스케줄러
RxJava의 스케줄러는 RxJava의 코드가 어느 스레드에서 실행될 것인지 지정하는 역할을 한다. RxJava만 사용한다고 비동기 처리가 되는 것이 아니라, 스케줄러를 통해 스레드를 분리해주어야 비동기 작업이 가능한 것이다. 스케줄러의 지정은 RxJava의 subscribeOn과 observeOn 연산자를 통해 가능하다.
subscribeOn은 Observable이 데이터 흐름을 발생시키고 연산하는 스레드를 지정할 수 있고, observeOn은 Observable이 observer에게 알림을 보내는 스레드를 지정할 수 있다.
RxJava의 큰 장점은 특정 스케줄러를 사용하다가 다른 스케줄러로 변경하기 쉽다는 점이다.
(1) 세번째 subscribeOn 연산자에 의해 처음 데이터 발행은 파란색 스레드에서 일어난다.
(2) 첫번째 observeOn 연산자에 의해 그 다음의 map 연산이 주황색 스레드에서 일어난다.
(3) 마지막 observeOn 연산자에 의해 그 다음의 작업이 핑크색 스레드에서 일어난다.
그럼 두 연산차의 차이는 무엇일까?
subscribeOn은 여러번 호출되더라도 맨 처음의 호출만 영향을 주며 어디에 위치하든 상관없다.
observeOn은 여러번 호출될 수 있으며 이후에 실행되는 연산에 영향을 주므로 위치가 중요하다.
예제
ArrayList<MyShape> shapes = new ArrayList<>();
shapes.add(new MyShape("Red","Ball"));
shapes.add(new MyShape("Green","Ball"));
shapes.add(new MyShape("Blue","Ball"));
Observable.fromIterable(shapes)
.subscribeOn(Schedulers.computation()) // (A)
.subscribeOn(Schedulers.io()) // (B)
// 1. 현재 스레드(main)에서 Observable을 구독
.doOnSubscribe(data -> MyUtil.printData("doOnSubscribe"))
// 2. (A)에 의해 computation 스케줄러에서 데이터 흐름 발생, (B)는 영향 X
.doOnNext(data -> MyUtil.printData("doOnNext", data))
// 3. (C)에 의해 map 연산이 new thread에서 실행
.observeOn(Schedulers.newThread()) // (C)
.map(data -> {data.shape = "Square"; return data;})
.doOnNext(data -> MyUtil.printData("map(Square)", data))
// 4. (D)에 의해 map 연산이 new thread에서 실행
.observeOn(Schedulers.newThread()) // (D)
.map(data -> {data.shape = "Triangle"; return data;})
.doOnNext(data -> MyUtil.printData("map(Triangle)", data))
// 5. (E)에 의해 new thread에서 데이터 소비(subscribe)
.observeOn(Schedulers.newThread()) // (E)
.subscribe(data -> MyUtil.printData("subscribe", data));
[실행결과]
main | doOnSubscribe |
RxComputationThreadPool-1 | doOnNext | MyShape{color='Red', shape='Ball'}
RxComputationThreadPool-1 | doOnNext | MyShape{color='Green', shape='Ball'}
RxComputationThreadPool-1 | doOnNext | MyShape{color='Blue', shape='Ball'}
RxNewThreadScheduler-1 | map(Square) | MyShape{color='Red', shape='Square'}
RxNewThreadScheduler-1 | map(Square) | MyShape{color='Green', shape='Square'}
RxNewThreadScheduler-1 | map(Square) | MyShape{color='Blue', shape='Square'}
RxNewThreadScheduler-2 | map(Triangle) | MyShape{color='Red', shape='Triangle'}
RxNewThreadScheduler-2 | map(Triangle) | MyShape{color='Green', shape='Triangle'}
RxNewThreadScheduler-2 | map(Triangle) | MyShape{color='Blue', shape='Triangle'}
RxNewThreadScheduler-3 | subscribe | MyShape{color='Red', shape='Triangle'}
RxNewThreadScheduler-3 | subscribe | MyShape{color='Green', shape='Triangle'}
RxNewThreadScheduler-3 | subscribe | MyShape{color='Blue', shape='Triangle'}
예제에서 사용한 MyShape와 MyUtil 클래스이다.
class MyShape{
String color;
String shape;
MyShape(String color, String shape) {
this.color = color;
this.shape = shape;
}
@Override
public String toString() {
return "MyShape{" +
"color='" + color + '\'' +
", shape='" + shape + '\'' +
'}';
}
}
class MyUtil {
static void printData(String message) {
System.out.println(""+Thread.currentThread().getName()+" | "+message+" | ");
}
static void printData(String message, Object obj) {
System.out.println(""+Thread.currentThread().getName()+" | "+message+" | " +obj.toString());
}
}
스케줄러의 종류
스케줄러는 RxJava의 Schedulers 클래스의 정적 팩토리 메소드를 통해 생성할 수 있으며 총 5개의 스케줄러가 존재한다.
각각을 정리해보면 다음과 같다.
스케줄러 | 생성 방법 | 내용 |
SINGLE | Schedulers.single() | 단일 스레드를 생성해 계속 재사용 |
COMPUTATION | Schedulers.computation() | 내부적으로 스레드 풀 생성, 스레드 개수=프로세서 개수 |
IO | Schedulers.io() | 필요할때마다 스레드를 계속 생성 |
TRAMPOLINE | Schedulers.trampoline() | 현재 스레드에 무한한 크기의 대기 큐 생성 |
NEW_THREAD | Schedulers.newThread() | 매번 새로운 스레드 생성 |
RxJava에서는 이 중 Computation, IO, Trampoline 세 가지의 스케줄러를 권장한다.
Single Thread Scheduler
Single 스레드 스케줄러는 단일 스레드를 계속 사용한다. RxJava 내부에서 스레드를 별도로 생성하며, 한 번 생성된 스레드로 여러 작업을 처리한다. 비동기 처리를 지향하면 Single 스레드 스케줄러를 사용할 일은 거의 없다.
import common.CommonUtils
import common.Log
import io.reactivex.Observable
import io.reactivex.schedulers.Schedulers
class SingleSchedulerExample {
fun emit() {
val numbers = Observable.range(100, 5)
val chars = Observable.range(0, 5)
.map(CommonUtils()::numberToAlphabet)
numbers.subscribeOn(Schedulers.single())
.subscribe { data -> Log.it(data) }
chars.subscribeOn(Schedulers.single())
.subscribe { data -> Log.it(data) }
CommonUtils.sleep(500)
}
}
fun main() {
val demo = SingleSchedulerExample()
demo.emit()
}
[실행결과]
RxSingleScheduler-1 | value = 100
RxSingleScheduler-1 | value = 101
RxSingleScheduler-1 | value = 102
RxSingleScheduler-1 | value = 103
RxSingleScheduler-1 | value = 104
RxSingleScheduler-1 | value = A
RxSingleScheduler-1 | value = B
RxSingleScheduler-1 | value = C
RxSingleScheduler-1 | value = D
RxSingleScheduler-1 | value = E
Computation Thread Scheduler
Computation 스레드 스케줄러는 CPU에 대응하는 계산용 스케줄러이다. IO 작업을 하지 않고 일반적인 계산/연산 작업을 할 때 사용한다. 내부적으로 스레드 풀을 생성하고 생성된 스레드를 이용한다. 기본적으로 스레드의 개수는 프로세서의 개수와 같다.
import common.CommonUtils
import common.Log
import io.reactivex.Observable
import io.reactivex.functions.BiFunction
import io.reactivex.schedulers.Schedulers
import java.util.concurrent.TimeUnit
class ComputationSchedulerExample {
fun emit() {
val orgs = arrayOf("1", "3", "5")
val source = Observable.fromArray(*orgs)
.zipWith(
Observable.interval(100, TimeUnit.MILLISECONDS),
BiFunction { a: String, _: Long -> a }
)
// 구독 #1
source.map { item -> "<<$item>>" }
.subscribeOn(Schedulers.computation())
.subscribe { data -> Log.it(data) }
// 구독 #2
source.map { item -> "##$item##" }
.subscribeOn(Schedulers.computation())
.subscribe { data -> Log.it(data) }
CommonUtils.sleep(1000)
}
}
fun main() {
val demo = ComputationSchedulerExample()
demo.emit()
}
[실행결과]
RxComputationThreadPool-3 | value = <<1>>
RxComputationThreadPool-4 | value = ##1##
RxComputationThreadPool-3 | value = <<3>>
RxComputationThreadPool-4 | value = ##3##
RxComputationThreadPool-3 | value = <<5>>
RxComputationThreadPool-4 | value = ##5##
데이터와 시간을 합성할 수 있는 zipWith를 사용하여 interval을 통해 시간 간격으로 데이터를 발행한다.
동시 실행을 위해서 첫 번째 구독과 두 번째 구독사이에 sleep을 제거했다.
여러 번 실행하다 보면 아래와 같은 결과를 얻기도 한다.
[실행결과]
RxComputationThreadPool-3 | value = <<1>>
RxComputationThreadPool-3 | value = ##1##
RxComputationThreadPool-3 | value = <<3>>
RxComputationThreadPool-3 | value = ##3##
RxComputationThreadPool-3 | value = <<5>>
RxComputationThreadPool-3 | value = ##5##
첫번 째 구독과 두번째 구독이 거의 동시에 이루어지기 때문에 RxJava 내부에서 동일한 스레드에 작업을 할당했기 때문이다.
IO Thread Scheduler
파일 입출력 등의 IO 작업을 하거나 네트워크 요청 처리 시에 사용하는 스케줄러이다. Computation 스케줄러는 CPU 개수 만큼 스레드를 생성하지만, IO 스케줄러는 필요할 때마다 스레드를 계속 생성한다.
입출력 작업은 비동기로 실행되지만 결과를 얻기까지 대기 시간이 길다.
import common.CommonUtils
import common.Log
import io.reactivex.Observable
import io.reactivex.schedulers.Schedulers
import java.io.File
class IOSchedulerExample {
fun emit() {
val root = "c:\\"
val files = File(root).listFiles()
val source = Observable.fromArray(*files)
.filter { f -> !f.isDirectory }
.map { f -> f.absolutePath }
.subscribeOn(Schedulers.io())
source.subscribe { data -> Log.it(data) }
CommonUtils.sleep(500)
}
}
fun main() {
val demo = IOSchedulerExample()
demo.emit()
}
[실행결과]
RxCachedThreadScheduler-1 | value = c:\agentlog.txt
RxCachedThreadScheduler-1 | value = c:\hiberfil.sys
RxCachedThreadScheduler-1 | value = c:\pagefile.sys
RxCachedThreadScheduler-1 | value = c:\swapfile.sys
C 드라이브 루트 디렉터리인 root로 File 객체를 생성하여 listFiles 메서드를 호출하여 파일 목록을 File 배열로 받는다. 그리고 디렉터리는 제외하고 파일들만 필터링한다.
Trampoline Thread Scheduler
트램펄린 스케줄러는 새로운 스레드를 생성하지 않고 사용하고 있는 현재 스레드에 무한한 크기의 대기 큐를 생성한다.
import common.CommonUtils
import common.Log
import io.reactivex.Observable
import io.reactivex.schedulers.Schedulers
class TrampolineSchedulerExample {
fun emit() {
val orgs = arrayOf("1", "3", "5")
val source = Observable.fromArray(*orgs)
// 구독 #1
source.subscribeOn(Schedulers.trampoline())
.map { data -> "<<$data>>" }
.subscribe { data -> Log.it(data) }
// 구독 #2
source.subscribeOn(Schedulers.trampoline())
.map { data -> "##$data##" }
.subscribe { data -> Log.it(data) }
CommonUtils.sleep(500)
}
}
fun main() {
val demo = TrampolineSchedulerExample()
demo.emit()
}
[실행결과]
main | value = <<1>>
main | value = <<3>>
main | value = <<5>>
main | value = ##1##
main | value = ##3##
main | value = ##5##
출력 결과를 보면 새로운 스레드를 생성하지 않고 메인 스레드에서 모든 작업을 실행하는 것을 볼 수 있다.
큐에 작업을 넣은 후 1개씩 꺼내어 동작하므로 첫 번째 구독과 두 번째 구독의 실행 순서가 바뀌는 경우가 발생하지 않는다.
New Thread Scheduler
New Thread 스케줄러는 다른 스케줄러와 달리 요청을 받을 때마다 매번 새로운 스레드를 생성한다.
import common.CommonUtils
import common.Log
import io.reactivex.Observable
import io.reactivex.schedulers.Schedulers
class NewThreadSchedulerExample {
fun emit() {
val orgs = arrayOf("1", "3", "5")
Observable.fromArray(*orgs)
.doOnNext { data -> Log.it("Original data : $data") }
.map { data -> "<<$data>>" }
.subscribeOn(Schedulers.newThread())
.subscribe { data -> Log.it(data) }
CommonUtils.sleep(500)
Observable.fromArray(*orgs)
.doOnNext { data -> Log.it("Original data : $data") }
.map { data -> "##$data##" }
.subscribeOn(Schedulers.newThread())
.subscribe { data -> Log.it(data) }
CommonUtils.sleep(500)
}
}
fun main() {
val demo = NewThreadSchedulerExample()
demo.emit()
}
[실행결과]
RxNewThreadScheduler-1 | value = Original data : 1
RxNewThreadScheduler-1 | value = <<1>>
RxNewThreadScheduler-1 | value = Original data : 3
RxNewThreadScheduler-1 | value = <<3>>
RxNewThreadScheduler-1 | value = Original data : 5
RxNewThreadScheduler-1 | value = <<5>>
RxNewThreadScheduler-2 | value = Original data : 1
RxNewThreadScheduler-2 | value = ##1##
RxNewThreadScheduler-2 | value = Original data : 3
RxNewThreadScheduler-2 | value = ##3##
RxNewThreadScheduler-2 | value = Original data : 5
RxNewThreadScheduler-2 | value = ##5##
뉴 스레드 스케줄러를 사용했기 때문에 첫 번째 Observable은 RxNewThreadScheduler-1에서 실행되었고, 두 번째 Observable은 RxNewThreadScheduler-2에서 실행된 것을 볼 수 있다.
각 스레드에서 값이 처리되는 것을 기다려야 하기 때문에 sleep를 제거하면 발행이 되지 않거나, 값이 뒤섞이게 된다.
출처
19. RxJava - 스케줄러 종류 (tistory.com)
[RxJava] RxJava 이해하기 - 5. 스케줄러 - HERSTORY (4z7l.github.io)
[RxJava] RxJava 이해하기 - 6. subscribeOn과 observeOn의 차이 - HERSTORY (4z7l.github.io)
'Android > RxJava' 카테고리의 다른 글
RxJava + Retrofit으로 Http 통신하기 (0) | 2023.06.20 |
---|---|
RxJava (12) - Backpressure와 Flowable (0) | 2023.06.18 |
RxJava (10) - Observable 디버깅하기 (0) | 2023.06.18 |
RxJava (9) - Observable 오류 처리하기 (0) | 2023.06.18 |
RxJava (8) - Observable 결합하기 (0) | 2023.06.18 |