Reactive하게 생각하기
Reactive Programming 패러다임이란 사용자가 어떤 행동을 했을 때 그 행동에 대한 데이터를 기반으로 적절하게 반응하는 것이다. RxJava2는 Reactive Programming의 개념을 Java로 사용할 수 있도록 구현해 놓은 것이다.
RXJAVA의 3O
RxJava의 핵심 개념인 3O에 대해 먼저 알아보자. 3O란 각각 Observable, Observer, Operator를 말한다.
Reactive의 세계에서는 모든 것이 한방향으로 흐르는 스트림(Stream)이라는 강이 있다. 우리는 이 강을 따라서 지속적으로 데이터를 가공하고 정련해서 데이터를 생산/수정할 수 있다.
Observable
이 개념에 대해서 한마디로 표현하자면, 지속적으로 흐름을 파악할 수 있는 데이터를 의미한다. Observe=보다, 관찰하다. + able= 가능한의 합성어이기도 하다.
조금 더 이해하기 쉽게 하기 위해서, 실제 세계에서 Observable이 어떻게 사용되는지 예시를 들어보겠다. 강연을 보러갔을 때, 우리는 강연자의 말에 지속적으로 관심을 갖고 듣는다. 강연자는 여러 사람들에게 지속적으로 정보를 제공해 주고, 우리는 그 정보를 가공하고 정련해서 우리의 것으로 만든다. 이 때 "강연자는 Observable 하다"라고 할 수 있다.
즉, Observable을 이용해 데이터를 회수하고 변환하는 메커니즘을 정의하고, Observer는 이를 구독해 데이터가 준비되면 이에 반응한다.
이 패턴을 Oberver Pattern이라고 하며, Reactive Programming은 이 Oberver Pattern에 기반을 둔다.
- Observable이 데이터 스트림을 처리하고, 완료되면 데이터를 발행(emit)한다.
- 데이터를 발행할 때마다 구독하고 있는 모든 Observer가 알림을 받는다.
- Observer는 수신한 데이터를 가지고 어떠한 일을 한다.
어떻게 Subscribe를 하는가?
.Observable이 데이터를 발행하고 알림(Event)을 보내면 Observer는 Observable을 구독(Subscribe)해 데이터를 소비(Consume)한다. 실제로는 Obsesrvable이 데이터 흐름을 정의하고 알림을 보낸 뒤 Observer가 Subscribe를 해야 데이터가 발행되고 소비된다.
그럼 Subscribe를 어떻게 할까? 코드를 통해 알아보자.
Observable 생성하기
RxJava에서 연산자(Operator)를 통해 기존 데이터를 참조, 변형하여 Observable을 생성할 수 있다.
생성 연산자
생성 연산자의 역할은 데이터의 흐름을 만드는 것이다.
간단하게 Observable (Observable, Single, Maybe 객체 등을) 만드는 것
Observable을 생성할 때는 인스턴스를 직접 만들지 않고 정적 팩토리 함수를 호출한다.
1. create()연산자
Observable을 생성하지만 just()와 다르게 프로그래머가 직접 onNext, onComplete, onError를 호출해야한다.
Observable.create()를 사용하면 Emitter를 이용하여 직접 아이템을 발행하고, 아이템 발행의 완료나 오류(Complete/Error)의 알림을 직접 설정할 수 있다.
Observable.create(emitter -> {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onComplete();
}).subscribe(System.out::println);
[실행결과]
1
2
3
2. just() 연산자
Observable.just()를 사용하면 함수에 인자로 넣은 데이터를 차례로 발행한다. 인자로 10개까지 전달할 수 있다. 자동으로 onNext, onComplete 혹은 onError가 호출된다. 데이터가 그대로 발행되므로 다르게 변경하고 싶으면 map과 같은 연산자를 통해 변환해야한다.
//그대로 발행
Observable.just(1, 2, 3)
.subscribe(System.out::println);
// 변환하고 싶은 경우
Observable.just(1, 2, 3)
.map(x -> x * 10)
.subscribe(System.out::println);
[실행결과]
1
2
3
[실행결과]
10
20
30
위에서 언급했지만, RX Java에서는 기본적으로 null을 허용하지 않아서 just의 인자로 null을 발행하면 오류가 발생한다. 만약 아무런 아이템을 발행하지 않는 빈 Observable을 만들고 싶다면 observable.empty()연산자를 사용한다.
create() vs just()
- just: onNext, onComplete, onError 커스텀함 필요없음
- create: 개발자가 직접 콜백을 설정
3. fromXXX() 연산자
여러 데이터를 다뤄야 하는 경우 사용한다. 정의된 메소드의 종류는 다음과 같으며 특정 타입의 데이터를 oBSERVABLE로 바꿔주는 메소드이다.
예를들어, 배열, 리스트 등의 자료구조나 Future, Callable, Publisher 등은 from으로 시작하는 연산자를 통해 간단히 Observable로 변환할 수 있다.
fromArray() 연산자
배열의 아이템을 Observable로 바꿀 때에는 frmArray() 연산자를 이용하여 아이템을 순차적으로 발행한다.
String[] itemArray = new String[]{"Morning", "Afternoon", "Evening"};
Observable source = Observable.fromArray(itemArray);
source.subscribe(System.out::println);
[실행결과]
Morning
Afternoon
Evening
fromIterable() 연산자
ArrayList, HashSet과 같은 Iterable 자료 구조 클래스는 fromIterable() 연산자를 사용해 변환한다.
ArrayList itemList = new ArrayList<String>();
itemList.add("Morning");
itemList.add("Afternoon");
itemList.add("Evening");
Observable source = Observable.fromIterable(itemList);
source.subscribe(System.out::println);
[실행결과]
Morning
Afternoon
Evening
fromFuture() 연산자
fromFuture() 연산자는 Future 인터페이스를 지원하는 모든 객체를 ObservableSource로 변환하고 Future.get() 메소드를 호출한 값을 반환한다. 그럼 이 Future 인터페이스는 어디에 쓰냐하면, 바로 비동기적인 작업의 결과를 구할 때 사용한다. 보통 Executor Service를 통해 비동기 작업을 할 때 사용된다. Emitter는 Observable 내부에서 Future.get() 메소드를 호출하고, Future의 작업이 끝나기 전까지 스레드는 블로킹된다. RxJava에서는 Executor를 직접 다루기보다는 스케줄러를 사용하는 것을 권장한다.
Future<String> future = Executors.newSingleThreadExecutor()
.submit(() -> {
Thread.sleep(5000);
return "This is the future";
});
Observable source = Observable.fromFuture(future);
source.subscribe(System.out::println); //블로킹되어 기다림
This is the future
위는 단순히 출력된 것 처럼 보이지만 5초간 블로킹되어 기다린 후 출력된 것이다. 위에선 Executor를 사용했지만 Rx에서는 스케쥴러를 사용하는 것을 권장한다.
fromPublisher() 연산자
Publisher는 잠재적인 아이템 발행을 제공하는 생산자로 Subscriber로부터 요청을 받아 아이템을 발행한다. fromPublisher() 연산자는 Publisher를 Observable로 변환해준다.
Publisher<String> publisher = Subscriber -> {
subscriber.onNext("Morning");
subscriber.onNext("Afternoon");
subscriber.onNext("Evening");
subscriber.onComplete();
};
Observable<String> source = Observabler.fromPublisher(publisher);
source.subscribe(System.out::println);
Morning
Afternoon
Evening
fromCallable() 연산자
Callable 인터페이스는 비동기적인 실행 결과를 반환한다는 점이 Runnable과 다르다.
fromCallable() 연산자를 통해 Callable을 Observable로 변환하고 비동기적으로 아이템을 발행할 수 있다.
Callable<String> callable = () -> "RxJava is cool";
Observable source = Observable.fromCallable(callable);
source.subscribe(System.out::println);
RxJava is cool
4. interval() 연산자
일정 시간 간격으로 데이터 흐름을 생성하는 연산자다.
구독한 시간을 기준으로 그 시간부터 주어진 시간 간격으로 0부터 1씩 증가하는 Long객체를 발행한다.
- 함수 원형 두가지
함수 원형 | 설명 | 비고 |
interval(long period, TimeUnit unit) | 일정 시간(period)쉬었다가 데이터 발행 | |
interval(long initialDelay, long period, TimeUnit unit) | 동작은 같고 최초 지연 시간(initialDelay)을 조절 | 보통 초기 지연시간 없이(initialDelay를 0으로) 바로 데이터를 발행하기 위해 사용 |
public class IntervalOperator
{
public static void main(String[] args) throws InterruptedException
{
Observable<Long> source = Observable.interval(100L, TimeUnit.MILLISECONDS)
.map(data -> (data + 1) * 100)
.take(5);
source.subscribe(System.out::println);
Thread.sleep(1000);
}
}
[실행결과]
100
200
300
400
500
Observable인 source 변수는 100ms 간격으로 0부터 데이터를 발행한 후 map(0함수를 호출하여 입력값에 1을 더하고 100을 곱한다.
따라서 100, 200, 300...등의 데이터를 발행한다. take()는 최초 5개의 데이터만 취급한다.
5. timer() 연산자
interval()과 유사하지만 한번만 실행되는 연산자이다.
일정 시간이 지난 후, 한 개의 데이터를 발행하고 onComplete() 이벤트 발행
public class TimerOperator
{
public static void main(String[] args) throws InterruptedException
{
Obseravle<String> source = Observable.timer(300L, TimeUnit.MILLISECONDS)
.map(notUsed -> {
return new SimpleDataFormat("yyyy/MM/dd HH:mm:ss").format(new Date());
}).subscribe(Log::it);
}
}
[실행결과]
2023/06/14 15:18:59
300ms가 지난 후 현재 시간을 출력하도록 구현한 코드이다.
6. range() 연산자
주어진 값 (n)부터 m개의 Integer 객체를 발행하는 연산자이다.
interval(), timer()는 Long 객체를 발행했지만 range()는 Integer 객체를 발행하는 것이 다름
interval(), timer()와 다르게 스케줄러에서 실행되지 않고 현재 스레드에서 실행되는 특징이 있다.
public class rangefun {
public static void main(String[] args){
Observable<Integer> source = Observable.range(1,10).filter(num-> num%2 == 0);
source.subscribe(it -> System.out.println(it));
}
}
[실행결과]
2
4
6
8
10
7. intervalRange() 연산자
interval() + reange()를 혼합해놓은 연산자이다.
- interval()처럼 일정시간 간격으로 값을 출력하지만
- range()처럼 시작 숫자(n)로 부터 m개만큼의 값만 생성하고 onComplete이벤트가 발생
- 즉, interval() 처럼 무한히 데이터 흐름을 발행하지 않음
- 리턴 타입은 interval()함수와 동일하게 Long타입
public class intervalRangefun {
public static void main(String[] args){
Observable<Long> source= Observable.intervalRange(1,
5,
100L,
100L,
TimeUnit.MILLISECONDS);
source.subscribe(it -> System.out.println(it));
}
}
[실행결과]
1
2
3
4
5
8. defer() 연산자
'defer'의 사전적 정의는 '미루다, 연기하다'이다. 그와 비슷하게, defer()연산자는 누군가 구독(subscribe)할 때까지 Observable 데이터 발행을 미루게한다. 즉, 어떤 Observer가 subscribe()를 호출할 때 데이터를 발행하기 시작하는 것이다.
데이터 발행 방식의 차이를 명확하게 이해하기 위해 현재 시각을 출력하는 동작과 함께 just()와 비교해보도록 하자. 쓰레드를 5초동안 정지했고, just()를 통해 생성된 Observable이 발생하는 데이터와 defer()를 통해 생성된 Observable이 발행되는 데이터를 자세히보자.
public class deferfun {
String getCurrentTime() {
SimpleDateFormat timeFormat = new SimpleDateFormat("mm:ss.SSS", Locale.KOREA)
return timeFormat.format(System.currentTimeMillis())
}
public static void main(String[] args){
Observable justStram = Observable.just(getCurrentTime());
Observable deferStream = Observable.defer{
Observable.just(getCurrentTime))
}
Log.d("[1] : ${getCurrentTime()}");
Thread.sleep(5000);
Log.d("[2] : ${getCurrentTime()}");
Log.d("=========================================");
justStream.subscribe(Log::i);
deferStream.subscribe(Log::i);
}
}
[실행결과]
[1] : 15:44.640
[2] : 15:49.645
=========================================
just : 15:44.572
defer : 15:49.655
just(0를 통해 생성된 녀석은 Observable이 생성됐을 때의 시간을 발행했고, defer()를 통해 생성된 녀석은 구독을 시작했을 때의 시간을 발행했다.
따라서 구독 당시 가장 최신 데이터를 받아보고 싶을 때 defer()를 사용하면 된다.
8. refeat() 연산자
단순 반복 실행을 한다. 해당 연산자는 서버와 통신을 하면 해당 서버가 잘 살아있는지 확인하는 코드로 주로 활용한다.
public class repeatfun {
public static void main(String[] args){
String[] number = {"1", "2", "3"};
Observable<String> source = observable.fromArray(number)
.repeat(3)
.doOnComplete(() -> Log.d("onComplete"))
.subscribe(Log::i);
}
}
[실행결과]
1
2
3
1
2
3
1
2
3
onComplete
Observable의 데이터 발행
Observable이 데이터를 발행 한 후 보내는 알림에는 세 가지 종류가 있다.
// Emitter를 통해 알림을 보낸다고 생각하면 된다
public interface Emitter<@NonNull T> {
void onNext(@NonNull T value);
void onError(@NonNull Throwable error);
void onComplete();
}
- onNext(): 하나의 소스 Observable에서 Observer까지 한번에 하나씩 순차적으로 데이터를 발행한다.
- onComplete(): 데이터 발행이 끝났음을 알리는 완료 이벤트를 Observer에 전달하여 onNext()를 더 호출하지 않음을 나타낸다.
- onError(): 오류가 발생했음을 Obserer에 전달한다.
위 이벤트들은 Emitter라는 인터페이스에 의해 선언된다.
Emit 1. 방출하다 2. 내뿜다 3. 발산하다.
하지만 데이터나 오류 내용을 발행할 때 null을 발행할 수 없다.
예제 코드 1 (Observable.create)
//Observable 생성
Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Throwable {
// 데이터 흐름 정의
emitter.onNext(1);
emitter.onNext(2);
emitter.onComplete();
// onComplete() 이후의 데이터는 발행되지 않음
emitter.onNext(3);
}
});
// subscribe 함수를 통해 실제로 데이터를 발행하여 소비함
observable.subscribe(
// onNext
new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Throwable {
System.out.println("onNext : " + integer);
}
},
// onError
new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Throwable {
System.out.println("onError : " + throwable);
}
},
// onComplete
new Action() {
@Override
public void run() throws Throwable {
System.out.println("onComplete");
}
}
);
[실행결과]
onNext : 1
onNext : 2
onComplete
인터페이스를 확인하기 위해 익명객체를 생성하였지만, 람다식을 활용하면 코드가 훨씬 더 간결해진다!
예제 코드 2 (Observable.just)
Observable<String> observable = Observable.just("Hello", "World");
observable.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
// 구독 준비가 완료되었을 때 실행되는 코드
}
@Override
public void onNext(String value) {
// Observable이 아이템을 방출할 때 실행되는 코드
System.out.println(value);
}
@Override
public void onError(Throwable e) {
// 오류가 발생했을 때 실행되는 코드
}
@Override
public void onComplete() {
// Observable이 완료되었을 때 실행되는 코드
}
});
+ (추가)
subcribe()시 Emitter, Observer 사용 차이
안드로이드에서 RxJava를 사용할 때 Observable.subscribe()를 호출할 때 Emitter와 Observer의 차이는 다음과 같다.
- Observer:
- Observer는 일반적으로 subscribe() 메서드를 호출할 때 전달되는 객체이다.
- Observer는 onNext(), onError(), onComplete()와 같은 세 가지 메서드를 구현한다.
- onNext()는 Observable이 아이템을 방출할 때 호출되며, onError()는 Observable에서 오류가 발생했을 때 호출되며, onComplete()는 Observable이 완료되었을 때 호출된다.
- Emitter:
- Emitter는 Observable을 생성하는 데 사용된다. Observable.create() 메서드를 사용하여 Observable을 만들 때 Emitter를 사용할 수 있다.
- Emitter를 통해 onNext(), onError(), onComplete() 메서드를 호출하여 Observable이 생성된다.
- Emitter를 사용하면 Observable의 동작을 더욱 세밀하게 제어할 수 있다. 예를 들어, backpressure(배압)을 처리할 수 있고, Observable의 상태를 변경할 수 있다.
따라서, 일반적으로는 Observer를 사용하여 Observable을 구독하고, Observable을 생성할 때는 Emitter를 사용한다. 하지만 일부 고급 사용 사례에서는 Observer와 Emitter를 혼합하여 사용할 수도 있다.
출처
RxJava로 Android 개발하기#2 : 네이버 블로그 (naver.com)
[Android] RxJava Observable 옵저버블 (yena.io)
[RxJava] RxJava 이해하기 - 2. Observable - HERSTORY (4z7l.github.io)https://velog.io/@haero_kim/RxJava-Observable-%EC%83%9D%EC%84%B1%ED%95%98%EA%B8%B0-%EA%B3%A0%EA%B8%89%ED%8E%B8
https://github.com/taeiim/Android-Study/blob/master/study/week12/RxJava/RxJava.md (도움 많이됨)
https://velog.io/@haero_kim/series/Reactive-Programming (도움 많이됨)
'Android > RxJava' 카테고리의 다른 글
RxJava (6) - Observable 변형하기 (0) | 2023.06.18 |
---|---|
RxJava (5) - Cold Observable vs Hot Observable (0) | 2023.06.18 |
RxJava (4) - Single, Maybe, Completable (0) | 2023.06.18 |
RxJava (3) - Disposable 다뤄보기 (0) | 2023.06.18 |
RxJava (1) - Reactive Programming 이란 (0) | 2023.06.04 |