大家好
我目前有一个用例,我需要在路由中调用一个端点,但此后继续原始消息。
我发现这样做的方法是使用multicast()
,然后使用AggregationStrategy AggregationStrategies.useOriginal()
,即UseOriginalAggregationStrategy()
明确地说,我不是将消息发送到多个端点,只是使用multicast()
将数据“重置”为之前的状态
现在,我遇到了一个问题,我必须将某些信息从multicast()
内部传播到外部路由,并在以后使用(下面的示例)。
它尝试使用更新的UseOriginalAggregationStrategy()
来做到这一点,在这里我仅“重置” Message
,而将Exchange
保留不变,因此我可以更新属性,并在multicast()
之后保留它们
聚合策略:
public class UseOriginalMessageAggregationStrategy implements AggregationStrategy {
@Override
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
if (oldExchange != null && oldExchange.getIn() != null) {
newExchange.setIn(oldExchange.getIn());
}
return newExchange;
}
}
单元测试:
@Test
public void test() throws Exception {
camelContext.addRoutes(new RouteBuilder() {
@Override
public void configure() throws Exception {
from("direct:test")
.multicast(new UseOriginalMessageAggregationStrategy())
.process(exchange -> { // This could also be a 'to()', it doesn't make a difference
exchange.setProperty("c", 1);
exchange.getIn().setHeader("d", 1);
})
.end()
.log("Property a: ${exchangeProperty.a}")
.log("Header b: ${header.b}")
.log("Property c: ${exchangeProperty.c}")
.log("Header d: ${header.d}");
}
});
Exchange exchange = new ExchangeBuilder(camelContext)
.withProperty("a", 1)
.withHeader("b", 1)
.build();
camelContext.createProducerTemplate().send("direct:test", exchange);
}
现在,我期望的是此日志:
属性a:1
标头b:1
属性c:1
标头d:
但是实际发生的是:
属性a:1
标头b:1
属性c:1
标头d:1
请注意,即使标题属于
multicast()
内的消息,仍将设置“标题d”,该消息现在应该已经被忽略了。经过一些调试之后,我注意到,发生这种情况是因为AggregationStrategy中的'oldExchange'实际上为null,此时日志才有意义,因为正是被记录的'newExchange'。
现在,这让我非常困惑。
即使'oldExchange'为空,这意味着我们没有访问
multicast()
之前的Exchange的权限,但UseOriginalAggregationStrategy()
仍然能够准确获取该交换。这是上面的UnitTest的示例:
@Test
public void test() throws Exception {
camelContext.addRoutes(new RouteBuilder() {
@Override
public void configure() throws Exception {
from("direct:test")
.multicast(AggregationStrategies.useOriginal()) // <- Using the camel AggregationStrategy now
.process(exchange -> {
exchange.setProperty("c", 1);
exchange.getIn().setHeader("d", 1);
})
.end()
.log("Property a: ${exchangeProperty.a}")
.log("Header b: ${header.b}")
.log("Property c: ${exchangeProperty.c}")
.log("Header d: ${header.d}");
}
});
Exchange exchange = new ExchangeBuilder(camelContext)
.withProperty("a", 1)
.withHeader("b", 1)
.build();
camelContext.createProducerTemplate().send("direct:test", exchange);
}
这将记录(如您所愿):
属性a:1
标头b:1
属性c:
标头d:
显然,它是通过返回null来实现的。
因此,当我在AggregationStrategy中返回null时,我得到了实际的初始Exchange,但是没有办法直接在AggregationStrategy中访问该Exchange?
在我看来,这有点怪异,所以我猜测我缺少了一些东西。
最好的祝福
克里斯
TL; DR:
如何将Exchange发送到另一个终结点,并将消息重置为以前的状态,同时保留在另一个终结点中设置的ExchangeProperty