我在Apache Camel中有2条路由,分别是MultimapRoute(基于数据库查询的结果在程序启动时初始化Multimap对象)和UpdatingRoute(数据库更新的侦听器,当数据库更新时,Multimap对象会更新)已更新)。我想确保首先运行MultimapRoute,并按顺序处理UpdatingRoute中的所有更改。我还想确保所有并发问题都得到解决。

这是MultimapRoute类:

public class MultimapRoute implements org.apache.camel.Processor {
    static Multimap<String, String> m = new ArrayListMultimap<String, String>();
    static ReentrantLock l = new ReentrantLock();

    public void process(Exchange exchange) throws Exception {
        try {
            l.lock();
            //Query the database and fill out the map initially
        } finally {
            if (l.isHeldByCurrentThread()) l.unlock();
        }
    }
}


这是UpdatingRoute类:

public class UpdatingRoute implements org.apache.camel.Processor {
    public void process(Exchange exchange) throws Exception {
        try {
            l.lock();
            //Update the database based on latest event
        } finally {
            if (MultimapRoute.l.isHeldByCurrentThread())
                MultimapRoute.l.unlock();
        }
    }
}


我不确定这种多线程方法的线程安全性或其他问题是什么,或者如何使程序具有线程安全性。但是有人告诉我,这不是处理此特定应用程序的多线程的正确方法。非常感谢你的帮助。

最佳答案

首先,您需要在两条路由之间进行某种通信以实现此目的:


  我想确保首先运行MultimapRoute,并按顺序处理UpdatingRoute中的所有更改。


为此,我建议使用基于ConditionReentrantLock

现在,问题的第二部分:


  我不确定这种多线程方法的线程安全性或其他问题是什么,或者如何使程序具有线程安全性。


首先,您需要清楚地了解您希望使其成为线程安全的确切含义。我的猜测-您想确保在某些线程成功启动m方法(我的意思是,获得的锁)的同时,没有线程能够访问map process()。为此,请尝试更改您访问地图的方式。我的意思是,现在还可以,但是在这两个类之外都可以使用地图参考。这是我对两种想法都实施的建议:

public class MultimapRoute implements org.apache.camel.Processor {
    private static Multimap<String, String> m = new ArrayListMultimap<String, String>();
    private static ReentrantLock l = new ReentrantLock();
    public static boolean routeFinishedFlag = false;
    public static final Condition multimapRouteFinished = l.newCondition();

    public void process(Exchange exchange) throws Exception {
        l.lock();
        try {
            //Query the database and load some data to newData object
            performMapUpdate(newData);
            routeFinishedFlag = true;
            multimapRouteFinished.signal();
        } catch (InterruptedException e) {
            //if we're here, it means someone interrupted the thread that invoked process() method while it was trying to acquire map lock
        } finally {
            l.unlock();
        }
    }

    public static void performMapUpdate(Object newData) throws InterruptedException {
        l.lock();
        try {
            //here do some magic with the map and newData object. You may need to change method signature, it is just an illustration of approach
        } finally {
            l.unlock();
        }
    }
}

public class UpdatingRoute implements org.apache.camel.Processor {
    public void process(Exchange exchange) throws Exception {
        l.lock();
        try {
            while (!routeFinishedFlag) {
                MultimapRoute.multimapRouteFinished.await();
            }
            //Update the database based on latest event - again, I assume you have newObject object
            MultimapRoute.performMapUpdate(newData);
        } catch (InterruptedException e) {
            //if we're here, it means someone interrupted the thread that invoked process() method while it was waiting for condition or trying to acquire map lock
        } finally {
             l.unlock();
        }
    }
}

08-17 23:15