本文介绍了Alpakka MongoDB-在MongoSource中指定类型的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我目前正在使用Akka Streams和 Alpakka MongoDB连接器.

I'm currently playing around with Akka Streams and the Alpakka MongoDB connector.

是否可以指定MongoSource的类型?

val codecRegistry = fromRegistries(fromProviders(classOf[TodoMongo]), DEFAULT_CODEC_REGISTRY)
  private val todoCollection: MongoCollection[TodoMongo] = mongoDb
    .withCodecRegistry(codecRegistry)
    .getCollection("todo")

我想做这样的事情:

val t: FindObservable[Seq[TodoMongo]] = todoCollection.find()
MongoSource(t) // Stuck here

但是出现以下错误:

Expected Observable[scala.Document], Actual FindObservable[Seq[TodoMongo]].

我找不到有关此部分的正确文档.

I can't find the correct documentation about this part.

推荐答案

此版本尚未发布,但在Alpakka的主分支中, MongoSource.apply 采用类型参数:

This is not published yet, but in Alpakka's master branch, MongoSource.apply takes a type parameter:

object MongoSource {
  def apply[T](query: Observable[T]): Source[T, NotUsed] =
    Source.fromPublisher(ObservableToPublisher(query))
}

因此,在即将发布的Alpakka 0.18版本中,您将可以执行以下操作:

Therefore, with the upcoming 0.18 release of Alpakka, you'll be able to do the following:

val source: Source[TodoMongo, NotUsed] = MongoSource[TodoMongo](todoCollection.find())

请注意,此处的source假定todoCollection.find()返回Observable[TodoMongo];根据需要调整类型.

Note that source here assumes that todoCollection.find() returns an Observable[TodoMongo]; adjust the types as needed.

在此期间,您可以简单地手动添加以上代码.例如:

In the meantime, you could simply add the above code manually. For example:

package akka.stream.alpakka.mongodb.scaladsl

import akka.NotUsed
import akka.stream.alpakka.mongodb.ObservableToPublisher
import akka.stream.scaladsl.Source
import org.mongodb.scala.Observable

object MyMongoSource {
  def apply[T](query: Observable[T]): Source[T, NotUsed] =
    Source.fromPublisher(ObservableToPublisher(query))
}

请注意,将MyMongoSource定义为驻留在akka.stream.alpakka.mongodb.scaladsl包中(例如MongoSource),因为 ObservableToPublisher 是软件包专用的类.您将使用MyMongoSource的方式与使用MongoSource的方式相同:

Note that MyMongoSource is defined to reside in the akka.stream.alpakka.mongodb.scaladsl package (like MongoSource), because ObservableToPublisher is a package-private class. You would use MyMongoSource in the same way that you would use MongoSource:

val source: Source[TodoMongo, NotUsed] = MyMongoSource[TodoMongo](todoCollection.find())

这篇关于Alpakka MongoDB-在MongoSource中指定类型的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

06-17 11:11