我有一系列尝试要处理的处理步骤,但是我的测试并不总是通过,也不总是失败...
我制作了一个简单的示例,似乎可以展示我在真实测试中看到的内容。
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import java.util.Arrays;
import java.util.List;
import org.springframework.integration.test.support.AbstractRequestResponseScenarioTests;
import org.springframework.integration.test.support.PayloadValidator;
import org.springframework.integration.test.support.RequestResponseScenario;
import org.springframework.test.context.ContextConfiguration;
@ContextConfiguration
public class FlowTest extends AbstractRequestResponseScenarioTests {
@Override
protected List<RequestResponseScenario> defineRequestResponseScenarios() {
return Arrays.asList(
new RequestResponseScenario("inputChannel", "outputChannel")
.setPayload("a")
.setResponseValidator(new PayloadValidator<String>() {
@Override
protected void validateResponse(String response) {
assertEquals("AA", response);
}
}),
new RequestResponseScenario("inputChannel", "outputChannel")
.setPayload("b")
.setResponseValidator(new PayloadValidator<String>() {
@Override
protected void validateResponse(String response) {
assertNotEquals("AA", response);
}
})
);
}
}
和XML
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xsi:schemaLocation="
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd">
<int:channel id="inputChannel"/>
<int:channel id="outputChannel"/>
<int:channel id="errorChannel"/>
<int:chain input-channel="inputChannel" output-channel="outputChannel">
<int:gateway request-channel="processOne"/>
<int:gateway request-channel="processTwo"/>
</int:chain>
<int:chain input-channel="processOne">
<int:service-activator expression="payload + payload"/>
<int:filter expression="payload == 'aa'" discard-channel="errorChannel"/>
</int:chain>
<int:chain input-channel="processTwo">
<int:service-activator expression="payload.toUpperCase()"/>
</int:chain>
<int:transformer input-channel="errorChannel" output-channel="outputChannel" expression="payload + payload"/>
</beans>
似乎正在发生消息流,并且消息始终显示在
responseChannel
上,并由PayloadValidator处理。但是,如果消息碰巧经过错误处理弯路,则应用程序上下文不会关闭,只会停滞不前。
是否支持使用过滤器将消息从链中拉出并到达备用路径的这种模式?
我猜这条链正在等待回复消息,并且某些相关性正在途中丢失。
我使用的是Spring Integration的较旧版本:-(
更新资料
看来这是使用嵌套链的效果。如果我将过滤器移到顶级链中,那么一切都会按预期进行。考虑到网关将等待响应,哪种方式有意义。
我之所以使用嵌套链,是因为我需要单独的进程来分别处理重试逻辑。
也许我只需要放弃链式语法糖?
最佳答案
不,一切正确。
只有您真正在等待<int:gateway/>
中的答复的问题,并且永远不会发生,因为您转到了discard-channel
。
您可以考虑使用:
<xsd:attribute name="reply-timeout" type="xsd:string">
<xsd:annotation>
<xsd:documentation>
<![CDATA[
Specifies how long this gateway will wait for the reply message
before returning. By default it will wait indefinitely. 'null' is returned
if the gateway times out.
Value is specified in milliseconds; it can be a simple long value or a SpEL
expression; array variable '#args' is available.
Also used for receive-only operations as the receive timeout.
]]>
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
在该
<gateway>
上或 <xsd:attribute name="throw-exception-on-rejection" default="false">
<xsd:annotation>
<xsd:documentation>
Throw an exception if the filter rejects the message (default false).
</xsd:documentation>
</xsd:annotation>
<xsd:simpleType>
<xsd:union memberTypes="xsd:boolean xsd:string" />
</xsd:simpleType>
</xsd:attribute>
在
<filter>
上,因此在error-channel
上是<gateway>
而不是discardChannel
。这不是如何处理请求-答复网关的方式。