我正在编写一段代码,当缓冲区(列表)增长到一定大小时,它将填充MongoDB集合。
import scala.actors.Actor
import com.mongodb.casbah.Imports._
import scala.collection.mutable.ListBuffer
class PopulateDB extends Actor {
val buffer = new ListBuffer[DBObject]
val mongoConn = MongoConnection()
val mongoCol = mongoConn("casbah_test")("logs")
def add(info: DBObject = null) {
if (info != null) buffer += info
if (buffer.size > 0 && (info == null || buffer.length >= 1000)) {
mongoCol.insert(buffer.toList)
buffer.clear
println("adding a batch")
}
}
def act() {
loop {
react {
case info: DBObject => add(info)
case msg if msg == "closeConnection" =>
println("Close connection")
add()
mongoConn.close
}
}
}
}
但是,当我运行以下代码时,scala偶尔会在“mongocol.insert(buffer.tolist)”行上抛出一个“concurrentmodificationexception”。我很肯定这和“mongocol.insert”有关。我想知道代码是否有根本性的错误。或者我应该使用类似akka的“原子{…}”来避免这个问题。
下面是完整的堆栈跟踪:
PopulateDB@7e859a68: caught java.util.ConcurrentModificationException
java.util.ConcurrentModificationException
at java.util.LinkedHashMap$LinkedHashIterator.nextEntry(LinkedHashMap.java:373)
at java.util.LinkedHashMap$EntryIterator.next(LinkedHashMap.java:392)
at java.util.LinkedHashMap$EntryIterator.next(LinkedHashMap.java:391)
at org.bson.BSONEncoder.putObject(BSONEncoder.java:113)
at org.bson.BSONEncoder.putObject(BSONEncoder.java:67)
at com.mongodb.DBApiLayer$MyCollection.insert(DBApiLayer.java:215)
at com.mongodb.DBApiLayer$MyCollection.insert(DBApiLayer.java:180)
at com.mongodb.DBCollection.insert(DBCollection.java:85)
at com.mongodb.casbah.MongoCollectionBase$class.insert(MongoCollection.scala:561)
at com.mongodb.casbah.MongoCollection.insert(MongoCollection.scala:864)
at PopulateDB.add(PopulateDB.scala:14)
at PopulateDB$$anonfun$act$1$$anonfun$apply$1.apply(PopulateDB.scala:26)
at PopulateDB$$anonfun$act$1$$anonfun$apply$1.apply(PopulateDB.scala:25)
at scala.actors.ReactorTask.run(ReactorTask.scala:34)
at scala.actors.Reactor$class.resumeReceiver(Reactor.scala:129)
at PopulateDB.scala$actors$ReplyReactor$$super$resumeReceiver(PopulateDB.scala:5)
at scala.actors.ReplyReactor$class.resumeReceiver(ReplyReactor.scala:69)
at PopulateDB.resumeReceiver(PopulateDB.scala:5)
at scala.actors.Actor$class.searchMailbox(Actor.scala:478)
at PopulateDB.searchMailbox(PopulateDB.scala:5)
at scala.actors.Reactor$$anonfun$startSearch$1$$anonfun$apply$mcV$sp$1.apply(Reactor.scala:114)
at scala.actors.Reactor$$anonfun$startSearch$1$$anonfun$apply$mcV$sp$1.apply(Reactor.scala:114)
at scala.actors.ReactorTask.run(ReactorTask.scala:36)
at scala.concurrent.forkjoin.ForkJoinPool$AdaptedRunnable.exec(ForkJoinPool.java:611)
at scala.concurrent.forkjoin.ForkJoinTask.quietlyExec(ForkJoinTask.java:422)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.mainLoop(ForkJoinWorkerThread.java:340)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:325)
谢谢,
德里克
最佳答案
dbobject不是线程安全的;您正在用actor消息发送一个dbobject。它很可能稍后再次被修改,这将导致并发修改问题。
我建议首先尝试在dbobject进入actor时对其使用clone()
,然后将其放入缓冲区。它只是一个浅拷贝,但至少应该足以在linkedhashmap上引起并发修改问题,linkedhashmap支持dbobject上的键(由于lhm而保持顺序)。
我会尝试:
def act() {
loop {
react {
case info: DBObject => add(info.clone())
case msg if msg == "closeConnection" =>
println("Close connection")
add()
mongoConn.close
}
}
}
如果这不起作用,那么在dbobject被发送给actor之后,看看您正在修改的其他任何地方。