我正在使用play framework 2.5.0和reactivemongo构建一个应用程序,我花了很多时间,在一些在大多数web语言中都很容易做到的事情上。
那就是一次插入许多文档。
为此,我必须使用reactivemongo函数bulkInsert
我发现this google group有一个非常简单的例子,但是它是从2013年开始的,现在签名改变了

def bulkInsert[T](enumerator: Enumerator[T])


def bulkInsert(documents: Stream[P.Document], ordered: Boolean, writeConcern: WriteConcern)(implicit ec: ExecutionContext): Future[MultiBulkWriteResult]

因此,在这里,我试图以那个例子为例,找到一种将枚举数转换为流的方法(没有找到这样做的方法):
val schemasDocs: Seq[JsObject] = {
  jsonSchemas.fields.map {
    case (field, value) => Json.obj(field -> value)
  }
}
val enumerator = Enumerator.enumerate(schemasDocs)
val schemasStream = Source.fromPublisher(Streams.enumeratorToPublisher(enumerator)) // my attempt to turn enumerator into a Stream
val schemasInsert = {
  getCollection("schemas").flatMap(
    _.bulkInsert(schemasStream, true)
  )
}

现在我发现自己潜入akka、reactivemongo和play api,试图从一系列jsobjects中创建一个jsobjects流。
然后我尝试了另一种方法:来自reactivemongo网站的example
val bulkDocs = schemasDocs.map(implicitly[collection.ImplicitlyDocumentProducer](_))
collection.bulkInsert(ordered=true)(bulkDocs: _*)

给我一个同样难以调试的错误:
type mismatch; found : Seq[reactivemongo.play.json.collection.JSONCollection#ImplicitlyDocumentProducer] required: Seq[x$48.ImplicitlyDocumentProducer]

我不想使用流和第二种解决方案,因为我不喜欢在代码中包含我不理解的东西。

最佳答案

我刚找到如何处理bulkinsert。有一个例子
生成.sbt

...
libraryDependencies ++= Seq(
  "org.reactivemongo" %% "play2-reactivemongo" % "0.11.14"
)
...

插件.sbt
addSbtPlugin("com.typesafe.play" % "sbt-plugin" % "2.5.12")

cxTransactionsRepository.scala
package cx.repository

import cx.model.CxTransactionEntity
import play.modules.reactivemongo.ReactiveMongoApi
import reactivemongo.play.json.collection.JSONCollection

import scala.concurrent.{ExecutionContext, Future}

class CxTransactionsRepository @Inject()(val reactiveMongoApi: ReactiveMongoApi)(implicit ec: ExecutionContext){

  private val cxTransactionsCollectionFuture: Future[JSONCollection] = reactiveMongoApi.database.map(_.collection[JSONCollection]("cxTransactions"))

  def bulkInsert(seq: Seq[CxTransactionEntity]): Future[Int] = {
    for {
      transactions <- cxTransactionsCollectionFuture
      writeResult <- transactions.bulkInsert(ordered = false)(seq.map(implicitly[transactions.ImplicitlyDocumentProducer](_)): _*)
    } yield {
      writeResult.n
    }
  }

}

07-27 22:42