Channel 이란
Channel은 코루틴 간의 데이터를 전송하기 위한 통신 수단으로, stream처럼 데이터를 주고받을 수 있는 인터페이스입니다.
Channel의 구조는 BlockingQueue와 비슷하며, 동일하게 Thread-Safe 한 구조를 가지고 있습니다.
차이점이 있다면 BlockingQueue는 값을 전달하는 과정에서 스레드를 block하지만, Channel은 스레드를 block하지 않고 suspending 하는 특징이 있습니다.
Channel은 데이터를 제공하는 채널인 SendChannel 과 데이터를 소비하는 채널인 ReceiveChannel 로 이루어져 있습니다.
Channel은 송신자인 prooducer와 수신자인 consumer 사이에서 데이터를 주고 받는 중간 다리 역할을 하며,
큐(Queue)와 유사한 형태로 존재합니다.
FIFO(First In, First Out) 방식으로 작동하며, Channel 에서 첫번째로 제공된 데이터는 항상 첫번째로 소비 되는 것이 보장됩니다.
또한, Channel은 송신자와 수신자의 수에는 제한이 없고, 송신된 모든 데이터는 단 한번만 한번만 수신할 수 있다는 특징이 있습니다.
한마디로 말하면, Channel은 코루틴 간 값을 전달할 수 있는 효율적인 통신 수단입니다.
하지만 Channel만이 코루틴 간 값을 전달하는 유일한 방법은 아닙니다. 단일 값을 비동기적으로 전달하기 위해 Deferred를 활용할 수도 있습니다.
아래에서 Deferred를 활용한 방식에 대해 살펴보겠습니다.
Deferred를 통한 코루틴 간 값 전달
코루틴에서는 값과 관련된 객체로 Deferred가 있습니다.
이는 async를 설명할 때 자주 등장하는 개념으로, 결과값을 가질 수 있는 Job입니다.
그렇다면 Deferred를 이용해 코루틴 간 값을 전달할 수 있을까요? 물론 가능합니다.
fun main() = runBlocking<Unit> {
val producer: Deferred<List<Int>> = async {
println("producer: ${Thread.currentThread().name}")
val data = mutableListOf<Int>()
for (i in 1..5) {
delay(300L)
data.add(i)
println("Send: $i")
}
data
}
launch {
println("consumer: ${Thread.currentThread().name}")
val data = producer.await()
for (value in data) {
println("Receive: $value")
}
}
}
실행 결과는 다음과 같습니다.
producer: main @coroutine#2
consumer: main @coroutine#3
Send: 1
Send: 2
Send: 3
Send: 4
Send: 5
Receive: 1
Receive: 2
Receive: 3
Receive: 4
Receive: 5
producer 코루틴은 async를 사용하여 Deferred 객체를 생성하고, 300ms마다 숫자를 생성하여 리스트에 추가합니다.
작업이 완료되면 해당 리스트를 Deferred 객체에 저장하고 반환합니다.
consumer 코루틴은 launch를 사용하여 시작되며, producer.await()를 통해 Deferred 객체에서 값을 받습니다.
이런 방식으로 두 코루틴 간 값 전달이 가능합니다.
하지만 이러한 방식에는 한계점이 있습니다. 마치 연속적인 데이터를 전송하는 것처럼 보이지만 결국 하나의 값(List)만을 전달합니다.
즉, 한 번에 하나의 결과만 처리하는 형태로, 연속적인 데이터 처리에는 적합하지 않습니다.
채널 기본 동작 방식
이런 한계를 극복할 수 있는 것이 채널입니다.
채널은 코루틴 간 값을 전달하기 위한 큐(Queue)와 같은 역할을 합니다.
이러한 특성 덕분에 연속적인 데이터를 처리할 수 있고, 생산자 소비자 패턴(Producer-Consumer Pattern)을 구현하기에 적합합니다.
채널은 SendChannel<E>과 ReceiveChannel<E> 인터페이스를 상속받는 인터페이스입니다. 따라서 송신과 수신 기능을 모두 지원합니다.
public interface Channel<E> : SendChannel<E>, ReceiveChannel<E>
SendChannel은 값을 전송(send, trySend)하거나, 채널을 닫을(close) 수 있습니다.
public interface SendChannel<in E> {
public suspend fun send(element: E)
public fun trySend(element: E): ChannelResult<Unit>
public fun close(cause: Throwable? = null): Boolean
...
}
ReceiveChannel은 값을 수신(receive, tryReceive)하거나, 채널을 취소(cancel)할 수 있습니다.
public interface ReceiveChannel<out E> {
public suspend fun receive(): E
public fun tryReceive(): ChannelResult<E>
public fun cancel(cause: CancellationException? = null)
...
}
send()와 trySend(), receive와 tryReceive의 차이는 무엇인가요?
send()와 trySend(), receive()와 tryReceive()는 채널에서 데이터를 송신하거나 수신할 때 사용하는 메서드들로, 각각의 동작 방식과 사용 목적이 다릅니다.
send()와 trySend()의 차이
send()
- send()는 비동기(suspend) 함수입니다.
- 채널이 가득 찬 경우(예: 버퍼가 꽉 찬 상태) 다른 코루틴이 값을 소비할 때까지 suspend됩니다.
- 이는 값이 반드시 전송되어야 하는 상황에서 사용됩니다.
trySend()
- trySend()는 동기적 메서드로, suspend하지 않습니다.
- 채널이 가득 찬 경우 즉시 실패하며, 전송 시도 결과를 Result.success 또는 Result.failure로 반환합니다.
- 이는 값 전송이 꼭 필요하지 않은 상황에서 사용됩니다.
요약하면, send()는 값 전송을 보장해야 하는 상황에서, trySend()는 전송 실패를 허용할 수 있는 상황에서 적합합니다.
receive()와 tryReceive()의 차이
receive()
- receive()는 비동기(suspend) 함수로, 채널에 값이 없으면 값이 도착할 때까지 suspend됩니다.
- 반드시 값을 수신해야 하는 상황에서 사용됩니다.
tryReceive()
- tryReceive()는 동기적 메서드로, 채널에서 값을 수신하려고 시도합니다.
- 값이 없는 경우 즉시 실패하며, 결과를 Result.success 또는 Result.failure로 반환합니다.
- 값 수신이 꼭 필요하지 않은 상황에서 적합합니다.
※ trySend()와 tryReceive()는 제한된 크기의 버퍼를 가진 채널에서만 사용할 수 있습니다.
이는 Rendezvous Channel(버퍼가 없는 채널)에서는 동작하지 않기 때문입니다.
close와 cancel의 차이는 무엇인가요?
채널에는 닫기(close)와 취소(cancel) 메서드가 존재, 서로 비슷해 보이지만 다르므로 차이를 알아야 합니다
두 메서드의 차이점
close()
- 채널을 닫는 동작을 수행하며, close를 호출하기 전, 이미 보낸 요소들의 전송이 보장됩니다.
- 개념적으로 close는 채널에 특별한 close 토큰을 보내는 것과 같습니다.
close 토큰을 받는 즉시 반복이 중지되므로 close 되기 전에 이전에 보낸 모든 요소가 수신을 하고 종료될 수 있다는 의미입니다. - 데이터 발행이 완료된 후 채널을 닫고 싶을 때 주로 사용됩니다.
단, 한 번 닫힌 채널에 데이터를 더 보내려고하면 "ClosedSendChannelException"이 발생합니다.
fun main() {
runBlocking {
val channel = Channel<Int>()
launch {
for (i in 0 until 10) {
channel.send(i)
}
}
for (i in channel) {
if (i == 5) channel.close() // 채널 닫기
println(i)
}
println("end")
}
}
<결과>
0
1
2
3
4
5
end
Exception in thread "main" kotlinx.coroutines.channels.ClosedSendChannelException: Channel was closed
이 문제는 채널이 닫힌 후에도 송신자가 데이터를 보내려고 하기 때문에 발생합니다.
cancel()
- 채널을 즉시 취소하며, 버퍼링된 전송 요소들도 모두 제거합니다.
- 따라서, 이미 보낸 요소도 버려질 수 있습니다.
fun main() {
runBlocking {
val channel = Channel<Int>()
val job = launch {
for (i in 0 until 10) {
channel.send(i)
delay(100)
}
}
launch {
for (i in channel) {
if (i == 5) {
channel.cancel() // 채널 취소
println("Channel cancelled")
}
println(i)
}
}
job.join()
}
}
<결과>
0
1
2
3
4
5
Channel cancelled
위 코드에서 cancel()이 호출되면, 채널은 즉시 종료되고 버퍼링된 데이터도 모두 삭제됩니다.
정리하자면,
- close()는 데이터 전송을 보장하며, 송신이 완료된 후 안전하게 채널을 닫고 싶을 때 사용합니다.
- cancel()은 즉각적인 중단이 필요하거나 에러 상황에서 작업을 종료할 때 사용됩니다.
Channel 예제
가장 기본적인 채널을 만들어보겠습니다.
fun main() = runBlocking<Unit> {
val channel = Channel<Int>()
launch {
println("producer: ${Thread.currentThread().name}")
for (i in 1..5) {
delay(300L)
channel.send(i)
println("Send: $i")
}
}
launch {
println("consumer: ${Thread.currentThread().name}")
repeat(5) {
println("Receive: ${channel.receive()}")
}
}
}
실행 결과는 다음과 같습니다.
producer: main @coroutine#2
consumer: main @coroutine#3
Send: 1
Receive: 1
Send: 2
Receive: 2
Send: 3
Receive: 3
Send: 4
Receive: 4
Send: 5
Receive: 5
채널을 생성한 후에 300ms마다 send() 함수를 통해 값을 5번 전송합니다.
값이 전송이 될 때마다 수신이 바로 이루어지며, 수신 측에서는 전송 횟수만큼 5번 반복하여 모든 값을 받아오고 있습니다. 이 과정에서 송신 측과 수신 측은 1:1로 대응하는 페어 관계를 유지해야 합니다.
하지만 이 경우는 전송하는 요소(element)가 몇 개인지 수신하는 쪽에서 미리 알아야 하는 문제가 있습니다.
만약 횟수가 일치하지 않으면 일시 중단(suspend)되게 됩니다.
예를 들어, 수신 측에서 4번만 값을 반복하면 송신 측의 5번째 send()는 수신 측의 receive()가 호출될 때까지 일시 중단 상태에 놓입니다.
이로 인해 송신과 수신 간의 페어 관계가 깨지며, 해당 코루틴이 종료되지 않아서 프로그램도 대기 상태에 빠지게 됩니다.
반대로, 수신 측에서 6번을 반복하면 송신 측 send()는 5번만 호출되었기 때문에 6번째 receive()가 send()를 만나기 전까지 일시 중단됩니다. 결국 이 경우에도 프로그램은 계속 대기하는 상태에 이릅니다.
위 내용을 정리하면 다음과 같습니다.
- 송신 측에서 보낸 값이 수신되지 않으면 대기 상태가 된다. ( send()가 receive()를 만나지 못하면 suspend 된다.)
- 수신 측에서는 값을 받지 않으면 대기 상태가 된다. ( receive()가 send()를 만나지 못하면 suspend 된다.)
이런 실수를 방지하기 위해서 채널에서 값을 모두 전송한 후에 채널을 닫는 방법이 있습니다.
다음 예제를 보면 close() 함수를 통해서 채널을 닫고 있습니다. 실행 결과는 바로 앞선 예제와 동일합니다.
fun main() = runBlocking<Unit> {
val channel = Channel<Int>()
launch {
println("producer: ${Thread.currentThread().name}")
for (i in 1..5) {
delay(300L)
channel.send(i)
println("Send: $i")
}
channel.close()
}
launch {
println("consumer: ${Thread.currentThread().name}")
for (value in channel) {
println("Receive: $value")
}
}
}
채널을 생성한 후, 300ms마다 send() 함수를 통해 값을 전송합니다. 수신 측에서는 for 문을 사용하여 채널의 값을 받고 있습니다.
여기서 횟수를 지정하지 않고 for 문으로 값을 받아올 수 있는 이유는 값을 모두 보낸 후에 close() 함수로 채널을 닫았기 때문입니다.
채널을 닫으면 채널이 닫혔다는 정보를 마지막에 보내는 방식으로 동작합니다.
따라서 해당 정보를 받으면 for 문의 반복이 멈추게 됩니다. 또한 이것으로 채널이 닫히기 전까지 전송된 모든 값들이 수신되었다는 것을 알 수 있습니다.
앞서 이야기했듯이 close() 함수를 통해 적절히 채널을 닫지 않는다면 수신하는 측에서는 값이 들어오기를 계속 기다리게 됩니다.
또한 for문 대신 consumeEach함수를 사용할 수 있습니다.
consumeEach는 내부적으로 for문을 사용하여, 채널이 close()될 때까지 값을 반복적으로 수신합니다.
따라서, for문과 마찬가지로 채널의 모든 값을 소비할 때 사용할 수 있습니다.
fun main() = runBlocking<Unit> {
val channel = Channel<Int>()
launch {
println("producer: ${Thread.currentThread().name}")
for (i in 1..5) {
delay(300L)
channel.send(i)
println("Send: $i")
}
channel.close()
}
launch {
println("consumer: ${Thread.currentThread().name}")
channel.consumeEach { element ->
println("Receive: $element")
}
}
}
close 호출을 통해 for문 또는 consumeEach가 이를 감지하고 정상적으로 수신을 할 수 있게 되었습니다.
하지만 개발자가 close() 호출을 깜빡하고 잊는다면 어떻게 될까요?
이 경우, 수신측은 계속 값을 기다리며 무한 대기 상태에 빠질 수 있습니다.
또한 값 전송 중 예외가 발생해 코루틴이 비정상적으로 종료되면, close()가 호출되지 않기 때문에 수신 측은 끝나지 않은 채 계속 대기 상태로 남게 됩니다.
이러한 문제를 방지하기 위해 produce를 사용할 수 있습니다.
produce 함수는 코루틴이 정상 종료(finished), 중단(stopped), 취소(cancelled)될 때 자동으로 채널을 닫아 수신 측이 무한 대기 상태에 빠지지 않도록 보장합니다.
produce 함수의 내용은 추후 아래에서 살펴보도록 하겠습니다.
Capacity 정책
Coroutine 채널에서 제공하는 Capacity 정책은 총 4가지(RENDEZVOUS, CONFLATED, UNLIMITED, BUFFERED)입니다.
기본 Capacity 정책은 RENDEZVOUS, 기본 BufferOverflow 정책은 SUSPEND로 설정됩니다.
public fun <E> Channel(
capacity: Int = RENDEZVOUS,
onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND,
onUndeliveredElement: ((E) -> Unit)? = null
): Channel<E> =
when (capacity) {
RENDEZVOUS -> {..}
CONFLATED -> {..}
UNLIMITED -> {..}
BUFFERED -> {..}
else -> {..}
}
RENDEZVOUS
RENDEZVOUS 는 Buffer 가 존재하지 않는 채널입니다.
데이터를 소비해야만 다음 데이터를 생산할 수 있는 구조로, 생산자(sender)와 소비자(receiver)가 직접 교차하며 데이터 생산과 소비가 이루어집니다.
즉, 수신 측에서 receive를 호출하기 전까지 send는 대기(suspend) 상태가 됩니다.
앞서 보여드린 예제도, 기본 Capacity정책인 RENDEZVOUS를 사용하고 있었기 때문에, 데이터가 하나씩 교차하면서 생산과 소비가 이루어졌던 것입니다.
// 버퍼가 없는 채널. 수신자가 receive 를 호출하기 전까지 send 를 호출해도 대기한다
val rendezvousChannel = Channel<Int>()
CoroutineScope(Dispatchers.Default).launch {
repeat(10) {
rendezvousChannel.send(it)
println("$rendezvousChannel send $it")
delay(10)
}
rendezvousChannel.close()
}
private suspend fun <E> consumeAll(channel: Channel<E>) {
for (data in channel) {
println("[$channel] consuming data: $data")
}
}
consumeAll(rendezvousChannel)
위 코드를 실행하면 produce 와 consume 이 섞이는 현상을 확인할 수 있습니다.
UNLIMITED
UNLIMITED 는 버퍼 크기에 제한이 없는 무한한 채널입니다.
따라서, send가 절대 suspend 되지 않고, 데이터를 소비 여부와 관계없이 계속해서 생산할 수 있습니다.
하지만, 데이터를 과도하게 생산하면 자칫하면 메모리 부족(OutOfMemory) 현상이 발생될 위험이 있습니다.
참고로 Channel.BUFFERED의 기본 크기는 64이며, JVM의 kotlinx.coroutines.channels.defaultBuffer 속성을 설정하여 변경할 수 있습니다.
val unlimitedChannel = Channel<Int>(Channel.UNLIMITED)
CoroutineScope(Dispatchers.Default).launch {
repeat(10) {
unlimitedChannel.send(it)
println("$unlimitedChannel send $it")
delay(10)
}
unlimitedChannel.close()
}
delay(500)
consumeAll(unlimitedChannel)
위와 같은 예제 코드를 만들고 돌려보면 어떻게 될까요?
실행해보면, 데이터의 소비 유무와 상관없이 무조건 생산되는 것을 볼 수 있습니다.
BUFFERED
BUFFERED 는 크기가 고정된 buffer를 사용하는 채널입니다.
Buffer가 가득 차기 전까지는 데이터를 지속적으로 생산할 수 있지만, 가득 차면 생산자는 suspend 상태에 들어갑니다. 그리고 수신자가 데이터를 소비할 때 생산을 재개합니다.
현업에서 가장 자주 사용되는 채널 유형 중 하나로, 데이터 생산과 소비 속도를 조절할 수 있어 리액티브 프로그래밍의 배압(Backpressure) 개념과 유사한 역할을 수행합니다.
val arrayChannel = Channel<Int>(5)
CoroutineScope(Dispatchers.Default).launch {
repeat(10) {
arrayChannel.send(it)
println("$arrayChannel send $it")
delay(10)
}
arrayChannel.close()
}
delay(500)
consumeAll(arrayChannel)
위 코드는 버퍼 크기를 5로 설정한 채널에서 10개의 데이터를 생산하고 순차적으로 소비하는 예제입니다.
실행 결과, 5개의 데이터를 먼저 버퍼에 저장한 후 소비가 시작되면 동시에 새로운 데이터가 생산되는 모습을 확인할 수 있습니다.
CONFLATED
CONFLATED 는 buffer 가 1개인 채널로 유지하지만, 이미 생산된 데이터가 있는 경우 데이터를 override 하는 특징이 있습니다.
즉, 지나간 element를 저장하지 않으며, 새로운 element가 기존의 element를 replace합니다.
그러므로 앞서 보냈던 element들은 소실되고, 가장 최신의 데이터만 유지됩니다.
val conflatedChannel = Channel<Int>(Channel.CONFLATED)
CoroutineScope(Dispatchers.Default).launch {
repeat(10) {
conflatedChannel.send(it)
println("$conflatedChannel send $it")
delay(10)
}
conflatedChannel.close()
}
delay(500)
consumeAll(conflatedChannel)
데이터 생산자는 데이터를 10개를 보내고, 500ms 후에 소비한다고 가정하면
이전에 생산된 데이터는 모두 읽지 못하고, 마지막에 생산된 데이터만 소비하는 것을 볼 수 있네요
요약하면 아래와 같습니다.
분류 | 설명 |
RENDEZVOUS | Buffer 가 존재하지 않는 Channel |
UNLIMITED | Buffer 가 무한대인 Channel |
BUFFERED | Buffer 의 크기를 한정해서 사용하는 Channel |
CONFLATE | Buffer 의 크기는 1이고, 마지막에 제공된 데이터만 보장하는 Channel |
BufferOverflow 정책
Channel 에서 Buffer 를 어떻게 사용할 수 있는지를 보았다면 Overflow 정책은 어떤게 있을까요?
Buffer 가 넘치게 되는 경우 취할 수 있는 행동은 크게 3가지 종류가 있습니다.
종류 | 설명 |
SUSPEND | 버퍼가 가득차면, send메소드를 suspend 합니다. ( default ) |
DROP_OLDEST | 버퍼가 가득차면, 가장 오래된 데이터를 제거하고 새로운 데이터를 받아들인다. ( queue ) |
DROP_LATEST | 버퍼가 가득차면, 가장 최신의 데이터를 제거하고 새로운 데이터를 받아들인다. ( stack ) |
capacity가 Channel.CONFLATED인 채널은 버퍼 크기가 1이며, onBufferOverflow가 DROP_OLDEST인 채널과 유사합니다.
즉, 가장 오래된 데이터가 제거되면서 최신 데이터만 유지되는 방식입니다.
이 정책들을 어떤 데이터를 유지할지, 어떤 데이터를 제거할지에 대한 요구사항에 따라 선택할 수 있습니다.
예를 들어 SUSPEND는 데이터 손실을 방지할 때 유리하고, DROP_OLDEST는 실시간 데이터 스트림에서 최신 정보만 필요할 때 유용합니다.
On undelivered element handler
Channel function의 파라미터인 onUndeliveredElement 에 대해 알아야합니다. 이는 element가 어떠한 이유로 처리되지 않았을 때 호출됩니다.
대부분의 경우 이는 channel이 closed or cancelled 되었음을 의미합니다. 하지만 send , receive , receiveOrNull , or hasNext 가 error를 throw 했을 때도 호출됩니다. 일반적으로 channel에서 보낸 resource를 close 하는데 사용됩니다.
val channel = Channel<Resource>(capacity) { resource ->
resource.close()
}
// or
// val channel = Channel<Resource>(
// capacity,
// onUndeliveredElement = { resource ->
// resource.close()
// }
// )// Producer code
val resourceToSend = openResource()
channel.send(resourceToSend)// Consumer code
val resourceReceived = channel.receive()
try {
// work with received resource
} finally {
resourceReceived.close()
}
팬아웃(Fan-out)
여러 개의 코루틴이 하나의 채널을 통해 값을 수신할 수 있습니다. 앞서 말했듯이 채널은 여러 개의 송수신자를 가질 수 있기 때문입니다.
하지만 전송한 모든 값은 한 번만 수신할 수 있기 때문에, 수신자는 번갈아 가며 값을 수신하게 됩니다.
즉, 채널을 통해 전송된 값은 모든 수신자에게 동일하게 전달되지 않고 나누어 수신됩니다.
![](https://blog.kakaocdn.net/dn/cpoeuc/btsMeVx31Mc/Haf2oOKWe15CxFYeXUyPO0/img.png)
이것을 확인해 보기 위해서 앞선 예제에 수신을 위한 코루틴을 하나 더 추가해보겠습니다.
fun main() = runBlocking<Unit> {
val channel = Channel<Int>()
launch {
println("producer: ${Thread.currentThread().name}")
for (i in 1..5) {
delay(300L)
channel.send(i)
println("Send: $i")
}
channel.close()
}
launch {
println("consumer1: ${Thread.currentThread().name}")
for (value in channel) {
println("Receive1: $value")
}
}
launch {
println("consumer2: ${Thread.currentThread().name}")
for (value in channel) {
println("Receive2: $value")
}
}
}
실행 결과는 다음과 같습니다.
producer: main @coroutine#2
consumer1: main @coroutine#3
consumer2: main @coroutine#4
Send: 1
Receive1: 1
Send: 2
Receive2: 2
Send: 3
Receive1: 3
Send: 4
Receive2: 4
Send: 5
Receive1: 5
수신하는 코루틴을 하나 더 추가했습니다. 결과를 보면 Receiver1과 Receiver2가 값을 나눠서 받고 있는 것을 확인할 수 있습니다.
팬인(Fan-in)
여러 개의 코루틴이 하나의 채널을 통해서 값을 송신할 수도 있습니다.
![](https://blog.kakaocdn.net/dn/TWx88/btsMfPqdEfh/FQ8aA1oXBnjhbvRzdpVef0/img.png)
팬아웃과 반대로 이번에는 송신하는 코루틴을 하나 더 추가하여 2개로 만들고, 수신하는 코루틴은 그대로 1개로 유지하도록 해보겠습니다. Channel_ex02.kt을 다음과 같이 수정하면 됩니다.
fun main() = runBlocking<Unit> {
val channel = Channel<Int>()
val producer1 = launch {
println("producer1: ${Thread.currentThread().name}")
for (i in 1..5) {
delay(300L)
channel.send(i)
println("Send1: $i")
}
}
val producer2 = launch {
println("producer2: ${Thread.currentThread().name}")
for (i in 1..5) {
delay(600L)
channel.send(i * 10)
println("Send2: ${i * 10}")
}
}
launch {
println("consumer: ${Thread.currentThread().name}")
for (value in channel) {
println("Receive: $value")
}
}
producer1.join()
producer2.join()
channel.close()
}
실행 결과는 다음과 같습니다.
producer1: main @coroutine#2
producer2: main @coroutine#3
consumer: main @coroutine#4
Send1: 1
Receive: 1
Send2: 10
Receive: 10
Send1: 2
Receive: 2
Send1: 3
Receive: 3
Send2: 20
Receive: 20
Send1: 4
Receive: 4
Send1: 5
Receive: 5
Send2: 30
Receive: 30
Send2: 40
Receive: 40
Send2: 50
Receive: 50
송신하는 코루틴은 기존과 동일하게 1~5를 순차적으로 보내는 것과 10을 곱해서 10~50을 순차적으로 보내는 것으로 총 2개입니다. 수신하는 코루틴은 여전히 1개입니다.
결과를 보면 하나의 채널을 통해서 2개의 송신자에서 보낸 값을 1개의 수신자에서 열심히 받고 있는 것을 알 수 있습니다.
채널을 편리하게 만드는 방법: produce()
produce() 함수는 내부적으로 ProducerScope를 구현하는 코루틴을 생성하여 반환합니다.
ProducerScope는 CoroutineScope와 SendChannel을 함께 상속받은 인터페이스입니다.
따라서 produce 빌더를 사용하여 코루틴을 생성하면 CoroutineScope와 SendChannel의 기능을 모두 사용할 수 있습니다. 참고로 produce() 함수로 생성한 채널은 ReceiveChannel입니다.
produce 빌더를 통해 생성된 코루틴은 이유에 관계없이 코루틴이 종료될 때 채널을 닫습니다.
따라서 채널을 사용할 때 적절히 닫지 않으면 무한 대기가 발생할 수 있는 문제점에서 벗어날 수 있습니다.
Channel_ex02.kt와 동일한 동작을 하는 예제를 produce() 함수를 사용해 구현하면 다음과 같습니다.
fun main() = runBlocking<Unit> {
val channel = produce<Int> {
println("producer: ${Thread.currentThread().name}")
for (i in 1..5) {
delay(300L)
send(i)
println("Send: $i")
}
}
launch {
println("consumer: ${Thread.currentThread().name}")
for (value in channel) {
println("Receive: $value")
}
}
}
파이프라인
파이프라인은 어떤 채널에서 수신한 값을 다른 채널로 전송하면서 (무한한) 값의 스트림을 생성할 수 있는 패턴입니다.
이 과정을 확인하기 위해서 producer1이라는 채널을 생성하여 1~5라는 값을 순차적으로 전송합니다.
그리고 그 값은 producer2라는 채널에서 수신되고, producer2는 해당 값에 10을 곱한 값을 다시 전송합니다.
결과적으로 그 값은 consumer 코루틴에서 수신하게됩니다.
코드는 다음과 같습니다.
fun main() = runBlocking<Unit> {
val producer1 = produce<Int> {
println("producer1: ${Thread.currentThread().name}")
for (i in 1..5) {
delay(300L)
send(i)
println("Send1: $i")
}
}
val producer2 = producer2(producer1)
launch {
println("consumer: ${Thread.currentThread().name}")
for (value in producer2) {
println("Receive2: $value")
}
}
}
fun CoroutineScope.producer2(numbers: ReceiveChannel<Int>) = produce {
println("producer2: ${Thread.currentThread().name}")
for (value in numbers) {
println("Receive1: $value")
send(value * 10)
println("Send2: ${value * 10}")
}
}
실행 결과는 다음과 같습니다.
producer1: main @coroutine#2
producer2: main @coroutine#3
consumer: main @coroutine#4
Send1: 1
Receive1: 1
Send2: 10
Receive2: 10
Send1: 2
Receive1: 2
Send2: 20
Receive2: 20
Send1: 3
Receive1: 3
Send2: 30
Receive2: 30
Send1: 4
Receive1: 4
Send2: 40
Receive2: 40
Send1: 5
Receive1: 5
Send2: 50
Receive2: 50
CoroutineScope의 확장 함수 producer2(numbers: ReceiveChannel<Int>)의 파라미터 타입을 보면 ReceiveChannel<Int>입니다.
produce() 함수로 생성한 채널은 ReceiveChannel이기 때문에 producer1을 인자로 남겨주는 것에 문제가 없는 것입니다.
아주 복잡한 과정은 아니지만 아래 그림을 보면 좀 더 명확합니다.
![](https://blog.kakaocdn.net/dn/cymb9J/btsMe4V0w32/y6KDJ6wChUE3zJ9IwGtR2K/img.png)
Channel과 Flow는 차이가 무엇인가요?
Channel이 flow와 다른 점은 직접 데이터를 생산하는 것이 아닌 전달의 개념이라는 것이 다른 점입니다.
Flow는 직접 값을 반환하지만, Channel은 값이 이동하는 통로입니다. 값을 받는 입장에서는 비슷해 보일 수 있지만, 데이터를 직접 만드는지 단순히 전달할 뿐인지의 차이가 있습니다.
또한, Flow는 데이터의 반환이 flow 블록 안에서만 이루어지지만, Channel은 참조할 수만 있다면 어디에서든 값을 보낼 수 있습니다.
마지막으로, Flow는 누군가가 값을 기다리고 있는지와 상관없이 값을 반환할 수 있지만, Channel은 receiver가 있어야만 값을 전달할 수 있습니다.
본질적으로 생산과 전달의 차이라고 봅니다.
Flow는 데이터 스트림을 생성하고 이를 소비하는 방식입니다. 반면, Channel은 데이터 전달에 중점을 둡니다.
Channel을 flow로 변환할때, receiveAsFlow와 consumeAsFlow의 차이는 무엇인가요?
receiveAsFlow와 consumeAsFlow는 Kotlin의 Channel을 Flow로 변환하는 데 사용되는 두 함수입니다.
이 두 함수의 주요 차이점을 아래와 같이 정리할 수 있습니다.
1. 채널 소비 & 재사용 여부
consumeAsFlow
- 채널을 소비하며, Flow의 수집이 끝나거나 에러가 발생하면 채널이 자동으로 닫힙니다.
- Flow 수집 후 채널이 닫히기 때문에 재사용할 수 없습니다.
suspend fun main() {
// 버퍼가 없는 채널. 수신자가 receive 를 호출하기 전까지 send 를 호출해도 대기한다
runBlocking {
val channel = Channel<Int>(Channel.BUFFERED)
// 데이터를 채널에 보냄
channel.send(1)
channel.send(2)
// consumeAsFlow를 사용하여 Flow로 변환
val flow = channel.consumeAsFlow()
// Flow를 수집 (테스트를 위해 2개만 수집하고 collect 중단)
flow.take(2).collect { value ->
println("Collected: $value")
}
println("Is channel closed? ${channel.isClosedForSend}") // true
channel.send(3)
}
}
<결과>
Collected: 1
Collected: 2
Is channel closed? true
Exception in thread "main" kotlinx.coroutines.flow.internal.AbortFlowException: Flow was aborted, no more elements needed
receiveAsFlow
- 채널을 소비하지 않으며, Flow 수집 후에도 채널은 계속 열려 있습니다.
- Flow 수집 후에도 채널이 열려 있으므로 재사용할 수 있습니다.
- 채널을 직접 닫아야 하며, 닫지 않으면 자원 누수가 발생할 수 있습니다.
suspend fun main() {
// 버퍼가 없는 채널. 수신자가 receive 를 호출하기 전까지 send 를 호출해도 대기한다
runBlocking {
val channel = Channel<Int>(Channel.BUFFERED)
// 데이터를 채널에 보냄
channel.send(1)
channel.send(2)
// consumeAsFlow를 사용하여 Flow로 변환
val flow = channel.receiveAsFlow()
// Flow를 수집 (테스트를 위해 2개만 수집하고 collect 중단)
flow.take(2).collect { value ->
println("Collected: $value")
}
println("Is channel closed? ${channel.isClosedForSend}") // false
channel.send(3)
}
}
<결과>
Collected: 1
Collected: 2
Is channel closed? false
2. 수집자(Collector) 제한
consumeAsFlow
- Flow는 단일 수집자만 허용합니다. 여러 수집자가 동시에 수집을 시도하면 에러가 발생합니다.
suspend fun main() {
// 버퍼가 없는 채널. 수신자가 receive 를 호출하기 전까지 send 를 호출해도 대기한다
runBlocking {
val receiveChannel = Channel<Int>(Channel.BUFFERED)
// 데이터를 채널에 보냄
receiveChannel.send(1)
receiveChannel.send(2)
receiveChannel.send(3)
receiveChannel.send(4)
// receiveAsFlow로 Flow를 생성
val flow = receiveChannel.consumeAsFlow()
// 첫 번째 수집자
launch {
flow.collect { value ->
println("Collector 1: $value")
delay(1000)
}
}
// 두 번째 수집자
launch {
flow.collect { value ->
println("Collector 2: $value")
delay(1000)
}
}
}
}
<결과>
Collector 1: 1
Exception in thread "main" java.lang.IllegalStateException: ReceiveChannel.consumeAsFlow can be collected just once
receiveAsFlow
- 여러 수집자가 데이터를 수집할 수 있습니다. 단, 각 데이터는 단일 수집자에게만 전달됩니다(팬아웃 방식).
suspend fun main() {
// 버퍼가 없는 채널. 수신자가 receive 를 호출하기 전까지 send 를 호출해도 대기한다
runBlocking {
val receiveChannel = Channel<Int>(Channel.BUFFERED)
// 데이터를 채널에 보냄
receiveChannel.send(1)
receiveChannel.send(2)
receiveChannel.send(3)
receiveChannel.send(4)
// receiveAsFlow로 Flow를 생성
val flow = receiveChannel.receiveAsFlow()
// 첫 번째 수집자
launch {
flow.collect { value ->
println("Collector 1: $value")
delay(1000)
}
}
// 두 번째 수집자
launch {
flow.collect { value ->
println("Collector 2: $value")
delay(1000)
}
}
}
}
<결과>
Collector 1: 1
Collector 2: 2
Collector 1: 3
Collector 2: 4
3. 선택 기준
consumeAsFlow를 사용할 때:
- 채널 데이터를 한 번만 수집하면 충분할 때.
- 채널 정리를 자동으로 처리하고 싶을 때.
receiveAsFlow를 사용할 때:
- 여러 수집자가 필요하거나, Flow 수집 후 채널을 재사용해야 할 때.
- Activity 또는 Fragment와 같은 라이프사이클 변화로 인해 수집이 중단될 가능성이 있을 때.
예를 들어, repeatOnLifecycle을 사용하여 Flow의 collect를 수집하는 경우, Activity가 백그라운드로 이동하면 collect가 중단될 수 있습니다. 이때, consumeAsFlow를 사용했다면 Flow 수집이 중단되면서 채널이 자동으로 닫히기 때문에, 포그라운드로 돌아왔을 때 채널을 재사용할 수 없습니다. 반면, receiveAsFlow를 사용하면 채널이 닫히지 않고 계속 열려 있으므로, 라이프사이클에 관계없이 채널을 재사용할 수 있습니다.
Channel을 데이터로 가져오는 방법은 어떤 것들이 있고, 그 중 flow를 통한 방법은 언제 사용하는 것이 좋을까요?
채널의 데이터를 가져오는 방법은 receive 이외에도 여러 가지 방법이 존재합니다.
1. receive를 통해서 하나의 값 받기
2. for-loop, consumeEach를 통해서 값 전체 하나씩 받기
3. receiveAsFlow, consumeAsFlow를 이용하여 데이터를 Flow 형식으로 받기
데이터를 가공하지 않고 그대로 사용한다면, consumeAsFlow를 굳이 사용할 필요가 없습니다. 만약, 데이터를 조작하기 위해
find, map, filter와 같은 메서드를 사용해야 한다면 consumeAsFlow를 사용해주면 됩니다.
정리
오늘은 Channel 의 종류에 대해 알아보았네요. 개념에 대해서는 딱 정리하고 가면 좋겠습니다.
- Channel 은 Coroutine 간에 데이터를 주고 받기 위해 만들어진 인터페이스다
- Channel 은 데이터를 제공하는 생산자, 데이터를 소비하는 소비자가 있다
- Channel은 송신자와 수신자의 수에 제한이 없고, 전송한 모든 값은 한 번만 수신할 수 있다.
- Channel 송신 측에서 보낸 값이 수신되지 않으면 대기 상태가 된다. (send()가 receive()를 만나지 못하면 suspend 된다.)
- Channel은 수신 측에서는 값을 받지 않으면 대기 상태가 된다. (receive()가 send()를 만나지 못하면 suspend 된다.)
- Channel은 채널에서 값을 모두 전송한 후에 채널을 닫아야 한다.
- Channel 은 Buffer 에 대한 정책을 다양하게 지정할 수 있다
- Channel 에서 BufferOverflow 발생시에 대한 정책을 다양하게 지정할 수 있다
- 팬아웃: 여러 개의 코루틴이 하나의 채널을 통해서 값을 수신할 수 있다.
- 팬인: 여러 개의 코루틴이 하나의 채널을 통해서 값을 송신할 수도 있다.
- produce 빌더를 통해 생성된 코루틴은 이유에 관계없이 코루틴이 종료될 때 채널을 닫는다.
참고자료
https://huisam.tistory.com/entry/coroutine-channel#Capacity%20%EC%A0%95%EC%B1%85-1
[Kotlin] Coroutine - 5. 코루틴의 Channel 의 모든 것
안녕하세요~! Coroutine 에서 제공하는 다양한 Interfa ce 중에서 Queue 와 비슷한 개념을 가진 Channel 에 대해서 알아볼려고 해요 Channel 이란 Channel 은 쉽게 말씀드리면 데이터를 stream 처럼 전송하기 위
huisam.tistory.com
https://brunch.co.kr/@mystoryg/237
코루틴 채널 (Channel)
코틀린 코루틴(14) | 코루틴 채널 (Channel) 채널 기본 개념 코루틴 간 값(Data)을 전달하려면 어떻게 해야 할까? 채널(Channel)을 사용하면 된다. 채널은 기본적으로 다음과 같은 구조를 가진다. 송신하
brunch.co.kr
https://jaejong.tistory.com/65
[Kotlin] 코루틴 #5 - Channel (채널)
코루틴 #5 - Channel (채널) 값의 흐름을 전송하는 Channel(채널)에 대해 작성 Coroutine 이전 글 코루틴 #1 - 기본 코루틴 #2 - CoroutineContext와 CoroutineScope란? 코루틴 #3 - suspend Function (중단함수) 코루틴 #4 - Co
jaejong.tistory.com
https://medium.com/@wind.orca.pe/channel-and-flow-kotlin-coroutines-b43f0854bd16
Channel and Flow — Kotlin Coroutines
Korean recap
medium.com
https://medium.com/@appdevinsights/difference-between-consumeasflow-and-receiveasflow-a3db5b9cdb6c
Difference between consumeAsFlow and receiveAsFlow
Both consumeAsFlow and receiveAsFlow are functions in Kotlin that convert a receive channel into a flow, which is a powerful tool for…
medium.com
'Android > Coroutine' 카테고리의 다른 글
[Android] 예외로인한 코루틴 종료 과정 살펴보기 (0) | 2025.02.10 |
---|---|
[Android] SupervisorJob, SupervisorScope에 대한 이해 (0) | 2025.02.09 |
[Android] Callback, Reactive 그리고 Coroutine (0) | 2025.02.06 |
[Android] 중단(suspend)함수를 비동기적으로 실행하는 방법도 있을까? (0) | 2025.02.05 |
[Android] 중단(suspend)함수란 무엇이고 어떻게 동작할까? (feat. delay()) (1) | 2025.02.05 |