我目前正在构建一种解决方案,以将数据从mongoDb传输到elasticsearch。
我的目标是跟踪所有成功传输给Elasticsearch的项目。
我正在使用akka流和elastic4s。当前流到es看起来像这样
val esSubscriber: BulkIndexingSubscriber[CustomT] = esClient.subscriber[CustomT](
batchSize = batchSize,
completionFn = { () => elasticFinishPromise.success(()); ()},
errorFn = { (t: Throwable) => elasticFinishPromise.failure(t); ()},
concurrentRequests = concurrentRequests
)
val esSink: Sink[CustomT, NotUsed] = Sink.fromSubscriber(esSubscriber)
从我的来源来看,像这样:
val a: [NotUsed] = mongoSrc
.via(some operations..)
.to(esSink)
.run()
现在一切正常,现在我正在记录第二个接收器的项目计数。但是我宁愿记录真正传输给elasticsearch的项目。
elastic4s订户提供了一个带有
listener: ResponseListener
和onAck(): Unit
的onFailure(): Unit
,我很想像这样将信息返回到流中val mongoSrc: [Source..]
val doStuff: [Flow..]
val esSink: [Flow..] //now as flow instead of sink
val logSink: [Sink[Int...]] //now gets for example a 1 for each successful transported item
mongoSrc ~> doStuff ~> esSink ~> logSink
我将如何实现?我是否需要一个定制阶段来缓冲
onAck
和onFailure
的元素?还是有更简单的方法?谢谢你的帮助。
最佳答案
您可以通过利用Subscriber[T]
“流化”您的Flow.fromSinkAndSource
接收器。查看the docs中的“(来自接收器和源)的复合流”插图。
在这种情况下,您将把自定义actorPublisher附加为源,并通过onAck()
向其发送消息。
由于您要求一种更简单的方法:
val doStuff = Flow[DocToIndex]
.grouped(batchSize)
.mapAsync(concurrentRequests)(bulkopFuture)
简而言之,除了所有有用的抽象之外,elastic4s订阅者只是a bulk update request。