本文介绍了如何使用wireTap传递标题?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

现在我有以下流程:

flow -> flow.channel(some_channel())
                .....
                .gateway(anotherFlow, idempotentByHeader(OBJECT_ID_HEADER));
Consumer<GatewayEndpointSpec> idempotentByHeader(String objectIdHeader) {
    return endpointSpec -> endpointSpec.advice(idempotentByHeaderInterceptor(objectIdHeader)).errorChannel(errorChannel());
}

default IdempotentReceiverInterceptor idempotentByHeaderInterceptor(String header) {
    MessageProcessor<String> headerSelector = message -> headerExpression(header).apply(message);
    var interceptor = new IdempotentReceiverInterceptor(new MetadataStoreSelector(headerSelector, idempotencyStore()));
    interceptor.setDiscardChannel(idempotentDiscardChannel());
    return interceptor;
}

这里的问题是:

anotherFlowMessageHandler 结束,它是 void 所以 anotherFlow 不返回任何东西.

anotherFlowis finished with MessageHandler which is void so anotherFlow doesn't return anything.

我尝试使用以下方法:

 flow -> flow.channel(some_channel())
                    .....
                    .wireTap(anotherFlow, idempotentByHeader(OBJECT_ID_HEADER));

但是编译器因为 idempotentByHeader 返回类型而抱怨,所以我尝试执行以下操作:

but compiler complains because of idempotentByHeader return type so I tried to do following:

default Consumer<WireTapSpec> idempotentByHeader(String objectIdHeader) {
    return endpointSpec -> endpointSpec.advice(idempotentByHeaderInterceptor(objectIdHeader)).errorChannel(errorChannel());
}

但 WireTapSpec 没有建议方法.

but WireTapSpec doesn't have advice method.

怎么解决?

我能够通过改变 idempotentByHeader 的返回类型来编写

I was able to write with changing return type of idempotentByHeader

            .wireTap(anotherFlow)
            .enrich(idempotentByHeader(OBJECT_ID_HEADER));

但现在应用程序无法启动,原因是:

But now app can't start because of:

Caused by: java.lang.IllegalStateException: If the errorChannel is set, then the requestChannel must not be null
    at org.springframework.util.Assert.state(Assert.java:73)
    at org.springframework.integration.transformer.ContentEnricher.doInit(ContentEnricher.java:277)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.onInit(AbstractReplyProducingMessageHandler.java:98)
    at org.springframework.integration.context.IntegrationObjectSupport.afterPropertiesSet(IntegrationObjectSupport.java:214)
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.invokeInitMethods(AbstractAutowireCapableBeanFactory.java:1862)
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1799)
    ... 42 common frames omitted

推荐答案

好的.您忽略了 WireTap 是一个通道拦截器这一事实.它不像端点那样能够接受幂等接收器拦截器.

OK. You are missing the fact that WireTap is a Channel Interceptor. It is not an endpoint like gateway to be able to accept an idempotent receiver interceptor.

我不确定您使用 idempotentByHeaderInterceptor 的目标是什么,但是标题实际上是在将要发送到该 WireTap 的消息中传递的.因此,您可以访问订阅此 WireTap 的子流中的标头.

I'm not sure what is your goal with that idempotentByHeaderInterceptor, but headers are really passed in the message which is going to be sent to that WireTap. Therefore you get access to the headers in the sub-flow subscribed to this WireTap.

还有你最新的 enrich() 示例让我有点困惑.在使用网关之前,您试图避免通过 idempotentByHeaderInterceptor 将相同的消息发送到该子流,但现在您无条件地发送到 wireTap,并且只有在此之后您才应用 idempotentByHeaderInterceptor代码>idempotentByHeaderInterceptor.

Also your latest sample with enrich() confuses me a little bit. Before with a gateway you tried to avoid sending the same message to that sub-flow via idempotentByHeaderInterceptor, but now you send to wireTap unconditionally and only after that your apply such that idempotentByHeaderInterceptor.

那么,您的 idempotentByHeaderInterceptor 的目标是什么?您希望将它应用到哪里?

So, what is the goal of your idempotentByHeaderInterceptor and where you would like to apply it?

这篇关于如何使用wireTap传递标题?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

07-13 13:35