问题描述
我目前正在使用Akka Streams和 Alpakka MongoDB连接器.
I'm currently playing around with Akka Streams and the Alpakka MongoDB connector.
是否可以指定MongoSource
的类型?
val codecRegistry = fromRegistries(fromProviders(classOf[TodoMongo]), DEFAULT_CODEC_REGISTRY)
private val todoCollection: MongoCollection[TodoMongo] = mongoDb
.withCodecRegistry(codecRegistry)
.getCollection("todo")
我想做这样的事情:
val t: FindObservable[Seq[TodoMongo]] = todoCollection.find()
MongoSource(t) // Stuck here
但是出现以下错误:
Expected Observable[scala.Document], Actual FindObservable[Seq[TodoMongo]].
我找不到有关此部分的正确文档.
I can't find the correct documentation about this part.
推荐答案
此版本尚未发布,但在Alpakka的主分支中, MongoSource.apply
采用类型参数:
This is not published yet, but in Alpakka's master branch, MongoSource.apply
takes a type parameter:
object MongoSource {
def apply[T](query: Observable[T]): Source[T, NotUsed] =
Source.fromPublisher(ObservableToPublisher(query))
}
因此,在即将发布的Alpakka 0.18版本中,您将可以执行以下操作:
Therefore, with the upcoming 0.18 release of Alpakka, you'll be able to do the following:
val source: Source[TodoMongo, NotUsed] = MongoSource[TodoMongo](todoCollection.find())
请注意,此处的source
假定todoCollection.find()
返回Observable[TodoMongo]
;根据需要调整类型.
Note that source
here assumes that todoCollection.find()
returns an Observable[TodoMongo]
; adjust the types as needed.
在此期间,您可以简单地手动添加以上代码.例如:
In the meantime, you could simply add the above code manually. For example:
package akka.stream.alpakka.mongodb.scaladsl
import akka.NotUsed
import akka.stream.alpakka.mongodb.ObservableToPublisher
import akka.stream.scaladsl.Source
import org.mongodb.scala.Observable
object MyMongoSource {
def apply[T](query: Observable[T]): Source[T, NotUsed] =
Source.fromPublisher(ObservableToPublisher(query))
}
请注意,将MyMongoSource
定义为驻留在akka.stream.alpakka.mongodb.scaladsl
包中(例如MongoSource
),因为 ObservableToPublisher
是软件包专用的类.您将使用MyMongoSource
的方式与使用MongoSource
的方式相同:
Note that MyMongoSource
is defined to reside in the akka.stream.alpakka.mongodb.scaladsl
package (like MongoSource
), because ObservableToPublisher
is a package-private class. You would use MyMongoSource
in the same way that you would use MongoSource
:
val source: Source[TodoMongo, NotUsed] = MyMongoSource[TodoMongo](todoCollection.find())
这篇关于Alpakka MongoDB-在MongoSource中指定类型的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!