并行执行多个接收者流

并行执行多个接收者流

本文介绍了Spring集成DSL Scatter-Gather异步/并行执行多个接收者流的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我们正在尝试使用分散收集向不同的接收者进行并行呼叫,并且效果很好.但是,除非第一个收件人完成(在Zipkin中跟踪),否则第二个收件人流程不会开始.有没有一种方法可以使所有收件人异步..非常类似于使用执行者通道拆分聚合.

we are trying to make parallel calls to different recipient using scatter-gather and it works fine. But the second recipient flow is not starting unless the first one is complete(traced in Zipkin). is there is a way to make all recipients async.. very similar to split-aggregate with executor channel.

public IntegrationFlow flow1() {

        return flow -> flow
                .split().channel(c -> c.executor(Executors.newCachedThreadPool()))
                .scatterGather(
                        scatterer -> scatterer
                                .applySequence(true)
                                .recipientFlow(flow2())
                                .recipientFlow(flow3())
                                .recipientFlow(flow4())
                                .recipientFlow(flow5()),
                        gatherer -> gatherer
                                .outputProcessor(messageGroup -> {
                                    Object request = gatherResponse(messageGroup);
                                    return createResponse(request);
                                }))
                .aggregate();
    }

flow2(),flow3(),flow4()方法是将InterationFlow作为返回类型的方法.

flow2(),flow3(),flow4() methods are methods with InterationFlow as return type.

示例代码flow2():

public IntegrationFlow flow2() {
        return integrationFlowDefinition -> integrationFlowDefinition
                .enrichHeaders(
                        h -> h.header(TransportConstants.HEADER_CONTENT_TYPE, MediaType.APPLICATION_XML_VALUE))
                .transform(ele -> createRequest1(ele))
                .wireTap("asyncXMLLogging")
                .handle(wsGateway.applyAsHandler(endpoint1))
                .transform(
                        ele -> response2(ele));
    }

推荐答案

使用提到的executor channel确实可以做到这一点.您所有的接收者流程实际上都必须从ExecutorChannel开始.在您的情况下,您必须将它们全部修改为如下形式:

This is indeed possible with the mentioned executor channel. All you recipient flows must really start from the ExecutorChannel. In your case you have to modify all of them to something like this:

public IntegrationFlow flow2() {
    return IntegrationFlows.from(MessageChannels.executor(taskExexecutor()))
            .enrichHeaders(
                    h -> h.header(TransportConstants.HEADER_CONTENT_TYPE, MediaType.APPLICATION_XML_VALUE))
            .transform(ele -> createRequest1(ele))
            .wireTap("asyncXMLLogging")
            .handle(wsGateway.applyAsHandler(endpoint1))
            .transform(
                    ele -> response2(ele))
            .get();
}

请注意IntegrationFlows.from(MessageChannels.executor(taskExexecutor())).这就是使每个子流异步的方式.

Pay attention to the IntegrationFlows.from(MessageChannels.executor(taskExexecutor())). That's exactly how you can make each sub-flow async.

更新

对于未对子流进行IntegrationFlow改进的较早版本的Spring Integration,我们可以这样做:

For the older Spring Integration version without IntegrationFlow improvement for the sub-flows we can do like this:

public IntegrationFlow flow2() {
    return integrationFlowDefinition -> integrationFlowDefinition
            .channel(c -> c.executor(Executors.newCachedThreadPool()))
            .enrichHeaders(
                    h -> h.header(TransportConstants.HEADER_CONTENT_TYPE, MediaType.APPLICATION_XML_VALUE))
            .transform(ele -> createRequest1(ele))
            .wireTap("asyncXMLLogging")
            .handle(wsGateway.applyAsHandler(endpoint1))
            .transform(
                    ele -> response2(ele));
}

这类似于您在上面的注释中显示的内容.

This is similar to what you show in the comment above.

这篇关于Spring集成DSL Scatter-Gather异步/并行执行多个接收者流的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-18 22:03