Backpressure와 Flowable
RxJava에는 Backpressure라는 개념과 이를 처리하는 Flowable class가 존재한다.
Backpressure가 무엇이고 Flowable은 어떻게 쓰는 것인지 알아보자.
배압(Backpressure)
배압이란 데이터 생산과 소비가 불균형적일 때 일어나는 현상이다. 만약 10,000개의 데이터를 0.1초마다 발행하고, 소비는 10초마다 한다면 소비와 관계없이 데이터는 스트림에 계속 쌓이게 된다. Observable이 데이터를 발행하는 속도를 Observer의 소비 속도가 따라가지 못하는 것이다. 이는 결국 메모리가 overflow되고 OutOfMemoryError로 이어져 앱이 터질 것이다. 이러한 현상을 배압(Backpressure)이라고 하며 RxJava에서는 배압 현상을 제어할 수 있는 방법을 제공한다.
Flowable
기존의 Observable이 배압 현상을 제어하지 못하는 반면, Flowable은 배압 현상을 스스로 제어할 수 있다. 다음의 두 코드를 살펴보자.
Observable
Observable.range(1, 10000)
.doOnNext(integer -> System.out.println("Emit Data : "+integer))
.observeOn(Schedulers.io())
.subscribe(integer -> {
System.out.println("Consume Data : "+integer);
Thread.sleep(100);
});
Thread.sleep(100*10000);
[실행결과]
Emit Data : 1
Emit Data : 2
Emit Data : 3
Emit Data : 4
Emit Data : 5
Consume Data : 1
...
Emit Data : 9998
Emit Data : 9999
Emit Data : 10000
Consume Data : 2
Consume Data : 3
Consume Data : 4
...
Flowable
Flowable.range(1, 10000)
.doOnNext(integer -> System.out.println("Emit Data : "+integer))
.observeOn(Schedulers.io())
.subscribe(integer -> {
System.out.println("Consume Data : "+integer);
Thread.sleep(100);
});
Thread.sleep(100*10000);
[실행결과]
Emit Data : 1
Emit Data : 2
Emit Data : 3
Emit Data : 4
Emit Data : 5
Consume Data : 1
...
Emit Data : 126
Emit Data : 127
Emit Data : 128
Consume Data : 2
Consume Data : 3
Consume Data : 4
...
두 예제 모두 10,000개의 데이터를 발행하면서, 소비는 100ms의 delay를 두었다.
Observable을 사용한 경우에는 데이터 발행과 소비가 균형적으로 일어나지 않으며 데이터는 소비와 상관없이 스트림에 계속 쌓이게 된다.
반면 Flowable을 사용한 경우에는 데이터가 일정량 누적되면 데이터를 더이상 발행하지 않는 것을 확인할 수 있다.
이와 같이, Flowable은 스트림에 끊임없이 쌓이는 데이터의 양을 제어할 수 있는 Observable의 또다른 형태이다.
When to use Observable? When to use Flowable?
그렇다면 언제 Observable을, 언제 Flowable을 사용해야할까? RxJava Wiki에 Observable과 Flowable을 선택하는 기준이 포스트되어있다.
Observable을 사용해야하는 경우
- 1,000개 미만의 데이터 흐름이 발생하는 경우
- 적은 데이터 소스만을 활용하여 OutOfMemoryException이 발생할 확률이 적은 경우
- 마우스 이벤트나 터치 이벤트와 같은 GUI 프로그래밍을 하는 경우 (초당 1,000회 이하의 이벤트는 Observable의 sample()이나 debounce()로 핸들링 가능)
- 동기적인 프로그래밍이 필요하지만 플랫폼에서 Java Streams을 지원하지 않는 경우
Flowable을 사용해야하는 경우
- 10,000개 이상의 데이터 흐름이 발생하는 경우
- 디스크에서 파일을 읽는 경우 (기본적으로 Blocking/Pull-based 방식)
- JDBC에서 데이터베이스를 읽는 경우 (기본적으로 Blocking/Pull-based 방식)
- 네트워크 IO 실행 시
- Blocking/Pull-based 방식을 사용하고 있는데 나중에 Non-Blocking 방식의 Reactive API/드라이버에서 데이터를 가져올 일이 있는 경우
배압 전략
Flowable에도 배압을 제어하지 못해 MissingBackpressureException이 발생할 수 있는 예외상황1이 존재한다. 따라서 Flowable에 배압 전략을 명시함으로써 배압을 제어할 수 있다. 5가지의 배압 전략이 존재하며 각각의 내용은 다음과 같다.
📌 1. Flowable과 interval()을 같이 사용하는 경우. interval 연산자는 스케줄러와 관계없이 시간에 의존해 데이터를 발행하므로 에러가 발생한다.
이름 | enum | 내용 |
MISSING | BackpressureStrategy.MISSING | 배압 전략을 구현하지 않음 |
ERROR | BackpressureStrategy.ERROR | 소비 속도가 발행 속도를 따라가지 못하는 경우 MissingBackpressureException 발생 |
BUFFER | BackpressureStrategy.BUFFER | 데이터를 소비할 때까지 데이터를 버퍼에 넣어둠. 무한한 크기의 큐이지만 OOME이 발생할 수 있음. |
DROP | BackpressureStrategy.DROP | 소비 속도가 발행 속도를 따라가지 못하는 경우 발행된 데이터를 모두 버림 |
LATEST | BackpressureStrategy.LATEST | 구독자가 데이터를 받을 준비가 될 때까지 최신 데이터만 유지하고 나머지는 버림 |
예제
create() 연산자를 통해 Flowable을 생성하는 경우 배압 전략을 명시해주어야 한다.
Flowable.create(emitter -> {
for (int i = 0; i < 10000; i++) emitter.onNext(i);
emitter.onComplete();
}, BackpressureStrategy.DROP)
.observeOn(Schedulers.io())
.subscribe();
배압 제어 연산자
RxJava의 연산자 중에는 생성된 Flowable에 배압 전략을 적용할 수 있는 3가지 연산자를 제공한다.
onBackPressureBuffer()
BackpressureStrategy.BUFFER 전략을 적용한다. 매개변수로 버퍼의 용량, 버퍼 overflow 발생 시의 동작 등을 함께 전달할 수 있다.
onBackPressureDrop()
BackpressureStrategy.DROP 전략을 적용한다. 매개변수로 데이터를 버릴 때의 동작을 정의할 수 있다.
onBackPressureLatest()
BackpressureStrategy.LATEST 전략을 적용한다.
예제
Flowable.range(1, 1000)
.onBackpressureLatest()
.doOnNext(integer -> System.out.println("Emit Data : "+integer))
.observeOn(Schedulers.io())
.subscribe(integer -> {
System.out.println("Consume Data : "+integer);
Thread.sleep(100);
});
Thread.sleep(100*1000);
[실행결과]
Emit Data : 1
Emit Data : 2
Emit Data : 3
Emit Data : 4
Consume Data : 1
Emit Data : 5
...
Emit Data : 128
Consume Data : 2
Consume Data : 3
...
Consume Data : 95
Consume Data : 96
Emit Data : 1000
Consume Data : 97
Consume Data : 98
...
Consume Data : 128
Consume Data : 1000
Flowable vs Observable
1. Reactive Streams 인터페이스 구현 유무
- Flowable은 Reactive Streams 인터페이스를 구현합니다.
Package를 보면 Reactive Streams인 것을 확인할 수 있습니다.
- Observable은 Reactive Streams 인터페이스를 구현하지 않습니다.
ObservableSource의 Package를 보면 RxJava 3.x에서 독자적으로 제공하고있는 인터페이스인 것을 확인할 수 있습니다.
2. 데이터 처리
- Flowable은 Subscriber라는 구독자로 데이터를 처리합니다.
- Observable은 Observer라는 구독자로 데이터를 처리합니다.
3. 구독 해지
- Flowable은 Subscription으로 구독을 해지합니다.
- Observable은 Disposable로 구독을 해지합니다.
Flowable.subscribe()에서 통지된 데이터가 3일 때 구독을 해지하는 과정
Observable.subscribe()에서 통지된 데이터가 3일 때 구독을 해지하는 과정
Flowable은 Subscription, Observable은 Disposable을 사용하여 구독을 해지하지만 결과는 같습니다.
4. 배압(Back Pressure) 기능
- Flowable은 배압 기능이 있기 때문에 Subscription으로 전달 받는 데이터의 개수를 제어할 수 있습니다.
- Observable은 배압 기능이 없기 때문에 데이터의 개수를 제어할 수 없습니다.
* 배압이라는 것은 데이터를 통지하는 속도가 Subscriber에서 통지된 데이터를 전달받아 처리하는 속도보다 빠를 때 균형을 맞추기 위해 데이터의 통지량을 제어하는 기능을 의미합니다.
출처 : https://www.inflearn.com/course/자바-리액티브프로그래밍-1
- 소스 코드
생산자 측은 RxComputationThreadPool-2에서 데이터를 통지하는 것을 확인할 수 있고, 첫번째 데이터가 처리될 때 소비자 처리 대기 중 로그가 찍히고 Thread가 1초동안 멈추게 됩니다.
소비자 측은 RxComputationThreadPool-1에서 데이터를 처리하는 것을 확인할 수 있고, 생산자는 127까지 통지하고 더이상 통지하지 않습니다. 소비자 측에서는 생산자 측에서 통지한 데이터 0만 처리한 후 MissinBackpressureException이 발생한 것을 확인할 수 있습니다. 생산자 측에서 통지하는 속도가 소비자 측에서 처리하는 속도보다 월등히 빨라 Exception이 발생한 것을 의미합니다.
'Android > RxJava' 카테고리의 다른 글
[Android] RxJava 병렬 처리 (flatmap, merge, zip) (1) | 2023.09.26 |
---|---|
RxJava + Retrofit으로 Http 통신하기 (0) | 2023.06.20 |
RxJava (11) - 스케줄러 (0) | 2023.06.18 |
RxJava (10) - Observable 디버깅하기 (0) | 2023.06.18 |
RxJava (9) - Observable 오류 처리하기 (0) | 2023.06.18 |