[Reactive Programming] Sinks
스프링으로 시작하는 리액티브 프로그래밍(황정식 저 , 비제이퍼블릭)을 읽고 정리합니다.
📖 Part 02 . Project Reactor
🔸9. Sinks
🔹9.1 Sinks 란
앞서 Processor에 대해서 알아봤었다.
[Reactive Programming] 리액티브 시스템과 프로그래밍, 리액티브 스트림즈
스프링으로 시작하는 리액티브 프로그래밍(황정식 저 , 비제이퍼블릭)을 읽고 정리합니다. 📖Chapter 01 . 리액티브 프로그래밍(Reactive Programming)🔸1. 리액티브 시스템과 리액티브 프로그래밍🔹1.1
88dldl.tistory.com
Processor의 기능을 개선한 Sinks가 Reactor 3.4.0부터 지원되기 시작했고, Processor 관련 API는 3.5.0에 완전히 제거되었다.
Sinks are constructs through which Reactive Streams signals can be programmatically pushed, with Flux or Mono semantics. These standalone sinks expose tryEmit methods that return an Sinks.EmitResult enum, allowing to atomically fail in case the attempted signal is inconsistent with the spec and/or the state of the sink.
https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Sinks.html
Sinks 는 리액티브 스트림즈의 Signal 을 프로그래밍 방식으로 푸시할 수 있는 구조이며 Flux또는 Mono 의 의미 체계를 가진다.
지금까지는 Flux또는 Mono가 onNext와 같은 Signal을 내부적으로 전송해주는 방식이었는데, Sinks를 사용하면 프로그래밍 코드를 통해 명시적으로 Singal 을 전송할 수 있다.
Reactor에서 프로그래밍 방식으로 Signal을 전송하는 가장 일반적인 방법은 generate() Operator 나 create() Operator 등을 사용하는 방식인데, 이는 Reactor에서 Sinks를 지원하기 전부터 사용하던 방식이다.
일반적으로 generate() Operator 나 create() Operator 는 싱글스레드 기반에서 Signal을 전송하는데 사용하는 반면,
Sinks는 멀티스레드 방식으로 Signal을 전송해서 스레드 안전성을 보장하기 때문에 예기치 않은 동작으로 이어지는 것을 보장한다.
예제 코드를 살펴보자
@Slf4j
public class Example9_1 {
public static void main(String[] args) throws InterruptedException {
int tasks = 6;
Flux
.create((FluxSink<String> sink) -> {
IntStream
.range(1, tasks)
.forEach(n -> sink.next(doTask(n)));
})
.subscribeOn(Schedulers.boundedElastic())
.doOnNext(n -> log.info("# create(): {}", n))
.publishOn(Schedulers.parallel())
.map(result -> result + " success!")
.doOnNext(n -> log.info("# map(): {}", n))
.publishOn(Schedulers.parallel())
.subscribe(data -> log.info("# onNext: {}", data));
Thread.sleep(500L);
}
private static String doTask(int taskNumber) {
// now tasking.
// complete to task.
return "task " + taskNumber + " result";
}
}
위 코드는 create() Operator 를 사용해서 프로그래밍 방식으로 Signal을 전송하는 방법이다.
create() Operator 가 처리해야할 작업의 개수만큼 doTask () 메서드를 호출해서 작업을 처리한 후, 결과를 리턴받는다.
⇒ 이 결과를 map() Operator를 사용해 추가 가공 처리 한 후 최종적으로 Subscriber에게 전달한다.
- 작업을 처리하는 스레드는 subscribeOn() Operator에서 지정하고
- 처리 결과를 가공하는 스레드는 publishOn() Operator에서 (13번 라인)
- 가공된 결과를 Subscriber에게 전달하는 스레드는 publishOn() Operator에서 (16번 라인) 지정한다.
결과적으로 총 3개의 스레드가 동시에 실행된다.
이처럼 create() Operator를 사용해서 프로그래밍 방식으로 Signal을 전송할 수 있으며 Reactor Sequence를 단계적으로 나누어서 여러개의 스레드로 처리할 수 있다.
그런데 위 코드에서 작업을 처리한 후, 그 결과값을 반환하는 doTask() 메서드가 싱글스레드가 아닌 여러개의 스레드에서 각각의 전혀 다른 작업들을 처리한 후 처리 결과를 반환하는 상황이 발생할 수도 있다.
⇒ 이런 상황에서 적절하게 사용할 수 있는 방식이 Sinks이다.
@Slf4j
public class Example9_2 {
public static void main(String[] args) throws InterruptedException {
int tasks = 6;
Sinks.Many<String> unicastSink = Sinks.many().unicast().onBackpressureBuffer();
Flux<String> fluxView = unicastSink.asFlux();
IntStream
.range(1, tasks)
.forEach(n -> {
try {
new Thread(() -> {
unicastSink.emitNext(doTask(n), Sinks.EmitFailureHandler.FAIL_FAST);
log.info("# emitted: {}", n);
}).start();
Thread.sleep(100L);
} catch (InterruptedException e) {
log.error(e.getMessage());
}
});
fluxView
.publishOn(Schedulers.parallel())
.map(result -> result + " success!")
.doOnNext(n -> log.info("# map(): {}", n))
.publishOn(Schedulers.parallel())
.subscribe(data -> log.info("# onNext: {}", data));
Thread.sleep(200L);
}
private static String doTask(int taskNumber) {
return "task " + taskNumber + " result";
}
}
위 코드는 doTask() 메서드가 루프를 돌 때마다 새로운 스레드에서 실행된다.
그리고 doTask() 메서드의 작업 처리 결과를 Sinks를 통해서 Downstream 에 emit한다.
이처럼 Sinks는 프로그래밍 방식으로 Signal을 전송할 수 있고, 멀티스레드 환경에서 스레드 안전성을 보장받을 수 있는 장점이 있다.
+) 스레드 안전성(Thread Safety) :
- 함수나 변수 같은 공유 자원에 동시 접근할 경우에도 프로그램의 실행에 문제가 없음을 의미한다.
- Processor에서는 onNext, onError 메서드를 직접적으로 호출하면서 스레드 안전성이 보장되지 않을 수 있는데, Sinks의 경우 동시접근을 감지하고, 동시 접근하는 스레드 중 하나가 빠르게 실패함으로써 스레드 안전성을 보장한다.
🔹9.2 Sinks 종류 및 특징
🔺9.2.1 Sinks.One
Sinks.One() 메서드를 사용해서 한 건의 데이터를 전송하는 방법을 정의해둔 기능 명세라고 할 수있다.
public final class Sinks{
...
public static <T> Sinks.One<T> one() {
return SinksSpecs.DEFAULT_SINKS.one();
}
}
위 코드는 메서드 내부 코드의 구성 모습이다.
Sinks.One은 한 건의 데이터를 프로그래밍 방식으로 emit하는 역할을 하기도 하고, Mono 방식으로 Subscriber가 데이터를 소비할 수 있도록 해주는 Sinks 클래스 내부에 인터페이스로 정의된 Sinks의 스펙 또는 사양으로 볼 수 있다.
⇒ 즉, 해당 메서드를 호출하는 것은 한 건의 데이터를 프로그래밍 방식으로 emit하는 기능을 사용하고 싶으니 거기에 맞는 적당한 기능 명세를 달라고 요청하는 것과 같다.
@Slf4j
public class Example9_4 {
public static void main(String[] args) throws InterruptedException {
Sinks.One<String> sinkOne = Sinks.one();
Mono<String> mono = sinkOne.asMono();
sinkOne.emitValue("Hello Reactor", FAIL_FAST);
//sinkOne.emitValue("Hi Reactor", FAIL_FAST);
mono.subscribe(data -> log.info("# Subscriber1 {}", data));
mono.subscribe(data -> log.info("# Subscriber2 {}", data));
}
}
- Sinks.One<String> sinkOne : Sinks.one() 메서드를 호출하면 Sinks.one이라는 기능 명세를 리턴한다.
- asMono() : emit한 데이터를 구독하여 전달받기 위해서 asMono()라는 메서드를 사용해서 Mono객체로 반환한다. 이렇게 변환된 Mono객체를 통해 emit된 데이터를 전달받을 수 있다.
- sinkOne.emitValue("Hello Reactor", FAIL_FAST); : Sinks.one 객체로 데이터를 emit할 수 있다.
- emitValue() 메서드의 두번째 파라미터는 emit도중에 에러가 발생했을때 어떻게 처리할 것인지에 대한 핸들러를 나타낸다.
public final class Sinks{
public interface EmitFailureHandler {
EmitFailureHandler FAIL_FAST = (signalType, emission) -> false;
...
}
...
}
내부적으로 살펴보면 FAIL_FAST 는 EmitFailureHandler 의 구현객체이다. 이 객체를 통해서 emit 중간에 발생한 에러에 대해 빠르게 실패처리 한다. (에러가 발생했을때 재시도를 하지 않고 즉시 실패 처리 한다는 의미이다)
위 코드를 실행해보면 로그는 다음과 같이 출력된다.
17:47:27.595 [main] INFO sample.spring.service.Example9_4 -- # Subscriber1 Hello Reactor
17:47:27.603 [main] INFO sample.spring.service.Example9_4 -- # Subscriber2 Hello Reactor
emit한 데이터가 두 구독자에게 모두 전달되었다.
주석을 해제하고 다시 실행해보면 다음과 같다.
17:49:30.232 [main] DEBUG reactor.core.publisher.Operators - onNextDropped: Hi Reactor
17:49:30.232 [main] INFO sample.spring.service.Example9_4 -- # Subscriber1 Hello Reactor
17:49:30.232 [main] INFO sample.spring.service.Example9_4 -- # Subscriber2 Hello Reactor
위 결과를 통해 처음 emit한 데이터는 정상적으로 emit되지만 나머지 데이터들은 Drop된다는 것을 확인할 수 있다.
🔺9.2.2 Sinks.Many
public final class Sinks{
public static ManySpec many() {
return SinksSpecs.DEFAULT_SINKS.many();
}
}
Sinks.Many의 경우 ManySpec을 리턴한다.
public final class Sinks{
...
public interface ManySpec {
UnicastSpec unicast();
MulticastSpec multicast();
MulticastReplaySpec replay();
}
}
ManySpec 의 경우 총 3가지의 기능을 정의하는데, 이 세가지의 기능은 각각의 기능을 또다시 별도의 Spec으로 정의해두고 있다.
⏺️ unicast
@Slf4j
public class Example9_8 {
public static void main(String[] args) throws InterruptedException {
Sinks.Many<Integer> unicastSink = Sinks.many().unicast().onBackpressureBuffer();
Flux<Integer> fluxView = unicastSink.asFlux();
unicastSink.emitNext(1, FAIL_FAST);
unicastSink.emitNext(2, FAIL_FAST);
fluxView.subscribe(data -> log.info("# Subscriber1: {}", data));
unicastSink.emitNext(3, FAIL_FAST);
//fluxView.subscribe(data -> log.info("# Subscriber2: {}", data));
}
}
해당 예제는 ManySpec의 구현 메서드인 unicast() 메서드를 호출했다.
unicast() 메서드를 호출하면 리턴값으로 UnicastSpec을 리턴하고 최종적으로 UnicastSpec에 정의된 기능을 사용한다.
UnicastSpec에 정의된 기능은 onBackpressureBuffer() 메서드를 호출함으로써 사용하게 된다.
+) unicast 는 One to One 방식이고 따라서 unicast의 의미는 UnicastSpec의 기능을 단 하나의 Subscriber에게만 데이터를 emit하는 것이다.
17:58:15.603 [main] INFO sample.spring.service.Example9_8 -- # Subscriber1: 1
17:58:15.608 [main] INFO sample.spring.service.Example9_8 -- # Subscriber1: 2
17:58:15.609 [main] INFO sample.spring.service.Example9_8 -- # Subscriber1: 3
실행결과는 다음과 같다.
주석을 제거하고 다시 실행해보면,
17:59:02.850 [main] INFO sample.spring.service.Example9_8 -- # Subscriber1: 1
17:59:02.858 [main] INFO sample.spring.service.Example9_8 -- # Subscriber1: 2
17:59:02.858 [main] INFO sample.spring.service.Example9_8 -- # Subscriber1: 3
17:59:02.882 [main] ERROR reactor.core.publisher.Operators -- Operator called default onErrorDropped
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.IllegalStateException: Sinks.many().unicast() sinks only allow a single Subscriber
Caused by: java.lang.IllegalStateException: Sinks.many().unicast() sinks only allow a single Subscriber
at reactor.core.publisher.SinkManyUnicast.subscribe(SinkManyUnicast.java:426)
at reactor.core.publisher.Flux.subscribe(Flux.java:8848)
at reactor.core.publisher.Flux.subscribeWith(Flux.java:8969)
at reactor.core.publisher.Flux.subscribe(Flux.java:8813)
at reactor.core.publisher.Flux.subscribe(Flux.java:8737)
at reactor.core.publisher.Flux.subscribe(Flux.java:8680)
at sample.spring.service.Example9_8.main(Example9_8.java:25)
IllegalStateException가 발생했고, 단 하나의 Subscriber를 허용한다고 보여준다.( sinks only allow a single Subscriber )
⏺️ multicast
@Slf4j
public class Example9_9 {
public static void main(String[] args) {
Sinks.Many<Integer> multicastSink =
Sinks.many().multicast().onBackpressureBuffer();
Flux<Integer> fluxView = multicastSink.asFlux();
multicastSink.emitNext(1, FAIL_FAST);
multicastSink.emitNext(2, FAIL_FAST);
fluxView.subscribe(data -> log.info("# Subscriber1: {}", data));
fluxView.subscribe(data -> log.info("# Subscriber2: {}", data));
multicastSink.emitNext(3, FAIL_FAST);
}
}
multicastSpec의 기능은 하나 이상의 Subscriber에게 데이터를 emit하는 것이다.
실행해보면 다음과 같다.
18:02:43.233 [main] INFO sample.spring.service.Example9_9 -- # Subscriber1: 1
18:02:43.243 [main] INFO sample.spring.service.Example9_9 -- # Subscriber1: 2
18:02:43.244 [main] INFO sample.spring.service.Example9_9 -- # Subscriber1: 3
18:02:43.244 [main] INFO sample.spring.service.Example9_9 -- # Subscriber2: 3
로그를 통해 알 수 있듯, Sinks는 기본적으로 Hot Publisher로 동작하며, 특히 onBackpressureBuffer() 메서드는 warm up 의 특징을 가지는 Hot Sequence로 동작하기 때문에 첫번째 구독이 발생한 시점에 Downstream 쪽으로 데이터가 전달되는 것이다.
+) warm up의 의미 : 최초 구독이 발생하기 전까지 데이터의 emit이 발생하지 않는 것
⏺️ replay
@Slf4j
public class Example9_10 {
public static void main(String[] args) {
Sinks.Many<Integer> replaySink = Sinks.many().replay().limit(2);
Flux<Integer> fluxView = replaySink.asFlux();
replaySink.emitNext(1, FAIL_FAST);
replaySink.emitNext(2, FAIL_FAST);
replaySink.emitNext(3, FAIL_FAST);
fluxView.subscribe(data -> log.info("# Subscriber1: {}", data));
replaySink.emitNext(4, FAIL_FAST);
fluxView.subscribe(data -> log.info("# Subscriber2: {}", data));
}
}
replay() 메서드를 호출하면 리턴값으로 multicastReplaySpec을 리턴하고, 이 multicastReplaySpec의 구현 메서드 중 하나인 limit() 메서드를 호출한다.
multicastReplaySpec에는 emit된 데이터를 다시 replay해서 구독 전에 이미 emit된 데이터라도 Subscriber가 전달받을 수 있게 하는 다양한 메서드들이 정의되어있다.
대표적으로 all()이라는 메서드가 있는데 처음 emit된 데이터부터 모든 데이터들이 Subscriber에게 전달된다.
limit() 메서드는 emit된 데이터 중에서 파라미터로 입력한 개수만큼 가장 나중에 emit된 데이터부터 Subscriber에게 전달하는 기능을 한다.
18:08:36.352 [main] INFO sample.spring.service.Example9_10 -- # Subscriber1: 2
18:08:36.358 [main] INFO sample.spring.service.Example9_10 -- # Subscriber1: 3
18:08:36.358 [main] INFO sample.spring.service.Example9_10 -- # Subscriber1: 4
18:08:36.359 [main] INFO sample.spring.service.Example9_10 -- # Subscriber2: 3
18:08:36.359 [main] INFO sample.spring.service.Example9_10 -- # Subscriber2: 4
실행 결과를 보면 첫번째 Subscriber의 입장에서는 구독시점에 이미 세개의 데이터가 emit되었기 때문에 마지막 2개를 뒤로 되돌린 숫자가 2므로 2부터 전달된다.
두번째 Subscriber 는 구독 전에 숫자 4의 데이터가 한번 더 emit되었기 때문에, 구독 시점에 마지막 2개를 뒤로 돌린 숫자가 3이므로 3부터 전달된다.