我有一个Spring Integration IntegrationFlow定义如下:

IntegrationFlows.from(Amqp.inboundAdapter(connectionFactory, "queueName")
                    .id("id")
                    .autoStartup(autoStartup)
                    .concurrentConsumers(2)
                    .maxConcurrentConsumers(3)
                    .messageConverter(messageConverter()))
                    .aggregate(a -> ...)
                    .handle(serviceActivatorBean)
                    .get();


serviceActivatorBean看起来像这样:

@Component
@Transactional
public class ServiceActivator {

    @ServiceActivator

    public void myMethod(Collection<MyEvent> events) {
        ....
    }
}


如果myMethod引发异常,它将被记录下来,但不会重试。我试图将IntegrationFlow更改为此:

RequestHandlerRetryAdvice advice = new RequestHandlerRetryAdvice();
RetryTemplate retryTemplate = new RetryTemplate();
SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
retryPolicy.setMaxAttempts(5);
retryTemplate.setRetryPolicy(retryPolicy);
advice.setRetryTemplate(retryTemplate);

IntegrationFlows.from(Amqp.inboundAdapter(connectionFactory, "queueName")
                    .id("id")
                    .autoStartup(autoStartup)
                    .adviceChain(advice)
                    .concurrentConsumers(2)
                    .maxConcurrentConsumers(3)
                    .messageConverter(messageConverter()))
                    .aggregate(a -> ...)
                    .handle(serviceActivatorBean)
                    .get();


但是随后我出现了这样的日志消息(不会发生重试):


  2017-06-30 13:18:10.611警告88706 --- [erContainer#1-2]
  o.s.i.h.a.RequestHandlerRetryAdvice:此建议
  org.springframework.integration.handler.advice.RequestHandlerRetryAdvice
  只能用于MessageHandlers;尝试建议方法
  的“ invokeListener”
  'org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer $ 1'
  被忽略


如何配置IntegrationFlow以使其表现与RabbitListener相同的方式?即让RabbitMQ再次发布消息。

最佳答案

如消息所示,在适配器的建议链中使用retry interceptor代替RequestHandlerRetryAdvice-这是消耗端点的。

08-04 15:15