본문 바로가기
Android/RxJava

RxJava (8) - Observable 결합하기

by 태크민 2023. 6. 18.

Observable 결합하기

여러 개의 Observable 데이터 스트림을 하나의 Observable로 만들 수 있다. 여러 데이터를 이용하여 가공해서 사용하는 경우, 혹은 HTTP 통신의 응답들을 한 번에 묶어서 받고 싶은 경우 등에 사용하게 된다. 이번 포스팅에선 이러한 동작을 하는 Observable 결합 연산자들 중 몇 가지를 알아보고자 한다.

 

설명만 들어서는 헷갈릴 수 있는 파트다. 마블 다이어그램과 예제 소스코드를 함께 보며 천천히 이해해보자.

 

 

conbineLast() 메소드

해당 메소드는 두 Observable 중 하나에서 데이터를 발행하려고 할 때, 나머지 한 Observable이 가장 최근에 발행한 데이터를 가져와 지정해준 함수를 거쳐 새로운 하나의 데이터로 발행하게 한다.

fun main() {
    val intStream: Observable<Int> = Observable.create { 
        Thread {
            for (i in 1..5) {
                it.onNext(i)
                Thread.sleep(1000)
            }
        }.start()
    }

    val strStream: Observable<String> = Observable.create {
        Thread {
            Thread.sleep(500)
            it.onNext("A")
            Thread.sleep(700)
            it.onNext("B")
            Thread.sleep(100)
            it.onNext("C")
            Thread.sleep(700)
            it.onNext("D")
        }.start()
    }

    Observable.combineLatest(
	intStream, strStream
    ) { num, str -> num.toString() + str }
        .subscribe(System.out::println)
}
[실행결과]
1A
2A
2B
2C
3C
3D
4D
5D

결과를 보면, 두 쓰레드 내에서 데이터가 발행되고 있는데, strStream의 쓰레드가 시작되고 0.5초 후에 A를 발행했고, 그전에는 이미 inStream의 쓰레드가 1을 발행했다. 이 때 combineLatest()로 만들어진 Observable은 각각의 최신 발행 데이터인 1과 A를 합쳐 1A를 발행하는 것이다. (해당 동작은 num.toString() + str 에 정의되어 있다.)

 

Thread.sleep()을 통해 구현한 '데이터 발행 시점'을 하나씩 살펴보며 어떠한 방식으로 동작되는지 살펴보는 것을 권장한다. 항상 가장 최근에 발행된 데이터끼리 합쳐지는 것을 이해했다면 성공이다.

 

merge() 메소드

무작정 합치고 보는 녀석이다. 여러 Observable에서 발행되는 모든 데이터들을 한 스트림으로 합쳐 발행해주는 녀석이다. 말 그대로 merge 한다. 합쳐진 데이터 스트림의 순서는 각각이 발행된 순서이다.

fun main() {
    val streamA = Observable.create<Int> {
        Thread {
            Thread.sleep(100)
            it.onNext(1)
            Thread.sleep(100)
            it.onNext(2)
            Thread.sleep(100)
            it.onNext(3)
        }.start()
    }

    val streamB = Observable.create<Int> {
        Thread {
            Thread.sleep(250)
            it.onNext(100)
            Thread.sleep(250)
            it.onNext(200)
            Thread.sleep(250)
            it.onNext(300)
        }.start()
    }

    Observable.merge(streamA, streamB)
        .subscribe(System.out::println)
}
[실행결과]
1
2
100
3
200
300

결과를 보면 발행 시간 순으로 모든 데이터들이 한 스트림으로 합쳐져서 발행되는 것을 확인할 수 있다.

🤚🏻  그런데 merge()로 데이터를 하나씩 발행하다가 에러가 발생하면 어떻게 될까?
정답: 즉시 데이터 스트림의 데이터 발행이 중단된다.

그럼 만약 원인이 사소한 오류라고 한다면, 나머지 데이터에 대하여 손실이 발생한다. 이럴 때를 대비하여 mergeDelayError라는 것을 제공해준다.

아래 마블 다이어그램을 보면 알 수 있듯, 에러가 발생한 데이터 제외 스트림 상 모든 데이터를 발행하고 난 뒤에, 발생했던 에러를 onError()로 발행하게 된다.

 

zip() 메소드

여러 Observable을 하나로 결합하되, 지정해준 함수를 거쳐 하나의 데이터로 발행한다. combineLates()가 항상 최신 발행 데이터끼리 합쳐서 발행하는 역할을 했다면, zip()은 여러 개의 Observable 들에서 발행하는 데이터들을 항상 1:1로 매핑하는 것을 보장하며 데이터를 발행하게 된다.

fun main() {
    val intStream = Observable.create<Int> {
        Thread {
            for (i in 1..5) {
                it.onNext(i)
                Thread.sleep(1000)
            }
        }.start()
    }

    val strStream = Observable.create<String> {
        Thread {
            Thread.sleep(500)
            it.onNext("A")
            Thread.sleep(700)
            it.onNext("B")
            Thread.sleep(100)
            it.onNext("C")
            Thread.sleep(700)
            it.onNext("D")
        }.start()
    }

    Observable.zip(
        intStream, strStream
    ) { num: Int, str: String -> num.toString() + str }
        .subscribe(System.out::println)
}
[실행결과]
1A
2B
3C
4D

출력 결과를 보면 알 수 있듯, strStream이 발행하는 데이터는 4개이고, inStream이 발행하는 데이터는 5개이다. 항상 1:1로 매핑되는 것을 보장하기 때문에, 총 4번 데이터가 발행되는 것을 확인할 수 있다.

 

오늘은 여러 Observable을 한 개의 Observable으로 결합하는 동작을 수행하는 연산자 몇 가지를 알아보았다.

 

출처

https://velog.io/@haero_kim/RxJava-Observable-%EA%B2%B0%ED%95%A9%ED%95%98%EA%B8%B0