[Android] SharedFlow와 StateFlow
Flow의 한계
Flow는 Kotlin에서 비동기적인 데이터 스트림을 처리하기 위해 사용되는 라이브러리입니다. 데이터를 비동기적으로 처리하고 연속된 값을 스트림 형태로 표현하는 데 유용합니다.
하지만 Flow는 데이터를 저장하는 기능을 제공하지 않기 때문에, 상태나 이벤트를 관리하려면 별도의 데이터 홀더를 만들어야 합니다. 즉, Flow는 데이터를 저장하는 것이 아니라 데이터의 흐름(Stream)을 관리하는 역할만 합니다.
이러한 한계를 해결하기 위해 Kotlin에서는 StateFlow와 SharedFlow라는 특별한 Flow가 등장합니다.
StateFlow는 상태(State)를 관리하는데 사용되며, SharedFlow는 이벤트(event)를 처리하는데 적합합니다.
우리는 애플리케이션을 개발할 때 필연적으로 “상태”와 “이벤트” 를 다루게 됩니다.
상태와 이벤트에 대해서 설명하며, StateFlow와 SharedFlow를 다뤄보도록 하겠습니다.
상태(State)
UI 애플리케이션에서 다루는 상태에 대해 생각 해봅시다.
우선 사용자의 인증 상태를 생각해 볼 수 있습니다. 사용자의 인증 상태는 크게 “로그아웃”, “로그인”, “계정 잠김” 등으로 정의 될 수 있으며 반드시 한가지 상태를 갖고 있습니다.
또 한가지는 화면에 표현하기 위한 데이터의 상태를 생각해 볼 수 있습니다.
이는 “로딩중”, “데이터 이용가능”, “오류” 등으로 정의 될 수 있으며 역시 반드시 한가지 상태를 갖고 있습니다.
애플리케이션은 필요한 상태 머신을 정의하고, UI 변경이나 비지니스 로직의 수행 등에 따라 상태를 전이해 가며 실행됩니다.
MVVM 아키텍쳐를 이용할 경우 보통 UI 와 관련된 “상태” 들은 ViewModel (VM) 에 위치하여 ViewModel 에 바인드 된 View 들이 상태를 표현할 수 있도록 합니다.
Kotlin 을 사용하여 이런 상태들을 정의 할 때 상태가 상태 자체로 의미가 있고 부가적인 데이터를 갖고 있지 않다면 Enum 을 이용하여 정의하기도 하고, 부가적인 데이터가 필요하다면 Sealed class 를 이용하여 정의하기도 합니다.
Android 에서는 ViewModel 에서 이런 상태 데이터를 다루기 위한 장치로 LiveData 를 제공하고 있습니다. LiveData 는 Android lifecycle 과 자연스럽게 연동되므로 편리하게 사용할 수 있습니다. 다만, LiveData 는 Lifecycle 이 있는 UI 와 인터렉션 하도록 디자인 되었으며 비지니스 레이어에서의 사용에는 무리가 있습니다. 특히, 비지니스 레이어를 플랫폼 독립적으로 가져가고자 한다면 더욱이 그렇습니다.
이 경우 과거에는 Reactive streams, 특히 Android 에서는 RxJava/Kotlin 구현을 사용하였습니다. RxJava 에서도 Subject 를 이용하여 이를 구현하였는데 다양한 타입의 Subject 가 있지만 기본 상태를 갖고 있으며 모든 구독자에게 최신 상태를 전달해주는 BehaviorSubject 가 “상태” 관리에 주로 사용 되었습니다.
이제 많은 Android Application 들이 Kotlin 으로 작성되고 있으므로 RxJava/Kotlin 의존성을 가져 오지 않더라도 경량화된 Reactive streams 구현체 라고 볼 수 있는 Flow 를 이용하여 이를 구현할 수 있습니다. Flow 는여타 Observable (RX) 과 같이 기본적으로 콜드 스트림 입니다.
즉 구독 시점에 스트림이 생성되어 데이터를 처음부터 수신합니다.
이를 RX 의 Subject 처럼 Hot Observable 로 제공해 주는 Flow 구현으로 StateFlow, SharedFlow 가 있습니다.
그 중에서 기본값을 가지고 모든 구독자에게 최신 상태를 전달해 주는 것은 StateFlow 입니다.
Android ViewModel 에서 Flow 의 사용 역시 AndroidX 를 통해 제공되는 Coroutine viewModelScope 를 이용하여 ViewModel lifecycle 에 바인딩 되어 편리하게 사용할 수 있습니다.
더욱이 많은 AndroidX 및 서드파티 라이브러리들이 중단 함수를 제공하고 있기 때문에 Flow 는 더욱 빛을 발할 수 있습니다.
Flow 가 Rx Observable 과 비교하여 갖는 강력한 장점 중 하나는 Flow chain 에서 중단함수를 사용할 수 있다는 것 입니다. 이것은 코드의 많은 부분에서 가독성을 높이고 유지보수를 용이하게 해줄 수 있습니다.
예를 들어, API 호출 후 DB에 저장하는 로직이 있다고 할 때 RxJava와 Flow는 가독성 측면에서, 확연한 차이를 보여줍니다.
RxJava의 경우
fun getUser(): Observable<User> {
return api.getUser() // Retrofit Observable
.flatMap { user ->
localDatabase.saveUser(user) // 중단함수 아님. 콜백 or 다른 Observable로 래핑 필요
.andThen(Observable.just(user)) // side effect 처리
}
}
문제점
- saveUser()가 suspend가 아니기 때문에, 콜백 기반으로 감싸거나 Completable로 래핑해야 함
- 비동기 호출임에도 명시적인 흐름을 깨지 않게 유지하려면 복잡한 조합이 필요함
Kotlin Flow의 경우
fun getUser(): Flow<User> = flow {
val user = api.getUser() // suspend 함수
localDatabase.saveUser(user) // suspend 함수
emit(user)
}
장점
- suspend 함수 (getUser(), saveUser())를 자연스럽게 Flow 체인 내에서 사용 가능
- 흐름이 간단하고 가독성 좋음
이처럼 Flow는 suspend 함수가 체인 내에서 자연스럽게 호출 가능하기 때문에, 코드가 간결하고 명확해집니다.
이제, StateFlow 를 사용하는 코드를 살펴볼까요?
@HiltViewModel
class MainViewModel(
private val refreshDataUseCase: RefreshDataUseCase,
@Assisted private val savedStateHandle: SavedStateHandle,
) : ViewModel() {
private val _dataState: MutableStateFlow<UiState<List<MembershipUiModel>>>
= MutableStateFlow(UiState.InProgress())
val dataState = _dataState.asStateFlow()
fun refreshData() = viewModelScope.launch {
runCatching {
refreshDataUseCase.execute()
}.onSuccess { data ->
_dataState.emit(UiState.Success(data))
}.onFailure { throwable ->
_dataState.emit(UiState.Fail(throwable))
}
}
}
refreshDataUseCase 는 비지니스 레이어에서 데이터를 동기화하고 최신 데이터를 리스트 형태로 가져온다고 생각합시다.
먼저 6~8 번 라인에서 StateFlow 를 정의하고 있습니다.
_dataState 는 ViewModel 내부에서 사용하기 위한 변경 가능한 (Mutable) Flow 이고 ViewModel 외부로 노출하기 위한 dataState 는 위에서 생성한 Flow 를 asStateFlow() 함수를 이용하여 Immutable Flow 로 변환한 Flow 입니다.
11 번 라인의 refreshData() 가 호출되면 ViewModel 스코프에서 코루틴이 생성되어 UseCase 의 중단함수를 호출하게 되고, 결과가 도착하면 성공/실패 에 따라 적당한 데이터로 변환하여 Flow 로 전달하게 되고, 이것은 dataState 를 구독하고 있는 UI 를 갱신하게 만듭니다.
지금까지 StateFlow 에 대해서 간략하게 살펴보았습니다.
SharedFlow 는 무엇일까요?
이벤트(Event)
앞서 살펴본 것처럼 우리는 애플리케이션 개발을 할 때 필요한 상태 머신을 정의하고, 애플리케이션은 정의 된 상태들로 전환해 가면서 실행됩니다. 이러한 애플리케이션 “상태” 와 함께 우리가 애플리케이션을 개발할 때 고려해야 하는 다른 한가지는 바로 “이벤트” 입니다.
상태 머신은 늘 기본 상태를 갖고 동작하지만 이벤트는 기본값 없이 특정 상황이 발생했을 때 구독자들에게 발생한 상황을 이벤트라는 형태로 전달합니다. 그러면 구독자의 입장에서 상태와 이벤트는 어떻게 다를까요?
- 상태는 기본값이 있지만, 이벤트는 기본값이 없습니다.
- 상태는 신규 구독 시 가장 최근 값을 받지만, 이벤트는 구독 이후 발생한 값을 받습니다.
(위 내용은 기본적으로 UI 애플리케이션에서 상태와 이벤트에 대한 일반적인 내용이며 각 프레임워크에서 제공하는 상태나 이벤트 매커니즘의 옵션 설정을 통해 동작은 변경될 수 있습니다.)
이러한 이벤트는 어떤 경우에 사용될까요?
일반적으로 UI Layer 에서 사용자와 뷰의 인터렉션으로 인해 발생한 이벤트를 정의하여 전달하거나, 시스템에 발생 한 메모리 부족, 인증 오류 발생 등의 이벤트에 관련 컴포넌트들이 대응할 수 있도록 하기 위해 사용합니다.
SharedFlow 를 사용하는 코드를 잠시 살펴봅시다.
@HiltViewModel
class MainViewModel(
@Assisted private val savedStateHandle: SavedStateHandle,
) : ViewModel() {
private val _systemEvent: MutableSharedFlow<Unit> =
MutableSharedFlow(replay = 0, extraBufferCapacity = 1, onBufferOverflow = BufferOverflow.DROP_OLDEST)
val systemEvent = _systemEvent.asSharedFlow()
init {
viewModelScope.launch {
systemEvent.collect { systemEvent ->
when(systemEvent) {
is SystemEvent.MemoryWarning -> { TODO() }
is SystemEvent.StorageWarning -> { TODO() }
else -> { }
}
}
}
}
fun reportSystemEvent(systemEvent: SystemEvent) {
_systemEvent.emit(systemEvent)
}
}
전체적인 구조는 StateFlow 와 유사합니다. 다만 MutableSharedFlow 생성 시 몇 가지 옵션을 전달하여 SharedFlow 의 동작을 재정의 하고 있습니다.
replay
collect시 전달받을 이전 데이터의 개수를 지정합니다.
replay가 0이라면 collect 시점에 담겨있던 데이터부터 전달받을 수 있습니다. 만약 1이라면 collect 시점 직전의 데이터부터 전달받으며 시작합니다. 만약 2라면 현재 데이터 이전 두개의 데이터 부터 전달받으면서 시작하게 됩니다.
예를 들어 0~9까지 emit 되는 state flow에서 3이 emit 되어 시점에 collect가 시작된다면
ex) 0->1->2->3-> collect 시작->4->5->6->7->8->9
- replay = 0 일 때: 4부터 수신 시작
- replay = 1 일때: 3부터 수신 시작
- replay = 4 이상 일때: 0부터 수신 시작
extraBufferCapacity
buffer 개수 설정을 설정합니다.
flow의 emit이 빠르고 collect가 느릴 때 지정된 개수만큼 buffer에 저장되며 저장된 개수가 넘어가면 onBufferOverflow에 설정된 정책에 따라 동작하게 됩니다.
onBufferOverflow
Buffer가 다 찼을 때의 동작을 정의합니다. 이는 channel에서 사용하는 buffer의 정의와 동일합니다.
- BufferOverflow.SUSPEND : buffer가 꽉 찼을 때 emit을 수행하면 emit 코드가 blocking 됩니다. 즉, buffer의 빈자리가 생겨야 emit 코드 이후의 코드가 수행될 수 있습니다.
- BufferOverflow.DROP_OLDEST: buffer가 꽉 찼을 때 emit을 수행하면 오래된 데이터 부터 삭제하면서 새로운 데이터를 넣습니다.
- BufferOverflow.DROP_LATEST: buffer가 꽉찼을때 emit을 수행하면 최근 데이터를 삭제하고 새로운 데이터를 넣습니다.
위 세가지 옵션을 통해 우리가 원하는 SharedFlow 를 생성할 수 있으며, 이는 우리가 RxJava/Kotlin 에서 사용하던 PublishSubject 와 유사하다고 할 수 있습니다.
실제 내부 정의를 보면 아래와 같이 api가 설정되어 있습니다.
public fun <T> MutableSharedFlow(
replay: Int = 0,
extraBufferCapacity: Int = 0,
onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
): MutableSharedFlow<T> {
require(replay >= 0) { "replay cannot be negative, but was $replay" }
require(extraBufferCapacity >= 0) { "extraBufferCapacity cannot be negative, but was $extraBufferCapacity" }
require(replay > 0 || extraBufferCapacity > 0 || onBufferOverflow == BufferOverflow.SUSPEND) {
"replay or extraBufferCapacity must be positive with non-default onBufferOverflow strategy $onBufferOverflow"
}
val bufferCapacity0 = replay + extraBufferCapacity
val bufferCapacity = if (bufferCapacity0 < 0) Int.MAX_VALUE else bufferCapacity0 // coerce to MAX_VALUE on overflow
return SharedFlowImpl(replay, bufferCapacity, onBufferOverflow)
}
replay의 기본값은 0이므로 collect 시점 이전에 전달된 데이터는 받지 않습니다.
또한 buffer size도 0이므로 buffer를 사용하지 않으며, buffer 사용 정책이 BufferOverflow.SUSPEND 이므로 collect가 emit보다 늦게 처리된다면 emit 부분이 block 되어 지연이 발생될 수 있습니다.
ColdFlow를 HotFlow(SharedFlow, StateFlow)로 변환하는 방법
coldFlow를 hotFlow로 변환하기 위해서 Flow의 extension function으로 stateIn과 sharedIn이 존재합니다.
이 두 개의 api는 하나의 flow에서 (single instance upstream) 방출되는 결과를 여러 곳에서 동일하게 수신하는 경우 용이하게 사용될 수 있습니다.
특히 cold flow의 생성 자체에 무거운 작업이 포함되어 있거나 유지하는 작업 자체의 비용이 많이 드는 경우 여러 곳에서 collect를 할때 매번 생성하여 사용하지 않고 하나의 flow만 생성하고, collect는 여러곳에서 수신받도록 구현합니다.
각각의 함수 정의는 아래와 같습니다.
1. shareIn
shareIn 함수는 콜드 플로우를 공유 가능한 핫 플로우로 변환하기 위한 확장 함수입니다.
이를 통해 플로우를 여러 소비자가 동시에 사용할 수 있도록 캐싱하거나 데이터를 재활용할 수 있습니다.
suspend fun main(): Unit = coroutineScope {
val flow = flowOf("A", "B", "C")
.onEach { delay(1000) }
val sharedFlow: SharedFlow<String> = flow.shareIn(
scope = this,
started = SharingStarted.Eagerly,
// replay = 0 (default)
)
delay(500)
launch {
sharedFlow.collect { println("#1 $it") }
}
delay(1000)
launch {
sharedFlow.collect { println("#2 $it") }
}
delay(1000)
launch {
sharedFlow.collect { println("#3 $it") }
}
}
//(1초 후)
//#1 A
//(1초 후)
//#1 B
//#2 B
//(1초 후)
//#1 C
//#2 C
//#3 C
shareIn은 다음과 같은 인자를 받습니다.
- scope: shareIn으로 변환된 핫 플로우가 실행될 코루틴 범위를 지정하며, 이 범위가 종료되면 핫 플로우도 종료된다.
- started: 공유 시작 시점과 종료 조건을 결정하는 전략을 설정한다.
- replay: 가장 최근의 값들을 캐싱할 수 있는 갯수를 설정한다.
fun <T> Flow<T>.shareIn(
scope: CoroutineScope,
started: SharingStarted,
replay: Int = 0
): SharedFlow<T>
두 번째 인자인 started는 다음과 같은 옵션을 지원하며, SharingStarted 인터페이스를 구현하여 커스텀화된 전략을 정의하는 것도 가능합니다.
- SharingStarted.Eagerly: 즉시 값을 감지하기 시작하며, 시작하기 전에 값이 나오면 유실될 수 있습니다.
suspend fun main(): Unit = coroutineScope {
val flow = flowOf("A", "B", "C")
val sharedFlow: SharedFlow<String> = flow.shareIn(
scope = this,
started = SharingStarted.Eagerly,
)
delay(100)
launch {
sharedFlow.collect { println("#1 $it") }
}
print("Done")
}
//(0.1초 후)
//Done
- SharingStarted.Lazily: 첫 구독자가 생길 때 감지하기 시작하며, 모든 값을 수신하는 것이 보장됩니다.
suspend fun main(): Unit = coroutineScope {
val flow1 = flowOf("A", "B", "C")
val flow2 = flowOf("D")
.onEach { delay(1000) }
val sharedFlow = merge(flow1, flow2).shareIn(
scope = this,
started = SharingStarted.Lazily,
)
delay(100)
launch {
sharedFlow.collect { println("#1 $it") }
}
delay(1000)
launch {
sharedFlow.collect { println("#2 $it") }
}
}
//(0.1초 후)
//#1 A
//#1 B
//#1 C
//(1초 후)
//#1 D
//#2 D
- SharingStarted.WhileSubscribed(timeoutMillis: Long): 첫 번재 구독자가 나올때 감지하기 시작, 마지막 구독자가 사라지면 플로우도 멈추게 됩니다. 멈추고 난 후 timeoutMillis 이전에 새로운 구독자가 나오면 플로우가 다시 시작됩니다.
suspend fun main(): Unit = coroutineScope {
val flow = flowOf("A", "B", "C", "D")
.onStart { println("Started") }
.onCompletion { println("Finished") }
.onEach { delay(1000) }
val sharedFlow = flow.shareIn(
scope = this,
started = SharingStarted.WhileSubscribed(),
)
delay(3000)
launch {
println("#1 ${sharedFlow.first()}")
}
launch {
println("#2 ${sharedFlow.take(2).toList()}")
}
delay(3000)
launch {
println("#3 ${sharedFlow.first()}")
}
}
//(3초 후)
//Started
//(1초 후)
//#1 A
//(1초 후)
//#2 [A, B]
//Finished
//(3초 후)
//Started
//(1초 후)
//#3 A
//Finished
2. stateIn
stateIn은 Flow<T>를 StateFlow<T>로 변환하는 함수이며 두 가지 형태가 있습니다.
suspend fun <T> Flow<T>.stateIn(
scope: CoroutineScope
): StateFlow<T>
fun <T> Flow<T>.stateIn(
scope: CoroutineScope,
started: SharingStarted,
initialValue: T
): StateFlow<T>
StateFlow는 항상 값을 가져야 합니다.
중단 함수 형태의 stateIn을 사용할 때는, 값을 명시하지 않았다면 첫 번째 값이 계산될 때까지 기다려야 합니다.
suspend fun main(): Unit = coroutineScope {
val flow = flowOf("A", "B", "C")
.onEach { delay(1000) }
.onEach { println("Produced $it") }
val stateFlow: StateFlow<String> = flow.stateIn(this)
println("Listening")
println(stateFlow.value)
stateFlow.collect {
println("Received $it")
}
}
//(1초 후)
//Produced A
//Listening
//A
//Received A
//(1초 후)
//Produced B
//Received B
//(1초 후)
//Produced C
//Received C
stateIn의 두 번째 형태는 초기 값과 started 모드를 지정해야 한다.
suspend fun main(): Unit = coroutineScope {
val flow = flowOf("A", "B")
.onEach { delay(1000) }
.onEach { println("Produced $it") }
val stateFlow: StateFlow<String> = flow.stateIn(
scope = this,
started = SharingStarted.Lazily,
initialValue = "Empty"
)
println(stateFlow.value)
delay(2000)
stateFlow.collect {
println("Received $it")
}
}
//Empty
//(2초 후)
//Received Empty
//(1초 후)
//Produced A
//Received A
//(1초 후)
//Produced B
//Received B
StateFlow, SharedFlow와 CallbackFlow 비교
Flow에는 여러 가지 종류가 있으며, 그중 StateFlow, SharedFlow, CallbackFlow는 각각 다른 방식으로 데이터를 전달하는 역할을 합니다. Flow의 계층 구조를 살펴보면, Flow를 기반으로 SharedFlow가 만들어졌고, StateFlow는 SharedFlow를 확장하여 구현된 구조를 가집니다.
SharedFlow는 여러 구독자에게 동일한 데이터를 전달하는 Hot Flow입니다.
기본적으로 버퍼 크기가 0으로 설정되어 있기 때문에, 새로운 구독자는 이전에 방출된 데이터를 받을 수 없습니다.
하지만 replay 값을 설정하면 새로운 구독자도 일정 개수만큼의 이전 데이터를 받을 수 있도록 조정할 수 있습니다.
또한, SharedFlow는 활성화된 모든 구독자에게 데이터를 전달하므로, 이벤트 스트림을 관리할 때 유용합니다.
StateFlow는 SharedFlow를 기반으로 만들어진 Hot Flow이지만, 상태(state)를 유지하는 특성을 갖습니다. 생성 시 반드시 초기값을 설정해야 하며, 새로운 구독자는 항상 최신 상태 값을 받을 수 있습니다.
또한, StateFlow는 이전 값과 비교하여 동일한 값이 들어오면 emit을 호출하지 않는 특징이 있습니다. 이러한 특성 덕분에 UI 상태 관리 등에 적합하며, 상태 변경이 발생할 때만 구독자들에게 알릴 수 있습니다.
반면, CallbackFlow는 Cold Flow의 성격을 가지며, 구독자가 없으면 비활성화된 상태가 됩니다. 일반적으로 비동기 콜백을 Flow로 변환하는 용도로 사용됩니다.
지금까지 간략하게 살펴본 Flow 들을 애플리케이션 개발 시 상황에 따라 적절하게 사용한다면 더 간결하고 직관적인 코드를 작성할 수 있을 것이라 생각합니다.
참고자료
https://myungpyo.medium.com/stateflow-%EC%99%80-sharedflow-32fdb49f9a32
StateFlow 와 SharedFlow
코루틴 공식 가이드 읽기 Part 9 — Dive1
myungpyo.medium.com
https://augustin26.tistory.com/104
[코틀린 코루틴] 공유플로우와 상태플로우
공유플로우 공유플로우를 통해 메세지를 보내면 대기하고 있는 모든 코루틴이 수신하게 된다.suspend fun main(): Unit = coroutineScope { val mutableSharedFlow = MutableSharedFlow(replay = 0) // or MutableSharedFlow() launch
augustin26.tistory.com
https://tourspace.tistory.com/434#google_vignette
[Coroutine] State flow vs Shared flow with case study
Flow를 사용하면서 유용하게 사용할 수 있는 state flow와 shared flow가 다른 점과 각각 어떤 상황에서 적합한지를 알기 위하여 두 개의 특성을 비교하려고 합니다. Flow builder로 생성한 flow들은 기본적
tourspace.tistory.com
https://dodobest.tistory.com/116
Flow, Channel, ChannelFlow, CallbackFlow
Flow와 Channel의 목적 Flow : 데이터를 생성하고 처리하는데 사용할 수 있습니다.Channel : 코루틴 간에 데이터를 주고 받는데 사용할 수 있습니다. Flow & Channel 비교 Flow는 비동기적으로 생성된 데이
dodobest.tistory.com