问题描述
使用 Camel 拆分 ArrayList 并并行处理最多 10 个线程的每个项目.以下是配置.线程池配置文件设置为最大线程数 =10.
Using Camel to split an ArrayList and process each item in parallel up to 10 threads.Following is the config.Thread pool profile is set to max thread count =10.
<camel:route id="ReportsRoute">
<camel:from uri="direct:processReportsChannel" />
<camel:to uri="bean:reportRepository?method=getPendingTransactions" />
<camel:split parallelProcessing="true" executorServiceRef="ReportThreadPoolProfile">
<camel:simple>${body}</camel:simple>
<camel:doTry>
<camel:to uri="direct:processReportChannel" />
<camel:doCatch>
<camel:exception>java.lang.Exception</camel:exception>
<camel:handled>
<camel:constant>true</camel:constant>
</camel:handled>
<camel:to uri="bean:ReportRepository?method=markAsFailed"/>
<camel:wireTap uri="direct:loggingAndNotificationChannel" />
</camel:doCatch>
</camel:doTry>
</camel:split>
</camel:route>
bean:reportRepository?method=getPendingTransactions
获取 ArrayList 并传递给 Splitter.
bean:reportRepository?method=getPendingTransactions
gets the ArrayList and passes to the Splitter.
processReportChannel
是处理项目的处理器.
processReportChannel
is the processor that handles items.
问题:作业开始时它正在启动 10 个线程,但有些线程正在拾取相同的项目.例如,如果我在 ArrayList 中有 item_no_1 到 10,thread_no_1 和 thread_no_2 或某个时候更多线程正在接收,让我们说 item_no_2.是不是因为 Array List 不是线程安全的,而 Splitter 不管理它?
Problem:It is starting 10 threads when the job starts, but some threads are picking up the same item. For example, if I have item_no_1 through to 10 in the ArrayList, thread_no_1 and thread_no_2 or sometime more threads are picking up let's say item_no_2. Is it because Array List is not thread safe and Splitter doesn't manage that?
我不是这方面的专家,需要帮助指出问题所在.
I'm not an expert in this and need help to point out where the issue is.
推荐答案
我测试了以下(更简单的)设置:
I tested with following (simpler) setup:
<camelContext xmlns="http://camel.apache.org/schema/spring">
<route id="ReportsRoute">
<from uri="direct:start" />
<!-- By default a pool size of 10 is used. -->
<split parallelProcessing="true">
<simple>${body}</simple>
<to uri="direct:sub" />
</split>
</route>
<route>
<from uri="direct:sub"/>
<log message="Processing item ${body}" />
</route>
</camelContext>
测试:
List<Object> list = new ArrayList<>();
for (int i = 0; i < 1000; i++) {
list.add("And we go and go: " + (i + 1));
}
template.sendBody("direct:start", list);
使用此设置,没有条目被处理两次.因此,您的处理器中必须存在导致问题的某些内容,即多个线程拾取了相同的列表项.
With this setup no entry was processed twice. So there must be something in your processors that lead to the issue, that the same list item is picked up by more than one thread.
这篇关于Camel Splitter 并行处理数组列表 - 并发访问问题的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!