问题描述
我们正在尝试使用分散收集向不同的接收者进行并行呼叫,并且效果很好.但是,除非第一个收件人完成(在Zipkin中跟踪),否则第二个收件人流程不会开始.有没有一种方法可以使所有收件人异步..非常类似于使用执行者通道拆分聚合.
we are trying to make parallel calls to different recipient using scatter-gather and it works fine. But the second recipient flow is not starting unless the first one is complete(traced in Zipkin). is there is a way to make all recipients async.. very similar to split-aggregate with executor channel.
public IntegrationFlow flow1() {
return flow -> flow
.split().channel(c -> c.executor(Executors.newCachedThreadPool()))
.scatterGather(
scatterer -> scatterer
.applySequence(true)
.recipientFlow(flow2())
.recipientFlow(flow3())
.recipientFlow(flow4())
.recipientFlow(flow5()),
gatherer -> gatherer
.outputProcessor(messageGroup -> {
Object request = gatherResponse(messageGroup);
return createResponse(request);
}))
.aggregate();
}
flow2(),flow3(),flow4()方法是将InterationFlow
作为返回类型的方法.
flow2(),flow3(),flow4() methods are methods with InterationFlow
as return type.
示例代码flow2()
:
public IntegrationFlow flow2() {
return integrationFlowDefinition -> integrationFlowDefinition
.enrichHeaders(
h -> h.header(TransportConstants.HEADER_CONTENT_TYPE, MediaType.APPLICATION_XML_VALUE))
.transform(ele -> createRequest1(ele))
.wireTap("asyncXMLLogging")
.handle(wsGateway.applyAsHandler(endpoint1))
.transform(
ele -> response2(ele));
}
推荐答案
使用提到的executor channel
确实可以做到这一点.您所有的接收者流程实际上都必须从ExecutorChannel
开始.在您的情况下,您必须将它们全部修改为如下形式:
This is indeed possible with the mentioned executor channel
. All you recipient flows must really start from the ExecutorChannel
. In your case you have to modify all of them to something like this:
public IntegrationFlow flow2() {
return IntegrationFlows.from(MessageChannels.executor(taskExexecutor()))
.enrichHeaders(
h -> h.header(TransportConstants.HEADER_CONTENT_TYPE, MediaType.APPLICATION_XML_VALUE))
.transform(ele -> createRequest1(ele))
.wireTap("asyncXMLLogging")
.handle(wsGateway.applyAsHandler(endpoint1))
.transform(
ele -> response2(ele))
.get();
}
请注意IntegrationFlows.from(MessageChannels.executor(taskExexecutor()))
.这就是使每个子流异步的方式.
Pay attention to the IntegrationFlows.from(MessageChannels.executor(taskExexecutor()))
. That's exactly how you can make each sub-flow async.
更新
对于未对子流进行IntegrationFlow
改进的较早版本的Spring Integration,我们可以这样做:
For the older Spring Integration version without IntegrationFlow
improvement for the sub-flows we can do like this:
public IntegrationFlow flow2() {
return integrationFlowDefinition -> integrationFlowDefinition
.channel(c -> c.executor(Executors.newCachedThreadPool()))
.enrichHeaders(
h -> h.header(TransportConstants.HEADER_CONTENT_TYPE, MediaType.APPLICATION_XML_VALUE))
.transform(ele -> createRequest1(ele))
.wireTap("asyncXMLLogging")
.handle(wsGateway.applyAsHandler(endpoint1))
.transform(
ele -> response2(ele));
}
这类似于您在上面的注释中显示的内容.
This is similar to what you show in the comment above.
这篇关于Spring集成DSL Scatter-Gather异步/并行执行多个接收者流的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!