我正在尝试使用Amqp alpakka连接器作为源和接收器。

Source<CommittableReadResult, NotUsed> AMQP_SOURCE
      -> processAndGetResponse
      -> Sink<ByteString, CompletionStage<Done>> AMQP_SINK


在接收器的操作成功之后,我想确认从Amqp队列获得的消息。像这样:

amqp_source(committableReadResult)
    -> processAndGetResponse
    -> amqp_sink
    -> IfSinkOperationSuccess.Then(committableReadResult.ack())


我该如何实现?我基本上希望仅在接收器操作成功之后才将消息标记为确认。

最佳答案

通常,如果您有一个实现为SinkFuture[Done](Scala API,我相信Java等效项是CompletionStage[Done]),则可以在Sink阶段使用该mapAsync运行子流。数据仅在成功时通过。

通过快速浏览Alpakka AMQP API,Sink都以这种方式实现,因此该方法将起作用。

假设您的doSomethingWithMessage函数同时导致CommittableReadResultWriteMessage,Scala中的类似以下内容将起作用:

def doSomethingWithMessage(in: CommittableReadResult): (CommittableReadResult, WriteMessage) = ???

amqpSource
  .map(doSomethingWithMessage)
  .mapAsync(parallelism) { tup =>
    val (crr, wm) = tup
    val fut = Source.single(crr, wm).runWith(amqpSink)
    fut.flatMap(_ => crr.ack().map(_ => crr.message))
  }.runWith(Sink.ignore)

08-07 16:17