问题描述
我是Akka流和流的新手,所以我可能在概念层面上完全误解了一些东西,但是在将来解决之前,有什么方法可以产生背压?基本上我想做的是这样的:
I'm new to Akka streams and streams in general so I might have completely misunderstood something at a conceptual level, but is there any way I can create backpressure until a future resolves? Essentially what I want to do is like this:
object Parser {
def parseBytesToSeq(buffer: ByteBuffer): Seq[ExampleObject] = ???
}
val futures = FileIO.fromPath(path)
.map(st => Parser.parseBytesToSeq(st.toByteBuffer))
.batch(1000, x => x)(_ ++ _)
.map(values => doAsyncOp(values))
.runWith(Sink.seq)
def doAsyncOp(Seq[ExampleObject]) : Future[Any] = ???
从文件中读取字节并将其传输到解析器中,解析器发出 Seq
的 ExampleObject
s,然后将这些流传输到异步操作,该操作返回 Future
。我要这样做,以便在 Future
解析之前,其余流将受到反压,然后在Future解析后继续恢复,并传递另一个 Seq [ExampleObject]
到 doAsyncOp
,将恢复背压,依此类推。
Bytes are read from a file and streamed to the parser, which emits Seq
s of ExampleObject
s, and those are streamed to an async operation that returns a Future
. I want to make it so that until the Future
resolves, the rest of the stream gets backpressured, then resumes once the Future is resolved, passing another Seq[ExampleObject]
to doAsyncOp
, which resumes the backpressure and so on.
右现在我可以使用此方法了:
Right now I've got this working with:
Await.result(doAsyncOp(values), 10 seconds)
但是我的理解是,这会锁定整个线程,而且很糟糕。有什么更好的方法吗?
But my understanding is that this locks up the entire thread and is bad. Is there any better way around it?
如果有帮助,总的来说是我正在尝试解析一个非常大的JSON文件(太大而无法容纳在内存中) )与Jawn进行逐块处理,然后将对象解析时将其传递给ElasticSearch进行索引-ElasticSearch拥有50个待处理操作的队列,如果该操作溢出,则会开始拒绝新对象。
If it helps, the big picture is that I'm trying to parse an extremely large JSON file (too big to fit in memory) chunk-by-chunk with Jawn, then pass objects to ElasticSearch to be indexed as they're parsed - ElasticSearch has a queue of 50 pending operations, if that overflows it starts rejecting new objects.
推荐答案
这很容易。您需要使用 mapAync
:)
It's quite easy. You need to use mapAync
:)
val futures = FileIO.fromPath(path)
.map(st => Parser.parseBytesToSeq(st.toByteBuffer))
.batch(1000, x => x)(_ ++ _)
.mapAsync(4)(values => doAsyncOp(values))
.runWith(Sink.seq)
其中 4
是并行度。
这篇关于在Akka流中从Future创建背压的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!