问题描述
使用Camel拆分ArrayList并并行处理每个项目最多10个线程。
以下是配置。
线程池配置文件设置为最大线程数= 10。
Using Camel to split an ArrayList and process each item in parallel up to 10 threads.Following is the config.Thread pool profile is set to max thread count =10.
<camel:route id="ReportsRoute">
<camel:from uri="direct:processReportsChannel" />
<camel:to uri="bean:reportRepository?method=getPendingTransactions" />
<camel:split parallelProcessing="true" executorServiceRef="ReportThreadPoolProfile">
<camel:simple>${body}</camel:simple>
<camel:doTry>
<camel:to uri="direct:processReportChannel" />
<camel:doCatch>
<camel:exception>java.lang.Exception</camel:exception>
<camel:handled>
<camel:constant>true</camel:constant>
</camel:handled>
<camel:to uri="bean:ReportRepository?method=markAsFailed"/>
<camel:wireTap uri="direct:loggingAndNotificationChannel" />
</camel:doCatch>
</camel:doTry>
</camel:split>
</camel:route>
bean:reportRepository?method = getPendingTransactions
gets ArrayList并传递给Splitter。
bean:reportRepository?method=getPendingTransactions
gets the ArrayList and passes to the Splitter.
processReportChannel
是处理项目的处理器。
processReportChannel
is the processor that handles items.
问题:
当作业开始时,它会启动10个线程,但有些线程正在拾取相同的项目。例如,如果我在ArrayList,thread_no_1和thread_no_2中有item_no_1到10,或者有时候更多线程正在拾取,那么就说item_no_2。是因为Array List不是线程安全的,Splitter不管理它吗?
Problem:It is starting 10 threads when the job starts, but some threads are picking up the same item. For example, if I have item_no_1 through to 10 in the ArrayList, thread_no_1 and thread_no_2 or sometime more threads are picking up let's say item_no_2. Is it because Array List is not thread safe and Splitter doesn't manage that?
我不是这方面的专家,需要帮助指出问题所在。
I'm not an expert in this and need help to point out where the issue is.
推荐答案
我测试了以下(更简单)设置:
I tested with following (simpler) setup:
<camelContext xmlns="http://camel.apache.org/schema/spring">
<route id="ReportsRoute">
<from uri="direct:start" />
<!-- By default a pool size of 10 is used. -->
<split parallelProcessing="true">
<simple>${body}</simple>
<to uri="direct:sub" />
</split>
</route>
<route>
<from uri="direct:sub"/>
<log message="Processing item ${body}" />
</route>
</camelContext>
测试:
List<Object> list = new ArrayList<>();
for (int i = 0; i < 1000; i++) {
list.add("And we go and go: " + (i + 1));
}
template.sendBody("direct:start", list);
使用此设置,没有任何条目被处理两次。因此,处理器中必定存在导致问题的内容,即同一个列表项由多个线程拾取。
With this setup no entry was processed twice. So there must be something in your processors that lead to the issue, that the same list item is picked up by more than one thread.
这篇关于Camel Splitter并行处理数组列表 - 并发访问问题的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!