我的用例需要使用smne数据丰富我的输入,并将其发送到出站端点。
通过调用两个Web服务获得丰富的数据,然后从答复中提取数据。
提取的数据将丰富到我的输入XML中,并发送到出站端点。
我需要进行的两个Web服务调用必须并行,因为它们不依赖于另一个。这样我可以节省处理时间。
请提出如何在Mule的流程中实现此并行处理的建议。
注意:我已经尝试使用ALL流控制,但是似乎正在依次调用Web服务(子流)。
下面给出的是我的抽象流程。
<flow name="mainFlow">
<inbound-endpoint> .....
<some validation>
<setting some flow variables>
<!-- Now make calls to the sub-flows which has some processing of the input and make some web-service calls -->
<all>
<flow-ref name="myFlow1" />
<flow-ref name="myFlow2" />
<flow-ref name="myFlow3" />
</all>
<enrich the input with the data obtained from the output of the above three flows>
<outbound-endpoint>
</flow>
<flow name="myFlow1">
<some transformer to transform the payload provided >
< the tran sformed payload is passed as input to the web-service call>
<http:outbound-endpoint ...>
<transform the reply from the web-service call>
</flow>
<flow name="myFlow2">
<some transformer to transform the payload provided >
< the tran sformed payload is passed as input to the web-service call>
<http:outbound-endpoint ...>
<transform the reply from the web-service call>
</flow>
<flow name="myFlow3">
<some transformer to transform the payload provided to it>
< the tran sformed payload is passed as input to the web-service call>
<http:outbound-endpoint ...>
<transform the reply from the web-service call>
</flow>
最佳答案
这是一个简单的配置,显示了一种使用两个HTTP出站终结点进行派生/联接的方法。要添加第三个端点,请将MULE_CORRELATION_GROUP_SIZE
设置为3
,将第三个MULE_CORRELATION_SEQUENCE
的async flow-ref
设置为3
。
<flow name="fork">
<vm:inbound-endpoint path="fork.in" />
<set-property propertyName="MULE_CORRELATION_GROUP_SIZE"
value="2" />
<all enableCorrelation="IF_NOT_SET">
<async>
<set-property propertyName="MULE_CORRELATION_SEQUENCE"
value="1" />
<flow-ref name="parallel1" />
</async>
<async>
<set-property propertyName="MULE_CORRELATION_SEQUENCE"
value="2" />
<flow-ref name="parallel2" />
</async>
</all>
</flow>
<sub-flow name="parallel1">
<logger level="INFO" message="parallel1: processing started" />
<http:outbound-endpoint address="..."
exchange-pattern="request-response" />
<logger level="INFO" message="parallel1: processing finished" />
<flow-ref name="join" />
</sub-flow>
<sub-flow name="parallel2">
<logger level="INFO" message="parallel2: processing started" />
<http:outbound-endpoint address="..."
exchange-pattern="request-response" />
<logger level="INFO" message="parallel2: processing finished" />
<flow-ref name="join" />
</sub-flow>
<sub-flow name="join">
<collection-aggregator timeout="6000"
failOnTimeout="true" />
<combine-collections-transformer />
<logger level="INFO"
message="Continuing processing of: #[message.payloadAs(java.lang.String)]" />
</sub-flow>
编辑:在上述配置中,聚合器在6秒后超时。对于您的实际用例来说,这可能太短了:根据您的需要增加。此外,它还设置为在超时时失败,如果不是所有出站HTTP端点交互都成功的话,这可能不是您想要的行为:取决于您的用例。