我目前正在构建一种解决方案,以将数据从mongoDb传输到elasticsearch。
我的目标是跟踪所有成功传输给Elasticsearch的项目。
我正在使用akka流和elastic4s。当前流到es看起来像这样

val esSubscriber: BulkIndexingSubscriber[CustomT] = esClient.subscriber[CustomT](
    batchSize = batchSize,
    completionFn = { () => elasticFinishPromise.success(()); ()},
    errorFn = { (t: Throwable) => elasticFinishPromise.failure(t); ()},
    concurrentRequests = concurrentRequests
    )
val esSink: Sink[CustomT, NotUsed] = Sink.fromSubscriber(esSubscriber)

从我的来源来看,像这样:
val a: [NotUsed] = mongoSrc
  .via(some operations..)
  .to(esSink)
  .run()

现在一切正常,现在我正在记录第二个接收器的项目计数。但是我宁愿记录真正传输给elasticsearch的项目。
elastic4s订户提供了一个带有listener: ResponseListeneronAck(): UnitonFailure(): Unit,我很想像这样将信息返回到流中
val mongoSrc: [Source..]
val doStuff: [Flow..]
val esSink: [Flow..] //now as flow instead of sink
val logSink: [Sink[Int...]] //now gets for example a 1 for each successful transported item

mongoSrc ~> doStuff ~> esSink ~> logSink

我将如何实现?我是否需要一个定制阶段来缓冲onAckonFailure的元素?还是有更简单的方法?

谢谢你的帮助。

最佳答案

您可以通过利用Subscriber[T]“流化”您的Flow.fromSinkAndSource接收器。查看the docs中的“(来自接收器和源)的复合流”插图。

在这种情况下,您将把自定义actorPublisher附加为源,并通过onAck()向其发送消息。

由于您要求一种更简单的方法:

val doStuff = Flow[DocToIndex]
                .grouped(batchSize)
                .mapAsync(concurrentRequests)(bulkopFuture)

简而言之,除了所有有用的抽象之外,elastic4s订阅者只是a bulk update request

08-28 14:40