设置Scala 2.11.4,Playframework 2.3.7,Reacivemongo(尝试使用0.10.5.0.akka23 / 0.11.0-SNAPSHOT两者)。
我们有一个包含18'000个实体的集合,使用Enumerator / Iteratee方法以异步方式处理此集合。
情况1。
处理非常简单(将实体提取为CSV格式并将其作为REST响应分块发送),一切正常,提取并处理了所有记录。
情况2
处理涉及最多10秒钟的计算,计算后更新记录,使用foreach Iteratee完成计算,更新内部任务跟踪器中已处理实体的数量。处理可能需要一段时间,但没关系
Patient.findByClient(clientName) &>
Enumeratee.mapM(patient => {
val evaluatedAndSaveTask = patient.
evaluate(parser).
flatMap(patientOpt =>
patientOpt.
map(evaluatedPatient => evaluatedPatient.saveAndGet().map(Some(_))).
getOrElse(Future.successful(None))
)
evaluatedAndSaveTask.recover({
case t =>
t.printStackTrace()
None
})
})
// Step 2.1. Running evaluation process through Iteratee
val evaluationTask = evaluation run Iteratee.foreach(patientOpt => {
collection.update(Json.obj("clientName" -> clientName), Json.obj("$inc" -> Json.obj("processedPatients" -> 1))))
)
// Step 2.3. Log errors
evaluationTask.onSuccess({ case _ => Patient.LOG.info("PatientEvaluation DONE") })
evaluationTask.onFailure({ case t => {
t.printStackTrace();
Patient.LOG.info("PatientEvaluation FAILED");
}})
在这种情况下,只有575个实体得到处理,并且Iteratee结束打印“患者评估完成”。
我从等式中删除了save,但这没有帮助。
为什么会这样呢?
最佳答案
我终于找到了问题的根源-Mongo在超时后会自动使评估过期,您可以指定noCursorTimeout标志来防止这种情况:
collection.
find(findQ).
sort(if(sortQF.values.isEmpty) sortQ else sortQF).
options(QueryOpts(skipN = offset + page._1 * page._2).noCursorTimeout).
cursor[T].
由于某种原因,在这种情况下,ReactiveMongo不会引发Exception,而只是关闭Iterator。在此之后,我在ReactiveMongo https://github.com/ReactiveMongo/ReactiveMongo/issues/250中创建了一个问题。
现在,对我而言,使游标过期并以offset重新开始可能更安全。