我想了解PublishSubscribeChannel的工作原理,所以我实现了一个小例子:
@Bean
public MessageSource<?> integerMessageSource() {
MethodInvokingMessageSource source = new MethodInvokingMessageSource();
source.setObject(new AtomicInteger());
source.setMethodName("getAndIncrement");
return source;
}
@Bean
public IntegrationFlow mainFlow() {
// @formatter:off
return IntegrationFlows
.from(integerMessageSource(), c -> c.poller(Pollers.fixedRate(1000)))
.publishSubscribeChannel(pubSub -> pubSub
.subscribe(flow -> flow
.handle(message -> LOG.info("Handling message, step 1: {}", message.getPayload())))
.subscribe(flow -> flow
.handle(message -> LOG.info("Handling message, step 2: {}", message.getPayload())))
.subscribe(flow -> flow
.transform(source -> MessageBuilder.withPayload("Error").build())
.handle(message -> {
LOG.info("Error");
}))
.subscribe(flow -> flow
.handle(message -> LOG.info("Handling message, step 4: {}", message.getPayload())))
)
.get();
// @formatter:on
}
我期望我看到的是输出:
Handling message, step 1...
Handling message, step 2...
Error
Handling message, step 4...
但是始终首先处理第三个子流(带有“错误”输出)。当我尝试为步骤1、2和4定义顺序I时,得到以下控制台输出(警告):
o.s.integration.dsl.GenericEndpointSpec : 'order' can be applied only for AbstractMessageHandler
我本来希望订阅者按订阅的顺序被调用,但是事实并非如此。
我正在使用Spring Boot 1.5.4和Spring Integration 4.3.10。
最佳答案
问题在于lambda处理程序不是Ordered
-发布/订阅通道的一般合同是先(按顺序)调用Ordered
订阅者,然后再调用无序订阅者。
由于lambda无法实现多个接口,因此我不确定我们能做些什么。
作为解决方法,您可以执行以下操作:
@Bean
public IntegrationFlow mainFlow() {
// @formatter:off
return IntegrationFlows
.from(integerMessageSource(), c -> c.poller(Pollers.fixedRate(1000)))
.publishSubscribeChannel(pubSub -> pubSub
.subscribe(flow -> flow
.handle(handler("Handling message, step 1: {}")))
.subscribe(flow -> flow
.handle(handler("Handling message, step 2: {}")))
.subscribe(flow -> flow
.transform(message -> "Error")
.handle(message -> {
LOG.info("Error");
}))
.subscribe(flow -> flow
.handle(handler("Handling message, step 4: {}")))
)
.get();
// @formatter:on
}
private MessageHandler handler(String format) {
return new AbstractMessageHandler() {
@Override
protected void handleMessageInternal(Message<?> message) throws Exception {
LOG.info(format, message.getPayload());
}
};
}
这样所有订户都是
Ordered
。编辑
这是一个更简单的解决方法-使用桥而不是lambda启动子流,以便所有子流第一组件都实现
Ordered
。@Bean
public IntegrationFlow mainFlow() {
// @formatter:off
return IntegrationFlows
.from(integerMessageSource(), c -> c.poller(Pollers.fixedRate(1000)))
.publishSubscribeChannel(pubSub -> pubSub
.subscribe(flow -> flow
.bridge(e -> e.id("s1"))
.handle(message -> LOG.info("Handling message, step 1: {}", message.getPayload())))
.subscribe(flow -> flow
.bridge(e -> e.id("s2"))
.handle(message -> LOG.info("Handling message, step 2: {}", message.getPayload())))
.subscribe(flow -> flow
.transform(source -> MessageBuilder.withPayload("Error").build())
.handle(message -> {
LOG.info("Error");
}))
.subscribe(flow -> flow
.bridge(e -> e.id("s4"))
.handle(message -> LOG.info("Handling message, step 4: {}", message.getPayload())))
)
.get();
// @formatter:on
}