基本的にMutableStateFlow使っておけば良いと思いますが、LiveDataのように変更が無くても常に値を流したいという要件がありました。
以下のようなFlowが欲しい
最新の値を取得できる
これは StateFlow
の特徴ですね。
public interface StateFlow<out T> : SharedFlow<T> { /** * The current value of this state flow. */ public val value: T }
値が変更できる
これは MutableStateFlow
や MutableSharedFlow
の特徴ですね。
値がセットされたら前との比較の結果に関わらず値が送られる
これはAndroidのLiveDataや MutableSharedFlow
の特徴ですね。
MutableStateFlow
は compareAndSet
を実装しなければならず、これが要りませんね。
public interface MutableStateFlow<T> : StateFlow<T>, MutableSharedFlow<T> { public override var value: T public fun compareAndSet(expect: T, update: T): Boolean }
その他
上記の機能はAndroidのLiveDataのようなものですね。
上記に加え、値を取得する側の処理が重かった場合に古い値を捨てて、最新の値だけ流れてくるようにしたいです。
作るFlow
値が更新できる Mutable
、常に値が送られる SharedFlow
。それに加え、最新の値が取得できる StateFlow
の特徴が欲しいわけです。
上記より、作るFlowは MutableSharedFlow
と StateFlow
を継承したものになります。
Flowを作る
MutableSharedFlow
の実装である SharedFlowImpl
を見て頂ければわかりますが、とても複雑です。これを再実装するのは面倒なのでラップしました。
名前は適当に LiveDataFlow
としました。
class LiveDataFlow<T>(initValue: T) : MutableSharedFlow<T>, StateFlow<T> { private val internalFlow = MutableSharedFlow<T>( replay = 1, onBufferOverflow = BufferOverflow.DROP_OLDEST ) init { internalFlow.tryEmit(initValue) } override var value get() = replayCache.last() set(value) { internalFlow.tryEmit(value) } override val subscriptionCount: StateFlow<Int> get() = internalFlow.subscriptionCount override val replayCache: List<T> get() = internalFlow.replayCache @InternalCoroutinesApi override suspend fun collect(collector: FlowCollector<T>) { internalFlow.collect(collector) } override suspend fun emit(value: T) { internalFlow.emit(value) } @ExperimentalCoroutinesApi override fun resetReplayCache() { internalFlow.resetReplayCache() } override fun tryEmit(value: T): Boolean { return internalFlow.tryEmit(value) } }
解説
MutableSharedFlow<T>(
replay = 1,
onBufferOverflow = BufferOverflow.DROP_OLDEST
)
replay = 1
を指定することで replayCache
にその分だけ値が入ります。最新1個だけを保持するという意味になります。
なので以下で最新の値を取得する事ができるようになります。
override val value: T get() = replayCache.last()
初期値が無いとnullになることがあるので初期値だけ設定します。
init {
internalFlow.tryEmit(initValue)
}
それから onBufferOverflow = BufferOverflow.DROP_OLDEST
を指定することで処理されていない古い値を捨てる事ができます。
BufferOverflow.SUSPEND
以外であれば tryEmit
は常に成功します。ついでに setter
も実装しました。
override var value set(value) { internalFlow.tryEmit(value) }
おわりに
後はAndroidのFragmentで購読したりして使用しています。
inline fun <T> Flow<T>.collectInLifecycle( lifecycleOwner: LifecycleOwner, crossinline action: suspend (value: T) -> Unit ) = lifecycleOwner.lifecycleScope.launchWhenStarted { collect { action(it) } }
val flow = LiveDataFlow(0) flow.collectInLifecycle(viewLifecycleOwner) { value -> }