由于某种原因,我的Akka流始终在“发出”(?)第一条消息之前等待第二条消息。

这是一些示例代码,演示了我的问题。

val rx = Source((1 to 100).toStream.map { t =>
  Thread.sleep(1000)
  println(s"doing $t")
  t
})
rx.runForeach(println)

产生输出:
doing 1
doing 2
1
doing 3
2
doing 4
3
doing 5
4
doing 6
5
...

我想要的是:
doing 1
1
doing 2
2
doing 3
3
doing 4
4
doing 5
5
doing 6
6
...

最佳答案

现在设置代码的方式是,在允许开始向下游发射元素之前,您正在完全转换Source。通过删除表示源的数字范围内的toStream,您可以清楚地看到该行为(如@slouc所述)。如果这样做,您将看到Source在开始响应下游需求之前首先被完全转换。如果您实际上想将Source转换为Sink并在中间进行转换,则可以尝试构建如下结构:

val transform =
  Flow[Int].map{ t =>
    Thread.sleep(1000)
    println(s"doing $t")
    t
  }

Source((1 to 100).toStream).
  via(transform ).
  to(Sink.foreach(println)).
  run

如果进行了更改,那么您将获得所需的效果,即在开始处理下一个元素之前,一直向下处理流向下游的元素。

09-05 20:08