Androidアプリのアーキテクチャをちょっと真面目に考え始めたときにぶつかる壁の一つが、Flowだと思います。Flowの概念がなんとなく分かってくると次に混乱するのが、SharedFlowやStateFlowなどのFlowの種類による違いではないかと思います。今回はSharedFlowの動作を図を使いながらなるべくわかりやすく説明してみます。
目次
Emitter/Subscriber と Producer/Consumer
Flowについて考えるときには必ず、登場人物として値を出力する側と受け取る側が出てきます。kotlinlang.orgでは出力側を「Emitter」、受け取る側を「Subscriber」と呼んでいます。一方でAndroid Developersでは出力側を「Producer」、受け取る側を「Consumer」と呼んでいます。これらは似たような意味ですが、感覚的には、Flowそのものについて考えるときはEmitter/Subscriber、Flowを利用してデータをやり取りするモジュール(UIとリポジトリなど)を考えるときはProducer/Consumerと呼ぶのがふさわしいのだろうと思います。今回はFlow自体の説明ですので、Emitter/Subscriberで統一して説明します。
SharedFlowオブジェクトはMutableSharedFlow()
で作成します。この関数の3つのパラメータが、SharedFlowの挙動を決めます。
fun <T> MutableSharedFlow(
replay: Int = 0,
extraBufferCapacity: Int = 0,
onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
): MutableSharedFlow<T>
replay
replay
は、後から追加されたSubscriberに対して出力するために値を保持しておくバッファのサイズです。
デフォルトのreplay = 0
の場合、emitした値はその時点で存在しているSubscriberにだけ出力されます。Subscriberが存在しない場合は値はどこにも出ていきません。
replay = 1
の場合、Subscriberが存在してもしなくても、最後にemitされた値が1つSharedFlow内に保持されます。そして、Subscriberが新たに収集を開始すると、バッファに保持されている値がまず出力され、その後はEmitterからemitされたタイミングで出力されます。
この動作は次のようなソースコードで確認できます。SharedFlowに1秒ごとに値をemitします。collectする前に1.5秒のdelayを入れると、replay = 0
の場合は最初にemitした値は取得されませんが、replay = 1
の場合はcollectした時点で最初の値を取得できていることが分かります。(※たまたま開いていたアプリのViewModelのコードで実験したのでCoroutineScopeがviewModelScopeになっています。深い意味はないです。)
val sharedFlow = MutableSharedFlow<Int>(
replay = 0, // or 1
extraBufferCapacity = 0,
onBufferOverflow = BufferOverflow.SUSPEND
)
var emitValue = 1
init {
viewModelScope.launch {
delay(1500)
sharedFlow.onEach {
Log.d("FlowSample", "collect $it")
}.collect()
}
viewModelScope.launch {
repeat(3) {
delay(1000)
Log.d("FlowSample", "emit $emitValue")
sharedFlow.emit(emitValue)
emitValue++
}
}
}
extraBufferCapacity
extraBufferCapacity
はemitされた値を格納しておくバッファのサイズです。Subscriberの処理を待っている間、このバッファに値をためておくことができます。
このバッファに空きがある場合はemit()
がsuspendせずに完了します。
バッファに空きがない場合で、かつ次に説明するonBufferOverflow
にSUSPEND
を指定している場合は、Subscriberが値を取り出してバッファに空きができるまでemit()
がsuspendして完了しません。Emitterが頻繁に値を更新する場合や、Subscriberが値を取り出すのに時間がかかる場合などは要注意です。
この動作は次のコードで確認できます。先ほどと同様に1秒ごとにemitしますが、collect側に3秒のディレイがあります。最初のemitはextraBufferCapacityの値に関係なくすぐにreturnしますが、2回目のemitがreturnするまでの時間が変わります。extraBufferCapacity = 1
の場合はすぐにreturnしますが、extraBufferCapacity = 0
の場合は1つ目のcollectが完了するまで待たされます。
val sharedFlow = MutableSharedFlow<Int>(
replay = 0,
extraBufferCapacity = 1, // or 0
onBufferOverflow = BufferOverflow.SUSPEND
)
var emitValue = 1
init {
viewModelScope.launch {
sharedFlow.onEach {
delay(3000)
Log.d("FlowSample", "collect $it")
}.collect()
}
viewModelScope.launch {
repeat(2) {
delay(1000)
Log.d("FlowSample", "emit $emitValue")
sharedFlow.emit(emitValue)
Log.d("FlowSample", "emit return $emitValue")
emitValue++
}
}
}
なおemitされた値を格納しておく場所という意味ではreplay
も同じ働きをします。extraBufferCapacity + replay
の数に到達するまでは、emit()
はsuspendしません。
onBufferOverflow
onBufferOverflow
はバッファが一杯(extraBufferCapacity + replay
に等しい数の値がすでにemitされている)の状態でさらにemit()
した場合の挙動を指定します。
デフォルトのSUSPEND
は、バッファが一杯の場合、subscriberが値を取り出すまでsuspendします。
DROP_LATEST
は、バッファが一杯の場合、一番新しい値を捨てます。つまりバッファが一杯の間にemitされた値は破棄されます。その代わりにemitはsuspendせずすぐに処理から戻ります。
DROP_OLDEST
は、バッファが一杯の場合、バッファのデータの中で一番古いデータを破棄し、空いたバッファに値を保持します。emitはsuspendせずすぐに処理から戻ります。
これも実際のコードで試してみます。先ほどとほぼ同じコードで、replay = 1, extraBufferCapacity = 0
として、onBufferOverflow
の値を変えながら動作を見てみます。SUSPEND
の場合は3回目のemitの戻りが遅くなる代わりに、すべての値がcollectできています。一方でDROP_LATEST
やDROP_OLDEST
の場合はemitはすぐ戻る代わりに、collectできていない値があることが分かります。
val sharedFlow = MutableSharedFlow<Int>(
replay = 1,
extraBufferCapacity = 0,
onBufferOverflow = BufferOverflow.SUSPEND // or DROP_LATEST or DROP_OLDEST
)
var emitValue = 1
init {
viewModelScope.launch {
sharedFlow.onEach {
delay(3000)
Log.d("FlowSample", "collect $it")
}.collect()
}
viewModelScope.launch {
repeat(3) {
delay(1000)
Log.d("FlowSample", "emit $emitValue")
sharedFlow.emit(emitValue)
Log.d("FlowSample", "emit return $emitValue")
emitValue++
}
}
}
なお、DROP_LATEST
とDROP_OLDEST
はその性質上、バッファサイズが1以上でなければなりません。replay = 0
かつextraBufferCapacity = 0
でDROP_LATEST
やDROP_OLDEST
を指定してMutableSharedFlow()
を呼び出すと、例外が発生します。
SharedFlowは「Hot」Flowです。Hot FlowはSubscriberが存在しなくてもアクティブになります。SubscriberとEmitterが独立して動けるので、UI層から独立して動作するデータ層で使うのに適しています。ただ、上で説明してきたように、Subscriberが存在しない間にFlowにemitしたデータがどうなるのかを考えておく必要があります。
emitはsuspend関数
MutableSharedFlowのemi
tはsuspend関数ですので、CoroutineScopeから呼び出す必要があります。上で説明した通り、状況によっては処理に時間がかかるためです。
abstract suspend override fun emit(value: T)
ただし、tryEmit
という関数もあります。こちらは通常の関数なのでCoroutineScope以外からも呼び出せます。
abstract fun tryEmit(value: T): Boolean
emit
がsuspendせずに完了する状況であれば、この関数は成功します(trueを返す)。emit
がsuspendする状況(onBufferOverflow
がSUSPEND
で、バッファが一杯)ではこの関数は失敗します(falseを返す)。バッファが一杯のときはemitしようとした最新の値が破棄されるので、実質的にはDROP_LATEST
と同じ動きになります。