KotlinのSharedFlowを図で理解する

この記事をシェア

Androidアプリのアーキテクチャをちょっと真面目に考え始めたときにぶつかる壁の一つが、Flowだと思います。Flowの概念がなんとなく分かってくると次に混乱するのが、SharedFlowやStateFlowなどのFlowの種類による違いではないかと思います。今回はSharedFlowの動作を図を使いながらなるべくわかりやすく説明してみます。

Emitter/Subscriber と Producer/Consumer

Flowについて考えるときには必ず、登場人物として値を出力する側と受け取る側が出てきます。kotlinlang.orgでは出力側を「Emitter」、受け取る側を「Subscriber」と呼んでいます。一方でAndroid Developersでは出力側を「Producer」、受け取る側を「Consumer」と呼んでいます。これらは似たような意味ですが、感覚的には、Flowそのものについて考えるときはEmitter/Subscriber、Flowを利用してデータをやり取りするモジュール(UIとリポジトリなど)を考えるときはProducer/Consumerと呼ぶのがふさわしいのだろうと思います。今回はFlow自体の説明ですので、Emitter/Subscriberで統一して説明します。

EmitterからSubscriberへデータが流れる

SharedFlowの重要なパラメータ3つ

SharedFlowオブジェクトはMutableSharedFlow()で作成します。この関数の3つのパラメータが、SharedFlowの挙動を決めます。

fun <T> MutableSharedFlow(
    replay: Int = 0, 
    extraBufferCapacity: Int = 0, 
    onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
): MutableSharedFlow<T>

replay

replayは、後から追加されたSubscriberに対して出力するために値を保持しておくバッファのサイズです。

デフォルトのreplay = 0の場合、emitした値はその時点で存在しているSubscriberにだけ出力されます。Subscriberが存在しない場合は値はどこにも出ていきません。

replay = 0

replay = 1の場合、Subscriberが存在してもしなくても、最後にemitされた値が1つSharedFlow内に保持されます。そして、Subscriberが新たに収集を開始すると、バッファに保持されている値がまず出力され、その後はEmitterからemitされたタイミングで出力されます。

replay = 1

この動作は次のようなソースコードで確認できます。SharedFlowに1秒ごとに値をemitします。collectする前に1.5秒のdelayを入れると、replay = 0の場合は最初にemitした値は取得されませんが、replay = 1の場合はcollectした時点で最初の値を取得できていることが分かります。(※たまたま開いていたアプリのViewModelのコードで実験したのでCoroutineScopeがviewModelScopeになっています。深い意味はないです。)

val sharedFlow = MutableSharedFlow<Int>(
    replay = 0, // or 1
    extraBufferCapacity = 0,
    onBufferOverflow = BufferOverflow.SUSPEND
)
var emitValue = 1

init {
    viewModelScope.launch {
        delay(1500)
        sharedFlow.onEach {
            Log.d("FlowSample", "collect $it")
        }.collect()
    }
    viewModelScope.launch {
        repeat(3) {
            delay(1000)
            Log.d("FlowSample", "emit $emitValue")
            sharedFlow.emit(emitValue)
            emitValue++
        }
    }
}
replay = 0の場合
replay = 1の場合

extraBufferCapacity

extraBufferCapacityはemitされた値を格納しておくバッファのサイズです。Subscriberの処理を待っている間、このバッファに値をためておくことができます。

このバッファに空きがある場合はemit()がsuspendせずに完了します。

バッファに空きがある場合はsuspendしない

バッファに空きがない場合で、かつ次に説明するonBufferOverflowSUSPENDを指定している場合は、Subscriberが値を取り出してバッファに空きができるまでemit()がsuspendして完了しません。Emitterが頻繁に値を更新する場合や、Subscriberが値を取り出すのに時間がかかる場合などは要注意です。

バッファに空きがない場合はsuspendする

この動作は次のコードで確認できます。先ほどと同様に1秒ごとにemitしますが、collect側に3秒のディレイがあります。最初のemitはextraBufferCapacityの値に関係なくすぐにreturnしますが、2回目のemitがreturnするまでの時間が変わります。extraBufferCapacity = 1の場合はすぐにreturnしますが、extraBufferCapacity = 0の場合は1つ目のcollectが完了するまで待たされます。

val sharedFlow = MutableSharedFlow<Int>(
    replay = 0,
    extraBufferCapacity = 1, // or 0
    onBufferOverflow = BufferOverflow.SUSPEND
)
var emitValue = 1

init {
    viewModelScope.launch {
        sharedFlow.onEach {
            delay(3000)
            Log.d("FlowSample", "collect $it")
        }.collect()
    }
    viewModelScope.launch {
        repeat(2) {
            delay(1000)
            Log.d("FlowSample", "emit $emitValue")
            sharedFlow.emit(emitValue)
            Log.d("FlowSample", "emit return $emitValue")
            emitValue++
        }
    }
}
extraBufferCapacity = 1の場合
extraBufferCapacity = 0の場合

なおemitされた値を格納しておく場所という意味ではreplayも同じ働きをします。extraBufferCapacity + replayの数に到達するまでは、emit()はsuspendしません。

onBufferOverflow

onBufferOverflowはバッファが一杯(extraBufferCapacity + replayに等しい数の値がすでにemitされている)の状態でさらにemit()した場合の挙動を指定します。

デフォルトのSUSPENDは、バッファが一杯の場合、subscriberが値を取り出すまでsuspendします。

SUSPEND

DROP_LATESTは、バッファが一杯の場合、一番新しい値を捨てます。つまりバッファが一杯の間にemitされた値は破棄されます。その代わりにemitはsuspendせずすぐに処理から戻ります。

DROP_LATEST

DROP_OLDESTは、バッファが一杯の場合、バッファのデータの中で一番古いデータを破棄し、空いたバッファに値を保持します。emitはsuspendせずすぐに処理から戻ります。

DROP_OLDEST

これも実際のコードで試してみます。先ほどとほぼ同じコードで、replay = 1, extraBufferCapacity = 0として、onBufferOverflowの値を変えながら動作を見てみます。SUSPENDの場合は3回目のemitの戻りが遅くなる代わりに、すべての値がcollectできています。一方でDROP_LATESTDROP_OLDESTの場合はemitはすぐ戻る代わりに、collectできていない値があることが分かります。

val sharedFlow = MutableSharedFlow<Int>(
    replay = 1,
    extraBufferCapacity = 0,
    onBufferOverflow = BufferOverflow.SUSPEND // or DROP_LATEST or DROP_OLDEST
)
var emitValue = 1

init {
    viewModelScope.launch {
        sharedFlow.onEach {
            delay(3000)
            Log.d("FlowSample", "collect $it")
        }.collect()
    }
    viewModelScope.launch {
        repeat(3) {
            delay(1000)
            Log.d("FlowSample", "emit $emitValue")
            sharedFlow.emit(emitValue)
            Log.d("FlowSample", "emit return $emitValue")
            emitValue++
        }
    }
}
SUSPENDの場合
DROP_LATESTの場合
DROP_OLDESTの場合

なお、DROP_LATESTDROP_OLDESTはその性質上、バッファサイズが1以上でなければなりません。replay = 0かつextraBufferCapacity = 0DROP_LATESTDROP_OLDESTを指定してMutableSharedFlow()を呼び出すと、例外が発生します。

SharedFlowはHot Flow

SharedFlowは「Hot」Flowです。Hot FlowはSubscriberが存在しなくてもアクティブになります。SubscriberとEmitterが独立して動けるので、UI層から独立して動作するデータ層で使うのに適しています。ただ、上で説明してきたように、Subscriberが存在しない間にFlowにemitしたデータがどうなるのかを考えておく必要があります。

emitはsuspend関数

MutableSharedFlowのemitはsuspend関数ですので、CoroutineScopeから呼び出す必要があります。上で説明した通り、状況によっては処理に時間がかかるためです。

abstract suspend override fun emit(value: T)

ただし、tryEmitという関数もあります。こちらは通常の関数なのでCoroutineScope以外からも呼び出せます。

abstract fun tryEmit(value: T): Boolean

emitがsuspendせずに完了する状況であれば、この関数は成功します(trueを返す)。emitがsuspendする状況(onBufferOverflowSUSPENDで、バッファが一杯)ではこの関数は失敗します(falseを返す)。バッファが一杯のときはemitしようとした最新の値が破棄されるので、実質的にはDROP_LATESTと同じ動きになります。

この記事をシェア