Flow系列文章:
前言
StateFlow也是一个热流,可以看着是SharedFlow的一种特殊情况,他的表现形式和创建一个replay = 0,extBufferCapacity,onBufferOverflow = BufferOverflow.DROP_OLDEST一样。相比于SharedFlow来说其内部单面要简单很多,建议先看了SharedFlow再回来学习StateFlow,StateFlow使用用于跟踪状态的一个热流,在使用场景上和LiveData很像,LiveData和组件生命周期有关,StateFlow不受组件生命周期影响,StateFlow设计出来不是用来取代LiveData的,而是用来取代 ConflatedBroadcastChannel 的。StateFlow是线程安全的。
相对于SharedFlow来说,StateFlow创建的时候需要提供一个初始值,发送数据的时候不会存在挂起的情况,相对于LiveData来说,StateFlow除了不受组件生命周期影响,它还能过滤掉重复值,意思就是只有新值和旧值不一样时StateFlow才会把值交给订阅者。
其他冷流可以通过stateIn
操作符转换成一个StateFlow。
Slot的四种状态
基于StateFlow过于简单,就不做其他详细接受,本文直接进行源码分析,在进入源码分析之前,先提出一个关键问题,也是核心问题?
Q:StateFlow是如何实现当一个订阅者取到数据后,是怎么确定是把自己挂起还是继续取数据,订阅者挂起后,StateFlow中的值更新后是如何唤醒订阅者继续取新值的?
A: 为了解决这个问题,StateFlow中定义了一个StateFlowSlot
类(在ShardFlow中详细介绍过Slot),一个Slot代表了一个订阅者,每当新来一个订阅者的时候,就会为其分配(创建一个新的或者复用之前已经闲置的(free))一个Slot对象与其绑定该类有一个 _state:atomic<Any?>
属性,_state
记录了订阅者的详细状态,StateFlow全靠状态类决定订阅者的行为,一共有以下几种状态:
null
: 表明该Slot对象是一个闲置的(free),即还没有和订阅者绑定在一起。NONE
: 表明该Slot已经和某一个订阅者绑定了,并且这个订阅者没有被挂起,有可能还没开始取数据,也有可能取到数据正在处理数据,也有可能已经处理完数据了(但是还未挂起)。向SlotFlow发送一个数据后,如果发现订阅者的状态为NONE
就会把状态变为PENDING
,告知订阅者数据更新了,这时候订阅者如果还没开始取数据就会去到StateFlow中的最新数据,如果过订阅自已经取走更新之前的数据(正在处理阶段),那么处理完成数据后就不需要挂起,继续取最新的数据,如果订阅者已经处理完数据(还未挂起),那就同样不需要挂起,继续取新数据。PENDING
:这个单词的意思是待处理的意思,表明了订阅者取到数据,处理完数据后,不能被挂起,因为StateFlow的值更新了,有新值待处理。反之订阅者取完数据后如果_state
的值不是PENDING
说明StateFlow的数据没有更新,需要把订阅者挂起。每当向StateFlow中发送一个新数据时,如果发现订阅者处于PENDING
转态,就不用做任何操作。处于PENDING
说明订阅者时没有挂起的,订阅者消费完数据后会继续消费数据,继续消费就会把新发的数据消费掉,因此不用去干预订阅者。CancellableContinuationImpl
:一个协成对象,表明了订阅者被挂起了。每当向StateFlow中发送一个新值时,如果发现订阅者的状态为挂起时就会把订阅者唤醒取新值。
StateFlowSlot
private class StateFlowSlot : AbstractSharedFlowSlot<StateFlowImpl<*>>() {
/**
* 官方原文解释,比较精简,意思也表达的比较清楚。
* Each slot can have one of the following states:
*
* * `null` -- it is not used right now. Can [allocateLocked] to new collector.
* * `NONE` -- used by a collector, but neither suspended nor has pending value.
* * `PENDING` -- pending to process new value.
* * `CancellableContinuationImpl<Unit>` -- suspended waiting for new value.
*
* It is important that default `null` value is used, because there can be a race
* between allocationo f a new slot and trying to do [makePending] on this slot.
*
*/
private val _state = atomic<Any?>(null)
//新来订阅者时调用,把Slot的状态设置为NONE,相当于把订阅自和这个Slot对象绑定在一块了,该Slot对象
//将被新来的订阅者所使用
override fun allocateLocked(flow: StateFlowImpl<*>): Boolean {
//如果_state.value不为null。说明这个Slot正在被别的订阅者使用,因此不能被当前订阅者绑定。
if (_state.value != null) return false // not free
//设置状态为NONE,
_state.value = NONE // allocated
return true //返回ture,订阅者和Slot绑定成功。
}
//当订阅者取消时调用该函数,把订阅者和Slot对象解绑,意味着这个Slot对象不在继续跟踪被
//取消的这个订阅者的状态,把Slot变成闲置(free),供后来的新订阅者继续复用。
override fun freeLocked(flow: StateFlowImpl<*>): Array<Continuation<Unit>?> {
_state.value = null // free now
//这里之所以要返回一个empty array。是因为freeLocked函数是在AbstractSharedFlowSlot中定义的。
//SharedFlow中这个函数会返回emit函数挂起的协成。
return EMPTY_RESUMES // nothing more to do
}
//当向StateFlow中发送新值时会调用该函数,主要做两件事情:
//1.告知还未挂起的订阅者,有新值了,那么正在处理数据的订阅者处理数据后就不会挂起,继续取新值。
//2.唤醒已经挂起的订阅者,有新值来了,需要醒来取新值。
fun makePending() {
_state.loop { state ->
when {
//如果该Slot的状态为null。说明该Slot是一个闲置的(free),直接返回,不处理。
state == null -> return // this slot is free - skip it
//如果已经处处于PENDING,说明前一次新值来时候把状态设置了PENDING,订阅者还未消费前一次的新值,
//现在StateFlow又有新值了,那也可以什么都不做,订阅者处理数据后,不会挂起,然后直接取到
//StateFlow中最新的。
//比如:StateFlow中目前的值为1,订阅者取到1后开始处理1(处理得比较久),这时候emit(2),
//会把订阅者的状态由NONE设置PENDING,订阅者还未处理完数据1,又emit(3),发现订阅者目前处于PENDING
//那就什么都不用管,丁玉珍处理完数据1后,不会挂起,继续取数据,就会取到最新的数据3。
state === PENDING -> return // already pending, nothing to do
//状态处于NONE,要要么订阅者还未开始取数据,要么订阅者正在处理数据,要么订阅者处理完数据(还未挂起)
//不管订阅者目前属于哪一种,把状态设置成PENDIN后,都能确保订阅者者取完最新的值后才会挂起。
state === NONE -> { // mark as pending
if (_state.compareAndSet(state, PENDING)) return
}
else -> { // must be a suspend continuation state
// we must still use CAS here since continuation may get cancelled and free the //slot at any time
//如果订阅者处于被挂起状态,就调用resume唤醒订阅者
if (_state.compareAndSet(state, NONE)) {
(state as CancellableContinuationImpl<Unit>).resume(Unit)
return
}
}
}
}
}
//当订阅者取到数据,并处理完数据后,会调用该函数,根据该函数的返回值来决定是否把订阅者挂起
// 如果在处理数据期间,向StateFlow中发送了新值,在上面makePending函数中就会把订阅者状态设置为PENDING。
//说明有新值来了,那么就返回true。告知订阅者不能挂起。如果不是PENDING,那就说明订阅者在处理数据期间StateFlow
//中的值没有发生改变,那么返回false,告知订阅者需要挂起等待新值。
//该函数还会把订阅者的状态设置成NONE,如果之前是PENGING,设置成NONE,这样就相当于表明新值对该订阅者来
//说已经不是新的了因为订阅者从该函数返回出去后,不会挂起,会把新值消费掉。
//如果之前状态NONE,那就是再次重复设置NONE,也没有影响,返回false,说明没有新值,订阅者需要挂起。
fun takePending(): Boolean = _state.getAndSet(NONE)!!.let { state ->
//确保之前的状态不是挂起状态。
assert { state !is CancellableContinuationImpl<*> }
return state === PENDING
}
//该函数在takePending返回false的时候调用,用来把订阅者挂起,只有订阅者之前的状态为NUL时才能把订阅者
//挂起成功,如果再次期间,有新值来了,就会把订阅者的状态设置为PENGDING ,PENGDING状态就不需要挂起订阅者
suspend fun awaitPending(): Unit = suspendCancellableCoroutine sc@ { cont ->
//状态的值一共就null,NONE,PENDING, CancellableContinuationImpl四种,null是不可能了,
//assert里面再把 CancellableContinuationImpl四种过滤掉了,那状态就只能是NONE or PENDING。
assert { _state.value !is CancellableContinuationImpl<*> } // can be NONE or PENDING
//如果之前状态为NONE,说明在调用该函数准备挂起前StateFlow中仍然没有新值,那就挂起。
//如果准备挂起前,有新值来了,即状态为PENGDING,那就不需要挂起。
//把协成对象保持在_state中,return@sc就会导致 awaitPending返回一个挂起标识让协成挂起。
//如果你不是理解 suspendCancellableCoroutine,可以看看这篇:
//https://piktao.blog.csdn.net/article/details/130952896?spm=1001.2014.3001.5502
if (_state.compareAndSet(NONE, cont)) return@sc // installed continuation, waiting for pending
//如果之前状态不是NONE,那就是PENDING ,是PENDING,那就说明有新值了,因此调用resume函数,这样awaitPending
//函数就不会返回一个挂起标识,协成也就不会挂起了。
assert { _state.value === PENDING }
cont.resume(Unit)
}
}
StateFlowImpl
private class StateFlowImpl<T>(
// StateFlow的初始值,虽然MutableStateFlow函数的参数是一个不可空的,但是如果是java段调用的的话仍然可以传入null。
//如果传入的是null,就会把初值设置为NULL。NULL 是一个Symbol类型。
//因此如果调用MutableStateFlow函数时传入了null。那么订阅者订阅时也会收到一个null。
initialState: Any
) : AbstractSharedFlow<StateFlowSlot>(), MutableStateFlow<T>, CancellableFlow<T>, FusibleFlow<T> {
//用来存向StateFlow发送的数据,新的值如果和旧值不相等就会替换旧值。
private val _state = atomic(initialState) // T | NULL
//这个属性用来在StateFlow中更新数据时起一个标识作用,当sequence 为偶数时表明当前只有一个线程在往
//StateFlow中发送数据,为奇数时,说明之前有其他线程已经发送了数据,并且正在处理中(updateState)还未执行
//完成。sequence 会一直增长。
private var sequence = 0 // serializes updates, value update is in process when sequence is odd
//提供给外界访问StateFlow中数据的一个属性,数据是存储在_state中的
public override var value: T
get() = NULL.unbox(_state.value) //从_state中拿到数据
set(value) { updateState(null, value ?: NULL) } //通过updateState函数把数据设置到_state中。
//提供给外界往StateFlow中设置数据的一个函数,设置成功返回true,否则返回false。
override fun compareAndSet(expect: T, update: T): Boolean =
updateState(expect ?: NULL, update ?: NULL)
//把newState存入到_state中
private fun updateState(expectedState: Any?, newState: Any): Boolean {
var curSequence = 0
//所有的订阅者
var curSlots: Array<StateFlowSlot?>? = this.slots // benign race, we will not use it
synchronized(this) {
//获取到旧值
val oldState = _state.value
//如果期望的旧值和实际的旧值不相等,那就是CAS失败,返回false。
if (expectedState != null && oldState != expectedState) return false // CAS support
//如果旧值和新值相等(equal),那就没必要再存一次了,直接返回true。也没有必要阻止将要挂起的订阅自或者
//唤醒已经挂起的订阅者
if (oldState == newState) return true
//把新的值存入到_state中
_state.value = newState
curSequence = sequence
//和1做与运算,结果为0 说明curSequence是偶数,是偶数说明没有其他线程也在调用该函数(并且还未执行完成)
if (curSequence and 1 == 0) {
//加一变成奇数,表明正update中,在本次update还为执行完成之前如果有别的线程发送了数据进来就会走else
curSequence++
sequence = curSequence
} else {
//位奇数,说明在此之前已有其他线程正在update,加2仍然为奇数,加2的目的是告诉
//之前已经在update的线程,数据又被更新了。
sequence = curSequence + 2
return true //返回true,意思就是唤醒订阅者或者阻止订阅者被挂起的操作让前一个正在update的线程去做。
}
// 当前已有的订阅者,之所以要在synchronized里面再次赋值,是因为,为订阅者分配Slot对象包括
//为slots数组扩容的操作也是在synchronized里面,所以在这里拿到的slots数组就是此时此刻的slots。
//什么意思呢?意思就是说出了这里的synchronized后,如果有新订阅者来了,导致slots扩容了,这次update中
//拿到的slots也是扩容之前的。
curSlots = slots
}
/**
* while循环之所以要写在synchronized外面,是为了避免在协成的启动模式为unconfined时造成死锁。
* 如果协成启动模式为unconfined,这时候如果有订阅者唤醒后往StateFlow中发送了一个数据就会造成死锁。
* 如果不是很明白unconfined可以去学习一下启程的启动模式。
*/
while (true) {
//变量在前面已经拿到的slots,对于已经挂起的订阅者就唤醒,还没挂起的旧阻止其挂起。
curSlots?.forEach {
it?.makePending()
}
// 再次枷锁,
synchronized(this) {
//如果sequence == curSequence说明在当前线程update过程中没有别的线程发送数据。
if (sequence == curSequence) {
//在上面的synchronized里面,如果没有别的线程发送数据,sequence为奇数
//现在加一变成偶数,表明本次update处理完成。那么下一次发送数据时在上面的
// synchronized里面就走if的逻辑。
sequence = curSequence + 1
return true //返回true, 本次update结束。
}
//说明在此期间又有别的线程发送了数据,更新了StateFlow中的数据
curSequence = sequence
//从新拿到slots,因为有可能在上一次遍历curSlots的时候,有新的订阅者来了。
curSlots = slots
//继续while循环,让订阅者不要挂起,或者唤醒挂起的订阅者取StateFlow中最新的数据
}
}
}
//这也算是一种外界拿到StateFlow中数据的一种手段,返回一个只有一个数据List
override val replayCache: List<T>
get() = listOf(value)
//对于StateFlow来说,tryEmit和emit没啥区别,虽然emit是一个挂起函数,但是永远不会挂起。
//tryEmit只是多了一个返回值,永远返回true。
override fun tryEmit(value: T): Boolean {
this.value = value
return true
}
//emit永远不会被挂起,
override suspend fun emit(value: T) {
this.value = value
}
//SharedFlow支持,StateFlow不支持调用该函数,调用就会抛异常。
@Suppress("UNCHECKED_CAST")
override fun resetReplayCache() {
throw UnsupportedOperationException("MutableStateFlow.resetReplayCache is not supported")
}
override suspend fun collect(collector: FlowCollector<T>): Nothing {
//为订阅者分配一个Slotd对象,关于allocateSlot函数,在SharedFlow中有详细讲解
val slot = allocateSlot()
try {
//如果订阅者的类型是SubscribedFlowCollector就回调其onSubscription。
if (collector is SubscribedFlowCollector) collector.onSubscription()
//拿到协成的Job对象,主要是为了响应协成被取消
val collectorJob = currentCoroutineContext()[Job]
//旧值,
var oldState: Any? = null // previously emitted T!! | NULL (null -- nothing emitted yet)
// The loop is arranged so that it starts delivering current value without waiting first
//开启一个死循环,因此订阅者只有在协成被取消时才会结束订阅。
while (true) {
// Here the coroutine could have waited for a while to be dispatched,
// so we use the most recent state here to ensure the best possible conflation of stale values
//拿到当前的值
val newState = _state.value
// 如果协成被取消了,这里就会抛出一次,执行finlly的代码
collectorJob?.ensureActive()
// 旧值为null,或者新值和旧值相等,那么订阅者就要消费这个新值
if (oldState == null || oldState != newState) {
//把新值交给订阅者,NULL.unbox里面代码比较简单,如果newState == NULL就返回null
//否则返回newState
collector.emit(NULL.unbox(newState))
oldState = newState
}
//订阅者已经处理完数据,如果订阅者的状态是PENDING,说明在订阅者处理数据期间,StateFlow的
//值被更新了,因此返回true,不需要挂起,否则返回false,需要把订阅者挂起,
if (!slot.takePending()) {
slot.awaitPending() //把订阅者挂起。
}
}
} finally {
//当订阅者被取消后,重置Slot,让其变为闲置(free)
freeSlot(slot)
}
}
//new 一个Slot对象
override fun createSlot() = StateFlowSlot()
//new 一个长度为size的数组
override fun createSlotArray(size: Int): Array<StateFlowSlot?> = arrayOfNulls(size)
//关于FuseFlow放在以一篇文章讲解。
override fun fuse(context: CoroutineContext, capacity: Int, onBufferOverflow: BufferOverflow) =
fuseStateFlow(context, capacity, onBufferOverflow)
}