ChannelFlow
ChannelFlow는 여러 코루틴에서 동시에 데이터를 보낼 수 있도록 설계된 Flow입니다.
channelFlow는 내부적으로 channel을 생성하는 ProducerScope를 파라미터로 받아 동작하며,
이를 통해 SendChannel을 사용하여 데이터를 송신하는 coldFlow를 생성할 수 있습니다.
즉, send 및 trySend를 사용하여 channelFlow 내부의 channel을 통해 값을 송신할 수 있습니다.
ChannelFlow는 CannelFlowBuider를 통해 Flow를 생성하며, 내부 구현은 다음과 같습니다.
기본적으로 버퍼가 포함된 채널이 사용되며, 기본 버퍼 크기는 64개입니다.
또한, onBufferOverFlow의 기본 값은 BufferOverflow.SUPSEND로 설정되어 있습니다. 따라서, channel의 데이터가 64개가 넘어가면 send가 일시 중단(suspend)되어 블로킹됩니다.
만약 버퍼크기 및 백프레셔 동작을 조정하고 싶다면 buffer 연산자를 활용하여 설정을 변경할 수 있습니다.
ChannelFlow와 일반 Flow의 차이점
Flow는 콜드 스트림이므로 구독 시점 이후에 데이터를 방출합니다. 즉 생산과 소비가 순차적으로 이루어지며, 구독자가 있어야 데이터가 생산됩니다.
하지만, ChannelFlow는 콜드 스트림인 Flow와 핫스트림인 Channel의 하이브리드입니다. 구독 시점 이후 부터 동작하는 것은 동일하지만, 생산과 소비의 동기화는 이뤄지지 않습니다.
따라서 ChannelFlow를 사용하면, 생산은 생산대로, 소비는 소비대로 독립적으로 병렬로 이루어지므로 데이터를 더 빠르게 조회할 수 있습니다.
다음은 channelFlow를 사용하여 생산과 소비를 병렬로 처리하는 예시입니다.
fun foo(): Flow<Int> = channelFlow {
for (i in 1..3) {
delay(100) // pretend we are asynchronously waiting 100 ms
send(i) // emit next value
}
}
fun main() = runBlocking<Unit> {
val time = measureTimeMillis {
launch {
foo().collect { value ->
delay(300) // pretend we are processing it for 300 ms
println("#1: $value")
}
}
launch {
foo().collect { value ->
delay(1000) // pretend we are processing it for 300 ms
println("#2: $value")
}
}
}
}
#1: 1
#1: 2
#1: 3
#2: 1
#2: 2
#2: 3
기존적으로 Flow의 빌더 블록(flow {})의 수신 객체는 FlowCollector입니다. 따라서, flow {} 내부에서는 launch {}를 이용해 자식 코루틴을 생성할 수 없습니다.
또한, Flow는 동일한 순차적 데이터 흐름과 쓰레드 안정성을 보장하기 위해 동시에 emit하는 것을 금지하고 있습니다. 그래서 새로운 CoroutineScope를 생성할 수도 없습니다.
반면, ChannelFlow는 내부에서 ProducerScope를 제공하여 여러 컨텍스트에서 동시에 데이터를 생성하고 보낼 수 있도록 설계되었습니다. ProducerScope는 CoroutineScope를 상속하기 때문에, channelFlow 내부에 서는 launch {}를 사용하여 자식 코루틴을 병렬로 실행할 수 있습니다. (또한, 새로운 CoroutineScope를 생성해도 에러가 발생하지 않습니다.)
다음은 ChannelFlow를 활용하여 데이터를 병렬로 생성하는 예제입니다.
suspend fun main() {
val flow = channelFlow<String> {
launch {
for(i in 0 until 10) {
delay(100)
send("launch#1 $i")
}
}
launch {
for(i in 0 until 10) {
delay(100)
send("launch#2 $i")
}
}
}
flow.onEach {
println("$it")
}.collect()
println("finished")
}
[출력 결과]
launch#1 0
launch#2 0
launch#1 1
launch#2 1
launch#1 2
launch#2 2
launch#2 3
launch#1 3
launch#1 4
launch#2 4
..
..
finished
위 예제에서는 launch {}를 사용하여 두 개의 코루틴이 동시에 실행되므로, launch#1과 launch#2가 번갈아 가며 실행되는 것을 확인할 수 있습니다.
ChannelFlow에서 awaitClose 활용
앞선 예제에서는 데이터를 send하면 자동으로 Channel이 닫히지만,
일정한 주기마다 지속적으로 데이터를 보내야 하는 경우에는 Channel이 종료되지 않도록 유지해야 합니다.
이런 상황은 안드로이드의 Button.setOnClickListener와 같은 콜백 이벤트 처리에서 자주 발생합니다.
이 때는 awaitClose()를 활용하여 Channel이 cancel 또는 close될 때까지 유지할 수 있습니다.
suspend fun main() {
val flow = channelFlow<String> {
launch {
for(i in 0 until 10) {
delay(100)
send("launch#1 $i")
}
}
launch {
for(i in 0 until 10) {
delay(100)
send("launch#2 $i")
}
}
awaitCloseWait()
}
flow.onEach {
println("$it")
}.collect()
println("finished")
}
[출력 결과]
launch#1 0
launch#2 0
launch#1 1
launch#2 1
launch#1 2
launch#2 2
launch#2 3
launch#1 3
launch#1 4
launch#2 4
..
..
launch#1 10
launch#2 10
위 코드에서는 awaitClose()가 호출됨으로써 Channel이 명시적으로 close될 때까지 실행이 유지됩니다.
즉, 프로그램이 종료되지 않고 데이터를 계속해서 생성할 수 있습니다.
awaitClose의 위치에 대한 주의점
awaitClose()는 suspend 함수이므로 코드 중간이 아닌, 반드시 하단에 선언해야합니다.
만약 awaitClose()를 launch {} 블록 사이에 배치한다면, 첫 번째 launch {}에서 실행이 중단되어 두번 째 launch {} 가 실행되지 않는 문제가 발생할 수 있습니다.
예를 들어, 다음과 같은 코드는 잘못된 예제입니다.
channelFlow {
launch {
send("Hello")
}
awaitClose()
launch { // 실행되지 않음
send("World")
}
}
위 코드에서는 awaitClose()가 중간에 호출되었기 때문에 그 이후의 코드가 실행되지 않습니다.
따라서, awaitClose는 반드시 마지막에 배치해야 합니다.
awaitClose는 주로 콜백 해제 및 리소스 정리 용도로 사용됩니다.
실제 cancel 또는 close될 때 awaitClose 블록 {}안에 있는 코드가 실행되기 때문에, 이를 활용하여 콜백 해제 등을 수행하여 메모리 누수를 방지할 수 있습니다.
예를 들어, 안드로이드에서 ButtonSetOnClickListener를 Flow로 변환할 때 다음과 같이 사용할 수 있습니다.
fun View.clicks(): Flow<Unit> = callbackFlow {
setOnClickListener {
trySend(Unit) // 클릭 이벤트 발생 시 값 전송
}
awaitClose { setOnClickListener(null) } // 리스너 해제
}
이 코드에서 awaitClose { setOnClickListener(null) }를 호출하여, Flow가 종료될 때 리스너를 해제 하고,
메모리 누수를 방지할 수 있습니다.
CallbackFlow
Callback함수를 Flow로 변환할 때, ChannelFlow 보다 CallbackFlow를 많이 사용합니다.
그 이유는 callbackFlow가 콜백 해제를 강제할 수 있도록 설계되어 있기 때문입니다.
callbackFlow는 channelFlow의 자식 클래스이며, 기능적으로는 동일합니다.
하지만, callbackFlow는 반드시 awaitClose()를 호출해야 하며, 그렇지 않으면 IllegalStateException이 발생하는 차이가 있습니다.
즉, callbackFlow를 사용하면 awaitClose()를 강제함으로써 콜백을 안전하게 해제할 수 있도록 보장할 수 있습니다.
반면, channelFlow는 awaitClose()를 호출하지 않아도 정상적으로 동작하기 때문에 콜백을 해제하지 않은 채 Flow가 종료될 위험이 있습니다.
이러한 이유로 콜백 기반 API를 Flow로 변환할 때는 callbackFlow를 사용하는 것이 더 안전한 선택입니다.
다음은 callbackFlow를 이용하여 콜백함수를 flow로 변환하는 예제입니다.
fun flowFrom(api: CallbackBasedApi): Flow<T> = callbackFlow {
val callback = object : Callback { // Implementation of some callback interface
override fun onNextValue(value: T) {
// To avoid blocking you can configure channel capacity using
// either buffer(Channel.CONFLATED) or buffer(Channel.UNLIMITED) to avoid overfill
trySendBlocking(value)
.onFailure { throwable ->
// Downstream has been cancelled or failed, can log here
}
}
override fun onApiError(cause: Throwable) {
cancel(CancellationException("API Error", cause))
}
override fun onCompleted() = channel.close()
}
api.register(callback)
awaitClose { api.unregister(callback) }
}
위 코드에서 awaitClose { }를 호출하여 Flow가 종료될 때 자동으로 API 콜백을 해제 할 수 있도록 보장하고 있습니다.
awaitClose를 사용하지 않는 다면 앞서 말한 바와 같이 IllegalStateException이 발생합니다.
싱글 샷 콜백의 경우 suspendCancellableCoroutine 사용
cabllbackFlow는 멀티샷 콜백 API를 Flow로 변환할 때 적합하지만,
단 한번만을 반환하는 싱글샷 API 변환에는 적절하지 않습니다.
이 경우 suspendCancellableCoroutine을 사용하는 것이 좋습니다.
suspend fun getSingleShotData(api: SingleShotApi): Data =
suspendCancellableCoroutine { continuation ->
val callback = object : SingleShotCallback {
override fun onSuccess(data: Data) {
continuation.resume(data)
}
override fun onFailure(error: Throwable) {
continuation.resumeWithException(error)
}
}
api.requestData(callback)
continuation.invokeOnCancellation { api.cancelRequest(callback) }
}
이 방식은 단 한 번의 결과만 필요할 때 최적화된 방법이며, 불필요한 Flow 생성 없이 값을 안전하게 처리할 수 있습니다.
channelFlow와 callbackFlow는 각각 어떤 상황에 사용하면 좋나요?
언제 channelFlow를 써야 할까?
✅ 콜백 기반 함수 변환이 아닌 다른 비동기 스트림이 필요할 때 channelFlow를 사용합니다.
✅ channelFlow는 내부적으로 ProducerScope를 제공하여 여러 개의 launch {} 블록에서 데이터를 동시에 보낼 수 있습니다.
✅ 따라서 여러 개의 코루틴에서 데이터를 동시에 방출해야 하는 경우에 유용합니다.
✅ channelFlow는 awaitClose() 없이도 사용할 수 있기 때문에, 콜백 해제와 관계없는 데이터 스트림을 만들 때 적합합니다.
언제 callbackFlow를 써야 할까?
✅ 콜백 기반 함수를 Flow로 변환할 때 callbackFlow를 사용해야 합니다.
✅ callbackFlow는 awaitClose()를 반드시 호출해야 하므로 콜백 해제를 강제할 수 있어 안전합니다.
✅ 따라서 안드로이드 UI 이벤트, 버튼 클릭, 네트워크 응답 리스너 등의 콜백을 Flow로 변환할 때 적합합니다.
정리: channelFlow vs callbackFlow
✅ 콜백 기반 함수를 Flow로 변환해야 한다면? → callbackFlow
✅ 콜백 해제를 강제해야 한다면? → callbackFlow
✅ 여러 개의 launch {}에서 동시에 데이터를 방출해야 한다면? → channelFlow
✅ 콜백이 없는 일반적인 데이터 스트림을 만들고 싶다면? → channelFlow
💡 한 마디로 정리하자면:
- 🔹 콜백이 있는 API → callbackFlow (콜백 해제 강제)
- 🔹 콜백이 없는 병렬 데이터 생성 → channelFlow (여러 launch {} 가능)
일반 flow를 사용해서 콜백함수를 Flow로 변환하면 안되나요?
일반적인 flow 빌더를 사용하면 데이터를 순차적으로 방출할 수 있으며, emit()을 호출하여 값을 내보냅니다. 하지만 emit()은 suspend 함수이므로 중단 가능한 코루틴 블록 안에서만 실행될 수 있습니다.
따라서 콜백 함수 내에서 emit을 할 수는 없습니다.
반면, callbackFlow는 비동기 콜백 기반 API를 Flow로 변환하는 데 최적화된 방식입니다. trySend()를 사용하여 값을 전달할 수 있으며, 이는 suspend 함수가 아니므로 비동기 콜백안에서도 값을 보낼 수 있습니다.
또한 awaitClose {}를 활용하여 콜백을 해제하는 정리 작업을 자동으로 수행할 수 있어, 메모리 누수를 방지하는 데 유용합니다.
또한, 일반 flow 빌더는 한 번 실행된 후 종료되는 작업을 처리하는 데 적합하지만, callbackFlow는 지속적으로 데이터가 발생하는 이벤트 스트림을 처리하는 데 유리합니다.
콜백을 Flow로 변환하려면 callbackFlow를 사용하는 것이 더 적절한 선택이 될 수 있습니다.
Hot Flow를 Cold Flow로 변환하는 방법
Hot Flow를 Cold Flow로 변환하는 방법은 새로운 Cold Flow를 생성하여 Hot Flow의 데이터를 emit하도록 하는 것입니다.
이를 위해 flow {} 또는 callbackFlow {}를 사용하여 Cold Flow를 만들고, Hot Flow의 데이터를 수집하여 emit할 수 있습니다.
1. callbackFlow / channelFlow를 사용하여 Cold Flow로 변환
SharedFlow 또는 StateFlow는 Hot Observable이므로, 이를 callbackFlow를 사용해서 Cold Flow로 변환할 수 있습니다.
fun main() = runBlocking {
val sharedFlow = MutableSharedFlow<Int>(replay = 3)
// Cold Flow 변환: 구독할 때마다 처음부터 다시 시작
val coldFlow = callbackFlow<Int> {
sharedFlow.collect { value ->
send(value) // Hot Flow 데이터를 Cold Flow로 전달
}
}
// 데이터 방출
launch {
sharedFlow.emit(1)
sharedFlow.emit(2)
sharedFlow.emit(3)
}
// 첫 번째 구독자
launch {
delay(1000)
println("구독자 1 시작")
coldFlow.collect { println("구독자 1 수신: $it") }
}
// 두 번째 구독자 (다시 처음부터 수신)
launch {
delay(2000)
println("구독자 2 시작")
coldFlow.collect { println("구독자 2 수신: $it") }
}
}
<실행결과>
구독자 1 시작
구독자 1 수신: 1
구독자 1 수신: 2
구독자 1 수신: 3
구독자 2 시작
구독자 2 수신: 1
구독자 2 수신: 2
구독자 2 수신: 3
✅ 설명
- callbackFlow 내부에서 collect를 사용하여 SharedFlow의 값을 send를 통해 Cold Flow로 전달합니다.
2. flow 내부에서 collect 사용하여 변환
Hot Flow(SharedFlow / StateFlow)를 flow {} 내부에서 collect하여 Cold Flow로 변환할 수도 있습니다.
fun main() = runBlocking {
val sharedFlow = MutableSharedFlow<Int>(replay = 3)
// Cold Flow 변환: 구독할 때마다 처음부터 다시 시작
val coldFlow = flow {
sharedFlow.collect { value ->
emit(value) // Hot Flow 데이터를 Cold Flow로 전달
}
}
// 데이터 방출
launch {
sharedFlow.emit(1)
sharedFlow.emit(2)
sharedFlow.emit(3)
}
// 첫 번째 구독
launch {
delay(1000)
println("구독자 1 시작")
coldFlow.collect { println("구독자 1 수신: $it") }
}
// 두 번째 구독자 (다시 처음부터 수신)
launch {
delay(2000)
println("구독자 2 시작")
coldFlow.collect { println("구독자 2 수신: $it") }
}
print("")
}
<실행결과>
구독자 1 시작
구독자 1 수신: 1
구독자 1 수신: 2
구독자 1 수신: 3
구독자 2 시작
구독자 2 수신: 1
구독자 2 수신: 2
구독자 2 수신: 3
✅ 설명
- flow {} 내부에서 collect를 사용하여 SharedFlow를 Cold Flow처럼 동작하게 만듭니다.
- flow {} 블록을 사용하여 hotFlow.collect { emit(it) } 형태로 Cold Flow를 만듦
- hotFlow의 데이터가 갱신될 때마다 새로운 값을 emit
참고자료
https://tourspace.tistory.com/433
[Coroutine] Callback을 Flow로 변환 - callbackFlow
coroutine을 정리하면서 같이 정리했던 flow 부분에 유용한 변화들이 발생하여 추가된 flow들에 대해서 추가적인 정리를 하려고 합니다. 기존 글을 작성했을 당시 (2019.11) coroutine이 v1.3.2 정도였는데,
tourspace.tistory.com
https://velog.io/@choius323/callbackFlow%EC%9D%98-awaitClose
callbackFlow의 awaitClose
callbackFlow 사용 callback 안에서 flow를 전달해야 할 때 사용하며 사용하며, send 대신 trySend를 사용한다. awaitClose() 를 사용하지 않으면 오류가 발생한다.
velog.io
https://medium.com/@syedamariarasheed/channel-flow-callback-flow-20b79007eeac
Channel Flow & Callback Flow
Flow
medium.com
'Android > Flow' 카테고리의 다른 글
[Android] SharedFlow와 StateFlow (0) | 2025.02.17 |
---|---|
[Android] Cold Flow와 Hot Flow (1) | 2025.02.17 |
[Android] 코루틴 Flow를 활용한 성능 개선 - 다중 요청 처리 (flatMapMerge) (0) | 2025.02.16 |
[Android] 플로우(Flow)의 Operator (0) | 2025.02.15 |
[Android] 플로우(Flow)란? - Deep Dive (1) | 2025.02.15 |