大家好

我目前有一个用例,我需要在路由中调用一个端点,但此后继续原始消息。
我发现这样做的方法是使用multicast(),然后使用AggregationStrategy AggregationStrategies.useOriginal(),即UseOriginalAggregationStrategy()
明确地说,我不是将消息发送到多个端点,只是使用multicast()将数据“重置”为之前的状态

现在,我遇到了一个问题,我必须将某些信息从multicast()内部传播到外部路由,并在以后使用(下面的示例)。
它尝试使用更新的UseOriginalAggregationStrategy()来做到这一点,在这里我仅“重置” Message,而将Exchange保留不变,因此我可以更新属性,并在multicast()之后保留它们

聚合策略:

public class UseOriginalMessageAggregationStrategy implements AggregationStrategy {

  @Override
  public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
    if (oldExchange != null && oldExchange.getIn() != null) {
      newExchange.setIn(oldExchange.getIn());
    }
    return newExchange;
  }
}


单元测试:

@Test
public void test() throws Exception {
  camelContext.addRoutes(new RouteBuilder() {
    @Override
    public void configure() throws Exception {
      from("direct:test")
          .multicast(new UseOriginalMessageAggregationStrategy())
            .process(exchange -> { // This could also be a 'to()', it doesn't make a difference
              exchange.setProperty("c", 1);
              exchange.getIn().setHeader("d", 1);
            })
          .end()
          .log("Property a: ${exchangeProperty.a}")
          .log("Header b: ${header.b}")
          .log("Property c: ${exchangeProperty.c}")
          .log("Header d: ${header.d}");
    }
  });

  Exchange exchange = new ExchangeBuilder(camelContext)
      .withProperty("a", 1)
      .withHeader("b", 1)
      .build();
  camelContext.createProducerTemplate().send("direct:test", exchange);
}


现在,我期望的是此日志:


属性a:1
标头b:1
属性c:1
标头d:


但是实际发生的是:


属性a:1
标头b:1
属性c:1
标头d:1


请注意,即使标题属于multicast()内的消息,仍将设置“标题d”,该消息现在应该已经被忽略了。

经过一些调试之后,我注意到,发生这种情况是因为AggregationStrategy中的'oldExchange'实际上为null,此时日志才有意义,因为正是被记录的'newExchange'。

现在,这让我非常困惑。
即使'oldExchange'为空,这意味着我们没有访问multicast()之前的Exchange的权限,但UseOriginalAggregationStrategy()仍然能够准确获取该交换。

这是上面的UnitTest的示例:

@Test
public void test() throws Exception {
  camelContext.addRoutes(new RouteBuilder() {
    @Override
    public void configure() throws Exception {

      from("direct:test")
          .multicast(AggregationStrategies.useOriginal()) // <- Using the camel AggregationStrategy now
            .process(exchange -> {
              exchange.setProperty("c", 1);
              exchange.getIn().setHeader("d", 1);
            })
          .end()
          .log("Property a: ${exchangeProperty.a}")
          .log("Header b: ${header.b}")
          .log("Property c: ${exchangeProperty.c}")
          .log("Header d: ${header.d}");
    }
  });

  Exchange exchange = new ExchangeBuilder(camelContext)
      .withProperty("a", 1)
      .withHeader("b", 1)
      .build();
  camelContext.createProducerTemplate().send("direct:test", exchange);
}


这将记录(如您所愿):


属性a:1
标头b:1
属性c:
标头d:


显然,它是通过返回null来实现的。
因此,当我在AggregationStrategy中返回null时,我得到了实际的初始Exchange,但是没有办法直接在AggregationStrategy中访问该Exchange?
在我看来,这有点怪异,所以我猜测我缺少了一些东西。

最好的祝福
克里斯

TL; DR:
如何将Exchange发送到另一个终结点,并将消息重置为以前的状态,同时保留在另一个终结点中设置的ExchangeProperty

最佳答案

您的目标是通过其他属性来丰富原始有效负载。
企业集成模式明智的浓缩器模式比多播更适合。
https://camel.apache.org/components/latest/eips/content-enricher.html和该页面中的ExampleAggregationStrategy类。

public class ExampleAggregationStrategy implements AggregationStrategy {

    public Exchange aggregate(Exchange original, Exchange resource) {

        Object resourceResponse = resource.getIn().getBody();
        // retrieve stuff you want from resource exchange and add them as properties to to original
        return original;
    }
}

08-27 12:49