现在,我有以下流程:
flow -> flow.channel(some_channel())
.....
.gateway(anotherFlow, idempotentByHeader(OBJECT_ID_HEADER));
Consumer<GatewayEndpointSpec> idempotentByHeader(String objectIdHeader) {
return endpointSpec -> endpointSpec.advice(idempotentByHeaderInterceptor(objectIdHeader)).errorChannel(errorChannel());
}
default IdempotentReceiverInterceptor idempotentByHeaderInterceptor(String header) {
MessageProcessor<String> headerSelector = message -> headerExpression(header).apply(message);
var interceptor = new IdempotentReceiverInterceptor(new MetadataStoreSelector(headerSelector, idempotencyStore()));
interceptor.setDiscardChannel(idempotentDiscardChannel());
return interceptor;
}
这里的问题是:
anotherFlow
以MessageHandler
结尾,而void
是idempotentByHeader
,因此anotherFlow不返回任何内容。我尝试使用以下方法:
flow -> flow.channel(some_channel())
.....
.wireTap(anotherFlow, idempotentByHeader(OBJECT_ID_HEADER));
但是编译器由于返回类型而抱怨,因此我尝试执行以下操作:
default Consumer<WireTapSpec> idempotentByHeader(String objectIdHeader) {
return endpointSpec -> endpointSpec.advice(idempotentByHeaderInterceptor(objectIdHeader)).errorChannel(errorChannel());
}
但是WireTapSpec没有建议方法。
如何解决呢?
附言
我能够通过更改idempotentByHeader的返回类型来编写
.wireTap(anotherFlow)
.enrich(idempotentByHeader(OBJECT_ID_HEADER));
但是由于以下原因,现在应用无法启动:
Caused by: java.lang.IllegalStateException: If the errorChannel is set, then the requestChannel must not be null
at org.springframework.util.Assert.state(Assert.java:73)
at org.springframework.integration.transformer.ContentEnricher.doInit(ContentEnricher.java:277)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.onInit(AbstractReplyProducingMessageHandler.java:98)
at org.springframework.integration.context.IntegrationObjectSupport.afterPropertiesSet(IntegrationObjectSupport.java:214)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.invokeInitMethods(AbstractAutowireCapableBeanFactory.java:1862)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1799)
... 42 common frames omitted
最佳答案
好。您缺少WireTap
是通道拦截器的事实。像网关这样的端点不能接受幂等的接收器拦截器。
我不确定该idempotentByHeaderInterceptor
的目标是什么,但是标题确实会传递到要发送到该WireTap的消息中。因此,您可以访问订阅此WireTap的子流中的标题。
另外,您最新的enrich()
示例使我有些困惑。在使用网关之前,您曾尝试避免通过idempotentByHeaderInterceptor
将相同的消息发送到该子流,但是现在您无条件地发送到wireTap
,并且只有在此之后应用idempotentByHeaderInterceptor
。
那么,idempotentByHeaderInterceptor
的目标是什么?您希望将其应用到哪里?