我有一个Spring Integration IntegrationFlow
定义如下:
IntegrationFlows.from(Amqp.inboundAdapter(connectionFactory, "queueName")
.id("id")
.autoStartup(autoStartup)
.concurrentConsumers(2)
.maxConcurrentConsumers(3)
.messageConverter(messageConverter()))
.aggregate(a -> ...)
.handle(serviceActivatorBean)
.get();
serviceActivatorBean
看起来像这样:@Component
@Transactional
public class ServiceActivator {
@ServiceActivator
public void myMethod(Collection<MyEvent> events) {
....
}
}
如果
myMethod
引发异常,它将被记录下来,但不会重试。我试图将IntegrationFlow
更改为此:RequestHandlerRetryAdvice advice = new RequestHandlerRetryAdvice();
RetryTemplate retryTemplate = new RetryTemplate();
SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
retryPolicy.setMaxAttempts(5);
retryTemplate.setRetryPolicy(retryPolicy);
advice.setRetryTemplate(retryTemplate);
IntegrationFlows.from(Amqp.inboundAdapter(connectionFactory, "queueName")
.id("id")
.autoStartup(autoStartup)
.adviceChain(advice)
.concurrentConsumers(2)
.maxConcurrentConsumers(3)
.messageConverter(messageConverter()))
.aggregate(a -> ...)
.handle(serviceActivatorBean)
.get();
但是随后我出现了这样的日志消息(不会发生重试):
2017-06-30 13:18:10.611警告88706 --- [erContainer#1-2]
o.s.i.h.a.RequestHandlerRetryAdvice:此建议
org.springframework.integration.handler.advice.RequestHandlerRetryAdvice
只能用于MessageHandlers;尝试建议方法
的“ invokeListener”
'org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer $ 1'
被忽略
如何配置
IntegrationFlow
以使其表现与RabbitListener
相同的方式?即让RabbitMQ再次发布消息。 最佳答案
如消息所示,在适配器的建议链中使用retry interceptor代替RequestHandlerRetryAdvice
-这是消耗端点的。