Flow系列文章:

kotlin Flow系列之-SharedFlow

前言

StateFlow也是一个热流,可以看着是SharedFlow的一种特殊情况,他的表现形式和创建一个replay = 0,extBufferCapacity,onBufferOverflow = BufferOverflow.DROP_OLDEST一样。相比于SharedFlow来说其内部单面要简单很多,建议先看了SharedFlow再回来学习StateFlow,StateFlow使用用于跟踪状态的一个热流,在使用场景上和LiveData很像,LiveData和组件生命周期有关,StateFlow不受组件生命周期影响,StateFlow设计出来不是用来取代LiveData的,而是用来取代 ConflatedBroadcastChannel 的。StateFlow是线程安全的。

相对于SharedFlow来说,StateFlow创建的时候需要提供一个初始值,发送数据的时候不会存在挂起的情况,相对于LiveData来说,StateFlow除了不受组件生命周期影响,它还能过滤掉重复值,意思就是只有新值和旧值不一样时StateFlow才会把值交给订阅者。

其他冷流可以通过stateIn操作符转换成一个StateFlow。

Slot的四种状态

基于StateFlow过于简单,就不做其他详细接受,本文直接进行源码分析,在进入源码分析之前,先提出一个关键问题,也是核心问题?

Q:StateFlow是如何实现当一个订阅者取到数据后,是怎么确定是把自己挂起还是继续取数据,订阅者挂起后,StateFlow中的值更新后是如何唤醒订阅者继续取新值的?

A: 为了解决这个问题,StateFlow中定义了一个StateFlowSlot类(在ShardFlow中详细介绍过Slot),一个Slot代表了一个订阅者,每当新来一个订阅者的时候,就会为其分配(创建一个新的或者复用之前已经闲置的(free))一个Slot对象与其绑定该类有一个 _state:atomic<Any?>属性,_state记录了订阅者的详细状态,StateFlow全靠状态类决定订阅者的行为,一共有以下几种状态:

  • null: 表明该Slot对象是一个闲置的(free),即还没有和订阅者绑定在一起。
  • NONE: 表明该Slot已经和某一个订阅者绑定了,并且这个订阅者没有被挂起,有可能还没开始取数据,也有可能取到数据正在处理数据,也有可能已经处理完数据了(但是还未挂起)。向SlotFlow发送一个数据后,如果发现订阅者的状态为NONE就会把状态变为PENDING,告知订阅者数据更新了,这时候订阅者如果还没开始取数据就会去到StateFlow中的最新数据,如果过订阅自已经取走更新之前的数据(正在处理阶段),那么处理完成数据后就不需要挂起,继续取最新的数据,如果订阅者已经处理完数据(还未挂起),那就同样不需要挂起,继续取新数据。
  • PENDING:这个单词的意思是待处理的意思,表明了订阅者取到数据,处理完数据后,不能被挂起,因为StateFlow的值更新了,有新值待处理。反之订阅者取完数据后如果_state的值不是PENDING 说明StateFlow的数据没有更新,需要把订阅者挂起。每当向StateFlow中发送一个新数据时,如果发现订阅者处于PENDING转态,就不用做任何操作。处于PENDING说明订阅者时没有挂起的,订阅者消费完数据后会继续消费数据,继续消费就会把新发的数据消费掉,因此不用去干预订阅者。
  • CancellableContinuationImpl:一个协成对象,表明了订阅者被挂起了。每当向StateFlow中发送一个新值时,如果发现订阅者的状态为挂起时就会把订阅者唤醒取新值。

StateFlowSlot

private class StateFlowSlot : AbstractSharedFlowSlot<StateFlowImpl<*>>() {
    /**
     *  官方原文解释,比较精简,意思也表达的比较清楚。
     * Each slot can have one of the following states:
     *
     * * `null` -- it is not used right now. Can [allocateLocked] to new collector.
     * * `NONE` -- used by a collector, but neither suspended nor has pending value.
     * * `PENDING` -- pending to process new value.
     * * `CancellableContinuationImpl<Unit>` -- suspended waiting for new value.
     *
     * It is important that default `null` value is used, because there can be a race
     *  between allocationo f a new slot and trying to do [makePending] on this slot.
     * 
     */
    private val _state = atomic<Any?>(null)
		
  	//新来订阅者时调用,把Slot的状态设置为NONE,相当于把订阅自和这个Slot对象绑定在一块了,该Slot对象
  	//将被新来的订阅者所使用
    override fun allocateLocked(flow: StateFlowImpl<*>): Boolean {
        //如果_state.value不为null。说明这个Slot正在被别的订阅者使用,因此不能被当前订阅者绑定。
        if (_state.value != null) return false // not free 
      	//设置状态为NONE,
        _state.value = NONE // allocated
        return true //返回ture,订阅者和Slot绑定成功。
    }

  	//当订阅者取消时调用该函数,把订阅者和Slot对象解绑,意味着这个Slot对象不在继续跟踪被
  	//取消的这个订阅者的状态,把Slot变成闲置(free),供后来的新订阅者继续复用。
    override fun freeLocked(flow: StateFlowImpl<*>): Array<Continuation<Unit>?> {
        _state.value = null // free now
      	//这里之所以要返回一个empty array。是因为freeLocked函数是在AbstractSharedFlowSlot中定义的。
      	//SharedFlow中这个函数会返回emit函数挂起的协成。
        return EMPTY_RESUMES // nothing more to do
    }

  	//当向StateFlow中发送新值时会调用该函数,主要做两件事情:
    //1.告知还未挂起的订阅者,有新值了,那么正在处理数据的订阅者处理数据后就不会挂起,继续取新值。
  	//2.唤醒已经挂起的订阅者,有新值来了,需要醒来取新值。
    fun makePending() {
        _state.loop { state ->
            when {
              	//如果该Slot的状态为null。说明该Slot是一个闲置的(free),直接返回,不处理。
                state == null -> return // this slot is free - skip it
              	//如果已经处处于PENDING,说明前一次新值来时候把状态设置了PENDING,订阅者还未消费前一次的新值,
              	//现在StateFlow又有新值了,那也可以什么都不做,订阅者处理数据后,不会挂起,然后直接取到
              	//StateFlow中最新的。
              	//比如:StateFlow中目前的值为1,订阅者取到1后开始处理1(处理得比较久),这时候emit(2),
              	//会把订阅者的状态由NONE设置PENDING,订阅者还未处理完数据1,又emit(3),发现订阅者目前处于PENDING
              	//那就什么都不用管,丁玉珍处理完数据1后,不会挂起,继续取数据,就会取到最新的数据3。
                state === PENDING -> return // already pending, nothing to do
              	//状态处于NONE,要要么订阅者还未开始取数据,要么订阅者正在处理数据,要么订阅者处理完数据(还未挂起)
              	//不管订阅者目前属于哪一种,把状态设置成PENDIN后,都能确保订阅者者取完最新的值后才会挂起。
                state === NONE -> { // mark as pending
                    if (_state.compareAndSet(state, PENDING)) return
                }
              
                else -> { // must be a suspend continuation state
                    // we must still use CAS here since continuation may get cancelled and free the 											//slot at any time		
                  	//如果订阅者处于被挂起状态,就调用resume唤醒订阅者
                    if (_state.compareAndSet(state, NONE)) {
                        (state as CancellableContinuationImpl<Unit>).resume(Unit)
                        return
                    }
                }
            }
        }
    }
	
  	//当订阅者取到数据,并处理完数据后,会调用该函数,根据该函数的返回值来决定是否把订阅者挂起
  	// 如果在处理数据期间,向StateFlow中发送了新值,在上面makePending函数中就会把订阅者状态设置为PENDING。
  	//说明有新值来了,那么就返回true。告知订阅者不能挂起。如果不是PENDING,那就说明订阅者在处理数据期间StateFlow
  	//中的值没有发生改变,那么返回false,告知订阅者需要挂起等待新值。
  	//该函数还会把订阅者的状态设置成NONE,如果之前是PENGING,设置成NONE,这样就相当于表明新值对该订阅者来
  	//说已经不是新的了因为订阅者从该函数返回出去后,不会挂起,会把新值消费掉。
  	//如果之前状态NONE,那就是再次重复设置NONE,也没有影响,返回false,说明没有新值,订阅者需要挂起。
    fun takePending(): Boolean = _state.getAndSet(NONE)!!.let { state ->
        //确保之前的状态不是挂起状态。
        assert { state !is CancellableContinuationImpl<*> }
        return state === PENDING
    }

    //该函数在takePending返回false的时候调用,用来把订阅者挂起,只有订阅者之前的状态为NUL时才能把订阅者
  	//挂起成功,如果再次期间,有新值来了,就会把订阅者的状态设置为PENGDING ,PENGDING状态就不需要挂起订阅者
    suspend fun awaitPending(): Unit = suspendCancellableCoroutine sc@ { cont ->
        //状态的值一共就null,NONE,PENDING,  CancellableContinuationImpl四种,null是不可能了,
        //assert里面再把    CancellableContinuationImpl四种过滤掉了,那状态就只能是NONE or PENDING。                                                            
        assert { _state.value !is CancellableContinuationImpl<*> } // can be NONE or PENDING
                                                                        
         //如果之前状态为NONE,说明在调用该函数准备挂起前StateFlow中仍然没有新值,那就挂起。
         //如果准备挂起前,有新值来了,即状态为PENGDING,那就不需要挂起。                                                              
        //把协成对象保持在_state中,return@sc就会导致 awaitPending返回一个挂起标识让协成挂起。
        //如果你不是理解  suspendCancellableCoroutine,可以看看这篇:
         //https://piktao.blog.csdn.net/article/details/130952896?spm=1001.2014.3001.5502                                                               
        if (_state.compareAndSet(NONE, cont)) return@sc // installed continuation, waiting for pending
                                                                        
       	//如果之前状态不是NONE,那就是PENDING ,是PENDING,那就说明有新值了,因此调用resume函数,这样awaitPending
        //函数就不会返回一个挂起标识,协成也就不会挂起了。                                                                
        assert { _state.value === PENDING }
        cont.resume(Unit)
    }
}

StateFlowImpl

private class StateFlowImpl<T>(
  // StateFlow的初始值,虽然MutableStateFlow函数的参数是一个不可空的,但是如果是java段调用的的话仍然可以传入null。
  //如果传入的是null,就会把初值设置为NULL。NULL 是一个Symbol类型。
  //因此如果调用MutableStateFlow函数时传入了null。那么订阅者订阅时也会收到一个null。
    initialState: Any 
) : AbstractSharedFlow<StateFlowSlot>(), MutableStateFlow<T>, CancellableFlow<T>, FusibleFlow<T> {
  	
  	//用来存向StateFlow发送的数据,新的值如果和旧值不相等就会替换旧值。
    private val _state = atomic(initialState) // T | NULL
  	//这个属性用来在StateFlow中更新数据时起一个标识作用,当sequence 为偶数时表明当前只有一个线程在往
  	//StateFlow中发送数据,为奇数时,说明之前有其他线程已经发送了数据,并且正在处理中(updateState)还未执行
  	//完成。sequence 会一直增长。
    private var sequence = 0 // serializes updates, value update is in process when sequence is odd

    //提供给外界访问StateFlow中数据的一个属性,数据是存储在_state中的
    public override var value: T
        get() = NULL.unbox(_state.value) //从_state中拿到数据
        set(value) { updateState(null, value ?: NULL) } //通过updateState函数把数据设置到_state中。
		
  //提供给外界往StateFlow中设置数据的一个函数,设置成功返回true,否则返回false。
    override fun compareAndSet(expect: T, update: T): Boolean =
        updateState(expect ?: NULL, update ?: NULL)

  	//把newState存入到_state中
    private fun updateState(expectedState: Any?, newState: Any): Boolean {
        var curSequence = 0
      	//所有的订阅者
        var curSlots: Array<StateFlowSlot?>? = this.slots // benign race, we will not use it
      
        synchronized(this) {
          	//获取到旧值
            val oldState = _state.value
          	//如果期望的旧值和实际的旧值不相等,那就是CAS失败,返回false。
            if (expectedState != null && oldState != expectedState) return false // CAS support
          	
          	//如果旧值和新值相等(equal),那就没必要再存一次了,直接返回true。也没有必要阻止将要挂起的订阅自或者
          	//唤醒已经挂起的订阅者
            if (oldState == newState) return true 
          	//把新的值存入到_state中
            _state.value = newState
          
            curSequence = sequence
          	//和1做与运算,结果为0 说明curSequence是偶数,是偶数说明没有其他线程也在调用该函数(并且还未执行完成)
            if (curSequence and 1 == 0) { 
              //加一变成奇数,表明正update中,在本次update还为执行完成之前如果有别的线程发送了数据进来就会走else
                curSequence++ 
                sequence = curSequence
            } else {
                //位奇数,说明在此之前已有其他线程正在update,加2仍然为奇数,加2的目的是告诉
              	//之前已经在update的线程,数据又被更新了。
                sequence = curSequence + 2 
                return true //返回true,意思就是唤醒订阅者或者阻止订阅者被挂起的操作让前一个正在update的线程去做。
            }
          // 当前已有的订阅者,之所以要在synchronized里面再次赋值,是因为,为订阅者分配Slot对象包括
          //为slots数组扩容的操作也是在synchronized里面,所以在这里拿到的slots数组就是此时此刻的slots。
          //什么意思呢?意思就是说出了这里的synchronized后,如果有新订阅者来了,导致slots扩容了,这次update中
          //拿到的slots也是扩容之前的。
            curSlots = slots 
        }
      	
      	/**
      	 * while循环之所以要写在synchronized外面,是为了避免在协成的启动模式为unconfined时造成死锁。
      	 * 如果协成启动模式为unconfined,这时候如果有订阅者唤醒后往StateFlow中发送了一个数据就会造成死锁。
      	 * 如果不是很明白unconfined可以去学习一下启程的启动模式。
      	 */
        while (true) {
            //变量在前面已经拿到的slots,对于已经挂起的订阅者就唤醒,还没挂起的旧阻止其挂起。
            curSlots?.forEach {
                it?.makePending()
            }
            // 再次枷锁,
            synchronized(this) {
              	//如果sequence == curSequence说明在当前线程update过程中没有别的线程发送数据。
                if (sequence == curSequence) {
                  	//在上面的synchronized里面,如果没有别的线程发送数据,sequence为奇数
                  	//现在加一变成偶数,表明本次update处理完成。那么下一次发送数据时在上面的
                    // synchronized里面就走if的逻辑。
                    sequence = curSequence + 1
                    return true //返回true, 本次update结束。
                }
               	//说明在此期间又有别的线程发送了数据,更新了StateFlow中的数据
                curSequence = sequence
              	//从新拿到slots,因为有可能在上一次遍历curSlots的时候,有新的订阅者来了。
                curSlots = slots
              	//继续while循环,让订阅者不要挂起,或者唤醒挂起的订阅者取StateFlow中最新的数据
            }
        }
    }
		
  	//这也算是一种外界拿到StateFlow中数据的一种手段,返回一个只有一个数据List
    override val replayCache: List<T>
        get() = listOf(value)
	
  	//对于StateFlow来说,tryEmit和emit没啥区别,虽然emit是一个挂起函数,但是永远不会挂起。
  	//tryEmit只是多了一个返回值,永远返回true。
    override fun tryEmit(value: T): Boolean {
        this.value = value
        return true
    }
		//emit永远不会被挂起,
    override suspend fun emit(value: T) {
        this.value = value
    }
		
  	//SharedFlow支持,StateFlow不支持调用该函数,调用就会抛异常。
    @Suppress("UNCHECKED_CAST")
    override fun resetReplayCache() {
        throw UnsupportedOperationException("MutableStateFlow.resetReplayCache is not supported")
    }

  
    override suspend fun collect(collector: FlowCollector<T>): Nothing {
      	//为订阅者分配一个Slotd对象,关于allocateSlot函数,在SharedFlow中有详细讲解
        val slot = allocateSlot()
        try {
          	//如果订阅者的类型是SubscribedFlowCollector就回调其onSubscription。
            if (collector is SubscribedFlowCollector) collector.onSubscription()
          	//拿到协成的Job对象,主要是为了响应协成被取消
            val collectorJob = currentCoroutineContext()[Job]
          	//旧值,
            var oldState: Any? = null // previously emitted T!! | NULL (null -- nothing emitted yet)
            // The loop is arranged so that it starts delivering current value without waiting first
          	//开启一个死循环,因此订阅者只有在协成被取消时才会结束订阅。
            while (true) {
                // Here the coroutine could have waited for a while to be dispatched,
                // so we use the most recent state here to ensure the best possible conflation of stale values
              	//拿到当前的值
                val newState = _state.value
                // 如果协成被取消了,这里就会抛出一次,执行finlly的代码
                collectorJob?.ensureActive()
                // 旧值为null,或者新值和旧值相等,那么订阅者就要消费这个新值
                if (oldState == null || oldState != newState) {
                  	//把新值交给订阅者,NULL.unbox里面代码比较简单,如果newState == NULL就返回null
                  	//否则返回newState
                    collector.emit(NULL.unbox(newState))
                  
                    oldState = newState
                }
								//订阅者已经处理完数据,如果订阅者的状态是PENDING,说明在订阅者处理数据期间,StateFlow的
              	//值被更新了,因此返回true,不需要挂起,否则返回false,需要把订阅者挂起,
                if (!slot.takePending()) { 
                    slot.awaitPending() //把订阅者挂起。
                }
            }
        } finally {
          	//当订阅者被取消后,重置Slot,让其变为闲置(free)
            freeSlot(slot)
        }
    }
		//new 一个Slot对象
    override fun createSlot() = StateFlowSlot()
  	//new 一个长度为size的数组
    override fun createSlotArray(size: Int): Array<StateFlowSlot?> = arrayOfNulls(size)
	
  	//关于FuseFlow放在以一篇文章讲解。
    override fun fuse(context: CoroutineContext, capacity: Int, onBufferOverflow: BufferOverflow) =
        fuseStateFlow(context, capacity, onBufferOverflow)
}
06-28 15:01