问题描述
Here is my scenario:
I have a master actor, which receives messages from multiple child actors. These messages contain data to be aggregated. In this aggregation logic, do I need to take care of synchronization issues, if I use a shared data structure to collect the aggregation?
else if(arg0 instanceof ReducedMsg){
ReducedMsg reduced = (ReducedMsg)arg0;
counter.decrementAndGet();
synchronized(finalResult){
finalResult.add((KeyValue<K, V>) reduced.getReduced());
if(counter.get() == 0){
if(checkAndReduce(finalResult)){
finalResult.clear();
}
else{
stop();
latch.countDown();
}
}
}
}
So as you can see I have a finalResult, to which each message will be aggregated, and after a processing logic the collection needs to be cleared as well.
Actually what I am trying to implement is a recursive (associative) reduction mapreduce. So I need to keep the synchronized block I assume? Or is it by any chance Akka executes the onReceive one thread at a time?
This logic produces accurate and predictable result on small data set. My problem is when my input data set is a little large, the code hangs. I want to be sure that is because of the context switches for my synchronization block, so that I may dwelve into a different design.
onReceive()
is never called concurrently. This is the most fundamental guarantee Akka is giving to you.
This means that if your counter
variable is a field in an actor and no other piece of code can access that field directly, you can safely use normal int
/long
instead of AtomicInteger
/AtomicLong
. Also synchronization on finalResult
is not necessary assuming it is a field encapsulated and hidden in an actor.
Finally the usage of CountDownLatch
is suspicious. In Akka applications you shouldn't use any synchronizations primitives. Actors are essentially single-threaded and all communication (including waking up and passing data) should be implemented via message passing.
This is all explained in the documentation: http://doc.akka.io/docs/akka/2.0.2/general/jmm.html#Actors_and_the_Java_Memory_Model
这篇关于Akka onReceive 方法是否同时执行?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!