我在Apache Camel中有2条路由,分别是MultimapRoute(基于数据库查询的结果在程序启动时初始化Multimap对象)和UpdatingRoute(数据库更新的侦听器,当数据库更新时,Multimap对象会更新)已更新)。我想确保首先运行MultimapRoute,并按顺序处理UpdatingRoute中的所有更改。我还想确保所有并发问题都得到解决。
这是MultimapRoute类:
public class MultimapRoute implements org.apache.camel.Processor {
static Multimap<String, String> m = new ArrayListMultimap<String, String>();
static ReentrantLock l = new ReentrantLock();
public void process(Exchange exchange) throws Exception {
try {
l.lock();
//Query the database and fill out the map initially
} finally {
if (l.isHeldByCurrentThread()) l.unlock();
}
}
}
这是UpdatingRoute类:
public class UpdatingRoute implements org.apache.camel.Processor {
public void process(Exchange exchange) throws Exception {
try {
l.lock();
//Update the database based on latest event
} finally {
if (MultimapRoute.l.isHeldByCurrentThread())
MultimapRoute.l.unlock();
}
}
}
我不确定这种多线程方法的线程安全性或其他问题是什么,或者如何使程序具有线程安全性。但是有人告诉我,这不是处理此特定应用程序的多线程的正确方法。非常感谢你的帮助。
最佳答案
首先,您需要在两条路由之间进行某种通信以实现此目的:
我想确保首先运行MultimapRoute,并按顺序处理UpdatingRoute中的所有更改。
为此,我建议使用基于Condition
的ReentrantLock
。
现在,问题的第二部分:
我不确定这种多线程方法的线程安全性或其他问题是什么,或者如何使程序具有线程安全性。
首先,您需要清楚地了解您希望使其成为线程安全的确切含义。我的猜测-您想确保在某些线程成功启动m
方法(我的意思是,获得的锁)的同时,没有线程能够访问map process()
。为此,请尝试更改您访问地图的方式。我的意思是,现在还可以,但是在这两个类之外都可以使用地图参考。这是我对两种想法都实施的建议:
public class MultimapRoute implements org.apache.camel.Processor {
private static Multimap<String, String> m = new ArrayListMultimap<String, String>();
private static ReentrantLock l = new ReentrantLock();
public static boolean routeFinishedFlag = false;
public static final Condition multimapRouteFinished = l.newCondition();
public void process(Exchange exchange) throws Exception {
l.lock();
try {
//Query the database and load some data to newData object
performMapUpdate(newData);
routeFinishedFlag = true;
multimapRouteFinished.signal();
} catch (InterruptedException e) {
//if we're here, it means someone interrupted the thread that invoked process() method while it was trying to acquire map lock
} finally {
l.unlock();
}
}
public static void performMapUpdate(Object newData) throws InterruptedException {
l.lock();
try {
//here do some magic with the map and newData object. You may need to change method signature, it is just an illustration of approach
} finally {
l.unlock();
}
}
}
public class UpdatingRoute implements org.apache.camel.Processor {
public void process(Exchange exchange) throws Exception {
l.lock();
try {
while (!routeFinishedFlag) {
MultimapRoute.multimapRouteFinished.await();
}
//Update the database based on latest event - again, I assume you have newObject object
MultimapRoute.performMapUpdate(newData);
} catch (InterruptedException e) {
//if we're here, it means someone interrupted the thread that invoked process() method while it was waiting for condition or trying to acquire map lock
} finally {
l.unlock();
}
}
}