멀티 스레드 처리는 애플리케이션의 퍼포먼스에 큰 이점을 주지만 경쟁 상태(race condition)를 적절히 조절해 주지 않는다면 데이터의 손실을 야기할 수 있습니다.
- 경쟁 상태란 여러 개의 스레드가 하나의 공유/변경 가능한 자원에 접근하는 것을 말합니다.
대표적으로 코루틴에서, Dispatchers.Default 등의 멀티 스레드 dispatcher를 사용하면 여러 개의 코루틴을 동시에 실행할 수 있습니다. 이 과정에서 여러 동시성 문제가 발생할 수 있습니다.
따라서, 데이터 손실을 막기 위해서는 동기화(synchronization)를 통해 race condition을 제어하여 올바른 멀티 스레드 환경을 구축해야 합니다.
문제 정의
아래 코드는 Synchronization(이하 동기화) 없이 동시에 변수에 접근하여 수정하는 예제를 보여줍니다.
suspend fun massiveRun(action: suspend () -> Unit) {
val n = 100 // 코루틴을 실행할 횟수
val k = 1000 // 각 코루틴마다 action을 수행할 횟수
val time = measureTimeMillis {
coroutineScope { // 코루틴!
repeat(n) {
launch {
repeat(k) { action() }
}
}
}
}
println("Completed ${n * k} actions in $time ms")
}
개의 코루틴을 실행하여 각 코루틴마다 action을 massiveRun()함수는 nk번 실행하는 시간을 측정하는 함수입니다.
이제 shared mutable에 동기화 없이 접근해보겠습니다.
var counter = 0
fun main() = runBlocking {
withContext(Dispatchers.Default) {
massiveRun {
counter++
}
}
println("Counter = $counter")
}
Completed 100000 actions in 26 ms
Counter = 51746
counter를 번 실행했지만 결과는 .
여러 스레드가 동기화 없이 마구잡이로 접근하니까 제대로 실행되지 않는 것입니다.
즉, 경쟁상태(RaceCondition)로 인해 여러 코루틴이 counter 값을 동시에 읽고 수정하며, 일부 수정이 덮어써지기 때문이죠
Thread-safe한 방법
AtomicInteger를 활용한 Thread-safe한 변수 관리
일반적인 해결법 중 하나로 thread-safe한 자료구조를 사용하는 방법이 있습니다.
Thread-safe란, 동시에 최대 하나의 스레드만 변수에 접근할 수 있도록 자체적으로 제어하는 변수를 의미합니다.
따라서 thread-safe한 변수를 사용하면 여러 개의 스레드가 동시에 접근해도 변수를 동기화할 수 있다.
예를 들어 코틀린에는 Int의 thread-safe 타입 AtomicInteger가 존재합니다. incrementAndGet 연산을 사용하면 AtomicInteger를 thread-safe하게 증가시킬 수 있습니다.
val counter = AtomicInteger()
fun main() = runBlocking {
withContext(Dispatchers.Default) {
massiveRun {
counter.incrementAndGet()
}
}
println("Counter = $counter")
}
Completed 100000 actions in 25 ms
Counter = 100000
counter 값이 예상대로 100,000으로 출력됩니다.
AtomicInteger 외에도 AtomicReference<T>에 변수를 넣어 사용할 수 있습니다.
val counter = AtomicReference(0)
suspend fun massiveRun(action: suspend () -> Unit) {
val n = 100
val k = 1000
val time = measureTimeMillis {
coroutineScope {
repeat(n) {
launch {
repeat(k) { action() }
}
}
}
}
println("Completed ${n * k} actions in $time ms")
}
fun main() = runBlocking {
withContext(Dispatchers.Default) {
massiveRun {
counter.getAndUpdate { it + 1 }
}
}
println("Counter = $counter")
}
하지만 AtomicInteger나 AtomicReference는 간단한 연산이나 상태 관리에는 적합하지만, 적용할 연산이 복잡한 경우를 처리할 수 없다.
코드를 근본적으로 thread-safe하게 만들어야 합니다.
Thread confinement: fine-grained
Thread confinement 는 하나의 스레드를 통해서만 변수에 접근할 수 있도록 하는 해결법을 말합니다. 예를 들어 안드로이드에서는 메인 스레드에서만 UI를 갱신할 수 있습니다.
위의 코드에 적용하려면 단일 스레드 문맥에서만 변수에 접근하도록 하면 됩니다.
val counterContext = newSingleThreadContext("CounterContext") // 단일 스레드 문맥
var counter = 0
fun main() = runBlocking {
withContext(Dispatchers.Default) {
massiveRun {
// 이 부분만 문맥을 counterContext로 바꿔서 실행
withContext(counterContext) {
counter++
}
}
}
println("Counter = $counter")
}
Completed 100000 actions in 1077 ms
Counter = 100000
문제는 해결했지만 실행 시간이 매우 느립니다. 값을 증가시킬 때마다 Dispatchers.Default 문맥을 counterContext로 바꿔서 실행하기 때문입니다.
문맥을 10만 번이나 바꾸기 때문에 느릴 수밖에 없는것이죠
Thread confinement: coarse-grained
Coarse-grained 방식은 실행 전체를 단일 스레드 문맥에서 수행하여, 불필요한 문맥 전환을 없애는 방식입니다.
사실상 위 예제에서 Dispatchers.Default은 사용되지 않는 문맥입니다. 애초에 실행 자체를 counterContext에서 하면 불필요한 문맥 교환을 막을 수 있을 것입니다.
val counterContext = newSingleThreadContext("CounterContext")
var counter = 0
fun main() = runBlocking {
// 실행 자체를 단일 스레드 문맥에서 하면 된다.
withContext(counterContext) {
massiveRun {
counter++
}
}
println("Counter = $counter")
}
Completed 100000 actions in 37 ms
Counter = 100000
모든 실행이 단일 스레드 Context에서 이루어지므로 Context Switching이 발생하지 않습니다.
따라서, 실행속도가 빨라지게됩니다.
Mutual exclusion
Mutual exclusion(상호 배제)이란 공유 변수에 접근하는 코드를 critical section 으로 보호하여 여러 스레드가 동시에 공유 자원에 접근하지 못하도록 막는 기법을 말합니다.
일반적으로는 synchronized나 Mutex를 사용하여 동시성 문제를 해결할 수 있습니다.
synchronized
synchronized는 Java 및 Kotlin에서 동기화 메서드 또는 블록을 제공하여 상호 배제를 구현합니다.
한 번에 하나의 스레드만 synchronized에 접근할 수 있도록 보장하며, 스레드가 synchronized을 참조하는 동안 다른 스레드는 대기하게 됩니다.
var counter = 0
fun main() = runBlocking {
withContext(Dispatchers.Default) {
massiveRun {
// critical section
synchronized(this) {
counter++
}
}
}
println("Counter = $counter")
}
Completed 100000 actions in 37 ms
Counter = 100000
정상적으로 결과를 출력함을 확인할 수 있습니다.
하지만, synchronized는 아래와 같은 문제점이 있습니다.
- synchronized 블록 내부에서 중단 함수를 활용할 수 없다는 것입니다.
- synchronized 블록 내부에서 본인의 차례를 기다릴 때 쓰레드를 블록킹 합니다. -> 성능 저하
코루틴 환경에서 스레드를 블로킹(Blocking)하는 것은 지양해야 합니다. 따라서 블로킹 없이 중단(Suspend)하거나 충돌을 회피하는 방식을 사용하는 것이 좋습니다.
뮤텍스(Mutex)
뮤텍스(Mutex)는 동시성 문제를 방지하기 위해 공유 자원에 대한 접근을 제어하는 도구입니다.
코루틴 환경에서는 kotlinx.coroutines.sync.Mutex를 사용하여, 여러 코루틴이 동시에 하나의 공유 자원에 접근하지 못하도록 합니다.
뮤텍스는 lock과 unlock 함수를 사용하여 critical section 을 표현할 수 있습니다. Mutex.lock() 등은 모두 suspending 함수이기 때문에 스레드를 block하지 않습니다.
1. Lock과 Unlock을 직접 사용하는 방식
fun main() = runBlocking {
withContext(Dispatchers.Default) {
massiveRun {
// critical section
mutex.lock()
counter++
mutex.unlock()
}
}
println("Counter = $counter")
}
Completed 100000 actions in 249 ms
Counter = 100000
위 코드는 lock과 unlock를 직접사용하는 방식입니다.
하지만, lock()과 unlock() 사이에 예외가 발생하면, unlock()이 호출되지 않아 Deadlock(교착 상태)이 발생할 수 있습니다.
따라서, 다른 코루틴들이 해당 뮤텍스를 영원히 기다리게 됩니다.
2. withLock을 사용하여 안전하게 락 해제
Mutex.withLock 함수를 사용하면 자동으로 lock을 걸고 풀어줍니다.
블록 내부에서 예외가 발생하더라도 락을 반드시 해제하므로, 수동으로 락 해제를 관리할 때 발생할 수 있는 실수를 방지할 수 있습니다.
val mutex = Mutex()
var counter = 0
fun main() = runBlocking {
withContext(Dispatchers.Default) {
massiveRun {
// critical section
mutex.withLock {
counter++
}
}
}
println("Counter = $counter")
}
Completed 100000 actions in 245 ms
Counter = 100000
위와 같이 작성하면, 명시적으로 unlock()을 해주지 않아도 되며, 예외가 발생하더라도 락이 해제되어 Deadlock 문제를 막을 수 있습니다.
synchronized보다실행 시간이 느린 이유는 코루틴을 fine-grained 방식으로 실행했기 때문입니다.
하지만 특정 스레드에 종속되지 않은 변수를 주기적으로 갱신할 필요가 있는 경우 등, 특정한 상황에서는 사용해 볼 만 합니다.
주의사항
뮤텍스는 코루틴 환경에서 스레드 블로킹 없이 안전하게 동기화를 처리할 수 있는 유용한 도구이지만,
잘못된 사용 방식은 대기 상태(Deadlock)을 초래할 수 있습니다.
이를 방지하기 위해 아래와 같은 상황은 반드시 피해야합니다.
- withLock 함수 내 delay() 또는 yield()와 같은 중단함수 사용
- 코루틴이 락을 두번 통과하는 경우
1. withLock 내에서 중단 함수 사용
withLock 블록 내에서 delay()와 같은 중단 함수(suspend function)를 호출하면 무한 대기 상태(deadlock)에 빠질 수 있습니다.
fun main() = runBlocking {
withContext(Dispatchers.Default) {
massiveRun {
// critical section
mutex.lock()
counter++
delay(1000)
mutex.unlock()
}
}
println("Counter = $counter")
}
delay()가 호출되면, 현재 스레드가 반환되고, 이후 다른 스레드에서 코루틴이 재개됩니다.
뮤텍스는 락을 획득한 스레드에서만 해제할 수 있기 때문에, 이 상황에서는 락이 영원히 해제되지 않습니다.
결과적으로, 다른 코루틴들이 락을 기다리며 멈춰버립니다.
따라서, withLock 블록 내부에서는 delay()와 같은 중단 함수 호출을 피하는 것이 중요합니다.
2. 뮤텍스를 두 번 잠그기
동일한 코루틴이 뮤텍스를 중첩해서 잠그면 무한 대기 상태(deadlock)에 빠질 수 있습니다.
val mutex = Mutex()
var counter = 0
fun main() = runBlocking {
withContext(Dispatchers.Default) {
massiveRun {
// critical section
mutex.withLock {
mutext.withLock {
counter++
}
}
}
}
println("Counter = $counter")
}
뮤텍스는 이미 락을 획득한 상태에서 다시 잠그려고 하면 무한 대기 상태에 빠집니다.
이는 다음과 같은 비유로 이해할 수 있습니다.
- 열쇠가 문 안쪽에 있다: 첫 번째 mutex.withLock으로 문을 잠그고 안으로 들어감.
- 다른 문도 같은 열쇠를 요구한다.: 두 번째 mutex.withLock 호출이 내부에서 같은 열쇠를 요구.
- 열쇠가 문 안쪽에 있기 때문에 잠금을 해제할 수 없음: 결국 두 번째 호출이 영원히 대기하게 됨
따라서, 동일한 코루틴에서 여러 번 잠글 필요가 없도록 설계하고, 중첩된 WithLock 호출을 피해야합니다.
synchronized와 차이점은 뭐죠?
뮤텍스는 스레드를 블로킹(Blocking)하는 대신 코루틴을 중단(Suspend)시킨다는 것입니다.
synchronized는 스레드를 직접 블로킹하기 때문에, 대기 상태에서도 스레드가 비어 있는 상태로 유지되며, 이는 CPU 리소스를 낭비하게 됩니다.
Mutex는 락을 기다리는 코루틴을 중단(suspend)시켜, 해당 스레드를 다른 작업에 활용할 수 있도록 합니다.
결과적으로 시스템의 병렬 처리 능력이 향상됩니다.
Mutex는 코루틴 환경에서 락을 처리할 때 더 안전하고 가벼운 대안입니다. 스레드를 블로킹하지 않기 때문에 리소스 낭비를 줄이고, 코루틴의 비차단적 특성을 유지하며, 병렬 처리가 필요한 상황에서 더 나은 성능을 제공합니다.
세마포어 (Semaphore)
세마포어(Semaphore)는 동시에 여러 개의 접근을 허용하는 동기화 도구로, 동시 실행 가능 작업의 수를 제한할 때 사용됩니다.
이는 Mutex와 유사하게 작동하지만, Mutex는 한 번에 하나의 접근만 허용하는 반면, Semaphore는 지정된 개수의 접근을 허용합니다.
suspend fun main() = coroutineScope {
val semaphore = Semaphore(2)
repeat(5) {
launch {
semaphore.withPermit {
delay(1000)
print(it)
}
}
}
}
// 01
// (1초 후)
// 23
// (1초 후)
// 4
세마포어는 공유 상태로 인한 생기는 문제를 해결할 수 있는 없지만, 동시에 처리할 작업 수를 제한하여 시스템의 안정성을 유지하거나 과부하를 방지하는 데 유용합니다.
예를 들어, 네트워크 동시 요청을 제한하여 서버 과부하를 방지하려면 다음과 같이 사용할 수 있습니다
class LimitedNetworkUserRepository(
private val api: UserApi,
) {
// 동시 요청을 10개로 제한합니다.
private val semaphore = Semaphore(10)
suspend fun requestUser(userId: String) = semaphore.withPermit {
api.requestUser(userId)
}
}
Actors
Actor는 코루틴의 묶음으로, private 상태(변수)와 다른 코루틴과 의사소통할 수 있는 channel로 구성된 객체입니다. Actor는 메시지 기반의 비동기 처리 모델을 제공하며, 상태를 안전하게 관리할 수 있도록 설계되었습니다.
이 방식은 특히 공유 상태를 다룰 때 유용하며, 스레드 동기화를 걱정하지 않아도 됩니다.
actor 코루틴 빌더를 사용하면 메시지를 수신할 mailbox channel과 결과를 돌려줄 send channel을 간편하게 정의할 수 있습니다. 따라서 actor가 반환한 객체만을 가지고도 actor를 조작할 수 있습니다.
Actor를 사용하려면 먼저 actor가 처리할 메시지를 정의해야 하며, 이는 코틀린의 sealed class를 사용하면 좋습니다.
여러 개의 메시지 타입을 sealed class를 통해 하나로 묶고, when 문을 이용하여 메시지를 타입에 따라 처리할 수 있습니다.
말로 하면 어렵지만 코드를 보면 이해할 수 있을 것입니다.
sealed class CounterMsg // 모든 메시지 타입의 부모 클래스
object IncCounter : CounterMsg() // 변수를 1 증가시키라는 메시지
class GetCounter(val response: CompletableDeferred<Int>) : CounterMsg() // 변수의 값을 돌려달라는 메시지
CompletableDeferred를 사용하면 값 하나를 언젠가 돌려주겠다는 약속을 표현할 수 있습니다.
Actor도 코루틴이므로 본질적으로 비동기적이며, 값을 즉시 반환하지 못할 경우가 있을 수 있기 때문입니다.
이제 actor를 정의해보겠습니다.
// 위에서 정의한 메시지를 처리하는 actor 정의
fun CoroutineScope.counterActor() = actor<CounterMsg> {
var counter = 0 // 변수 (state)
for (msg in channel) { // 들어오는 메시지를 처리한다.
when (msg) {
is IncCounter -> counter++
is GetCounter -> msg.response.complete(counter)
}
}
}
이제 main 함수에서 actor를 실행해 보겠습니다.
fun main() = runBlocking<Unit> {
val counter = counterActor() // actor 생성
withContext(Dispatchers.Default) {
massiveRun {
counter.send(IncCounter)
}
}
// actor로부터 값을 받는다.
val response = CompletableDeferred<Int>()
counter.send(GetCounter(response))
println("Counter = ${response.await()}")
counter.close() // actor 종료
}
Completed 100000 actions in 708 ms
Counter = 100000
Actor는 메시지를 순차적으로 처리하기 때문에 자연스럽게 변수에는 한 번에 하나의 작업만 수행됩니다.
또, actor는 오직 메시지를 통해서만 서로 의사소통할 수 있습니다.
Actor 내부 상태는 항상 동일한 문맥(Context)에서 실행되므로, 스레드 문맥 전환에 대한 고민이 필요하지 않습니다. 문맥이 바뀌지 않기 때문에 실행 시간도 더 빨라지게 됩니다.
actor 코루틴 빌더는 produce 코루틴빌더의 형제 입니다. 액터는 메시지를 수신할 채널과 관련이 있는 반면 프로듀서는 데이터를 전송할 채널과 관련이 있습니다.
결론
성능 비교 결과, 동시성 처리 방법의 성능 순서는 다음과 같았습니다.
synchronized = singleThread(coarse-grained) > mutex > actors > singleThread(fine-grained)
동시성을 처리하는 방법은 여러가지가 있지만, 각각의 장단점이 있기 때문에, 단순히 성능만을 기준으로 선택하기보다는 상황과 요구 사항에 맞게 적절한 방법을 선택하는 것이 중요한 것 같습니다.
정리해보면, 아래와 같이 정리해볼 수 있을 것 같습니다.
- 락 경쟁이 적고 단순한 공유 자원 관리에는 synchronized (네이티브 JVM 락 최적화) 또는 singleThread 방식이 효과적입니다.
- 코루틴 환경에서 비동기 처리를 최적화하려면 Mutex가 적합합니다.
- 상태 공유를 완전히 배제하거나 메시지 기반으로 설계할 경우 actor가 유리합니다.
가장 많이 쓰이는 방식은 싱글스레드로 제한 된 coarse-grained 디스패쳐를 사용한 방식인 것 같습니다. (synchronized는 다른 스레드까지 Block할 수 있는 단점이 있기 때문에)
참고자료
https://thinking-face.tistory.com/141
[Kotlin] Coroutines - Shared Mutable State and concurrency
목차 문제 정의 Volatile? Thread-safe가 필요하다 Thread confinement: fine-grained Thread confinement: coarse-grained Mutual exclusion Actors 참고 문헌 Dispatchers.Default 등의 멀티 스레드 dispatcher를 사용하면 여러 개의 코루
thinking-face.tistory.com
[Kotlin Coroutines Deep Dive] Chapter 02. 코틀린 코루틴 라이브러리 (11장 - 14장)
코루틴 스코프와 디스패쳐 그리고 원자성 관리
velog.io
코루틴 공식 가이드 자세히 읽기 — Part 8
공식 가이드 읽기 (8 / 8)
myungpyo.medium.com
'Kotlin' 카테고리의 다른 글
[Kotlin] 불변성(Immutable)과 가변성(Mutable) (5) | 2024.11.07 |
---|---|
[Kotlin] 확장(Extension) 함수 (5) | 2024.11.06 |
[Kotlin] 코틀린에서의 Null 처리 방법 (4) | 2024.11.05 |
[Kotlin] 컬렉션(Collection) 함수 (9) | 2024.11.04 |
[Kotlin] by란? (0) | 2024.11.02 |