Android协程解决什么问题?
- 处理耗时任务,这种任务常常会阻塞主线程
- 保证主线程安全,确保安全地从主线程调用任何suspend函数
举例子(异步任务)
实现一个请求网络数据的例子:页面上有一个button,一个loading,一个textview用来显示结果。点击button,显示loading,向server发送请求,拿到数据后把result显示在textview上,并隐藏loading。
界面如下
Service和Retrofit
getUsers()供AsyncTask方式调用,getUserList()是一个suspend方法,供协程方式调用。
使用AsyncTask的做法
private fun doWithAsyncTask() { val task = object : AsyncTask<Void, Void, List<User>>() { override fun onPreExecute() { super.onPreExecute() showLoading() } override fun doInBackground(vararg p0: Void?): List<User>? { return userDataPoint.getUsers().execute().body() } override fun onPostExecute(result: List<User>?) { super.onPostExecute(result) Log.d(TAG, "onPostExecute: done.") showData(result) } } task.execute() }
使用协程,调用挂起函数suspend
private fun doWithCoroutine() {
GlobalScope.launch(Dispatchers.Main) {
showLoading()
val users = withContext(Dispatchers.IO) {
userDataPoint.getUserList()
}
showData(users)
}
}
下面来详细了解一下协程及其好处。
协程是什么?
协程让异步逻辑同步化,杜绝回调地狱。
协程最核心的点是,函数或者一段程序能够被挂起,稍后再在挂起的位置恢复。
协程的挂起和恢复
常规函数基础操作包括invoke(或者call)和return,协程新增了suspend和resume
- suspend也称为挂起或暂停,用于暂停执行当前协程,并保存所有局部变量。
- resume用于让已暂停的协程从其暂停处继续执行。
挂起函数
- 使用suspend关键字修饰的函数叫作挂起函数
挂起函数只能在协程体内或其他挂起函数内调用
挂起和阻塞的区别
Case对比:协程中的delay(5000)是挂起函数,Java中的Thread.sleep(5000)是一个阻塞函数。
例如在点击事件中,用协程中调用delay函数,不会阻塞主线程,主线程该干啥干啥;但是使用sleep会阻塞主线程,主线程一直卡在那里等待5秒钟才继续。协程的两部分
- 基础设施层,标准库的协程API,主要对协程提供了概念和语义上最基本的支持。
业务框架层,协程的上层框架支持。GlobalScope和delay函数等都属于这一层。
使用基础设施层创建协程会麻烦很多,所以开发中大都使用业务框架层即可。
比如NIO和Netty的关系,NIO是基础API,Netty是建立在NIO上的业务框架。来看一段使用基础设施层创建的协程以及其执行。// 创建协程体 val continuation = suspend { userDataPoint.getUserList() }.createCoroutine(object: Continuation<List<User>>{ override val context: CoroutineContext = EmptyCoroutineContext override fun resumeWith(result: Result<List<User>>) { Log.d(TAG, "resumeWith: ${result.getOrNull()}") } }) // 启动协程 continuation.resume(Unit)
其实里面还是回调。。。
上面例子中,基础框架用的是kotlin.coroutines包下的API,而业务框架层用的是kotlinx.coroutines包下的API调度器
所有协程必须在调度器中运行,即使他们在主线程上运行也是如此。
Dispatchers.Main, Android上的主线程,处理UI交互和一些轻量级任务
- 调用suspend函数
- 调用UI函数
- 更新LiveData
Dispatchers.IO,磁盘和网络IO
- 数据库
- 文件读写
- 网络处理
Dispatchers.Default,非主线程,CPU密集型任务
- 数组排序
- JSON解析
- 处理差异判断
任务泄露
当某个协程任务丢失,无法追踪,会导致内存、CPU、磁盘等资源浪费,甚至发送一个无用的网络请求,这种情况称为任务泄露。
- 例如点击按钮向server发送request,server还没处理完就按下返回键退出应用,Activity已经销毁,这时网络请求还在进行,网络请求对象还占用着内存,就会发生任务泄露。
为了避免协程任务泄露,Kotlin引入了结构化并发机制。
结构化并发
使用结构化并发可以做到
- 取消任务,当某任务不再需要时取消它。
- 追踪任务,当任务正在执行时,追踪它。
发出错误信号,当协程失败时,发出错误信号表明有错误发生。
协程作用域(CoroutineScope)
定义协程必须指定其CoroutineScope,它会跟踪所有协程,还可以取消由它启动的所有协程。
常用的API有:- GlobalScope,生命周期是process级别的,即使Activity/Fragment销毁,协程仍在执行。
- MainScope,在Activity中使用,可以在onDestroy中取消协程。
- viewModelScope,只能在ViewModel中使用,绑定ViewModel的生命周期。
- lifecycleScope,只能在Activity/Fragment中使用,绑定Activity/Fragment的生命周期。
MainScope:
private val mainScope = MainScope()
private fun doWithMainScope() {
mainScope.launch {
try {
showLoading()
val users = userDataPoint.getUserList()
showData(users)
} catch (e: CancellationException) {
Log.d(TAG, "CancellationException: $e")
showData(emptyList())
}
}
}
override fun onDestroy() {
super.onDestroy()
mainScope.cancel()
}
在Activity销毁的回调中,取消mainScope会取消掉里面的任务,并且会抛出CancellationException,可以在catch中处理取消后的操作。
另外,MainScope是CoroutineScope,CoroutineScope是一个接口,可以让Activity实现接口并默认用MainScope作为委托对象,这样就可以在activity中直接使用launch和cancel了。
class MainActivity : AppCompatActivity(), CoroutineScope by MainScope()
//private val mainScope = MainScope()
private fun doWithMainScope() {
launch {
showLoading()
try {
val users = userDataPoint.getUserList()
showData(users)
} catch (e: CancellationException) {
Log.d(TAG, "CancellationException: $e")
showData(emptyList())
}
}
}
override fun onDestroy() {
super.onDestroy()
cancel()
}
协程的启动与取消
启动协程
- 启动构建器
- 启动模式
- 作用域构建器
- Job的生命周期
取消协程
- 协程的取消
- CPU密集型任务取消
- 协程取消的副作用
- 超时任务
协程构建器
launch和async构建器都用来启动新协程
- launch,返回一个Job并且不附带任何结果
- async,返回一个Deferred,Deferred也是一个Job,可以使用.await()在一个延期的值上得到它的最终结果。
等待一个作业 - join和await。两者都是挂起函数,不会阻塞主线程。
组合开发
例1:launch和async启动协程
GlobalScope是顶级协程,不建议使用。runBlocking可以把测试方法包装成一个协程,测试方法是运行在主线程上的。在协程中可以用launch和async启动协程。
@Test fun test_coroutine_builder() = runBlocking { val job1 = launch { delay(2000) println("job1 finished.") } val job2 = async { delay(2000) println("job2 finished") "job2 result" } println("job1: $job1") println("job2: $job2") println("job2: ${job2.await()}") }
输出结果
job1: "coroutine#2":StandaloneCoroutine{Active}@71b1176b job2: "coroutine#3":DeferredCoroutine{Active}@6193932a job1 finished. job2 finished job2: job2 result
两种构建器都可以启动协程,如上,await可以得到async构建器返回的结果。job1和job2是runBlocking包装的主协程里面的子协程。两个子协程执行完毕,主协程才会退出。
协程执行顺序
@Test fun test_join() = runBlocking { val start = System.currentTimeMillis() println("start: ${System.currentTimeMillis() - start}") val job1 = launch { delay(2000) println("job1, ${System.currentTimeMillis() - start}") } val job2 = launch { delay(1000) println("job2, ${System.currentTimeMillis() - start}") } val job3 = launch { delay(5000) println("job3, ${System.currentTimeMillis() - start}") } println("end: ${System.currentTimeMillis() - start}") }
用launch启动三个协程,job1中delay2秒,job2中delay1秒,job3中delay5秒。执行结果输出
start: 0 end: 8 job2, 1018 job1, 2017 job3, 5018
start和end嗖的一下在主协程中打印完毕,中间瞬间使用launch启动器启动3个协程,用了8毫秒,1秒钟后job2打印,2秒钟后job1打印,5秒钟后job3打印。
例2:join
如果想控制它们的顺序,使用join函数:
@Test fun test_join() = runBlocking { val start = System.currentTimeMillis() println("start: ${System.currentTimeMillis() - start}") val job1 = launch { delay(2000) println("job1, ${System.currentTimeMillis() - start}") } job1.join() val job2 = launch { delay(1000) println("job2, ${System.currentTimeMillis() - start}") } job2.join() val job3 = launch { delay(5000) println("job3, ${System.currentTimeMillis() - start}") } println("end: ${System.currentTimeMillis() - start}") }
结果如下:
start: 0 job1, 2016 job2, 3022 end: 3024 job3, 8025
start开始,然后后执行job1,打印job1,这时过了2016ms,因为job1中任务delay了2s。
然后执行job2,打印job2,因为job2中任务delay了1s,所以这时的时间流逝了大约3s。
对job3没有使用join函数,所以直接打印end,又过了5秒钟job3delay完毕,打印job3。
如果在end之前对job3调用join()函数,那么结果如下:start: 0 job1, 2014 job2, 3018 job3, 8019 end: 8019
end 在job3执行完毕后打印。
例3:await
测试一下await:
fun test_await() = runBlocking { val start = System.currentTimeMillis() println("start: ${System.currentTimeMillis() - start}") val job1 = async { delay(2000) println("job1, ${System.currentTimeMillis() - start}") "result 1" } println(job1.await()) val job2 = async { delay(1000) println("job2, ${System.currentTimeMillis() - start}") "result 2" } println(job2.await()) val job3 = async { delay(5000) println("job3, ${System.currentTimeMillis() - start}") "result 3" } println(job3.await()) println("end: ${System.currentTimeMillis() - start}") }
输出结果
start: 0 job1, 2018 result 1 job2, 3027 result 2 job3, 8032 result 3 end: 8033
await也让子协程按顺序执行,并返回协程执行后的结果。
组合
@Test fun test_sync() = runBlocking { val time = measureTimeMillis { val one = firstTask() val two = secondTask() println("result: ${one + two}") } println("total time: $time ms") } private suspend fun firstTask(): Int { delay(2000) return 3 } private suspend fun secondTask(): Int { delay(2000) return 6 }
两个task,各自delay 2秒钟然后返回一个数值;在测试函数中,在主协程中用measureTimeMillis计算代码块的耗时然后打印,拿到两个任务的返回值,然后相加输出结果
result: 9 total time: 4010 ms
总共耗时4秒多,第一个任务执行完2秒,才执行第二个任务2秒。
例4:使用async
使用async让两个任务同时执行,用await来获得返回结果,看例子:
@Test fun test_combine_async() = runBlocking { val time = measureTimeMillis { val one = async { firstTask() } val two = async { secondTask() } println("result: ${one.await() + two.await()}") } println("total time: $time ms") }
结果如下
result: 9 total time: 2025 ms
总共耗时约2秒钟,两个任务同时执行。
用上面的例子再来梳理一遍流程,runBlocking保证是在主线程中启动的主协程,然后第4行在主协程中启动了协程one来执行任务firstTask,第5行在主协程中启动了协程two来执行任务secondTask,one和two这两个子协程总的任务并发执行,第6行等待one和two都返回结果后,把两者相加,输出结果。
来看下面的写法:@Test fun test_combine_async_case2() = runBlocking { val time = measureTimeMillis { val one = async { firstTask() }.await() val two = async { secondTask() }.await() println("result: ${one + two}") } println("total time: $time ms") }
在第4行,启动了协程one然后等待结果,用了约2秒;然后在第5行启动协程two然后等待结果,用了约2秒,第6行计算,输出结果。总共用了4秒多。如果是要并行的效果,这样写是不对的。
result: 9 total time: 4018 ms
协程的启动模式
public fun CoroutineScope.launch( //上下文调取器 context: CoroutineContext = EmptyCoroutineContext, //启动模式 start: CoroutineStart = CoroutineStart.DEFAULT, //协程方法 block: suspend CoroutineScope.() -> Unit ): Job { val newContext = newCoroutineContext(context) val coroutine = if (start.isLazy) LazyStandaloneCoroutine(newContext, block) else StandaloneCoroutine(newContext, active = true) coroutine.start(start, coroutine, block) return coroutine }
在构造方法中,第二个参数就是启动模式,默认为DEFAULT。有四种模式。
- DEFAULT:协程创建后,立即开始调度,在调度前如果协程被取消,直接进入取消响应的状态。
- ATOMIC:协程创建后,立即开始调度,协程执行到第一个挂起点之前不响应取消。
- LAZY:只有协程被需要时,包括主动调用协程的start、join或者await等函数时才会开始调度,如果调度前就被取消,那么该协程将直接进入异常结束状态。
UNDISPATCHED:协程创建后立即在当前函数调用栈中执行,直到遇到第一个真正挂起的点。
注意:调度并不代表执行。
可以理解为有一个队列,调度就是把协程的任务添加到这个队列,并不代表执行。
从调度到执行是有时间间隔的;协程是可以被取消的。public enum class CoroutineStart { //协程创建后。立即开始调度。在调度前如果协程被取消。直接进入取消响应的状态 DEFAULT, //当我们需要他执行的时候才会执行,不然就不会执行。 LAZY, //立即开始调度,协程之前到第一个挂起点之前是不响应取消的 ATOMIC, //创建协程后立即在当前函数的调用栈中执行。直到遇到第一个挂起点为止。 UNDISPATCHED }
DEFAULT
协程创建后,立即开始调度:意思是协程创建后,被添加入调度器,也就是上面说的那个队列,不需要start()或join(),任务就会自行触发。下面的测试代码可以说明,start()对测试结果没有任何影响。
在调度前如果被取消,则直接进入取消响应状态:这个更有意思,前面已经说了,创建一个协程就会被添加到队列中,任务会自行触发。那cancel的时间就很难把握,是在执行任务前还是后呢,即使是创建协程后,立马cancel,也有可能调度器已经在执行协程的代码块了。
@Test
fun test_fib_time() {
val time = measureTimeMillis {
println("result: ${fib(45)}")
}
println("time used: $time")
}
private fun fib(n: Int): Long {
if (n == 1 || n == 2) return 1L
return fib(n - 1) + fib(n - 2)
}
这是一段计算斐波那契数列的代码,计算45的时候耗时大约3秒钟。
result: 1134903170
time used: 3132
下面来看测试代码,使用DEFAULT启动协程,协程中计算fib(46),这是一个耗时操作>3s:
@Test
fun test_default() {
val start = System.currentTimeMillis()
val job = GlobalScope.launch(start = CoroutineStart.DEFAULT) {
println("Job started. ${System.currentTimeMillis() - start}")
val res = fib(46)
println("result: $res")
println("Job finished. ${System.currentTimeMillis() - start}")
}
job.cancel()
println("end.")
}
多运行几次可以得到两种结果。
结果1如下:
end.
Job started. 124
结果2如下:
end.
结果1:job创建后,立即开始调度,被加入队列,没有调用start()或join()方法,任务自行触发了,执行耗时操作时被cancel了。
结果2:job创建后,立即开始调度,被加入队列,没来得及自行触发,就被cancel了。
所以说cancel的时间很难把握,创建后立马cancel,调度器可能已经在执行任务,也可能没有来得及执行任务。并且调用start(),join()没什么影响,可以在cancel前加调用一下start(),效果也一样。
LAZY
需要执行的时候才执行,也就是执行了start()或join()才会启动,是否执行协程,取决于是否start()。
如果调度前就被取消,那么直接进入异常结束状态,也就是说,将协程任务添加到调度器中,等待执行,此时取消协程,那么是不会执行的。
@Test
fun test_lazy() {
val start = System.currentTimeMillis()
val job = GlobalScope.launch(start = CoroutineStart.LAZY) {
println("Job started. ${System.currentTimeMillis() - start}")
val res = fib(46)
println("result: $res")
println("Job finished. ${System.currentTimeMillis() - start}")
}
job.cancel()
println("end.")
}
使用LAZY启动协程,测试结果永远只会输出
end.
因为没有start,永远不会执行。
但是加上start()后,也不能保证就一定能执行。例如
@Test
fun test_lazy() {
val start = System.currentTimeMillis()
val job = GlobalScope.launch(start = CoroutineStart.LAZY) {
println("Job started. ${System.currentTimeMillis() - start}")
val res = fib(46)
println("result: $res")
println("Job finished. ${System.currentTimeMillis() - start}")
}
job.start()
job.cancel()
println("end.")
}
第10行加上了start(),但是结果可能是进入执行,也可能是没来得及执行就结束了。
把第10行和11行换位置,先cancel,在start,也永远无法进入执行。
ATOMIC
协程创建后,立即开始调度,协程执行到第一个挂起点之前不响应取消。
@Test
fun test_atomic() = runBlocking {
val start = System.currentTimeMillis()
val job = launch(start = CoroutineStart.ATOMIC) {
println("Job started. ${System.currentTimeMillis() - start}")
val result = fib(46)
println("result $result")
delay(1000)
println("Job finished. ${System.currentTimeMillis() - start}")
}
job.cancel()
println("end.")
}
结果:
end.
Job started. 9
result 1836311903
Process finished with exit code 0
第一个挂起点是在第8行delay挂起函数,所以Job执行后打印Job started,虽然cancel了,但是得执行到第一个挂起点才响应取消,等了大概5秒钟打印出fib函数结果才响应cancel。
应用中通常把不能取消的操作放到挂起函数之前,确保其执行完毕才去响应取消。
UNDISPATCHED
协程创建后,立即在当前函数调用栈中执行,直到遇到第一个真正挂起的点。
@Test
fun test_un_dispatched() = runBlocking {
val start = System.currentTimeMillis()
val job = launch(context = Dispatchers.IO, start = CoroutineStart.UNDISPATCHED) {
println("Job started. ${Thread.currentThread().name}, ${System.currentTimeMillis() - start}")
val result = fib(46)
println("result $result ${System.currentTimeMillis() - start}")
delay(1000)
println("Job finished. ${System.currentTimeMillis() - start}")
}
job.cancel()
println("end.")
}
结果
Job started. main @coroutine#2, 3
result 1836311903 5073
end.
Process finished with exit code 0
启动协程的时候指定用context=Dispachers.IO, 使用UNDISPACHED模式启动,在执行协程的过程中,打印当前线程名称,是主线程,并不是IO线程。因为runBlocking是运行在主线程,即当前函数调用栈。
协程的作用域构建器
coroutineScope与runBlocking
runBlocking是常规函数,主要用于main函数和测试,而coroutineScope是挂起函数
它们都会等待其协程体以及所有子协程结束,主要区别在于runBlocking方法会阻塞当前线程来等待,而coroutineScope只是挂起,会释放底层线程用于其他用途。
runBlocking的例子
@Test
fun test_coroutine_scope_runBlocking() {
val start = System.currentTimeMillis()
runBlocking {
val job1 = async {
println("job 1 开始. ${System.currentTimeMillis() - start}")
delay(400)
println("job 1 结束. ${System.currentTimeMillis() - start}")
}
val job2 = launch {
println("job 2 开始. ${System.currentTimeMillis() - start}")
delay(200)
println("job 2 结束. ${System.currentTimeMillis() - start}")
}
println("runBlocking结束.")
}
println("程序结束.")
}
输出结果
runBlocking结束.
job 1 开始. 121
job 2 开始. 127
job 2 结束. 333
job 1 结束. 525
程序结束.
Process finished with exit code 0
在程序结束前,会等待runBlocking中的子协程结束。
coroutineScope的例子
@Test
fun test_coroutine_scope_coroutineScope() = runBlocking {
val start = System.currentTimeMillis()
coroutineScope {
val job1 = async {
println("job 1 开始. ${System.currentTimeMillis() - start}")
delay(400)
println("job 1 结束. ${System.currentTimeMillis() - start}")
}
val job2 = launch {
println("job 2 开始. ${System.currentTimeMillis() - start}")
delay(200)
println("job 2 结束. ${System.currentTimeMillis() - start}")
}
println("coroutineScope结束.")
}
println("程序结束.")
}
输出结果
coroutineScope结束.
job 1 开始. 16
job 2 开始. 28
job 2 结束. 233
job 1 结束. 424
程序结束.
Process finished with exit code 0
同样会在程序结束前,等待coroutineScope中的子协程都结束。如上所述,coroutineScope是挂起函数,需要在协程中执行,所以用runBlocking产生一个协程环境,供coroutineScope运行。
coroutineScope与supervisorScope
- coroutineScope:一个协程失败了,其他兄弟协程也会被取消。
supervisorScope:一个协程失败了,不会影响其他兄弟协程。
例子,用coroutineScope测试失败@Test fun test_coroutine_scope_coroutineScopeFail() = runBlocking { val start = System.currentTimeMillis() coroutineScope { val job1 = async { println("job 1 开始. ${System.currentTimeMillis() - start}") delay(400) println("job 1 结束. ${System.currentTimeMillis() - start}") } val job2 = launch { println("job 2 开始. ${System.currentTimeMillis() - start}") delay(200) throw IllegalArgumentException("exception happened in job 2") println("job 2 结束. ${System.currentTimeMillis() - start}") } println("coroutineScope结束.") } println("程序结束.") }
结果
coroutineScope结束. job 1 开始. 15 job 2 开始. 24 java.lang.IllegalArgumentException: exception happened in job 2 ... Process finished with exit code 255
job1和job2是在coroutineScope中启动的两个兄弟协程,job2失败了,job1也没有继续执行。
例子,用supervisorScope测试失败,把上面的coroutineScope换掉即可supervisorScope结束. job 1 开始. 10 job 2 开始. 17 Exception in thread "main @coroutine#3" java.lang.IllegalArgumentException: exception happened in job 2 ... job 1 结束. 414 程序结束. Process finished with exit code 0
job1和job2是在supervisorScope中启动的两个兄弟协程,job2失败了抛出异常,job1则继续执行到任务完成。
Job对象
对于每一个创建的协程(通过launch或async),会返回一个Job实例,该实例是协程的唯一标示,并负责管理协程的生命周期。
一个Job可以包含一系列状态,虽然无法直接访问这些状态,但是可以访问Job的属性(isActive, isCancelled, isCompleted)- 新创建(New)
- 活跃(Active)
- 完成中(Completing)
- 已完成(Completed)
- 取消中(Cancelling)
已取消(Cancelled)
@Test fun test_job_status() = runBlocking { val start = System.currentTimeMillis() var job1: Job? = null var job2: Job? = null job1 = async { println("job 1 开始. ${System.currentTimeMillis() - start}") delay(400) job1?.let { println("Job1- isActive:${it.isActive}, isCancelled:${it.isCancelled}, isCompleted:${it.isCompleted}") } job2?.let { println("Job2- isActive:${it.isActive}, isCancelled:${it.isCancelled}, isCompleted:${it.isCompleted}") } println("job 1 结束. ${System.currentTimeMillis() - start}") } job2 = launch { println("job 2 开始. ${System.currentTimeMillis() - start}") delay(200) job1?.let { println("Job1- isActive:${it.isActive}, isCancelled:${it.isCancelled}, isCompleted:${it.isCompleted}") } job2?.let { println("Job2- isActive:${it.isActive}, isCancelled:${it.isCancelled}, isCompleted:${it.isCompleted}") } println("job 2 结束. ${System.currentTimeMillis() - start}") } println("程序结束.") }
Output:
程序结束. job 1 开始. 8 job 2 开始. 15 Job1- isActive:true, isCancelled:false, isCompleted:false Job2- isActive:true, isCancelled:false, isCompleted:false job 2 结束. 216 Job1- isActive:true, isCancelled:false, isCompleted:false Job2- isActive:false, isCancelled:false, isCompleted:true job 1 结束. 416 Process finished with exit code 0
在job2执行完毕后,job1中打印job2的信息,由于job2已经完成,所以 isActive=false, isCompleted=true.
看下图协程的取消
- 取消作用域会取消它的子协程
- 被取消的子协程不会影响其余兄弟协程
- 协程通过抛出一个特殊的异常CancellationException来处理取消操作
所有kotlinx.coroutines中的挂起函数(withContex/delay等)都是可取消的
取消作用域会取消它的子协程
@Test fun test_cancel_2() = runBlocking<Unit> { val scope = CoroutineScope(Dispatchers.Default) val job1 = scope.launch { println("job1 started.") delay(10) println("job1 finished.") } val job2 = scope.launch { println("job2 started.") delay(100) println("job2 finished.") } delay(500) }
output:
job1 started.
job2 started.
job1 finished.
job2 finished.
Process finished with exit code 0
注意:runBlocking是主协程,用自定义scope启动两个子协程,job1和job2是scope作用域中的子协程。第14行delay 500毫秒是在主协程中的,归主协程作用域管。运行测试代码,程序会在delay 500这个挂起函数结束后退出。这时,scope启动的子协程job1和job2任务一个需要10毫秒,一个需要100毫秒,也能执行完毕。delay如果50毫秒,那么这期间能够让job1完成,但是job2完不成。
下面来看取消作用域scope会怎样:
@Test
fun test_cancel_3() = runBlocking<Unit> {
val scope = CoroutineScope(Dispatchers.Default)
val job1 = scope.launch {
println("job1 started.")
delay(10)
println("job1 finished.")
}
val job2 = scope.launch {
println("job2 started.")
delay(100)
println("job2 finished.")
}
scope.cancel()
delay(500)
}
结果是
job1 started.
job2 started.
Process finished with exit code 0
第14行把scope作用域取消掉,可以看到job1和job2属于scope作用域中的兄弟协程,scope取消后,皮之不存毛将焉附,俩都cancel了。也就是说,取消作用域会取消它的子协程。
被取消的子协程不影响其余兄弟协程
@Test
fun test_cancel_4() = runBlocking<Unit> {
val scope = CoroutineScope(Dispatchers.Default)
val job1 = scope.launch {
println("job1 started.")
delay(10)
println("job1 finished.")
}
val job2 = scope.launch {
println("job2 started.")
delay(100)
println("job2 finished.")
}
job1.cancel()
delay(500)
}
结果
job1 started.
job2 started.
job2 finished.
Process finished with exit code 0
把job1取消掉,并没有影响兄弟job2的执行。
CancellationException
@Test
fun test_cancel_exception() = runBlocking<Unit> {
val scope = CoroutineScope(Dispatchers.Default)
val job1 = scope.launch {
try {
println("job1 started.")
delay(10)
println("job1 finished.")
} catch (e: Exception) {
println(e.toString())
}
}
job1.cancel("handle the cancellation exception!")
job1.join()
}
Output
job1 started.
java.util.concurrent.CancellationException: handle the cancellation exception!
Process finished with exit code 0
还有个cancelAndJoin():
@Test
fun test_cancel_exception() = runBlocking<Unit> {
val scope = CoroutineScope(Dispatchers.Default)
val job1 = scope.launch {
try {
println("job1 started.")
delay(10)
println("job1 finished.")
} catch (e: Exception) {
println(e.toString())
}
}
job1.cancelAndJoin()
}
效果一样。
所有kotlinx.coroutines中的挂起函数(withContex/delay等)都是可取消的
例如上面的例子中,delay()挂起函数,在挂起等待的时候都可以被cancel。
CPU密集型任务的取消
- isActive是一个可以被使用在CoroutineScope中的扩展属性,检查Job是否处于活跃状态
- ensureActive(),如果job处于非活跃状态,这个方法会立即抛出异常。
yield函数会检查所在协程的状态,如果已经取消,则抛出CancellationException予以响应。此外还会尝试出让线程的执行权,给其他协程提供执行机会。
@Test fun test_cpu_task() = runBlocking { val start = System.currentTimeMillis(); val job = launch(Dispatchers.Default) { var i = 0 var nextPrintTime = start while (i < 5) { if (System.currentTimeMillis() >= nextPrintTime) { println("job is waiting ${i++}") nextPrintTime += 1000 } } } delay(1000) job.cancel() println("main ended.") }
output:
job is waiting 0 job is waiting 1 main ended. job is waiting 2 job is waiting 3 job is waiting 4 Process finished with exit code 0
job中是一个CPU密集型任务,每个1秒打印一下。虽然job被取消了,但是job还是把while执行完毕。想要终止任务,可以使用isActive,在while条件中增加判断:
@Test fun test_cpu_task_active() = runBlocking { val start = System.currentTimeMillis(); val job = launch(Dispatchers.Default) { var i = 0 var nextPrintTime = start while (i < 5 && isActive) { if (System.currentTimeMillis() >= nextPrintTime) { println("job is waiting ${i++}") nextPrintTime += 1000 } } } delay(1000) job.cancel() println("main ended.") }
output:
job is waiting 0 job is waiting 1 main ended. Process finished with exit code 0
这个isActive可以用来检查协程的状态,但是无法抛出异常。也就是说加上try catch并不能捕获任何异常。如果想达到这个效果,可以用ensureActive()函数,其内部还是用的检查isActive然后抛出异常:
@Test fun test_cpu_task_active_cancel() = runBlocking { val start = System.currentTimeMillis(); val job = launch(Dispatchers.Default) { try { var i = 0 var nextPrintTime = start while (i < 5) { ensureActive() if (System.currentTimeMillis() >= nextPrintTime) { println("job is waiting ${i++}") nextPrintTime += 1000 } } } catch (e: Exception) { println(e.toString()) } } delay(1000) job.cancel() println("main ended.") }
output:
job is waiting 0 job is waiting 1 main ended. kotlinx.coroutines.JobCancellationException: StandaloneCoroutine was cancelled; job="coroutine#2":StandaloneCoroutine{Cancelling}@720b7121 Process finished with exit code 0
可以看到抛出并捕获异常JobCancellationException。
yeild也可以抛出异常并予以响应,还可以让出线程执行权给其他协程提供执行机会。@Test fun test_cpu_task_yield() = runBlocking { val start = System.currentTimeMillis(); val job = launch(Dispatchers.Default) { try { var i = 0 var nextPrintTime = start while (i < 5) { yield() if (System.currentTimeMillis() >= nextPrintTime) { println("job is waiting ${i++}") nextPrintTime += 1000 } } } catch (e: Exception) { println(e.toString()) } } delay(1000) job.cancel() println("main ended.") }
output:
job is waiting 0 job is waiting 1 main ended. kotlinx.coroutines.JobCancellationException: StandaloneCoroutine was cancelled; job="coroutine#2":StandaloneCoroutine{Cancelling}@478cdf05 Process finished with exit code 0
协程取消的副作用
- 在finally中释放资源
use函数:该函数只能被实现了Coloseable的对象使用,程序结束时会自动调用close方法,适合文件对象。
@Test fun test_release() = runBlocking { val job = launch { try { repeat(1000) { i -> println("job is waiting $i ...") delay(1000) } } catch (e: Exception) { println(e.toString()) } finally { println("job finally.") } } delay(1500) println("job cancel.") job.cancelAndJoin() println("main ended.") }
output:
job is waiting 0 ... job is waiting 1 ... job cancel. kotlinx.coroutines.JobCancellationException: StandaloneCoroutine was cancelled; job="coroutine#2":StandaloneCoroutine{Cancelling}@4f6ee6e4 job finally. main ended. Process finished with exit code 0
可以在finally中进行释放资源的操作。
使用use函数,可以省略掉手动释放资源,不使用use时,自己要处理:@Test fun test_not_use() = runBlocking { val filePath = "" val br = BufferedReader(FileReader(filePath)) with(br) { var line: String? try { while (true) { line = readLine() ?: break println(line) } } catch (e: Exception) { println(e.toString()) } finally { close() } } }
使用use,不需要额外处理:
@Test fun test_use() = runBlocking { val path = "" BufferedReader(FileReader(path)).use { var line: String? while (true) { line = readLine()?:break println(line) } } }
NonCancellable
看下这个例子
@Test
fun test_cancel_normal() = runBlocking {
val job = launch {
try {
repeat(100) { i ->
println("job is waiting $i...")
delay(1000)
}
} finally {
println("job finally...")
delay(1000)
println("job delayed 1 second as non cancellable...")
}
}
delay(1000)
job.cancelAndJoin()
println("main end...")
}
output
job is waiting 0...
job finally...
main end...
Process finished with exit code 0
如果在finally中11行12行有不能取消的操作,这样子就不行。这时可以使用NonCancellable:
@Test
fun test_non_cancellable() = runBlocking {
val job = launch {
try {
repeat(100) { i ->
println("job is waiting $i...")
delay(1000)
}
} finally {
withContext(NonCancellable) {
println("job finally...")
delay(1000)
println("job delayed 1 second as non cancellable...")
}
}
}
delay(1000)
job.cancelAndJoin()
println("main end...")
}
output
job is waiting 0...
job finally...
job delayed 1 second as non cancellable...
main end...
Process finished with exit code 0
超时任务
- 很多情况下取消一个协程的理由是它有可能超时
withTimeoutOrNull通过返回null来进行超时操作,从而替代抛出一个异常
@Test fun test_timeout() = runBlocking { val job = withTimeout(1300) { repeat(5) { println("start job $it ...") delay(1000) println("end job $it ...") } } }
output:
start job 0 ... end job 0 ... start job 1 ... kotlinx.coroutines.TimeoutCancellationException: Timed out waiting for 1300 ms
withTimeout,当超过1300毫秒,任务就会终止,然后抛出TimeoutCancellationException.
@Test fun test_timeout_return_null() = runBlocking { val res = withTimeoutOrNull(1300) { repeat(5) { println("start job $it ...") delay(1000) println("end job $it ...") } "Done" } println("Result: $res") }
output:
start job 0 ... end job 0 ... start job 1 ... Result: null Process finished with exit code 0
withTimeoutOrNull会在代码块最后一行返回一个结果,正常结束后返回改结果,超时则返回null,例子:
@Test fun test_timeout_return_null() = runBlocking { val res = withTimeoutOrNull(1300) { try { repeat(5) { println("start job $it ...") delay(1000) println("end job $it ...") } "Done" } catch (e: Exception) { println(e.toString()) } } println("Result: $res") }
output:
start job 0 ... end job 0 ... start job 1 ... kotlinx.coroutines.TimeoutCancellationException: Timed out waiting for 1300 ms Result: null Process finished with exit code 0
协程的异常处理
协程的上下文
- 组合上下文中的元素
- 协程上下文的继承
协程的异常处理
- 异常的传播特性
- 异常的捕获
- 全局异常处理
- 取消与异常
- 异常聚合
协程上下文是什么
CoroutineContext是一组用于定义协程行为的元素。它由如下几项构成:
- Job: 控制协程的生命周期
- CoroutineDispatcher: 向合适的线程分发任务
- CoroutineName: 协程的名称,调试的时候很有用
CoroutineExceptionHandler: 处理未被捕捉的异常
组合上下文中的元素
有时需要在协程上下文中定义多个元素,可以使用+操作符来实现。比如可以显示指定一个调度器来启动协程,并且同时显式指定一个命名:
@Test fun test_coroutine_context() = runBlocking<Unit> { launch(Dispatchers.Default + CoroutineName("test")) { println("running on thread: ${Thread.currentThread().name}") } }
output:
running on thread: DefaultDispatcher-worker-1 @test#2 Process finished with exit code 0
协程上下文的继承
对于新创建的协程,它的CoroutineContext会包含一个全新的Job实例,它会帮我们控制协程的生命周期。而剩下的元素会从CoroutineContext的父类继承,该父类可能是另外一个协程或者创建该协程的CoroutineScope。
@Test fun test_coroutine_context_extend() = runBlocking<Unit> { // 创建一个协程作用域 val scope = CoroutineScope(Job() + Dispatchers.IO + CoroutineName("test")) println("${coroutineContext[Job]} , ${Thread.currentThread().name}") // 通过协程作用域启动一个协程 val job = scope.launch { println("${coroutineContext[Job]} , ${Thread.currentThread().name}") val result = async { println("${coroutineContext[Job]} , ${Thread.currentThread().name}") "Ok" }.await() println("result: $result") } job.join() }
output:
"coroutine#1":BlockingCoroutine{Active}@2ddc8ecb , main @coroutine#1 "test#2":StandaloneCoroutine{Active}@5dd0b98f , DefaultDispatcher-worker-1 @test#2 "test#3":DeferredCoroutine{Active}@4398fdb7 , DefaultDispatcher-worker-3 @test#3 result: Ok Process finished with exit code 0
runBlocking是运行在主线程中的协程,job是scope中的协程,result是job中的协程;每一层的CoroutineContext都会有一个全新的Job实例,BlockingCoroutine{Active}@2ddc8ecb,StandaloneCoroutine{Active}@5dd0b98f,DeferredCoroutine{Active}@4398fdb7;剩下的元素从CoroutineContext的父类继承,比如scope下的job,从父亲scope中继承Dispachers.IO和CoroutineName("test").
协程的上下文 = 默认值 + 继承的CoroutineContext + 参数
- 一些元素包含默认值:Dispatchers.Default是默认的CoroutineDispatcher,以及"coroutine"作为默认的CoroutineName;
- 继承的CoroutineContext是CoroutineScope或者其父协程的CoroutineContext;
传入协程构建器的参数的优先级高于继承的上下文参数,因此会覆盖对应的参数值。
@Test fun test_coroutine_context_extend2() = runBlocking<Unit> { val coroutineExceptionHandler = CoroutineExceptionHandler { _, exception -> println("handle exception: $exception") } val scope = CoroutineScope(Job() + Dispatchers.Main + CoroutineName("test") + coroutineExceptionHandler) val job = scope.launch(Dispatchers.IO) { println("${coroutineContext[Job]} , ${Thread.currentThread().name}") 3/0 println("end of job...") } job.join() }
output:
"test#2":StandaloneCoroutine{Active}@39e6aad4 , DefaultDispatcher-worker-2 @test#2 handle exception: java.lang.ArithmeticException: / by zero Process finished with exit code 0
由上面的例子可见,构建器传入的参数Dispatchers.IO覆盖了原来的Main;名字来自scope中的CoroutineName("test");协程任务中的除0操作引起的异常,被自定义的exceptionHandler捕获。
异常处理的必要性
当应用出现一些意外情况时,给用户提供合适的体验非常重要,一方面,应用崩溃是很糟糕的体验,另外,在用户操作失败时,也必须要能给出正确的提示信息。
异常的传播
构建器有两种形式:
- 自动传播异常(launch与actor)
向用户暴露异常(async与produce)
当这些构建器用于创建一个根协程时(该协程不是另一个协程的子协程),前者这类构建器,异常会在它发生的第一时间被抛出;而后者则依赖用户来最终消费异常,例如通过await或receive. 来看例子:@Test fun test_exception() = runBlocking<Unit> { val job = GlobalScope.launch { try { throw IllegalArgumentException("exception from job1") } catch (e: Exception) { println("Caught exception: $e") } } job.join() val job2 = GlobalScope.async { 1 / 0 } try { job2.await() } catch (e: Exception) { println("Caught exception: $e") } }
launch是自动传播异常,发生的第一时间被抛出;async是向用户暴露异常,依赖用户消费异常,通过await函数调用的时候。输出如下
Caught exception: java.lang.IllegalArgumentException: exception from job1 Caught exception: java.lang.ArithmeticException: / by zero Process finished with exit code 0
对于async启动的协程,如果用户不处理,则并不会暴露异常。如下
@Test fun test_exception() = runBlocking<Unit> { val job = GlobalScope.launch { try { throw IllegalArgumentException("exception from job1") } catch (e: Exception) { println("Caught exception: $e") } } job.join() val job2 = GlobalScope.async { println("job2 begin") 1 / 0 } delay(1000) // try { // job2.await() // } catch (e: Exception) { // println("Caught exception: $e") // } }
output
Caught exception: java.lang.IllegalArgumentException: exception from job1 job2 begin Process finished with exit code 0
job2执行了,但是并没有抛出异常。需要用户消费异常。
非根协程的异常
其他协程所创建的协程中,产生的异常总是会被传播。
@Test fun text_exception_2() = runBlocking<Unit> { val scope = CoroutineScope(Job()) val job = scope.launch { async { println("async started a coroutine...") 1/0 } } delay(100) }
output:
async started a coroutine... Exception in thread "DefaultDispatcher-worker-2 @coroutine#3" java.lang.ArithmeticException: / by zero
异常传播的特性
当一个协程由于一个异常而运行失败时,它会传播这个异常并传递给它的父级。接下来父级会进行下面几步操作:
- 取消它自己的子级
- 取消它自己
将异常传播并传递给它的父级
SupervisorJob
- 使用SupervisorJob时,一个子协程的运行失败不会影响到其他子协程。SupervisorJob不会传播异常给它的父级,它会让子协程自己处理异常。
这种需求常见于在作用域内定义作业的UI组件,如果任何一个UI的子作业执行失败,并不总是有必要取消整个UI组件,但是如果UI组件被销毁了,由于它的结果不再被需要,就有必要使所有的子作业执行失败。
@Test fun text_supervisor_job1() = runBlocking<Unit> { val scope = CoroutineScope(SupervisorJob()) val job1 = scope.launch { println("child 1 start.") delay(1000) 1 / 0 } val job2 = scope.launch { println("child 2 start.") delay(5000) println("child 2 end.") } joinAll(job1, job2) }
output
child 1 start. child 2 start. Exception in thread "DefaultDispatcher-worker-1 @coroutine#2" java.lang.ArithmeticException: / by zero ... child 2 end. Process finished with exit code 0
上面的例子,使用SupervisorJob启动两个子协程job1,job2。两个job都执行了,在job1中有一个除零操作,抛出了异常,但是job2并不受影响。但是如果换成val scope = CoroutineScope(Job()),两个子协程都会终止
@Test fun text_supervisor_job2() = runBlocking<Unit> { val scope = CoroutineScope(Job()) val job1 = scope.launch { println("child 1 start.") delay(1000) 1 / 0 } val job2 = scope.launch { println("child 2 start.") delay(5000) println("child 2 end.") } joinAll(job1, job2) }
结果
child 1 start. child 2 start. Exception in thread "DefaultDispatcher-worker-1 @coroutine#2" java.lang.ArithmeticException: / by zero ... Process finished with exit code 0
supervisorScope
当作业自身执行失败的时候,所有子作业将会被全部取消。
@Test @Test fun text_supervisor_scope() = runBlocking<Unit> { try { supervisorScope { val job1 = launch { println("child 1 start.") delay(50) println("child 1 end.") } val job2 = launch { println("child 2 start.") delay(5000) println("child 2 end.") } println("in supervisor scope...") delay(1000) 1 / 0 } } catch (e: Exception) { println("caught exception: ${e.toString()}") } }
输出
in supervisor scope... child 1 start. child 2 start. child 1 end. caught exception: java.lang.ArithmeticException: / by zero Process finished with exit code 0
直接使用supervisorScope启动协程,里面两个子协程job1,job2,然后后面执行打印,delay和除零操作,可以看到除零操作抛出异常,在此期间,job1 delay时间较短,执行完毕,但是job2没有执行完毕被取消了。
异常的捕获
- 使用CoroutineExceptionHandler对协程的异常进行捕获
以下条件被满足时,异常就会被捕获:
- 时机:异常是被自动抛出异常的协程所抛出的(使用launch而不是async时)
- 位置:在CoroutineScope的CoroutineContext中或在一个根协程(CoroutineScope或supervisorScope的直接子协程)中。
@Test fun test_exception_handler() = runBlocking<Unit> { val handler = CoroutineExceptionHandler { _, exception -> println("caught exception: $exception") } val job = GlobalScope.launch(handler) { throw IllegalArgumentException("from job") } val deferred = GlobalScope.async(handler) { throw ArithmeticException("from deferred") } job.join() deferred.await() }
output
caught exception: java.lang.IllegalArgumentException: from job java.lang.ArithmeticException: from deferred ... Process finished with exit code 255
可见,handler捕获了launch启动的协程(自动抛出),而没有捕获async启动的协程(非自动抛出);并且GlobalScope是一个根协程。
@Test fun test_exception_handler2() = runBlocking<Unit> { val handler = CoroutineExceptionHandler { _, exception -> println("caught exception: $exception") } val scope = CoroutineScope(Job()) val job1 = scope.launch(handler) { launch { throw IllegalArgumentException("from job1") } } job1.join() }
output:
caught exception: java.lang.IllegalArgumentException: from job1 Process finished with exit code 0
上面的例子中handler安装在外部协程上,能被捕获到。但是如果安装在内部协程上,就无法被捕获,如下
@Test fun test_exception_handler4() = runBlocking<Unit> { val handler = CoroutineExceptionHandler { _, exception -> println("caught exception: $exception") } val scope = CoroutineScope(Job()) val job1 = scope.launch { launch(handler) { throw IllegalArgumentException("from job1") } } job1.join() }
output:
Exception in thread "DefaultDispatcher-worker-2 @coroutine#3" java.lang.IllegalArgumentException: from job1 ... Process finished with exit code 0
Android中全局异常处理
- 全局异常处理可以获取到所有协程未处理的未捕获异常,不过它并不能对异常进行捕获,虽然不能阻止程序崩溃,全局异常处理器在程序调试和异常上报等场景中仍然有非常大的用处。
需要在classpath里面创建META-INF/services目录,并在其中创建一个名为kotlinx.coroutines.CoroutineExceptionHandler的文件,文件内容就是我们全局异常处理器的全类名。
看一个android中捕获异常的例子:override fun onCreate(savedInstanceState: Bundle?) { super.onCreate(savedInstanceState) binding = DataBindingUtil.setContentView(this, R.layout.activity_main) val handler = CoroutineExceptionHandler { _, e -> Log.d(TAG, "onCreate: caught exception: $e") } binding.btnFetch.setOnClickListener { lifecycleScope.launch(handler) { Log.d(TAG, "onCreate: on click") "abc".substring(10) } } }
如果不用handler,应用就崩溃了;加上handler,就可以捕获并处理异常,避免应用崩溃。
但是如果没有被捕获,怎么办呢?还有一种方法可以拿到全局的异常信息,这就是上面所说的Android全局异常处理。
如上面所述,在project目录下src/main创建resource/META-INF/services/目录,并创建kotlinx.coroutines.CoroutineExceptionHandler文件。
文件内容是全局异常处理器的全类名。如下
全局异常处理类如下,在实现handleException方法时根据自己的业务需要去实现诸如打印和日志收集等逻辑。class GlobalCoroutineExceptionHandler: CoroutineExceptionHandler { override val key = CoroutineExceptionHandler override fun handleException(context: CoroutineContext, exception: Throwable) { Log.d("GlobalException", "Unhandled exception: $exception") } }
如此以来,没有被捕获的异常就都会在改方法中获取到。
取消与异常
- 取消与异常紧密相关,协程内部使用CancellationException进行取消,该异常会被忽略。
- 当子协程被取消时,不会取消它的父协程。
如果一个协程遇到了CancellationException以外的异常,它将使用该异常取消它的父协程。当父协程的所有子协程都结束后,异常才会被父协程处理。
@Test fun test_cancel_and_exception1() = runBlocking<Unit> { val job = launch { println("parent started.") val child = launch { println("child started.") delay(10000) println("child ended.") } yield() child.cancelAndJoin() yield() println("parent is not cancelled.") } }
output
parent started. child started. parent is not cancelled. Process finished with exit code 0
程序运行正常,child ended 没有打印,child被取消了。
会有一个JobCancellationException。可以用try catch捕获。如下:@Test fun test_cancel_and_exception2() = runBlocking<Unit> { val job = launch { println("parent started.") val child = launch { try { println("child started.") delay(10000) println("child ended.") } catch (e: Exception) { println("child exception: $e") } finally { println("child finally.") } } yield() child.cancelAndJoin() yield() println("parent is not cancelled.") } }
output
parent started. child started. child exception: kotlinx.coroutines.JobCancellationException: StandaloneCoroutine was cancelled; job="coroutine#3":StandaloneCoroutine{Cancelling}@294425a7 child finally. parent is not cancelled. Process finished with exit code 0
@Test fun test_cancel_and_exception3() = runBlocking<Unit> { val handler = CoroutineExceptionHandler { _, e -> println("handle the exception: $e") } val parent = GlobalScope.launch(handler) { val child1 = launch { try { println("child1 started.") delay(Long.MAX_VALUE) println("child1 ended.") } finally { withContext(NonCancellable) { println("child1 cancelled, but exception is not handled until all children are terminated") delay(100) println("child1 finished in the non cancellable block") } } } val child2 = launch { println("child2 started.") "abc".substring(10) delay(100) println("child2 ended.") } } parent.join() }
output
child1 started. child2 started. child1 cancelled, but exception is not handled until all children are terminated child1 finished in the non cancellable block handle the exception: java.lang.StringIndexOutOfBoundsException: String index out of range: -7 Process finished with exit code 0
当父协程的所有子协程都结束后,异常才会被父协程处理。
异常聚合
当协程的多个子协程因异常而失败时,一般情况下取第一个异常进行处理。在第一个异常之后发生的所有其他异常,都将被绑定到第一个异常之上。
@Test fun exception_aggregation() = runBlocking<Unit> { val handler = CoroutineExceptionHandler { _, e -> println("handle the exception: $e") println("the other exceptions: ${e.suppressedExceptions}") } val job = GlobalScope.launch(handler) { launch { try { delay(Long.MAX_VALUE) } finally { throw ArithmeticException("first exception") } } launch { try { delay(Long.MAX_VALUE) } finally { throw ArithmeticException("second exception") } } launch { "abc".substring(10) } } job.join() }
output:
handle the exception: java.lang.StringIndexOutOfBoundsException: String index out of range: -7 the other exceptions: [java.lang.ArithmeticException: second exception, java.lang.ArithmeticException: first exception] Process finished with exit code 0
在异常处理器中可以用e.suppressedExceptions来得到其他异常。