Android/Flow

[Android] Cold Flow와 Hot Flow

태크민 2025. 2. 17. 15:39

생산자 소비자 패턴을 이용한 자료구조인 Observable이나 Flow는 안드로이드 개발자라면 접하게되는 자료구조입니다.
여기서 중요하게 여기지는 개념이 Cold Stream과 Hot Stream입니다. Cold Stream이냐 Hot Stream이냐에 따라서 동작방식이 완전히 다르기 때문에 분명이 알고 사용해야합니다.

 

우선 flow와 channel를 통해 간단하게 cold stream(flow)과 hot stream(channel)에 대해서 알아보겠습니다.

 


Cold Stream, Hot Stream은 CD Player와 Radio다.

Cold Stream과 Hot Stream의 차이점은 3가지로 말할 수 있습니다.

1. 데이터가 생성되는 위치
2. 생산자가 발행한 데이터를 동시에 여러 소비자들이 수신할 수 있는지 여부
3. 스트림이 데이터를 생산하는 시점

 

이해를 돕기위하여 CD Player와 Radio로 위의 3가지 개념을 대입해 Cold Stream과 Hot Stream을 설명하겠습니다.


Cold Stream

CD Player는 각 CD 내부에 음악 목록이 저장되어있습니다(데이터가 내부에서 생성). 사용자 마다 재생 버튼을 누를때 음악이 재생되기 시작하고(소비자가 소비를 시작 할 때 데이터 생산), 모든 사용자는 동일한 음악 목록을 CD에 저장하고 있지만 하나의 CD를 공유하고 있지는 않습니다.(하나의 생산자에는 하나의 소비자가 존재)

 

이러한 CD Player와 Cold Stream이 비슷하다고 한 이유를 살펴보죠.

 

Cold Stream은 데이터가 내부에서 생성됩니다.

flow builder인 flow, flowOf, asFlow로 데이터가 내부에서 생성되는지 확인해보겠습니다.

val coldStream: Flow<Int> = flow {
    //flow 내부에서 데이터 생성
    emit(1)
    emit(2)
}

/** or */

val coldStream: Flow<Int> = flowOf(1, 2)
public fun <T> flowOf(vararg elements: T): Flow<T> = flow {
    for (element in elements) {
        //flow 내부에서 데이터 생성
        emit(element)
    }
}

/** or */

val data = listOf(1, 2)
val coldStream: Flow<Int> = data.asFlow()
public fun <T> Iterable<T>.asFlow(): Flow<T> = flow {
    forEach { value ->
        //flow 내부에서 데이터 생성
        emit(value)
    }
}

 

위의 코드를 보면, 모두 flow 내부에서 데이터를 emit 해주는 것으로 확인됩니다.

 

Cold Stream은 소비자가 소비를 시작할 때 데이터를 생산합니다.

val coldStream = flow<Int> {  
    emit(1)
    emit(2)
}
    
// 이 시점에 Cold Stream의 데이터가 생산된다.
coldStream.collect {
    println(it)
}

 

flow는 소비를 시작하는 함수인 종단연산자(collect, fold, reduce, first등)가 호출되지 않으면 데이터를 생산하지 않습니다. 물론 중간연산자(map, onEach, filter 등)도 종단연산자가 호출돼야지 실행됩니다.

 

Cold Stream은 하나의 생산자에 하나의 소비자만 존재합니다. (UniCast)

val coldStream = flow<Int> {  
    emit(1)
    emit(2)
}
    
coldStream.collect {
    println(it) // 1 2 수신
}

coldStream.collect {
    println(it) // 1 2 수신
}

 

flow를 여러 곳에서 collect할 수 있습니다. 하지만 collect를 할때마다 flow의 block이 새롭게 실행되며 이것은 이전 구독과는 독립적입니다.

 

결론적으로,

1. 데이터가 내부에서 생성된다.
2. 데이터는 소비자가 소비를 시작할때 생산된다.
3. 하나의 생산자에는 하나의 소비자가 존재한다.

 

Cold Stream은 위의 3가지를 충족해야하며 Flow는 모두 부합되는 것으로 확인 되었으니 Cold Stream이라고 결론을 지을 수 있겠습니다.

 

이제부터는 Hot Stream에 대해 알아봅시다~!


Hot Stream

라디오는 방송국에서 프로그램을 제작(데이터가 외부에서 생성)하고 청취자들에게 동시에 송신합니다(하나의 생산자에 다수의 소비자가 존재). 뒤늦게 라디오 주파수를 맞춘 청취자들은 청취하기 시작한 시점부터 들을 수 있게 됩니다(생산자가 소비자의 소비를 신경쓰지 않고 생산).

 

이러한 라디오와 Hot Stream이 비슷하다고 한 이유를 살펴보죠.

CoroutineScope(Dispatchers.Default).launch {
    val channel = Channel<Int>()

    launch {
        // 외부에서 데이터를 생성
        channel.send(1)
    }

    launch {
        // 외부에서 데이터를 생성
        channel.send(2)
        channel.close()
    }

    channel.consumeEach {
        println(it)
    }
}

 

위의 코드를 보면 channel의 send메소드를 통해 데이터를 생성하는 걸 볼 수 있습니다. flow와 다르게 내부에서 미리 데이터를 생성한 것이 아니라, 외부에서 데이터를 생성해 주는 것을 볼 수 있습니다.

 

Hot Stream은 생산자가 소비자의 소비를 신경쓰지 않고 생산합니다.

val channel = Channel<Int>(2)

channel.trySend(1)
println(channel.isEmpty) // false

 

위에 보이는 것과 같이 trySend만 해주고 따로 receive를 하지 않았을 경우에 channel이 비어있지 않은 상황을 볼 수 있습니다. buffer의 용량을 2로 지정을 해둬서 그런거아니야? 라고 생각하실 수도 있겠는데요.
buffer의 용량을 랑데부(RENDEZVOUS), 즉 0으로 지정을 해도 내부 코드를 보면 Channel이 RendezvousChannel을 반환 받게 되고, RendezvousChannel는 AbstractSendChannel을 상속 받고 있는데, AbstractSendChannel 내부에 protected val queue = LockFreeLinkedListHead() 과 같이 Queue가 있다는 것을 확인하실 수 있으실 겁니다. send가 실행이되면 이곳에 저장되는 것을 내부 코드를 통해 확인했습니다.

 

Hot Stream은 하나의 생산자에 다수의 소비자가 구독할 수 있습니다.(MultiCast)

@OptIn(ExperimentalCoroutinesApi::class)
fun CoroutineScope.produceNumbers() = produce {
    println(coroutineContext)
    var count = 0
    while (true) {
        send(count++)
        delay(100)
    }
}

fun CoroutineScope.consumeNumbers(index: Int, receiveChannel: ReceiveChannel<Int>) = launch {
    receiveChannel.consumeEach {
        println("$index 가 ${it}을 수신했습니다.")
    }
}

fun main(): Unit = runBlocking {

    val receiveChannel = produceNumbers()

    val job = launch {
        repeat(5) { index ->
            // 5개의 소비자를 생성
            consumeNumbers(index, receiveChannel)
        }
    }

    delay(1000)
    job.cancel()
}

 

위의 코드는 Channel의 Fan-Out을 구현한 코드 입니다. 즉, 여러개의 소비를 두고 생산자에서 생산된 데이터를 분산시킨 코드입니다.

코드를 실행해보면 결과는 아래와 같습니다.

0 가 0을 수신했습니다.
0 가 1을 수신했습니다.
1 가 2을 수신했습니다.
2 가 3을 수신했습니다.
3 가 4을 수신했습니다.
4 가 5을 수신했습니다.
0 가 6을 수신했습니다.
1 가 7을 수신했습니다.
2 가 8을 수신했습니다.
3 가 9을 수신했습니다.

 

위의 결과로 알수 있는 사실은 각기 다른 코루틴에 존재하는 수신자가 하나의 데이터 stream을 소비하고 있고, 소비자가 소비를 시작한 시점부터 생산된 데이터를 소비 하는 것도 확인할 수 있습니다.

 

결론적으로,

 

1. 데이터가 외부에서 생성
2. 하나의 생산자에 다수의 소비자가 존재
3. 생산자가 소비자의 소비를 신경쓰지 않고 생산 (소비자는 소비를 시작한 시점부터 생산된 데이터를 소비하기 시작한다.)

 

Hot Stream은 위의 3가지를 충족해야하며 Channel 은모두 부합되는 것으로 확인 되었으니 Hot Stream이라고 결론을 지을 수 있겠습니다.

 

여기까지 Flow와 Channel을 통해 Cold Stream과 Hot Stream에 대해 알아봤습니다. 

 


참고자료

https://medium.com/@NickFan34818768/hot-vs-cold-flows-kotlin-coroutines-36853ce53352

 

Hot vs cold flows — Kotlin Coroutines

In Kotlin, a “hot flow” refers to a type of flow that produces values regardless of whether there are active collectors.

medium.com

https://medium.com/@apfhdznzl/flow%EC%99%80-channel-cold-stream%EA%B3%BC-hot-stream-c42c64cf4996

 

Flow와 Channel, Cold Stream과 Hot Stream

Cold Stream VS Hot Stream

medium.com