본문 바로가기
Android/RxJava

RxJava (3) - Disposable 다뤄보기

by 태크민 2023. 6. 18.

subscribe()의 비밀

내용을 상기시켜보다. 우리는 subscribe()를 통해 Observable 데이터 스트림을 구독해볼 수 있었다. 그런데 이를 실험해본 사람이 있는지 모르겠는데, subscribe()는 사실 Disposable이라는 객체를 반환하기도 한다.

 

아래와 같이, Observable을 subscribe()로 구독하면, Disposasble 객체를 리턴받아 할당할 수 있다.

fun main() {
    val stream: Observable<String> = Observable.just("H43RO", "Velog", "Rx")
    val disposable: Disposable = stream.subscribe(System.out::println)
}

그럼 이녀석은 어디다 사용하는 것일까?

 

 

Disposable 개념

Disposable의 원래 뜻은 '처분 가능한' 정도로 해석할 수 있다. 어느 정도 일맥상통한다. Disposable 객체의 사용 목적을 이해하기 위해 한 번 이런 경우를 떠올려보자.

 

우리가 지금까지 들어왔던 예시에서는 Observable이 발행하는 데이터가 한정적이었기 때문에, onComplete()가 호출되기까지 별로 오래 걸리지 않아 데이터 발행 종료가 일어난다. 하지만 만약 무한정 데이터를 발행하거나 오랫동안 데이터를 발행하는 Observable의 경우에는 onComplete()가 호출되지 않거나 호출되기까지 오래 걸린다.

 

따라서 이들은 제대로 종료하지 않으면, 메모리 릭이 발생하고 만다. 아무도 데이터 스트림을 구독 안 하는데 메모리 어디선가 계속하여 데이터를 발행하고 있을 수 있기 때문이다.

더 이상 Observable이 발행하는 데이터의 구독이 필요하지 않을 땐, 이를 Dispose해줌으로써 메모리 릭을 방지해줄 수 있다. 이것을 하기 위해 바로 Disposable 객체를 사용하는 것이다.

 

Disposable.dispose() 메소드를 사용하면, 언제든지 아이템 발행을 중단시킬 수 있다. 1초에 한 번씩 무한정 데이터를 발행하는 Observable을 구독하고, Disposable을 통해 dispose()를 호출해보자.

fun main() {
    val stream: Observable<Long> = Observable.interval(1, TimeUnit.SECONDS)
    val disposable: Disposable = stream.subscribe(System.out::println)

    Thread.sleep(4000)
    disposable.dispose()
}
[실행결과]
0
1
2
3

이렇게 되면, 4초 후에 Observable 데이터 발행이 중단되고 모든 리소스가 폐기된다. Disposable.isDisposed()를 통해 리소스 폐기 여부를 체크할 수 있다.

근데 뭐 onComplete()가 호출된 이후라면 dispose()를 호출할 필요는 없다.

 

 

CompositeDisposable

그런데 만약 이러한 데이터 스트림을 여러 녀석들이 구독하고 있다면, 일일이 dispose()해주는 것은 여간 귀찮은 일이 아닐 것이다. 따라서 Disposable객체들은 한 번에 관리할 수 있는 CompositeDisposable 이라는 것을 제공해준다.

 

작동 원리는, CompositeDisposable에 Disposable 객체들을 추가해두고 필요에 따라 한번에 처리하는 방식이다.

fun main() {
    val stream: Observable<Long> = Observable.interval(1, TimeUnit.SECONDS)
    val disposableA: Disposable = stream.subscribe(System.out::println)
    val disposableB: Disposable = stream.subscribe(System.out::println)
    val disposableC: Disposable = stream.subscribe(System.out::println)
    val disposableD: Disposable = stream.subscribe(System.out::println)

    val compositeDisposable = CompositeDisposable()

    compositeDisposable.add(disposableA)
    compositeDisposable.add(disposableB)
    compositeDisposable.add(disposableC)
    compositeDisposable.add(disposableD)

    Thread.sleep(3000)

    compositeDisposable.dispose()
}
[실행결과]
0
0
0
0
1
1
1
1
2
2
2
2

실제로, 안드로이드 프로젝트를 진행할 때 이러한 Observable 데이터 스트림을 활용하는 상황이 있다면 무조건 CompositedDisposable을 통해 관리를 하곤 한다. 액티비티 라이플 사이클 콜백 메소드인 onDestroy()등에서 CompositeDisposable.dispose()를 해주게 되면, 메모리 릭을 방지할 수 있다. (또는 방지해야 한다.)