我对路由的理解(以Apache Camel的语言来说)是它代表了从一个端点到另一端点的数据流,并且它将沿对数据执行EIP类型操作的方式在各种处理器处停止。

如果这是对路线的正确/合理的评估,那么我正在建模一个问题,我认为它需要在同一CamelContext内包含多个路线(我使用的是Spring):


路线1:从Source-1中提取数据,进行处理,将其转换为List<SomePOJO>,然后将其发送到聚合器
路线2:从Source-2提取数据,进行处理,还将其转换为List<SomePOJO>,然后将其发送到聚合器
路由3:包含一个聚合器,该聚合器等待直到从路由1和路由2都收到List<SomePOJO>,然后继续处理聚合列表


事情是这样的:两个List<SomePOJO>需要同时到达聚合器,或者更确切地说,聚合器bean必须等待直到从两条路由接收到数据,然后才能将2个列表聚合到单个List<SomePOJO>中,并且将汇总列表发送到Route 3的其余部分。

到目前为止,我具有以下伪编码的<camelContext>

<camelContext id="my-routes" xmlns="http://camel.apache.org/schema/spring">
    <!-- Route 1 -->
    <route id="route-1">
        <from uri="time://runOnce?repeatCount=1&amp;delay=10" />

        <!-- Extracts data from Source 1, processes it, and then produces a List<SomePOJO>. -->
        <to uri="bean:extractor1?method=process" />

        <!-- Send to aggregator. -->
        <to uri="direct:aggregator" />
    </route>

    <!-- Route 2 -->
    <route id="route-2">
        <from uri="time://runOnce?repeatCount=1&amp;delay=10" />

        <!-- Extracts data from Source 2, processes it, and then produces a List<SomePOJO>. -->
        <to uri="bean:extractor2?method=process" />

        <!-- Send to aggregator. -->
        <to uri="direct:aggregator" />
    </route>

    <!-- Route 3 -->
    <route id="route-3">
        <from uri="direct:aggregator" />

        <aggregate strategyRef="listAggregatorStrategy">
            <correlationExpression>
                <!-- Haven't figured this part out yet. -->
            </correlationExpression>
            <to uri="bean:lastProcessor?method=process" />
        </aggregate>
    </route>
</camelContext>

<bean id="listAggregatorStrategy" class="com.myapp.ListAggregatorStrategy" />


然后在Java中:

public class ListAggregatorStrategy implements AggregatoryStrategy {
    public Exchange aggregate(Exchange exchange) {
        List<SomePOJO> route1POJOs = extractRoute1POJOs(exchange);
        List<SomePOJO> route2POJOs = extractRoute2POJOs(exchange);

        List<SomePOJO> aggregateList = new ArrayList<SomePOJO>(route1POJOs);
        aggregateList.addAll(route2POJOs);

        return aggregateList;
    }
}


我的问题


我的基本设置正确吗?换句话说,我是否正确使用direct:aggregator端点将数据从route-1route-2发送到route-3的聚合器?
我的聚合器会按照我期望的方式工作吗?假设extractor1中的route-1 bean运行仅需5秒钟,但是extractor2中的route-2 bean运行仅需2分钟。在t = 5时,聚合器应从extractor1接收数据并开始等待(2分钟),直到extractor2完成并将剩余的数据提供给聚合器。是?

最佳答案

听起来您处在正确的轨道上,Aggregator页上有很多关于此的很好的信息。

<correlationExpression>是匹配每个路由中的Exchange的关键,complementSize可以指定要等待的数量。在您的情况下,看起来每个路由只能运行一次,在这种情况下,表达式可能使用每个Exchange的固定标头值,否则每个路由都需要一个计数器类。

这是您的示例的更新:

<route id="route-1">
    <from uri="time://runOnce?repeatCount=1&amp;delay=10" />
    <to uri="bean:extractor1?method=process" />
    <setHeader headerName="id">
        <constant>myHeaderValue</constant>
    </setHeader>
    <to uri="direct:aggregator" />
</route>

<route id="route-2">
    <from uri="time://runOnce?repeatCount=1&amp;delay=10" />
    <to uri="bean:extractor2?method=process" />
    <setHeader headerName="id">
        <constant>myHeaderValue</constant>
    </setHeader>
    <to uri="direct:aggregator" />
</route>

<route id="route-3">
    <from uri="direct:aggregator" />

    <aggregate strategyRef="listAggregatorStrategy" completionSize="2">
        <correlationExpression>
            <simple>header.id</simple>
        </correlationExpression>
        <to uri="bean:lastProcessor?method=process" />
    </aggregate>
</route>

10-04 22:28
查看更多