我正在学习协程,并且遇到以下令人惊讶的行为(对我而言)。我想要一张平行的 map 。我考虑4个解决方案:

  • 只是map,没有并行性
  • here中的
  • pmap
  • 项目2的修改:我删除了coroutineScope并使用了GlobalScope
  • Java的parallelStream

  • 代码:
    import kotlinx.coroutines.*
    import kotlin.streams.toList
    import kotlin.system.measureNanoTime
    
    inline fun printTime(msg: String, f: () -> Unit) =
        println("${msg.padEnd(15)} time: ${measureNanoTime(f) / 1e9}")
    
    suspend fun <T, U> List<T>.pmap(f: (T) -> U) = coroutineScope {
        map { async { f(it) } }.map { it.await() }
    }
    
    suspend fun <T, U> List<T>.pmapGlob(f: (T) -> U) =
        map { GlobalScope.async { f(it) } }.map { it.await() }
    
    
    fun eval(i: Int) = (0 .. i).sumBy { it * it }
    
    fun main() = runBlocking {
        val list = (0..200).map { it * it * it }
        printTime("No parallelism") { println(list.map(::eval).sum()) }
        printTime("CoroutineScope") { println(list.pmap(::eval).sum()) }
        printTime("GlobalScope") { println(list.pmapGlob(::eval).sum()) }
        printTime("ParallelStream") { println(list.parallelStream().map(::eval).toList().sum()) }
    }
    

    输出(无总和):
    No parallelism  time: 0.85726849
    CoroutineScope  time: 0.827426385
    GlobalScope     time: 0.145788785
    ParallelStream  time: 0.161423263
    

    如您所见,coroutineScope几乎没有 yield ,而GlobalScope则和parallelStream一样快。是什么原因?我是否可以找到一个具有coroutineScope的所有优点且速度增益相同的解决方案?

    最佳答案

    范围仅间接涉及您观察到的差异。
    GlobalScope是一个单例,它定义了自己的调度程序,即Dispatchers.Default。它由线程池支持。
    coroutineScope没有定义自己的调度程序,因此您可以从调用方继承它,在本例中为runBlocking创建的调度程序。它使用被调用的单线程。

    如果将coroutineScope替换为withContext(Dispatchers.Default),您将获得相同的计时。实际上,这是您应该如何编写此代码(而不是GlobalScope),以便在某些并发任务可能失败的情况下获得理智的行为。

    09-12 21:18