アプリ開発備忘録

PlayStationMobile、Android、UWPの開発備忘録

【好きなFlowを作成する】LiveDataのような挙動の上位互換のFlowを作成する

基本的にMutableStateFlow使っておけば良いと思いますが、LiveDataのように変更が無くても常に値を流したいという要件がありました。

以下のようなFlowが欲しい

最新の値を取得できる

これは StateFlow の特徴ですね。

public interface StateFlow<out T> : SharedFlow<T> {
    /**
     * The current value of this state flow.
     */
    public val value: T
}

値が変更できる

これは MutableStateFlowMutableSharedFlow の特徴ですね。

値がセットされたら前との比較の結果に関わらず値が送られる

これはAndroidのLiveDataや MutableSharedFlow の特徴ですね。

MutableStateFlowcompareAndSet を実装しなければならず、これが要りませんね。

public interface MutableStateFlow<T> : StateFlow<T>, MutableSharedFlow<T> {
    public override var value: T
    public fun compareAndSet(expect: T, update: T): Boolean
}

その他

上記の機能はAndroidのLiveDataのようなものですね。
上記に加え、値を取得する側の処理が重かった場合に古い値を捨てて、最新の値だけ流れてくるようにしたいです。

作るFlow

値が更新できる Mutable 、常に値が送られる SharedFlow 。それに加え、最新の値が取得できる StateFlow の特徴が欲しいわけです。
上記より、作るFlowは MutableSharedFlowStateFlow を継承したものになります。

Flowを作る

MutableSharedFlow の実装である SharedFlowImpl を見て頂ければわかりますが、とても複雑です。これを再実装するのは面倒なのでラップしました。
名前は適当に LiveDataFlow としました。

class LiveDataFlow<T>(initValue: T) : MutableSharedFlow<T>, StateFlow<T> {
    private val internalFlow = MutableSharedFlow<T>(
        replay = 1,
        onBufferOverflow = BufferOverflow.DROP_OLDEST
    )

    init {
        internalFlow.tryEmit(initValue)
    }

    override var value
        get() = replayCache.last()
        set(value) {
            internalFlow.tryEmit(value)
        }

    override val subscriptionCount: StateFlow<Int> get() = internalFlow.subscriptionCount
    override val replayCache: List<T> get() = internalFlow.replayCache

    @InternalCoroutinesApi
    override suspend fun collect(collector: FlowCollector<T>) {
        internalFlow.collect(collector)
    }

    override suspend fun emit(value: T) {
        internalFlow.emit(value)
    }

    @ExperimentalCoroutinesApi
    override fun resetReplayCache() {
        internalFlow.resetReplayCache()
    }

    override fun tryEmit(value: T): Boolean {
        return internalFlow.tryEmit(value)
    }
}

解説

MutableSharedFlow<T>(
    replay = 1,
    onBufferOverflow = BufferOverflow.DROP_OLDEST
)

replay = 1 を指定することで replayCache にその分だけ値が入ります。最新1個だけを保持するという意味になります。
なので以下で最新の値を取得する事ができるようになります。

override val value: T get() = replayCache.last()

初期値が無いとnullになることがあるので初期値だけ設定します。

init {
    internalFlow.tryEmit(initValue)
}

それから onBufferOverflow = BufferOverflow.DROP_OLDEST を指定することで処理されていない古い値を捨てる事ができます。
BufferOverflow.SUSPEND 以外であれば tryEmit は常に成功します。ついでに setter も実装しました。

override var value
    set(value) {
        internalFlow.tryEmit(value)
    }

おわりに

後はAndroidのFragmentで購読したりして使用しています。

inline fun <T> Flow<T>.collectInLifecycle(
    lifecycleOwner: LifecycleOwner,
    crossinline action: suspend (value: T) -> Unit
) = lifecycleOwner.lifecycleScope.launchWhenStarted {
    collect {
        action(it)
    }
}
val flow = LiveDataFlow(0)
flow.collectInLifecycle(viewLifecycleOwner) { value ->
}