我试图了解 asyncBoundarymapAsync 之间的区别。乍一看,我猜他们应该是一样的。但是,当我运行代码时,看起来 asyncBoundary 的性能比 mapAsync

这是代码

implicit val system = ActorSystem("sourceDemo")
implicit val materializer = ActorMaterializer()


Source(1 to 100).mapAsync(100)(t => Future {t + 1}).mapAsync(100)(t => Future {t * 2}).map(println).to(Sink.ignore).run()
Source(1 to 100).map(_ + 1).withAttributes(Attributes.asyncBoundary).map(_ * 2).map(t => println("async boundary", t)).to(Sink.ignore).run()

输出 :
异步边界总是比 mayAsync 完成得更快。

从描述 asyncBoundary ( https://doc.akka.io/docs/akka-stream-and-http-experimental/current/scala/stream-flows-and-basics.html ) 的文档中,我可以看到它在不同的 CPU 上运行,但 mapAsync 是使用 Future 的多线程。 Future 也是异步的。

我可以问更多关于这两个 API 的说明吗?

最佳答案

异步
正如您正确指出的那样,这会强制在两个阶段之间插入异步边界。在你的例子中

Source(1 to 100).map(_ + 1).withAttributes(Attributes.asyncBoundary).map(_ * 2).map(t => println("async boundary", t)).to(Sink.ignore).run()
这实际上意味着 + 1 操作和 * 2 操作将由单独的 actor 运行。这使流水线成为可能,因为当一个元素移动到 * 2 阶段时,同时可以为 + 1 阶段引入另一个元素。如果你 没有 强制异步边界,在从上游请求一个新元素之前,同一个actor将顺序化操作并对一个元素执行操作。
顺便说一下,您的示例可以使用 async 组合器以更短的格式重写:
Source(1 to 100).map(_ + 1).async.map(_ * 2).map(t => println("async boundary", t)).to(Sink.ignore).run()
map 异步
这是并行执行异步操作的阶段。并行因子允许您指定要旋转以服务传入元素的最大并行 actor 数量。并行计算的结果由 mapAsync 阶段按顺序 跟踪和发出
在你的例子中
Source(1 to 100).mapAsync(100)(t => Future {t + 1}).mapAsync(100)(t => Future {t * 2}).map(println).to(Sink.ignore).run()
可能多达 100 个 + 1 操作(即所有操作)可以并行运行,并按顺序收集结果。随后,最多可以并行运行 100 个 * 2 操作,并再次按顺序收集结果并向下游发送。
在您的示例中,您正在运行受 CPU 限制的快速操作,这些操作不能证明使用 mapAsync 是合理的,因为此阶段所需的基础架构很可能比并行运行 100 个这些操作的优势要昂贵得多。 mapAsync 在处理受 IO 限制的慢速操作时特别有用,其中并行化非常方便。
有关此主题的全面阅读,请查看 this blogpost

关于scala - akka 流 asyncBoundary 与 mapAsync,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/47301363/

10-13 02:33