리액티브 프로그래밍

[Reactive Programming] Backpressure란? Reactor에서의 Backpressure 방식

88dldl 2025. 5. 3. 16:42

스프링으로 시작하는 리액티브 프로그래밍(황정식  , 비제이퍼블릭)을  읽고 정리합니다.

 

 

📖 Part 02 . Project Reactor

🔸8. Backpressure

🔹8.1 Backpressure 란?

Backpressure 는 우리말로 배압 또는 역압이라고 불린다.

배압이라는 용어는 배관으로 유입되는 가스나 액체 등의 흐름을 제어하기 위해 역으로 가해지는 압력을 의미한다.

 

리액티브 프로그래밍에서의 Backpressure

publisher가 끊임없이 emit하는 무수한 데이터를 적절하게 제어하여 데이터 처리에 과부화가 걸리지 않도록 하는 것이다.

 

 

다음과 같은 상황을 생각해보자

 

emit된 데이터들은 Subscriber가 data1을 처리 완료하기 전까지 대기해야한다.

=> 이런 상황이되면 처리되지 않고 대기 중인 데이터가 지속적으로 쌓이게 되면 오버플로가 발생하거나 최악의 경우 시스템이 다운되는 문제가 생긴다.

 

이를 해결하기 위한 수단이 바로 Backpressure이다.

 

 

🔹8.2 Reactor 에서의 Backpressure 방식

🔺8.2.1 데이터 개수 제어

첫번째 방식은 Subscriber가 적절히 처리할 수 있는 수준의 데이터 개수를 Publisher에게 요청하는 것이다.

@Slf4j
public class Example8_1 {
    public static void main(String[] args) {
        Flux.range(1, 5)
            .doOnRequest(data -> log.info("# doOnRequest: {}", data))
            .subscribe(new BaseSubscriber<Integer>() {
                @Override
                protected void hookOnSubscribe(Subscription subscription) {
                    request(1);
                }

                @SneakyThrows
                @Override
                protected void hookOnNext(Integer value) {
                    Thread.sleep(2000L);
                    log.info("# hookOnNext: {}", value);
                    request(1);
                }
            });
    }
}

Reactor에서는 Subscriber가 데이터 요청 개수를 직접 제어하기 위해서 Subscriber 인터페이스의 구현 클래스인 BaseSubscriber를 사용할 수 있다.

  • hookOnSubscribe() 메서드는 Subscriber인터페이스에 정의된 onSubscribe() 메서드를 대신해 구독 시점에 request() 메서드를 호출해 최초 데이터 요청 개수를 제어한다.
  • hookOnNext () 메서드는 Subscriber 인터페이스에 정의된 onNext() 메서드를 대신해 Publisher 가 emit한 데이터를 전달받아 처리한 후에 Publisher 에게 또다시 데이터를 요청하는 역할을 한다.
    → 이때 역시 request 메서드를 호출해서 데이터 요청개수를 제어한다.

 

🔺8.2.2 Backpressure 전략 사용

두번쨰 방법은 Reactor에서 제공하는 Backpressure 전략을 사용하는 것이다.

전략 이름 동작 방식
IGNORE Backpressure 를 적용하지 않는다.
ERROR Downstream 으로 전달할 데이터가 버퍼에 가득 찰 경우, Exception을 발생시키는 전략
DROP Downstream 으로 전달할 데이터가 버퍼에 가득 찰 경우, 버퍼 밖에서 대기하는 먼저 emit된 데이터부터 Drop 시키는 전략
LATEST Downstream 으로 전달할 데이터가 버퍼에 가득 찰 경우, 버퍼 밖에서 대기하는 가장 최근에 emit된 데이터부터 버퍼에 채우는 전략
BUFFER Downstream 으로 전달할 데이터가 버퍼에 가득 찰 경우, 버퍼 안에 있는 데이터부터 Drop 시키는 전략

 

 

 

⏺️ IGNORE 전략

말 그대로 Backpressure 를 적용하지 않는 전략으로 IllegalStateException이 발생할 수도 있다.

 

 

 

⏺️ ERROR 전략

Downstream의 데이터 처리 속도가 느려서 Upstream 의 emit속도를 따라가지 못할 때, IllegalStateException을 발생시킨다.

 

이 경우에 Publisher는 Error signal 을 Subscriber에게 전송하고 삭제한 데이터는 폐기한다.

@Slf4j
public class Example8_2 {
    public static void main(String[] args) throws InterruptedException {
        Flux
            .interval(Duration.ofMillis(1L))
            .onBackpressureError()
            .doOnNext(data -> log.info("# doOnNext: {}", data))
            .publishOn(Schedulers.parallel())
            .subscribe(data -> {
                        try {
                            Thread.sleep(5L);
                        } catch (InterruptedException e) {}
                        log.info("# onNext: {}", data);
                    },
                    error -> log.error("# onError", error));

        Thread.sleep(2000L);
    }
}

위 코드는 Subscriber가 전달받은 데이터를 처리하는데 0.005초의 딜레이를 넣어 emit하는 속도와 처리하는 속도의 차이를 적용해 테스트를 진행했다.

 

ERROR전략을 사용하기 위해 onBackpressureError() Operator를 사용했다.

 

다음 라인의 doOnNext() Operator는 Publisher가 emit한 데이터를 확인하거나 추가적인 동작을 정의하는 용도로 사용되는데 주로 디버깅 용도로 사용가능하다.

 

publishOn() Operator는 Reactor Sequence 중 일부를 별도의 스레드에서 실행할 수 있도록 해주는 Operator이다.

 

 

 

 

⏺️ DROP 전략

Downstream 으로 전달할 데이터가 버퍼에 가득 찰 경우, 버퍼 밖에서 대기하는 먼저 emit된 데이터부터 Drop 시키는 전략이다. Drop 된 데이터는 폐기된다.

 

그림을 보면 알 수 있듯 버퍼 밖에 있던 데이터 11,12,13 이 먼저 Drop 되고 있고, 버퍼가 빈 후에는 14부터 데이터가 입력되고 있다.

 

ERROR 전략에서는 onBackpressureError() Operator를 사용했었는데, Drop 전략에서는 onBackpressureDrop() Operator 를 사용한다.

 

onBackpressureError() 와 달리 onBackpressureDrop() Operator는 Drop된 데이터를 파라미터로 받을 수 있기 때문에 Drop된 데이터가 폐기되기 전에 추가 작업을 수행할 수 있다.

.onBackpressureDrop(dropped -> log.info("# dropped: {}", dropped))

 

 

 

 

⏺️ LATEST 전략

Downstream 으로 전달할 데이터가 버퍼에 가득 찰 경우, 버퍼 밖에서 대기하는 가장 최근에 emit된 데이터부터 버퍼에 채우는 전략이다.

 

Drop과 달리 새로운 데이터가 들어오는 시점에 가장 최근의 데이터만 남겨두고 나머지 데이터를 폐기한다.

 

그림에서는 step4에서 가장 최근에 emit된 숫자 17 이외의 나머지 숫자들이 한번에 폐기되는 것처럼 표기했지만, 실제로는 데이터가 들어올 때마다 이전에 유지하고 있던 데이터가 폐기된다.

 

onBackpressureLatest() Operator를 사용한다.

 

 

 

 

 

⏺️  BUFFER 전략

컴퓨터 시스템에서 사용되는 버퍼의 일반적인 기능은 입출력을 수행하는 장치들간의 속도차이를 조절하기 위해 입출력 장치 중간에 위치해서 데이터를 어느정도 쌓아두었다가 전송하는 것이다.

 

BUFFER 전략도 비슷한 기능을 한다.

버퍼의 데이터를 폐기하지 않고 버퍼링하는 전략도 지원하고, 버퍼가 가득차면 버퍼 내의 데이터를 폐기하는 전략, 그리고 버퍼가 가득차면 에러를 발생시키는 전략도 지원한다.

 

그 중 폐기하는 전략을 살펴보자. 폐기하는 전략에는 DROP_LATEST DROP_OLDEST가 있다.

 

 

 

1) BUFFER DROP_LATEST

Downstream 으로 전달할 데이터가 버퍼에 가득 찰 경우, 가장 최근에 버퍼 안에 채워진 데이터를 DROP하여 폐기한 후, 이렇게 확보된 공간에 emit된 데이터를 채우는 전략이다.

BUFFER DROP_LATEST 전략은 버퍼 오버플로가 발생했을 때, 버퍼 안에 가장 최근에 채워진 데이터가 Drop된다.

.onBackpressureBuffer(2,
      dropped -> log.info("** Overflow & Dropped: {} **", dropped),
      BufferOverflowStrategy.DROP_LATEST)

 

 

 

 

2) BUFFER DROP_OLDEST

Downstream 으로 전달할 데이터가 버퍼에 가득 찰 경우, 가장 처음에 버퍼 안에 채워진 데이터를 DROP하여 폐기한 후, 이렇게 확보된 공간에 emit된 데이터를 채우는 전략이다.

Step3까지는 BUFFER DROP_LATEST과 동일하다. Step 4에서는 버퍼 제일 앞쪽에 있는 숫자 1 이 Drop된다.

.onBackpressureBuffer(2,
      dropped -> log.info(" Overflow & Dropped: {} ", dropped),
      BufferOverflowStrategy.DROP_OLDEST)