我有一个简单的程序:

import scalaz._
import stream._

object Play extends App {
  val in1 = io.linesR("C:/tmp/as.txt")
  val in2 = io.linesR("C:/tmp/bs.txt")

  val p = (in1 merge in2) to io.stdOutLines
  p.run.run
}

文件as.txt包含五个a,文件bs.txt包含3个b。我看到这样的输出:
a
b
b
a
a
b
a
a
a

但是,当我如下更改in2的声明时:
val in2 = io.stdInLines

然后我得到了我认为是意外行为的信息。根据文档 1 ,程序应根据提供数据流的速度更快来确定地从每个数据流中提取数据。这应该意味着我立即看到一堆a印在控制台上,但这根本没有发生。

确实,直到我按ENTER,什么都没有发生。很明显,如果我随机选择一个流以从中获取下一个元素,然后,如果该流被阻塞,则合并的流程也将阻塞(即使另一个流包含数据),该行为也非常类似于我的预期。 )。

到底是怎么回事?

1 -好的,文档很少,但是Dan Spiewak在his talk中非常清楚地表示,它将抢占谁是第一个提供数据的人

最佳答案

问题在于stdInLines的实现。它正在阻塞,它从来没有Task.fork一个线程。

尝试将stdInLines的设置更改为此:

def stdInLines: Process[Task,String] =
    Process.repeatEval(Task.apply {
    Option(scala.Console.readLine())
    .getOrElse(throw Cause.Terminated(Cause.End))
})

原始的io.stdInLines在同一线程中运行readLine(),因此它总是在那里等待直到您键入内容。

09-25 23:37