Asynchronous Flow
우리는 어떤 연산을 수행한 후 한 개의 값을 반환하는 중단 함수를 정의하고 이를 비동기로 수행 할 수 있습니다.
하지만 어떤 연산 후 두 개 이상의 값을 반환하는 중단함수는 어떻게 만들 수 있을까요?
코틀린 플로우(Kotlin Flow)를 이용하면 이를 수행할 수 있습니다.
코루틴 플로우를 사용하여 연속적인 데이터 스트림을 구현하기 위해서 필요한 것은 아래의 3가지가 있습니다.
Producer(생산자)
Intermediary(중간 연산자) - 선택사항
Consumer(소비자)
Android에서 클린아키텍쳐 계층의 Datasource는 일반적으로 UI 데이터 생산자입니다.
이때 사용자 인터페이스(UI)는 최종적으로 데이터를 표시하는 소비자입니다.
그렇지만 우리는 UI와 양방향적으로 상호작용하기 때문에 UI 레이어가 사용자 입력 이벤트의 생산자이고 계층 구조의 다른 레이어가 이 이벤트를 사용하기도 합니다.
다시 본론으로 넘어가서,
Flow는 비동기로 동작하면서 여러개의 값을 반환하는 Function은 만들 때 사용하는 Builder입니다
이것이 어떻게 가능한지 아래에서 다른 개념과 비교하 살펴보도록 하겠습니다.
다수의 값 나타내기 (Representing multiple values)
다수의 값은 코틀린에서 컬렉션을 통해 나타낼 수 있습니다.
예를 들어, 우리는 세개의 수를 요소로 갖는 리스트를 반환하는 foo() 라는 함수를 만들고 forEach 함수를 이용하여 이 리스트의 모든 수를 출력할 수 있습니다.
fun foo(): List<Int> = listOf(1, 2, 3)
fun main() {
foo().forEach { value -> println(value) }
}
출력 결과
1
2
3
시퀀스 (Sequences)
우리는 각각의 수에 CPU 연산이 요구되는 어떤 일련의 수들을 나타내고자 할 때 시퀀스를 이용할 수 있습니다 (아래 예에서는 각각의 연산에 100ms 의 시간이 소요된다고 가정).
fun foo(): Sequence<Int> = sequence { // sequence builder
for (i in 1..3) {
Thread.sleep(100) // pretend we are computing it
yield(i) // yield next value
}
}
fun main() {
foo().forEach { value -> println(value) }
}
이 코드의 수행 결과는 이전과 동일합니다.
중단 함수 (Suspending functions)
하지만 위 예제와 같은 코드 블록은 블록을 실행하고 있는 메인 스레드를 정지 시킵니다.
이러한 연산들이 비동기 코드에서 실행될 때 우리는 함수 foo() 에 suspend 키워드를 붙여 함수를 중단함수로 정의할 수 있습니다.
그리고 이 함수를 코루틴 스코프에서 호출 하여 호출 스레드의 정지 없이 실행할 수 있고 그 결과를 리스트로 반환하도록 만들 수 있습니다.
suspend fun foo(): List<Int> {
delay(1000) // pretend we are doing something asynchronous here
return listOf(1, 2, 3)
}
fun main() = runBlocking<Unit> {
foo().forEach { value -> println(value) }
}
플로우(Flows)
List<Int> 를 함수의 반환 타입으로 사용한다는 것은 결국 우리가 모든 연산을 수행 한 후 한번에 모든 값을 반환해야 함을 의미합니다.
비동기로 처리 될 값 들의 스트림을 나타내기 위해서 우리는 앞서 살펴본 동기적으로 처리되는 Senquence<Int> 타입에서 했던 것과 같이 Flow<Int> 타입을 사용할 수 있습니다.
fun foo(): Flow<Int> = flow { // flow builder
for (i in 1..3) {
delay(100) // pretend we are doing something useful here
emit(i) // emit next value
}
}
fun main() = runBlocking<Unit> {
// Launch a concurrent coroutine to check if the main thread is blocked
launch {
for (k in 1..3) {
println("I'm not blocked $k")
delay(100)
}
}
// Collect the flow
foo().collect { value -> println(value) }
}
이 코드는 각각의 수를 출력하기 전에 메인 스레드를 정지하지 않고 100ms 를 기다렸다가 출력합니다.
이것은 메인스레드에서 수행되는 별도의 코루틴에서 매 100ms 마다 “I’m not blocked” 를 출력함으로써 확인 가능합니다.
출력 결과
I'm not blocked 1
1
I'm not blocked 2
2
I'm not blocked 3
3
이전의 예제들과 비교하면 Flow 를 사용한 위 코드는 다음과 같은 차이점들을 갖습니다.
- Flow 타입의 생성은 flow {} 빌더를 이용함
- flow { ... } 블록 안의 코드는 언제든 중단 가능
- foo() 함수는 더이상 suspend 로 마킹 되지 않아도 됨
- 결과 값들은 flow 에서 emit() 함수를 이용하여 방출 됨
- flow 에서 방출된 값들은 collect 함수를 이용하여 수집 됨
foo 함수의 flow { ... } 코드 블록에서 delay() 함수를 Thread.sleep 으로 변경하면 메인 스레드가 정지되는 것을 확인할 수 있습니다.
Flows are cold
플로우는 시퀀스와 유사하게 콜드 스트림(Cold Stream) 입니다.
다시 말해 flow {} 빌더 내부의 코드 블록은 플로우가 수집(collect) 되기 전까지는 실행되지 않습니다.
이것은 다음 예제를 보면 명확히 알 수 있습니다.
fun foo(): Flow<Int> = flow {
println("Flow started")
for (i in 1..3) {
delay(100)
emit(i)
}
}
fun main() = runBlocking<Unit> {
println("Calling foo...")
val flow = foo()
println("Calling collect...")
flow.collect { value -> println(value) }
println("Calling collect again...")
flow.collect { value -> println(value) }
}
출력 결과
Calling foo...
Calling collect...
Flow started
1
2
3
Calling collect again...
Flow started
1
2
3
이것이 플로우를 반환하는foo() 함수가 suspend 로 표시되지 않는 핵심적인 이유입니다.
foo() 함수는 호출 시 바로 반환되며 그 무엇도 기다리지 않습니다.
또한, 플로우는 매번 수집(collect) 될 때마다 시작됩니다.
이러한 이유로 우리가 동일한 플로우에 대해서 매번 collect 를 다시 호출할 때 마다 출력결과에서 “Flow started” 를 보게 되는 것입니다.
플로우의 취소 (Flow Cancellation)
플로우는 코루틴의 일반적인 취소 매커니즘 준수하긴 하지만 플로우 인프라스트럭쳐 자체적으로 취소 지점을 제공하는 기능은 없습니다. 하지만 이것이 취소에 있어서 보다 명확함을 제공합니다.
일반적인 코루틴의 경우와 동일하게 플로우 컬렉션도 취소 가능한 중단함수(ex> delay())에서 중단 되었을 때 취소 가능하며, 다른 경우는 그렇지 못합니다.
fun foo(): Flow<Int> = flow {
for (i in 1..5) {
delay(100)
println("Emitting $i")
emit(i)
}
}
fun main() = runBlocking<Unit> {
foo().collect { value ->
if(value == 2) cancel()
println(value)
}
println("Done")
}
출력 결과
Emitting 1
collect: 1
Emitting 2
collect: 2
Exception in thread "main" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job=BlockingCoroutine{Cancelled}@7e0ea639
collect {}에서 value가 2가 수집될 때 취소(cancel)을 하였지만, 실제 취소는 delay() 중단함수에서 이루어 졌음을 확인할 수 있습니다.
다음 예제는 플로우가 withTimeoutOrNull 코드 블록에서 실행 중일 때 어떻게 취소되고 코드 실행을 멈추는지 보여줍니다.
fun foo(): Flow<Int> = flow {
for (i in 1..3) {
delay(100)
println("Emitting $i")
emit(i)
}
}
fun main() = runBlocking<Unit> {
withTimeoutOrNull(250) { // Timeout after 250ms
foo().collect { value -> println(value) }
}
println("Done")
}
출력 결과
Emitting 1
1
Emitting 2
2
Done
플로우 빌더 (Flow builders)
Flow에서 생산자는 데이터를 Flow에 입력하고 소비자는 Flow에서 데이터를 수집하는 역할을 가지고 있습니다.
Android에선 데이터 소스나 레포지토리가 전형적인 생산자이고, 최종적으로 화면에 데이터를 표시하는 UI가 소비자 역할을 합니다.
![](https://blog.kakaocdn.net/dn/y3qcV/btsMk0Fmg0F/oigjyuPI8vWwwsje2Ykab1/img.png)
Creating Flows
그렇다면 우선 Flow를 생성하는 방법을 알아보겠습니다. 대부분의 경우 Flow를 직접 만들 필요는 없으며, 데이터 소스 라이브러리는 보통 Flow와 통합되어 있습니다.
DataStore, Retrofit, Room, WorkManager 등이 존재합니다. 이들은 댐 역할을 수행하며 Flow를 사용하여 데이터를 제공하고, 개발자는 구현 방법을 몰라도 파이프에 연결만 하면 데이터를 사용할 수 있습니다.
Room을 예시로 들어보겠습니다.
![](https://blog.kakaocdn.net/dn/2iKzU/btsMi5aCIoy/IJPixpOcHu8Z5k598wfmek/img.png)
위 코드에서 List<Codelab> 타입의 Flow를 노출하여 데이터베이스 변경 사항을 알리고 있습니다. Room 라이브러리가 생산자 역할을 맡아 업데이트가 있을 때마다 쿼리 내용을 전달합니다
.
만약 flow를 직접 만들어야 한다면, 우리는 여러 가지 방법을 사용할 수 있습니다.
이전 예제들에서 살펴본 flow { ... } 빌더는 가장 기본적인 것입니다.
코루틴 프레임워크에는 플로우 정의를 돕기위한 다양한 다른 빌더들이 존재합니다.
- flowOf {} 빌더는 고정된 값들을 방출하는 플로우를 정의
- 다양한 컬렉션들과 시퀀스들은 .asFlow() 확장 함수를 통해 플로우로 변환 가능
flow()
flow {
for (i in 1..3) {
delay(1000)
emit(i)
}
}
flowOf(Elements)
flowOf(1, 2, 3, 4, 5)
.collect {
delay(1000)
print(it)
} // 12345
asFlow()
// Convert an integer range to a flow
(1..3).asFlow()
.collect {
delay(1000)
value -> println(value)
}
flow 빌더 내부에서는 supend 함수를 호출 할 수 있습니다. 이는 flow가 코루틴 컨텍스트에서 실행되기 때문에 가능합니다.
플로우 중간 연산자 (Intermediate flow operators)
플로우는 여러분이 컬렉션이나 시퀀스에서 경험한 것과 같이 연산자로 변환될 수 있습니다.
중간 연산자는 업스트림 플로우에 적용되어 다운스트림 플로우를 반환합니다.
이 연산자들은 플로우 자체가 그렇듯 콜드(cold) 타입으로 동작합니다.
하지만, 이러한 연산자들의 호출은 그 자체로는 중단 함수가 아닙니다.
그러므로 새롭게 변형된 플로우를 즉시 반환합니다.
기본 연산자들은 map 이나 filter 와 같이 친숙한 이름들을 가지고 있습니다. 시퀀스와의 중요한 차이점은 이 연산자들로 수행되는 코드 블록에서 중단 함수들을 호출 할 수 있다는 점 입니다.
예를 들어, 요청된 플로우에 대해서 map 연산자를 이용하여 원하는 결과값으로 매핑할 수 있으며, 요청 작업이 긴 시간을 소모하는 중단 함수인 경우에도 성공적으로 동작합니다.
suspend fun performRequest(request: Int): String {
delay(1000) // imitate long-running asynchronous work
return "response $request"
}
fun main() = runBlocking<Unit> {
(1..3).asFlow() // a flow of requests
.map { request -> performRequest(request) }
.collect { response -> println(response) }
}
위 예제코드는 다음과 같은 결과를 출력하며, 각각의 라인은 매 초마다 출력됩니다.
response 1
response 2
response 3
업스트림(Upstream) : 현재 연산자를 기준으로 위에서 흘러들어오는 Stream
다운스트(Downstream) : 현재 연산자를 기준으로 아래로 흘러내려가는 Stream
변환 연산자 (Transform operator)
플로우 변환 연산자들 중에서 가장 일반적인 것은 transform 연산자 입니다.
이 연산자는 map 이나 filter 같은 단순한 변환이나 혹은 복잡한 다른 변환들을 구현하기 위해 사용됩니다.
transform 연산자를 사용하여 우리는 임의의 횟수로 임의의 값들을 방출할 수 있습니다.
예를 들어, transform 연산자를 사용하여 오래 걸리는 비동기 요청을 수행하기 전에 기본 문자열을 먼저 방출하고 요청에 대한 응답이 도착하면 그 결과를 방출할 수 있습니다.
아래에서는 하나의 아이템이 흘러나오면 transform에서 2개의 아이템을 만들어서 흘러보냅니다.
(1..3).asFlow() // a flow of requests
.transform { request ->
emit("Making request $request")
emit(performRequest(request))
}
.collect { response -> println(response) }
출력 결과
Making request 1
response 1
Making request 2
response 2
Making request 3
response 3
크기 제한 연산자 (Size-limiting operators)
take같은 크기 제한 중간 연산자는 정의된 제한치에 도달하면 실행을 취소합니다.
코루틴에서 취소는 언제나 예외를 발생시키는 방식으로 수행 되며, 이를 통해 try { ... } finally { ... } 같은 자원 관리형 함수들이 정상적으로 동작할 수 있게 합니다.
fun numbers(): Flow<Int> = flow {
try {
emit(1)
emit(2)
println("This line will not execute")
emit(3)
} finally {
println("Finally in numbers")
}
}
fun main() = runBlocking<Unit> {
numbers()
.take(2) // take only the first two
.collect { value -> println(value) }
}
이 코드의 출력 결과는 numbers함수의 flow { ... } 코드 블록의 실행이 두번째 수를 방출하고 멈추었음을 명확히 보여줍니다.
출력 결과
1
2
Finally in numbers
플로우 종단 연산자 (Terminal flow operators)
플로우의 종단 연산자는 플로우 수집을 시작하는 중단 함수입니다.
collect 연산자가 가장 대표적이며,
다음과 같이 수집을 용이하게 해주기 위한 다른 종단 연산자들도 존재합니다.
- toList 나 toSet 같은 다양한 컬렉션으로의 변환
- first()를 이용하여 첫번째 값만 방출하며 플로우는 단일 값 만 방출함을 보장
- 플로우를 reduce 나 fold 를 이용하여 값으로 변환
예를 들어,
val sum = (1..5).asFlow()
.map { it * it } // squares of numbers from 1 to 5
.reduce { a, b -> a + b } // sum them (terminal operator)
println(sum)
위의 예제의 경우 55를 출력합니다.
플로우는 순차적이다 (Flows are sequential)
어떤 플로우의 독립된 각각의 수집은 다중 플로우가 사용되는 특별한 연산자가 사용되지 않는 이상 순차적으로 수행됩니다.
수집(collect)은 종단 연산자를 호출한 코루틴에서 직접 수행되며 기본적으로 새로운 코루틴을 생성하지 않습니다.
각각의 방출된 값은 업스트림의 모든 중간 연산자들에 의해 처리되어 다운스트림으로 전달되며, 마지막으로 종단 연산자로 전달됩니다.
짝수를 찾아(filter) 문자열로 변환하는 다음 예를 살펴봅시다.
(1..5).asFlow()
.filter {
println("Filter $it")
it % 2 == 0
}
.map {
println("Map $it")
"string $it"
}.collect {
println("Collect $it")
}
출력 결과
Filter 1
Filter 2
Map 2
Collect string 2
Filter 3
Filter 4
Map 4
Collect string 4
Filter 5
플로우 컨텍스트 (Flow context)
플로우의 수집(collect)은 항상 호출한 코루틴의 컨텍스트 안에서 수행됩니다.
예를 들어 foo 라는 플로우가 있을 때 다음과 같은 코드는 foo플로우의 구현 내용과는 별개로 이 코드의 작성자가 명시한 컨텍스트 상에서 수행됩니다.
withContext(context) {
foo.collect { value ->
println(value) // run in the specified context
}
}
이러한 플로우의 특성은 컨텍스트 보존(context preservation) 이라 불립니다.
그러므로 기본적으로 flow { ... } 빌더에 제공된 코드 블록은 플로우 수집 연산을 실행한 코루틴의 컨텍스트에서 수행됩니다. 예를들어 호출 스레드를 출력하며 3개의 수를 방출하는 foo() 라는 함수를 생각해 봅시다.
fun foo(): Flow<Int> = flow {
log("Started foo flow")
for (i in 1..3) {
emit(i)
}
}
fun main() = runBlocking<Unit> {
foo().collect { value -> log("Collected $value") }
}
출력 결과
[main @coroutine#1] Started foo flow
[main @coroutine#1] Collected 1
[main @coroutine#1] Collected 2
[main @coroutine#1] Collected 3
foo().collect() 가 메인 스레드에서 호출 되었기 때문에 foo 플로우의 코드 블록 또한 메인 스레드에서 호출 되었습니다.
이것이 빠른 실행을 보장하고, 호출자를 블록하지 않고 실행 컨텍스트에 관계 없이 비동기 작업을 수행하는 최선의 방법입니다.
withContext 를 통한 잘못 된 방출 (Wrong emission withContext)
하지만 오랫동안 수행되는 CPU 소모적인 작업들은 Dispatchers.Default 와 같이 별도의 스레드에서 수행될 필요가 있고, UI를 업데이트 하는 코드는 Dispatchers.Main 과 같은 UI를 위한 전용 스레드에서 수행될 필요가 있습니다.
보통 withContext 는 코틀린 코루틴을 사용하는 코드에서 컨텍스트를 전환하기 위해서 사용됩니다.
하지만 flow { ... } 빌더 내부의 코드는 컨텍스트 보존 특성을 지켜야하기 때문에, 다른 컨텍스트에서 값을 방출하는 것이 허용되지 않습니다.
다음 코드를 수행하면,
fun foo(): Flow<Int> = flow {
// The WRONG way to change context for CPU-consuming code in flow builder
kotlinx.coroutines.withContext(Dispatchers.Default) {
for (i in 1..3) {
Thread.sleep(100) // pretend we are computing it in CPU-consuming way
emit(i) // emit next value
}
}
}
fun main() = runBlocking<Unit> {
foo().collect { value -> println(value) }
}
다음과 같은 예외가 발생합니다.
Exception in thread "main" java.lang.IllegalStateException: Flow invariant is violated: flow was collected in [CoroutineId(1), "coroutine#1":BlockingCoroutine{Active}@428e044, BlockingEventLoop@79f430ab], but emission happened in [CoroutineId(1), "coroutine#1":DispatchedCoroutine{Active}@4a536f6b, DefaultDispatcher]. Please refer to 'flow' documentation or use 'flowOn' instead...
flowOn 연산자 (flowOn operator)
위 예외의 스택 트레이스를 보면 플로우 실행 컨텍스트를 변경할 수 있는 flowOn 연산자에 대해서 언급하고 있습니다. 다음의 예는 플로우에서 컨텍스트를 변경하기 위한 올바른 방법을 보여줍니다.
그리고 flowOn 연산자를 이용하는 다음 예제는 현재 스레드 이름을 출력해서 정상적으로 컨텍스트 전환이 동작하는지 보여 줍니다.
fun foo(): Flow<Int> = flow {
for (i in 1..3) {
Thread.sleep(100) // pretend we are computing it in CPU-consuming way
log("Emitting $i")
emit(i) // emit next value
}
}.flowOn(Dispatchers.Default) // RIGHT way to change context for CPU-consuming code in flow builder
fun main() = runBlocking<Unit> {
foo().collect { value ->
log("Collected $value")
}
}
출력 결과
[DefaultDispatcher-worker-1 @coroutine#2] Emitting 1
[main @coroutine#1] Collected 1
[DefaultDispatcher-worker-1 @coroutine#2] Emitting 2
[main @coroutine#1] Collected 2
[DefaultDispatcher-worker-1 @coroutine#2] Emitting 3
[main @coroutine#1] Collected 3
메인 스레드에서 수집 연산을 수행할 때 flow { ... } 가 백그라운드 스레드에서 동작하는 모습을 확인해 봅시다.
여기서 한가지 더 확인해 보아야 할 것은 flowOn 연산자가 플로우의 기본적인 특성인 순차성을 일부 포기했다는 점 입니다.
위 예제에서 값 들의 수집이 특정 코루틴(“coroutine#1”) 에서 발생하고, 방출은 다른 스레드에서 실행되는 특정 코루틴(“coroutine#2”)에서 발생합니다.
또한, 아래와 같이 flowOn 연산자를 통해 각 연산에 대하여 디스패처를 지정해줄 수도 있습니다.
fun main() = runBlocking<Unit> {
foo()
.flowOn(Dispatchers.Default)
.onEach {
log("onEach: $it")
}
.flowOn(Dispatchers.IO)
.collect { value ->
log("collect: $value")
}
println("Collected in $time ms")
}
위 코드를 실행하면 각 연산에 대하여 아래와 같이 서로 다른 Coroutine Dispatcher에서 실행이됩니다.
foo() -> Dispatchers.Default
onEach {} -> Dispatchers.IO
collect {} -> runBlocking
따라서, flowOn 연산자는 컨텍스트 내에서 CoroutineDispatcher 를 변경해야 할 경우 업스트림 플로우를 위한 다른 코루틴을 생성합니다.
버퍼링 (Buffering)
플로우의 로직을 다른 코루틴에서 수행하는 것은 해당 플로우를 수집하는데 걸리는 전체 시간의 관점으로 보면 도움이 될 수 있습니다. 특히, 오래걸리는 비동기 연산이 관련되어 있다면 더욱 그렇습니다.
예를 들어, foo() 에 의한 방출된 플로우가 느릴 경우를 고려해 봅시다. 이것은 값을 방출하기까지 100ms 가 걸립니다. 그리고 수집하는 쪽도 느립니다. 방출 된 값을 처리하는데 300 ms 가 걸립니다.
이 경우 세개의 수를 처리하는데 얼마나 걸리는지 확인해 봅시다.
fun foo(): Flow<Int> = flow {
for (i in 1..3) {
delay(100) // pretend we are asynchronously waiting 100 ms
println(">>emitting $i")
emit(i) // emit next value
}
}
fun main() = runBlocking<Unit> {
val time = measureTimeMillis {
foo().collect { value ->
delay(300) // pretend we are processing it for 300 ms
println("<<collect $value")
log(value)
}
}
println("Collected in $time ms")
}
위 예제는 다음과 같은 출력 결과를 보이며, 전체 수집 시간은 1200ms 가 걸렸습니다 (세 개 숫자 각각 400ms).
>>emitting 1
<<collect 1
>>emitting 2
<<collect 2
>>emitting 3
<<collect 3
Collected in 1220 ms
왜 이렇게 될까요? 바로 하나의 Coroutine 이 Produce 와 Consume 을 동시에 처리하기 때문입니다.
그래서, 우리는 플로우에 buffer 연산자를 사용함으로써 foo() 의 방출 코드가 수집 코드와 동시에 수행되도록 만들 수 있습니다.
val time = measureTimeMillis {
foo()
.buffer() // buffer emissions, don't wait
.collect { value ->
delay(300) // pretend we are processing it for 300 ms
println("<<collect $value")
println(value)
}
}
println("Collected in $time ms")
위 예제에서 Channer 기반의 buffer를 사용하면 동일한 수들을 더 빠르게 처리합니다.
이것은 첫번째 수를 위해서만 100ms 를 기다리고 각각의 수 처리를 위해서 300ms 씩만 기다리도록 프로세싱 파이프라인을 효율화 함으로써 가능했습니다.
>>emitting 1
>>emitting 2
>>emitting 3
<<collect 1
<<collect 2
<<collect 3
Collected in 1071 ms
즉 위는 다음과 같이 동작하는 것입니다.
로직의 흐름
1초 후, 아이템 방출 ->
방출받은 아이템 수집 ->
collect에서 추가 처리(3초) ->
기다리는 동안, 1초 후 아이템 방출 ->
기다리는 동안, 1초 후 아이템 방출 ->
collect에서 추가 처리(3초) ->
collect에서 추가 처리(3초)
이것이 어떻게 가능한 것일까요?
원리는 간단합니다.
Channel buffer 라는 중간 데이터 스트림 관리자를 통하여 producer 는 produce 하는 자원으로 집중할 수 있고,
consumer 는 consume 하는 작업으로 집중할 수 있기 때문입니다.
물론 buffer의 함수 파라미터는 2가지인데요. 요구 사항에 맞는 방향으로 설정할 수 있습니다.
Parameter | 설명 |
capactity | buffer 의 size 를 몇으로 지정할 것인지 설정 단, default 는 64. |
onBufferOverFlow | Buffer 의 size 가 초과했을 때 데이터에 대한 보관을 어떻게 처리할 것인지 결정 단, default 는 SUSPEND SUSPEND - 일시중지 DROP_OLDEST - 오래된 데이터 drop DROP_LATEST - 최신 데이터 drop |
위 설정만 알고 있다면, Buffer 전략을 다양하게 처리할 수 있습니다.
정리하자면, buffer는 Flow의 방출시간 < collect의 내부 연산시간 일 때 사용하면, 전체 처리 시간을 단축할 수 있습니다.
이미 방출된 데이터를 버퍼에 저장해두고, 데이터 수집작업(collect)를 시작할 때, 버퍼에 저장된 데이터를 바로 수집할 수 있는것이죠.
즉, 데이터를 생산하는 쪽과 소비하는 쪽의 균형이 맞지않으면 버퍼를 사용해야 합니다.
📖 flowOn 연산자가 CoroutineDispatcher 를 변경할 경우 동일한 버퍼링 매커니즘을 사용함을 알아둡시다.
여기서 우린 buffer 연산자를 사용함으로써 실행 컨텍스트의 전환 없이 버퍼링을 수행했습니다.
flowOn은 dispatcher자체를 변경하는 거고, buffer() 함수의 경우에는 context를 변경하지 않는다는 차이점이 있습니다.
플로우 예외 (Flow exceptions)
플로우 수집은 방출 로직이나 연산자 안의 코드가 예외를 발생시키면 예외 발생 상태로 종료될 수 있습니다.
이러한 예외들을 다루기 위한 다양한 방법이 있습니다.
수집기의 try & catch
수집기(collector)는 예외를 다루기 위해 코틀린의 try/catch 블록을 사용할 수 있습니다.
fun foo(): Flow<Int> = flow {
for (i in 1..3) {
println("Emitting $i")
emit(i) // emit next value
}
}
fun main() = runBlocking<Unit> {
try {
foo().collect { value ->
println(value)
check(value <= 1) { "Collected $value" }
}
} catch (e: Throwable) {
println("Caught $e")
}
}
위 코드는 다음의 출력결과에서 볼 수 있듯이 collect 종단 연산자에서 성공적으로 예외를 잡아내며 그 후에는 어떠한 값도 방출되지 않습니다.
Emitting 1
1
Emitting 2
2
Caught java.lang.IllegalStateException: Collected 2
모든 예외 처리(Everything is caught)
위 예제는 사실 방출 로직이나 중간 혹은 종단 연산자 등에서 발생하는 모든 예외를 잡아냅니다.
예를들어 방출된 수를 문자열로 변환 하도록 다음과 같이 코드를 변경하고, 해당 변환 코드에서 예외를 발생 시키도록 해 봅시다.
fun foo(): Flow<String> =
flow {
for (i in 1..3) {
println("Emitting $i")
emit(i) // emit next value
}
}
.map { value ->
check(value <= 1) { "Crashed on $value" }
"string $value"
}
fun main() = runBlocking<Unit> {
try {
foo().collect { value -> println(value) }
} catch (e: Throwable) {
println("Caught $e")
}
}
이 예외 또한 동일하게 처리되며 수집이 중단 됩니다.
Emitting 1
string 1
Emitting 2
Caught java.lang.IllegalStateException: Crashed on 2
예외 투명성 (Exception transparency)
그렇다면 방출 코드의 예외 처리 로직은 어떻게 캡슐화 할 수 있을까요?
플로우는 예외에 있어서 반드시 투명해야 합니다.
flow { ... } 빌더 코드 블록 안에서 try/catch 블록으로 예외를 처리한 후 값을 방출하는 것은 예외 투명성을 위반하는 것 입니다.
즉, emit 및 emitAll의 호출이 try { .. } catch { .. } 블록으로 래핑되지 않아야 한다는 것을 의미합니다.
예외 투명성의 요구 사항을 준수하지 않으면 collect { .. }의 예외로 인하여 코드에 대한 추론을 어렵게 만드는 이상한 동작이 발생할 수 있습니다. 왜냐하면 exception이 업스트림 flow에 의해 어떻게든 “caugth”되어 로컬 추론 능력을 제한할 수 있기 때문입니다.
우리는 예외 투명성을 지킴으로써 이전 예제에서 처럼 예외가 발생하는 수집기는 try/catch 를 이용하여 그 예외를 잡아 처리 할 수 있게됩니다.
방출 로직은 이러한 예외 투명성을 보존하기 위해서 catch 연산자를 사용할 수 있으며 이를 통해 그 예외 처리 로직의 캡슐화가 가능합니다. catch 연산자의 구현 블록은 예외를 분석하고 발생한 예외의 타입에 따라 각기 다른 대응이 가능합니다.
- throw 연산자를 통한 예외 다시 던지기
- catch 로직에서 emit 을 사용하여 값 타입으로 방출
- 다른 코드를 통한 예외 무시, 로깅, 기타 처리
예를들어 예외 발생 시 문자열을 방출하도록 해 봅시다.
catch()는 FlowCollector가 receiver로 들어오기 때문에 값을 emit()할 수 있습니다.
foo()
.catch { e -> emit("Caught $e") } // emit on exception
.collect { value -> println(value) }
이 예제의 출력 결과는 우리가 이전 예제처럼 코드 블록을 try/catch 로 감싸지 않았지만 동일합니다.
Emitting 1
string 1
Emitting 2
Caught java.lang.IllegalStateException: Crashed on 2
catch 예외 투명성 (Transparent catch)
예외 투명성을 지키는 catch 중간 연산자는 오직 업 스트림에서 발생하는 예외(catch 연산자 위의 모든 연산자)들에 대해서만 동작하며 다운 스트림에서 발생한 예외에 대해서는 처리하지 않습니다.
fun foo(): Flow<Int> = flow {
for (i in 1..3) {
println("Emitting $i")
emit(i)
}
}
fun main() = runBlocking<Unit> {
foo()
.catch { e -> println("Caught $e") } // does not catch downstream exceptions
.collect { value ->
check(value <= 1) { "Collected $value" }
println(value)
}
}
Emitting 1 1 Emitting 2 Exception in thread "main"
java.lang.IllegalStateException: Collected 2..
출력 결과를 보면 catch 연산자가 있음에도 “Caught … “ 메시지가 출력되지 않았음을 알 수 있습니다.
플로우 종료 (Flow completion)
플로우의 수집이 종료(정상 종료 혹은 예외 발생)되면 그 이후 동작을 수행해야 할 수 있습니다. 이미 눈치 채셨겠지만 이는 두 가지 방식으로 가능합니다.
: Imperative / Declarative
Imperative finally block
수집 시에 try/catch 에 추가적으로 수집 종료 시 실행할 코드를 finally 블록을 통해 정의할 수 있습니다.
fun foo(): Flow<Int> = (1..3).asFlow()
fun main() = runBlocking<Unit> {
try {
foo().collect { value -> println(value) }
} finally {
println("Done")
}
}
출력 결과
1
2
3
Done
Declarative handling
선언적인 접근으로는 플로우에 onCompletion 중간 연산자를 추가해서 플로우가 완전히 수집되었을 때 실행 될 로직을 정의할 수 있습니다.
이전 예제는 onCompletion 연산자를 이용하여 다음과 같이 다시 작성할 수 있고 이는 동일한 출력 결과를 보입니다.
foo()
.onCompletion { println("Done") }
.collect { value -> println(value) }
onCompletion 을 사용 함으로써 얻을 수 있는 최대 이점은 람다에 nullable 로 정의되는 Throwable 파라미터를 통해 플로우 수집이 성공적으로 종료되었는지 혹은 예외가 발생 되었는지 알 수 있다는 것입니다.
다음 예제는 foo() 플로우가 숫자 1을 방출한 후 예외를 던집니다.
fun foo(): Flow<Int> = flow {
emit(1)
throw RuntimeException()
}
fun main() = runBlocking<Unit> {
foo()
.onCompletion { cause -> if (cause != null) println("Flow completed exceptionally") }
.catch { cause -> println("Caught exception") }
.collect { value -> println(value) }
}
출력 결과는 예상한 것과 동일합니다.
1
Flow completed exceptionally
Caught exception
단, onCompletion 연산자는 catch 와는 달리 예외를 처리하지는 않습니다.
위 예제 코드에서 볼 수 있듯이 예외는 여전히 다운 스트림으로 전달 됩니다. 결국 예외는 onCompletion 연산자를 거쳐 catch 연산자로 처리됩니다.
업 스트림 예외에 국한됨 (Upstream exceptions only)
catch 연산자와 동일하게 onCompletion 연산자도 업 스트림에서 전달되는 예외만 식별하고 처리할 수 있으며 다운 스트림의 예외는 알지 못합니다. 다음 코드를 실행해 봅시다.
fun foo(): Flow<Int> = (1..3).asFlow()
fun main() = runBlocking<Unit> {
foo()
.onCompletion { cause -> println("Flow completed with $cause") }
.collect { value ->
check(value <= 1) { "Collected $value" }
println(value)
}
}
출력 결과
1 Flow completed with null Exception in thread "main"
java.lang.IllegalStateException: Collected 2..
우리는 이 결과를 통해 completion 의 cause 가 null 이지만 수집은 예외로 인해 실패 했음을 알 수 있습니다.
Imperative vs declarative
이제 우리는 플로우를 수집하는 방법을 알고 그 종료와 예외를 명령적(Imperative) 혹은 선언적(declarative) 방식으로 다룰 수 있습니다.
여기서 나올 수 있는 질문 중 하나는 “어떠한 방식이 더 선호되며 그 이유는 무엇일까?” 입니다.
라이브러리로써 우리는 특정 접근이 더 낫다고 말할 수 없으며 두 가지 방식 모두 유효하기 때문에 여러분이 선호하는 코딩 스타일에 따라 선택해도 좋다고 생각합니다.
플로우 실행 (Launching flow)
어떤 소스로부터 발생하는 비동기 이벤트는 플로우를 통해 쉽게 표현할 수 있습니다.
이러한 경우를 위해서 우리는 일반적으로 들어오는 이벤트들에 대응하는 처리 코드를 addEventListener 를 통해 등록하고 이후 필요한 일을 진행해 가는 방식을 사용하곤 합니다.
플로우 에서는 onEach 연산자가 이 역할을 담당 합니다. 하지만 onEach 는 중간 연산자입니다.
우리는 플로우 수집을 시작 시키기 위해서 종단 연산자가 필요합니다. 그렇지 않고 단지 onEach 를 호출하는 것으로는 아무런 효과가 없습니다.
만일 우리가 onEach 연산자 이후에 collect 종단 연산자를 사용하면 그 이후 코드는 플로우가 수집될 때까지 대기하게 될 것입니다.
// Imitate a flow of events
fun events(): Flow<Int> = (1..3).asFlow().onEach { delay(100) }
fun main() = runBlocking<Unit> {
events()
.onEach { event -> println("Event: $event") }
.collect() // <--- Collecting the flow waits
println("Done")
}
출력 결과
Event: 1
Event: 2
Event: 3
Done
launchIn 종단 연산자가 이부분에서 유용하게 사용될 수 있습니다.
collect 연산자를 launchIn 으로 변경함으로써 우리는 플로우의 수집을 다른 코루틴에서 수행할 수 있으며 이를 통해 이후에 작성된 코드들이 곧바로 실행되도록 할 수 있습니다.
fun main() = runBlocking<Unit> {
events()
.onEach { event -> println("Event: $event") }
.launchIn(this) // <--- Launching the flow in a separate coroutine
println("Done")
}
출력 결과
Done Event: 1 Event: 2 Event: 3
launchIn 연산자에 꼭 필요한 파라미터는 플로우를 수집할 코루틴의 CoroutineScope 입니다.
위 예제에서 이 스코프는 runBlocking 코루틴 빌더로 부터 전달 되었습니다.
그로인해 플로우가 실행되는 동안 runBlocking 스코프는 그 자식 코루틴의 종료를 기다리게 되고 결국 메인 함수가 반환되어 프로그램이 종료되는 것을 방지합니다.
실제 애플리케이션에서 스코프는 제한된 생명 주기를 갖는 엔터티로부터 전달 될 수 있습니다.
이 엔터티의 생명 주기가 종료되면 그에 속한 스코프는 취소되며 이 스코프에 속한 플로우 또한 수집을 취소하게 됩니다.
이러한 방식으로 onEach { ... }.launchIn(scope) 는 addEventListener 와 동일하게 수행 됩니다. 하지만 여기에는 차이점이 존재하는데 그것은 더이상 removeEventListener 가 필요하지 않다는 것 입니다. 취소와 구조화된 동시성이 이것을 대신 수행해 주기 때문입니다.
launchIn 도 역시 Job 을 반환한다는 것을 기억하세요.
우리는 이를 통해 전체 스코프를 취소하거나 특정 Job 에 대한 조인 구문을 사용 할 필요 없이 그에 속한 플로우 수집 코루틴을 취소할 수 있습니다.
플로우와 리액티브 스트림 (Flow and Reactive Streams)
Reactive Stream 이나 Reactor 나 RxJava 같은 Reactive Framework 에 친숙한 분들은 아마 플로우의 디자인이 아주 친숙해 보일 것 입니다.
사실 플로우의 디자인은 리액티브 스트림과 그 안의 다양한 구현에 영향을 받았습니다.
하지만 플로우의 주요 목표는 구조화된 동시성을 따르며, 코틀린과 중단함수를 이용하여 가능한 한 단순화 된 디자인을 갖는 것입니다.
이 목표를 달성하는 것은 리액티브 개척자과 그들의 방대한 성과 없이는 불가능 했을 것입니다.
여러분은 그에 대한 전체 이야기를 여기에서 읽어볼 수 있습니다.
개념적으로 다르긴 하지만 플로우는 리액티브 스트림이며 여러분은 플로우를 리액티브 퍼블리셔로 변환하거나 그 반대로 변환할 수 있습니다. 그러한 변환기는 kotlinx.coroutines 에 의해 제공되며 그에 대응하는 리액티브 모듈에서 찾아볼 수 있습니다.
- kotlin-coroutines-reactive : Reactive Streams
- kotlin-coroutines-reactor : Project Reactor
- kotlin-coroutines-rx2 : RxJava2
통합 모듈은 플로우와의 상호 변환과 리액터 컨텍스트와의 통합, 다양한 리액티브 엔터티들과 중단 함수 친화적인 방법들을 포함합니다.
참고자료
코루틴 공식 가이드 자세히 읽기 — Part 9-A
Asynchronous Flow
myungpyo.medium.com
[정리] — 코틀린 Flow 사용하기 (Android Dev Summit 2021)
Android Dev Summit 2021
medium.com
https://medium.com/hongbeomi-dev/kotlin-coroutine-flow-ac07cfdca42d
Kotlin coroutine flow
코틀린 코루틴 플로우 공식 블로그 [번역]
medium.com
https://huisam.tistory.com/entry/coroutine3#google_vignette
[Kotlin] Coroutine - 3. 코루틴의 Flow 활용
안녕하세요 ^^ 우리는 이전 게시글들을 통하여 Coroutine 이 어떤 구조로 이루어져 있고, 어떤 방식으로 동작하게 되는 것인지를 알게 되었어요. [Kotlin] Coroutine - 2. CoroutineScope & Context & Dispathcer 을
huisam.tistory.com
'Android > Flow' 카테고리의 다른 글
[Android] 플로우(Flow)의 Operator (0) | 2025.02.15 |
---|---|
Coroutine Flow(4) - 총정리 (0) | 2023.07.03 |
Coroutine Flow(3) LiveData vs StateFlow (0) | 2023.07.03 |
Coroutine Flow(2) - StateFlow, SharedFlow (0) | 2023.06.30 |
Coroutine Flow(1) - Flow 란 (0) | 2023.06.27 |