





I'm new to Akka streams and streams in general so I might have completely misunderstood something at a conceptual level, but is there any way I can create backpressure until a future resolves? Essentially what I want to do is like this:

object Parser {
    def parseBytesToSeq(buffer: ByteBuffer): Seq[ExampleObject] = ???

val futures = FileIO.fromPath(path)
  .map(st => Parser.parseBytesToSeq(st.toByteBuffer))
  .batch(1000, x => x)(_ ++ _)
  .map(values => doAsyncOp(values))

def doAsyncOp(Seq[ExampleObject]) : Future[Any] = ???

从文件中读取字节并将其传输到解析器中,解析器发出 Seq ExampleObject s,然后将这些流传输到异步操作,该操作返回 Future 。我要这样做,以便在 Future 解析之前,其余流将受到反压,然后在Future解析后继续恢复,并传递另一个 Seq [ExampleObject] doAsyncOp ,将恢复背压,依此类推。

Bytes are read from a file and streamed to the parser, which emits Seqs of ExampleObjects, and those are streamed to an async operation that returns a Future. I want to make it so that until the Future resolves, the rest of the stream gets backpressured, then resumes once the Future is resolved, passing another Seq[ExampleObject] to doAsyncOp, which resumes the backpressure and so on.


Right now I've got this working with:

Await.result(doAsyncOp(values), 10 seconds)


But my understanding is that this locks up the entire thread and is bad. Is there any better way around it?

如果有帮助,总的来说是我正在尝试解析一个非常大的JSON文件(太大而无法容纳在内存中) )与Jawn进行逐块处理,然后将对象解析时将其传递给ElasticSearch进行索引-ElasticSearch拥有50个待处理操作的队列,如果该操作溢出,则会开始拒绝新对象。

If it helps, the big picture is that I'm trying to parse an extremely large JSON file (too big to fit in memory) chunk-by-chunk with Jawn, then pass objects to ElasticSearch to be indexed as they're parsed - ElasticSearch has a queue of 50 pending operations, if that overflows it starts rejecting new objects.


这很容易。您需要使用 mapAync :)

It's quite easy. You need to use mapAync :)

val futures = FileIO.fromPath(path)
  .map(st => Parser.parseBytesToSeq(st.toByteBuffer))
  .batch(1000, x => x)(_ ++ _)
  .mapAsync(4)(values => doAsyncOp(values))

其中 4 是并行度。



09-06 19:05