본문 바로가기
Android/RxJava

RxJava (1) - Reactive Programming 이란

by 태크민 2023. 6. 4.

Reactive Programing


RxJava를 시작하기 앞서, Reactive Programming이 무엇인지에 대해 알아보려 한다.

 

Reactive Programming이란 데이터 흐름과 전달에 관한 프로그래밍 패러다임이다.

 

우리는 주로 알고리즘 문제와 같이 절차를 명시하여 순서대로 실행되는 Imperative Programming(명령형 프로그래밍)을 한다. 반면, Reactive Programming이란 데이터의 흐름을 먼저 정의하고 데이터가 변경되었을 때 연관된 작업이 실행된다. 즉 프로그래머가 어떠한 기능을 직접 정해서 실행하는 것이 아닌, 시스템 이벤트가 발생했을 때 알아서 처리되는 것이다.

 

기존의  프로그래밍 방식을 Pull 방식, Reactive 프로그래밍 방식을 Push 방식이라고도 한다.

Pull 방식은 데이터를 사용하는 곳(Consumer)에서 데이터를 직접 가져와서 사용한다면, Push 방식은 데이터의 변화가 발생한 곳에서 새로운 데이터를 Consumer에게 전달한다.

따라서 Reactive 프로그래밍은 주변 환경과 끊임없이 상호작용을 한다. 다만 프로그램이 주도하는 것이 아니라 환경이 변하면 이벤트를 받아 동작함으로써 상호작용한다.

 

RxJava


RxJava는 ReactiveX의 Java 언어 라이브러리로, 2013년 2월 넷플릭스에서 처음 소개하였다. 2016년 10월 RxJava2를 발표하였으며 가장 최근인 2020년 2월 RxJava3를 배포한다.

 

  • ReactiveX는 관찰가능한 절차를 통해 비동기, 이벤트 기반 프로그램을 구성하기 위한 라이브러리이다.
  • Observer Pattern을 확장하며, Sequence를 조합할 수 있는 연산자를 지원한다.
  • low-level Thread, 동기화, Thread 안전성, non-blocking I/O에 관한 우려를 줄인다.

RxJava는 러닝 커브가 가파르다. 전통적인 스레드 기반의 프로그래밍을 사용하는 자바와 접근 방식이 다르기 때문이다. 여러 스레드를 사용하는 경우 예상치 못한 문제가 발생할 수도 있고 디버깅하기도 어렵다

 

따라서 RxJava는 함수형 프로그래밍 방식을 도입하였다. 함수형 프로그래밍은 Side Effect가 없는 순수 함수를 지향하기 때문에 Thread-Safe하다.

 

ReactiveX의 이러한 방식을 구성하게 해주는 핵심 요소가 바로 Observable과 Operator(연산자)인데, 다음 포스트에서 자시히 다룰 예정이다.

 

RxJava를 쓰는 이유

1. 자바가 동시성 처리를 하는데 번거로움이 있다. 다수의 비동기실행흐름을 생성하고 결과를 취합하여 최종리턴하는게 어렵다.

   -> completableFuture 도 가능하나 당시에는 제공되지 않았다. 

2. 비동기 흐름을 조합할 수 있는 방법을 제공한다. 

3. 콜백 지옥 상황을 개선할수있다.  -> 비동기 연산을 필터링, 변환, 조합해 해결한다. (map, filter, reduce)

 

Marble Diagram


Marble Diagram은 RxJava를 이해하는 핵심 도구이다. 데이터의 흐름과 연산자를 이해하는 데 큰 도움이 된다.

  • 가로 실선: Timeline
  • 모형(★, ●, ■, ◆, …): Observable에서 발행하는 데이터
  • 세로 실선(파이프): 데이터 발행의 완료를 의미함, 발행 완료 이후에는 더 이상 데이터 발행이 불가능하다.
  • 가운데 박스: 연산자, 이 박스를 거치면 데이터가 변한다.
  • X: 오류가 발행했음을 의미한다.
 예제를 해석하면, ★, ▼, .., ◆ 데이터가 차례로 들어와 ◆ 이후 데이터 발행이 완료되었다.
각 데이터는 flip 연산자를 거쳐 새롭게 변환되었으며(▼ -> ▲),
초록색 오각형 이후 어떠한 오류로 인해 데이터 발행이 비정상적으로 종료되었다.

 

 

Reactive Streams 


라이브러리나 프레임워크에 상관 없이 데이터 스트림을 비동기로 다룰 수 있는 공통 메커니즘으로, 이 메커니즘을 편리하게 사용할 수 있는 인터페이스를 제공

즉, Reactive Streams는 인터페이스만 제공하고 구현은 각 라이브러리와 프레임워크에서 한다.

 

Reactive Streams는 4개의 구성 요소(인터페이스)로 이뤄져 있다.

  • Publisher: 데이터를 생성하고 전달하는 생성자 인터페이스
  • Suscriber: 통지된 데이터를 전달 받아 처리하는 소비자 인터페이스
  • Subscription: 생성자에게 통지받을 데이터 개수를 요청하고, 구독을 해지하는 인터페이스
  • Processor: Publisher와 Subscriber를 둘 다 가지고 있는 인터페이스
 
public interface Publisher<T> {
    public void subscribe(Subscriber<? super T> s);
}

public interface Subscriber<T> { 
    public void onSubscribe(Subscription s);
    public void onNext(T t);
    public void onError(Throwable t);
    public void onComplete();
}

public interface Subscription {
    public void request(long n);
    public void cancel();
}

public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {}

기본적인 흐름은

  1. Publisher의 subscribe 메소드를 이용해서 Subscriber의 구독 처리를 한다.
  2. Subscriber는 onSubscribe가 호출되고, Subscription을 통해 데이터를 요청하거나, 구독을 해지한다.
  3. 데이터를 요청하면 Subscriber에 onNext 메소드가 호출되고, Subscription을 통해 계속 데이터를 요청하거나, 구독을 해지 한다.
  4. 완료된 경우 Subscriber의 onComplete가 호출되고 에러인 경우 onError가 호출된다.
 
 

 

기본 구조

RxJava는 데이터를 만들고 통지하는 생산자와 통지된 데이터를 받아 처리하는 소비자로 구성됩니다.

RxJava에서 생산자와 소비자의 관계는 크게 두 가지로 나뉜다.

1.  Reactive Streams를 지원하는 Flowable(생산자)과 Subscriber(소비자)

2. Reactive Stream을 지원하지 않는 Obervable(생선자)와 Observer(소비자)

 

Flowable은 Reactive Streams의 생산자인 Publisher를 구현한 클래스고, Subscriber는 Reactive Streams의 클래스입니다. 그래서 기본적인 매커니즘은 Reactive Streams와 같습니다. 생산자인 Flowable로 구독 시작(onSubscribe), 데이터 통지(onNext), 에러 통지(onError), 완료 통지(onComplete)를 하고 각 통지를 받은 시점에 소비자인 Subscriber로 처리합니다. 그리고 Subscription으로 데이터 개수 요청과 구독 해지를 합니다.

 

반면, ObservableObserver는 Reactive Streams를 구현하지 않아서 Reactive Streams 인터페이스를 사용하지 않습니다. 하지만 기본적인 메커니즘은 Flowable과 Subscriber 구성과 거의 같습니다. 생산자인 Observable에서 구독 시작(onSubscribe), 데이터 통지(onNext), 에러 통지(onError), 완료 통지(onComplete)를 하면 Observer에서 이 통지를 받습니다.

 

다만, Observable과 Observer 구성은 통지하는 데이터 개수를 제어하는 배압 기능이 없기 때문에 데이터 개수를 요청하지 않습니다. 그래서 Subscription을 사용하지 않고, Disposable이라는 구독 해지 메서드가 있는 인터페이스를 사용합니다. 이 Disposable은 구독을 시작하는 시점에 onSubscribe 메서드의 인자로 Observer에 전달됩니다.

그래서 Observable과 Observer 간에 데이터를 교환할 때 Flowable과 Subscriber처럼 데이터 개수 요청은 하지 않고 데이터가 생성되자마자 Observer에 통지됩니다.

 

 

 

출처

[RxJava] RxJava 이해하기 - 1. Reactive Programming 이란 - HERSTORY (4z7l.github.io)

https://github.com/taeiim/Android-Study/blob/master/study/week12/RxJava/RxJava.md

https://www.yeh35.com/80b4acec-0117-4b3e-b583-e938b2e488cf

https://seosh817.tistory.com/2#google_vignette