Flow异步流

  • 认识

    • 特性
    • 构建器和上下文
    • 启动
    • 取消与取消检测
    • 缓冲
  • 操作符

    • 过渡操作符
    • 末端操作符
    • 组合
    • 展平
  • 异常

    • 异常处理
    • 完成

如何表示多个值?
挂起函数可以异步的返回单个值,但是如何异步返回多个计算好的值呢?

方案

  • 集合
  • 序列
  • 挂起函数
  • Flow
    用集合,返回多个值,但不是异步的。

    private fun createList() = listOf<Int>(1, 2, 3)
    
    @Test
    fun test_list() {
      createList().forEach { println(it) }
    }

    用序列,返回一个整数序列

    private fun createSequence(): Sequence<Int> {
      return sequence {
          for (i in 1..3) {
              Thread.sleep(1000) // 假装在计算,此处是阻塞,不能做其他事情了
              // delay(1000) 这里不能用挂起函数
              yield(i)
          }
      }
    }
    
    @Test
    fun test_sequence() {
      createSequence().forEach { println(it) }
    }

    看下源码

    public fun <T> sequence(@BuilderInference block: suspend SequenceScope<T>.() -> Unit): Sequence<T> = Sequence { iterator(block) }

    传入的是一个SequenceScope的扩展函数。

    @RestrictsSuspension
    @SinceKotlin("1.3")
    public abstract class SequenceScope<in T> internal constructor()

    而RestrictsSuspension限制只能使用里面提供的已有的挂起函数,如yield,yieldAll等。
    createSequence返回了多个值,但是也是同步的。

    // 返回多个值,异步
    private suspend fun createList2(): List<Int> {
      delay(5000)
      return listOf<Int>(1, 2, 3)
    }
    
    @Test
    fun test_list2() = runBlocking<Unit> {
      createList().forEach { println(it) }
    }

    可以使用suspend函数返回多个值,是异步,但是是是一次性返回了多个值,能否像流一样返回多个值并保持异步呢?Flow可以解决这个问题。

    private suspend fun createFlow(): Flow<Int> = flow {
      for (i in 1..3) {
          delay(1000)
          emit(i) // 发射,产生一个元素
      }
    }
    
    @Test
    fun test_flow() = runBlocking<Unit> {
      createFlow().collect { println(it) } // collect是一个末端操作符,后面讲
    }

    每隔1秒钟产生一个元素,这里是挂起的。用例子来证明一下:

      private suspend fun createFlow(): Flow<Int> = flow {
          for (i in 1..3) {
              delay(1000)
              emit(i)
          }
      }
      @Test
      fun test_flow2() = runBlocking<Unit> {
          launch {
              for (i in 1..3) {
                  println("I am running and not blocked $i")
                  delay(1500)
              }
          }
          createFlow().collect { println(it) }
      }

    输出

    I am running and not blocked 1
    1
    I am running and not blocked 2
    2
    I am running and not blocked 3
    3
    
    Process finished with exit code 0

    collect收集结果的过程并没有阻塞另外的协程,打印完1,然后在delay挂起时,去执行其他,并没有阻塞,两个任务来回切换执行。
    Flow真正地做到了返回多个值,并且是异步的。

Flow与其他方式的区别

  • 名为flow的Flow类型的构建器函数
  • flow{...}构建块中的代码可以挂起
  • 函数createFlow()不再标有suspend修饰符,上面代码中的suspend修饰符可以去掉
  • 流使用emit函数发射值
  • 流使用collect函数收集值

    Flow应用

    在android中,文件下载是Flow的一个非常典型的应用。

冷流

Flow是一种类似于序列的冷流,flow构建器中的代码直到流被收集的时候才运行。

private fun createFlow2() = flow<Int> {
    println("Flow started.")
    for (i in 1..3) {
        delay(1000)
        emit(i)
    }
}
@Test
fun test_flow_cold() = runBlocking<Unit> {
    val flow = createFlow2()
    println("calling collect...")
    flow.collect { value -> println(value) }
    println("calling collect again...")
    flow.collect { value -> println(value) }
}
calling collect...
Flow started.
1
2
3
calling collect again...
Flow started.
1
2
3

Process finished with exit code 0

可以看到,当调用collect方法的时候,流才开始运行,并且可以多次调用。

流的连续性

  • 流的每次单独收集都是按顺序执行的,除非使用特殊操作符。
  • 从上游到下游,每个过渡操作符都会处理每个发射出的值,然后再交给末端操作符。

    @Test
    fun test_flow_continuation() = runBlocking<Unit> {
      (1..5).asFlow()
          .filter { it % 2 == 0 }
          .map { "string $it" }
          .collect { println("collect $it") }
    }
    collect string 2
    collect string 4
    
    Process finished with exit code 0

    改例子经过了如下步骤:生成一个流,过滤出偶数,转成字符串,开始收集

    流的构建器

  • flowOf构建器定义了一个发射固定值集的流。
  • 使用.asFlow()扩展函数,可以将各种集合与序列转换为流。

    @Test
    fun test_flow_builder() = runBlocking<Unit> {
      // flowOf构建器
      flowOf("one", "two", "three")
          .onEach { delay(1000) }
          .collect { value -> println(value) }
      // asFlow扩展函数
      (1..3).asFlow().collect { value -> println(value) }
    }
    one
    two
    three
    1
    2
    3
    
    Process finished with exit code 0

    流的上下文

  • 流的收集总是在调用协程的上下文中发生,流的该属性成为上下文保存。
  • flow{...}构建器中的代码必须遵循上下文保存属性,并且不允许从其他上下文中发射。
  • flowOn操作符,该函数用于更改流发射的上下文。

    private fun createFlow3() = flow<Int> {
      println("Flow started ${Thread.currentThread()}")
      for (i in 1..3) {
          delay(1000)
          emit(i)
      }
    }
    
    @Test
    fun test_flow_context() = runBlocking<Unit> {
      createFlow3()
          .collect {
              println("$it, thread: ${Thread.currentThread()}")
          }
    }
    Flow started Thread[main @coroutine#1,5,main]
    1, thread: Thread[main @coroutine#1,5,main]
    2, thread: Thread[main @coroutine#1,5,main]
    3, thread: Thread[main @coroutine#1,5,main]
    
    Process finished with exit code 0

    不做线程切换,收集和构建都在同一上下文,运行的线程是一样的。
    试着更改一下线程,如下

    private fun createFlow4() = flow {
      withContext(Dispatchers.IO) { // 用io线程
          println("Flow started ${Thread.currentThread().name}")
          for (i in 1..3) {
              delay(1000)
              emit(i)
          }
      }
    }
    
    @Test
    fun test_flow_on() = runBlocking {
      createFlow4().collect {
          println("collect $it, ${Thread.currentThread()}")
      }
    }
    Flow started DefaultDispatcher-worker-1 @coroutine#1
    
    java.lang.IllegalStateException: Flow invariant is violated:
          Flow was collected in [CoroutineId(1), "coroutine#1":BlockingCoroutine{Active}@4600ac86, BlockingEventLoop@1e1d1f06],
          but emission happened in [CoroutineId(1), "coroutine#1":DispatchedCoroutine{Active}@79c0c2ed, Dispatchers.IO].
          Please refer to 'flow' documentation or use 'flowOn' instead

    可以看到,构建流在IO线程中执行,但在收集流的时候报错了,不允许这样做,建议使用flowOn.
    正确的做法如下:

    private fun createFlow5() = flow {
      println("Flow started ${Thread.currentThread().name}")
      for (i in 1..3) {
          delay(1000)
          emit(i)
      }
    }.flowOn(Dispatchers.IO)
    
    @Test
    fun test_flow_on2() = runBlocking {
      createFlow5().collect {
          println("collect $it, ${Thread.currentThread().name}")
      }
    }
    Flow started DefaultDispatcher-worker-1 @coroutine#2
    collect 1, main @coroutine#1
    collect 2, main @coroutine#1
    collect 3, main @coroutine#1
    
    Process finished with exit code 0

    流在IO线程中构建和发射,在main线程中收集。

    启动流

  • 使用launchIn替换collect,可以在单独的协程中启动流的收集。

      private fun events() = (1..3).asFlow()
          .onEach {
              delay(1000)
              println("$it, ${Thread.currentThread().name}")
          }.flowOn(Dispatchers.Default)
    
    
      @Test
      fun testFlowLaunch() = runBlocking<Unit> {
          events()
              .onEach { e -> println("Event: $e ${Thread.currentThread().name}") }
              //.collect()
              .launchIn(CoroutineScope(Dispatchers.IO))
              .join()
      }
    1, DefaultDispatcher-worker-3 @coroutine#3
    Event: 1 DefaultDispatcher-worker-1 @coroutine#2
    2, DefaultDispatcher-worker-1 @coroutine#3
    Event: 2 DefaultDispatcher-worker-1 @coroutine#2
    3, DefaultDispatcher-worker-1 @coroutine#3
    Event: 3 DefaultDispatcher-worker-2 @coroutine#2
    
    Process finished with exit code 0

    onEach是过渡操作符,并不会触发收集数据,collect是末端操作符,才能触发收集数据。过渡操作符就像是过滤器,末端操作符就像是水龙头的阀门,不打开阀门水就流不出来,无论中间加了多少个过滤装置。
    如果想指定在哪个协程里面收集数据,就可以用末端操作符launchIn(),可以传递一个作用域进去,而作用域又可以指定调度器,launchIn(CoroutineScope(Dispatchers.IO)).

launchIn返回的是一个job对象,可以进行cancel等操作。例如

@Test
fun testFlowLaunch2() = runBlocking<Unit> {
    val job = events()
        .onEach { e -> println("Event: $e ${Thread.currentThread().name}") }
        .launchIn(CoroutineScope(Dispatchers.IO))
    delay(2000)
    job.cancel()
}
1, DefaultDispatcher-worker-1 @coroutine#3
Event: 1 DefaultDispatcher-worker-3 @coroutine#2

Process finished with exit code 0

如上,只收集了一个数字,job就取消了。
其实runBlockint本身就是一个主线程作用域,可以放到launchIn中,如下

@Test
fun testFlowLaunch3() = runBlocking<Unit> {
    val job = events()
        .onEach { e -> println("Event: $e ${Thread.currentThread().name}") }
        .launchIn(this)
}
1, DefaultDispatcher-worker-1 @coroutine#3
Event: 1 main @coroutine#2
2, DefaultDispatcher-worker-1 @coroutine#3
Event: 2 main @coroutine#2
3, DefaultDispatcher-worker-1 @coroutine#3
Event: 3 main @coroutine#2

Process finished with exit code 0

流的取消

  • 流采用与协程同样的协作取消。流的收集可以是当流在一个可取消的挂起函数(如delay)中挂起的时候取消。

      private fun createFlow6() = flow<Int> {
          for (i in 1..3) {
              delay(1000)
              println("emitting $i")
              emit(i)
          }
      }
    
      @Test
      fun testCancelFlow() = runBlocking<Unit> {
          withTimeoutOrNull(2500) {
              createFlow6().collect {
                  println("collect: $it")
              }
          }
          println("Done.")
      }
    emitting 1
    collect: 1
    emitting 2
    collect: 2
    Done.
    
    Process finished with exit code 0

    设置2.5秒超时,流还没发射3,就超时了,流被取消。

    流的取消检测

  • 方便起见,流构建器对每个发射值执行附加的ensureActive检测以进行取消,这意味着从flow{...}发出的繁忙循环是可以取消的。
  • 处于性能原因,大多数其他流操作不会自行执行其他取消检测,在协程处于繁忙循环的情况下,必须明确检测是否取消。
  • 通过cancellable操作符来执行操作。

      private fun createFlow7() = flow<Int> {
          for (i in 1..5) {
              delay(1000)
              println("emitting $i")
              emit(i)
          }
      }
    
      @Test
      fun testCancelFlowCheck() = runBlocking<Unit> {
          createFlow7().collect {
              if (it == 3) cancel()
              println("collect: $it")
          }
          println("Done.")
      }
    emitting 1
    collect: 1
    emitting 2
    collect: 2
    emitting 3
    collect: 3
    
    kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled
    ...
    
    Process finished with exit code 255

    在收集流的时候,遇到3就执行取消操作,抛出JobCancellationException,3还是会被收集到。

      @Test
      fun testCancelFlowCheck2() = runBlocking<Unit> {
          (1..5).asFlow().collect {
              if (it == 3) cancel()
              println("collect: $it")
          }
          println("Done.")
      }
    collect: 1
    collect: 2
    collect: 3
    collect: 4
    collect: 5
    Done.
    
    kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled

    使用asFlow()创建流,在收集的时候,虽然遇到3进行了取消,但是还是把所有的元素都打印了以后才抛出异常。如果要在执行的过程中真正的阻断流,需要加上cancellable()操作,如下:

      @Test
      fun testCancelFlowCheck3() = runBlocking<Unit> {
          (1..5).asFlow().cancellable().collect {
              if (it == 3) cancel()
              println("collect: $it")
          }
          println("Done.")
      }
    collect: 1
    collect: 2
    collect: 3
    
    kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled

    背压

  • buffer(),并发运行流中发射元素的代码。
  • conflate(),合并发射项,不对每个值进行处理。
  • collectLatest(),取消并重新发射最后一个值。
  • 当必须更改CoroutineDispatcher时,flowOn操作符使用了相同的缓冲机制,但是buffer函数显示地请求缓冲而不改变执行上下文。
    private fun createFlow8() = flow<Int> {
        for (i in 1..5) {
            delay(100) // 生产一个元素需要0.1秒
            println("emitting $i")
            emit(i)
        }
    }

    @Test
    fun testBackPressure() = runBlocking<Unit> {
        val time = measureTimeMillis {
            createFlow8()
                .buffer(50) // 并发运行流中发射元素
                .collect {
                    delay(200) // 消费一个元素需要0.2秒
                    println("collect: $it")
                }
        }

        println("Done, total $time")
    }
emitting 1
emitting 2
collect: 1
emitting 3
emitting 4
collect: 2
emitting 5
collect: 3
collect: 4
collect: 5
Done, total 1188

使用buffer可以让发射元素并发执行,提高效率。
使用flowOn()切换线程,也可以提高效率。

    private fun createFlow8() = flow<Int> {
        for (i in 1..5) {
            delay(100) // 生产一个元素需要0.1秒
            println("emitting $i, ${Thread.currentThread().name}")
            emit(i)
        }
    }

    @Test
    fun testBackPressure2() = runBlocking<Unit> {
        val time = measureTimeMillis {
            createFlow8()
                .flowOn(Dispatchers.Default)
                .collect {
                    delay(200) // 消费一个元素需要0.2秒
                    println("collect: $it, ${Thread.currentThread().name}")
                }
        }

        println("Done, total $time")
    }
emitting 1, DefaultDispatcher-worker-1 @coroutine#2
emitting 2, DefaultDispatcher-worker-1 @coroutine#2
emitting 3, DefaultDispatcher-worker-1 @coroutine#2
collect: 1, main @coroutine#1
emitting 4, DefaultDispatcher-worker-1 @coroutine#2
collect: 2, main @coroutine#1
emitting 5, DefaultDispatcher-worker-1 @coroutine#2
collect: 3, main @coroutine#1
collect: 4, main @coroutine#1
collect: 5, main @coroutine#1
Done, total 1186

conflate()可以合并发射项,但是不会对每个值进行处理。

    @Test
    fun testBackPressure3() = runBlocking<Unit> {
        val time = measureTimeMillis {
            createFlow8()
                .conflate()
                .collect {
                    delay(200) // 消费一个元素需要0.2秒
                    println("collect: $it, ${Thread.currentThread().name}")
                }
        }

        println("Done, total $time")
    }
emitting 1, main @coroutine#2
emitting 2, main @coroutine#2
emitting 3, main @coroutine#2
collect: 1, main @coroutine#1
emitting 4, main @coroutine#2
collect: 3, main @coroutine#1
emitting 5, main @coroutine#2
collect: 4, main @coroutine#1
collect: 5, main @coroutine#1
Done, total 1016

上面的例子中,使用conflate(),collect时跳过了2.
使用collectLatest()只会收集最后一个值,如下:

    @Test
    fun testBackPressure4() = runBlocking<Unit> {
        val time = measureTimeMillis {
            createFlow8()
                .collectLatest {
                    delay(200) // 消费一个元素需要0.2秒
                    println("collect: $it, ${Thread.currentThread().name}")
                }
        }

        println("Done, total $time")
    }
emitting 1, main @coroutine#2
emitting 2, main @coroutine#2
emitting 3, main @coroutine#2
emitting 4, main @coroutine#2
emitting 5, main @coroutine#2
collect: 5, main @coroutine#7
Done, total 913

Process finished with exit code 0

操作符

过渡流操作符

  • 可以使用操作符转换流,就像使用集合与序列一样。
  • 过渡操作符应用于上游流,并返回下游流。
  • 这些操作符也是冷操作符,就像流一样。这类操作符本身不是挂起函数。
  • 运行速度很快,返回新的转换流的定义。

      private fun createFlow9() = flow<Int> {
          for (i in 1..3) {
              delay(100) // 生产一个元素需要0.1秒
              println("emitting $i")
              emit(i)
          }
      }
    
      @Test
      fun testMap() = runBlocking<Unit> {
          createFlow9()
              .map { data -> performRequest(data) }
              .collect {
                  println("collect: $it")
              }
      }
    emitting 1
    collect: --response 1--
    emitting 2
    collect: --response 2--
    emitting 3
    collect: --response 3--
    
    Process finished with exit code 0

    上面的例子,map操作符,把Int流转成了String流。

      @Test
      fun testTransform() = runBlocking<Unit> {
          createFlow9()
              .transform { data ->
                  emit("making request $data")
                  emit(performRequest(data))
              }
              .collect {
                  println("collect: $it")
              }
      }
    emitting 1
    collect: making request 1
    collect: --response 1--
    emitting 2
    collect: making request 2
    collect: --response 2--
    emitting 3
    collect: making request 3
    collect: --response 3--
    
    Process finished with exit code 0

    上面的例子,transform操作符可以把流经过多次转换,多次发射。

    限长操作符

    take操作符

      private fun numbers() = flow<Int> {
          try {
              emit(1)
              emit(2)
              println("This line will not execute")
              emit(3)
          } finally {
              println("Finally.")
          }
      }
    
      @Test
      fun testLimitOperator() = runBlocking {
          numbers().take(2).collect {
              println("collect $it")
          }
      }
    collect 1
    collect 2
    Finally.

    take传入参数2,则只取2个数据。

    末端流操作符

    末端操作符是在流上用于启动流收集的挂起函数。collect是最基础的末端操作符,但是还有一些更方便的末端操作符:

  • 转化为各种集合,例如toList和toSet.
  • 获取第一个(first)值与确保流发射单个(single)值的操作符。
  • 使用reduce与fold将流规约到单个值。
    例如reduce操作符

      @Test
      fun testTerminateOperator() = runBlocking {
          val sum = (1..5).asFlow().map { it * it }.reduce { a, b -> a + b }
          println(sum)
      }
    55

    计算数字1-5的平方,然后求和,得到55.

    组合多个流

    就像Kotlin标准库中的Sequence.zip扩展函数一样,流拥有一个zip操作符用于组合两个流中的相关值。

      @Test
      fun testZip() = runBlocking {
          val numbers = (1..3).asFlow()
          val strings = flowOf("One", "Two", "Three")
          numbers.zip(strings) { a, b -> "$a -> $b" }.collect { println(it) }
      }

    这个例子把数字流和字符串流用zip操作符组合起来,成为一个字符流。

1 -> One
2 -> Two
3 -> Three

Process finished with exit code 0
    @Test
    fun testZip2() = runBlocking {
        val numbers = (1..3).asFlow().onEach { delay(300) }
        val strings = flowOf("One", "Two", "Three").onEach { delay(500) }
        val start = System.currentTimeMillis()
        numbers.zip(strings) { a, b -> "$a -> $b" }.collect { println("$it, ${System.currentTimeMillis() - start}") }
    }

如果两个流各自有delay,合并操作会等待那个delay时间较长的数据。

1 -> One, 563
2 -> Two, 1065
3 -> Three, 1569

Process finished with exit code 0

展平流

流表示异步接收的值序列,所以很容易遇到这样情况:每个值都会触发对另一个值序列的请求,然而,由于流具有异步的性质,因此需要不同的展平模式,为此,存在一系列的流展平操作符:

  • flatMapConcat连接模式
  • flatMapMerge合并模式
  • flatMapLatest最新展平模式

使用flatMapConcat连接模式

    private fun requestFlow(i: Int) = flow<String> {
        emit("$i: First")
        delay(500)
        emit("$i: Second")
    }

    @Test
    fun testFlatMapConcat() = runBlocking {
        val startTime = System.currentTimeMillis()
        (1..3).asFlow()
            .onEach { delay(100) }
            //.map { requestFlow(it) } // 如果用map,则产生一个Flow<Flow<String>>
            .flatMapConcat { requestFlow(it) } // 使用flatMapConcat,把flow展平成一维,达到效果
            .collect { println("$it at ${System.currentTimeMillis() - startTime} ms") }
    }
1: First at 144 ms
1: Second at 649 ms
2: First at 754 ms
2: Second at 1256 ms
3: First at 1361 ms
3: Second at 1861 ms

Process finished with exit code 0

使用flatMapMerge

    @Test
    fun testFlatMapMergeConcat() = runBlocking {
        val startTime = System.currentTimeMillis()
        (1..3).asFlow()
            .onEach { delay(100) }
            .flatMapMerge { requestFlow(it) }
            .collect { println("$it at ${System.currentTimeMillis() - startTime} ms") }
    }
1: First at 202 ms
2: First at 301 ms
3: First at 407 ms
1: Second at 708 ms
2: Second at 805 ms
3: Second at 927 ms

Process finished with exit code 0

发射1:First后,delay 500ms,这期间发射2:First,发射3:First;分别把这些数据收集到,然后其余的数据累计发射完毕并收集。
在来看flatMapLatest操作符

    private fun requestFlow(i: Int) = flow<String> {
        emit("$i: First")
        delay(500)
        emit("$i: Second")
    }

    @Test
    fun testFlatMapLatestConcat() = runBlocking {
        val startTime = System.currentTimeMillis()
        (1..3).asFlow()
            .onEach { delay(200) }
            .flatMapLatest { requestFlow(it) }
            .collect { println("$it at ${System.currentTimeMillis() - startTime} ms") }
    }
1: First at 313 ms
2: First at 581 ms
3: First at 786 ms
3: Second at 1291 ms

Process finished with exit code 0

跳过某些中间值,只收集最新的值。

流的异常处理

当运算符中的发射器或代码抛出异常时,有几种处理异常的方法:

  • try catch块
  • catch函数
    使用代码块捕获下游异常

      private fun createFlow10() = flow<Int> {
          for (i in 1..3) {
              println("emitting  $i")
              emit(i)
          }
      }
    
      @Test
      fun testException() = runBlocking {
          try {
              createFlow10().collect {
                  println("collect: $it")
                  check(it <= 1) { "wrong value " }
              }
          } catch (e: Exception) {
              println("handle the exception: $e")
          }
      }
    emitting  1
    collect: 1
    emitting  2
    collect: 2
    handle the exception: java.lang.IllegalStateException: wrong value
    
    Process finished with exit code 0

    使用catch操作符捕获上游异常

      @Test
      fun testException2() = runBlocking {
          flow {
              emit(1)
              1/0
              emit(2)
          }
              .catch { println("$it") }
              .flowOn(Dispatchers.IO)
              .collect { println("collect: $it") }
      }
    java.lang.ArithmeticException: / by zero
    collect: 1
    
    Process finished with exit code 0

    可以在捕获异常后补充发射一个数据

      @Test
      fun testException2() = runBlocking {
          flow {
              emit(1)
              1/0
              emit(2)
          }
              .catch {
                  println("$it")
                  emit(10)
              }
              .flowOn(Dispatchers.IO)
              .collect { println("collect: $it") }
      }
    java.lang.ArithmeticException: / by zero
    collect: 1
    collect: 10
    
    Process finished with exit code 0

    后面的2当然是收不到的。

    流的完成

    当流收集完成时(普通情况或者异常情况),它可能需要执行一个动作。

  • 命令式finally块
  • onCompletion声明式处理
    使用命令式finally块的例子

      private fun simpleFlow() = (1..3).asFlow()
    
      @Test
      fun testFinally() = runBlocking {
          try {
              simpleFlow().collect { println("collect $it") }
          } finally {
              println("Done.")
          }
      }
    collect 1
    collect 2
    collect 3
    Done.
    
    Process finished with exit code 0

    使用声明式onCompletion的例子:

      @Test
      fun testOnComplete() = runBlocking {
          simpleFlow()
              .onCompletion { println("Done.") }
              .collect { println("collect $it") }
    
      }

onCompletion优势在于,非正常程序结束,能够拿到异常信息。
看下面的例子,如果上游发射流抛出了异常,如果用finally命令式,若不用catch则无法捕获异常:

    private fun createFlow11() = flow<Int> {
        emit(1)
        1 / 0
        emit(2)
    }

    @Test
    fun testFinally2() = runBlocking {
        try {
            createFlow11().collect { println("collect $it") }
        } finally {
            println("Finally.")
        }
    }
collect 1
Finally.

java.lang.ArithmeticException: / by zero

但是如果使用声明式onCompletion,则能够得到异常信息:

    @Test
    fun testComplete2() = runBlocking {
        createFlow11()
            .onCompletion { println("exception info: $it") }
            .collect { println("collect $it") }
    }
collect 1
exception info: java.lang.ArithmeticException: / by zero

java.lang.ArithmeticException: / by zero

onCompletion里面能得到异常,但是并不能捕获异常,程序还是抛出了异常。如果想捕获并处理还需要catch操作符。

    @Test
    fun testComplete2() = runBlocking {
        createFlow11()
            .onCompletion { println("exception info: $it") }
            .catch { println("handle exception: $it") }
            .collect { println("collect $it") }
    }
collect 1
exception info: java.lang.ArithmeticException: / by zero
handle exception: java.lang.ArithmeticException: / by zero

Process finished with exit code 0

onCompletion还可以得到下游的异常信息,例如

    private fun simpleFlow() = (1..3).asFlow()

    @Test
    fun testComplete3() = runBlocking {
        simpleFlow()
            .onCompletion { println("exception info: $it") }
            .catch { println("handle exception: $it") }
            .collect {
                println("collect $it")
                check(it <= 1) { "invalid $it" }
            }
    }

下游check会抛出一个异常,onCompletion也会得到这个异常。

collect 1
collect 2
exception info: java.lang.IllegalStateException: invalid 2

java.lang.IllegalStateException: invalid 2

当然,程序还是会抛出异常。虽然这个例子有catch,但是那是捕获上游异常用的。onCompletion只是得到了下游异常信息,如果要捕获下游异常,还是要用try catch命令式方式。

    @Test
    fun testComplete4() = runBlocking {
        try {
            simpleFlow()
                .onCompletion { println("exception info: $it") }
                .catch { println("handle exception from catch: $it") }
                .collect {
                    println("collect $it")
                    check(it <= 1) { "invalid $it" }
                }
        } catch (e: Exception) {
            println("handle exception from try catch: $e")
        }
    }
collect 1
collect 2
exception info: java.lang.IllegalStateException: invalid 2
handle exception from try catch: java.lang.IllegalStateException: invalid 2

Process finished with exit code 0
03-05 15:56