在IntegrationFlow服务激活器方法成功返回之前

在IntegrationFlow服务激活器方法成功返回之前

本文介绍了在IntegrationFlow服务激活器方法成功返回之前,不要确认RabbitMQ消息吗?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个这样定义的集成流程:

I have an integration flow defined like this:

IntegrationFlows.from(Amqp.inboundAdapter(connectionFactory, "queueName")
                    .id("id")
                    .autoStartup(autoStartup)
                    .concurrentConsumers(2)
                    .maxConcurrentConsumers(3)
                    .messageConverter(messageConverter()))
                    .aggregate(a -> ...)
                    .handle(serviceActivatorBean, "myMethod", e -> e.advice(requestHandlerRetryAdviceForIntegrationFlow()))
                    .get();

serviceActivatorBean的定义如下:

@Component
@Transactional
public class ServiceActivator {

    @ServiceActivator
    public void myMethod(Collection<MyEvent> events) {
        ....
    }
}

requestHandlerRetryAdviceForIntegrationFlow()的定义如下:

  public static RequestHandlerRetryAdvice requestHandlerRetryAdviceForIntegrationFlow() {
    RequestHandlerRetryAdvice advice = new RequestHandlerRetryAdvice();
    RetryTemplate retryTemplate = new RetryTemplate();
    SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
    retryPolicy.setMaxAttempts(MAX_VALUE);
    retryTemplate.setRetryPolicy(retryPolicy);
    retryTemplate.setListeners(new RetryListenerSupport[]{new RetryListenerSupport() {
        @Override
        public <T, E extends Throwable> void onError(RetryContext context, RetryCallback<T, E> callback, Throwable throwable) {
            log.error("Caught exception {} (retry count {}), will retry again!", throwable.getClass().getSimpleName(),
                    context.getRetryCount(), throwable);
        }
    }});
    advice.setRetryTemplate(retryTemplate);
    ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
    backOffPolicy.setMaxInterval(5000L);
    backOffPolicy.setInitialInterval(200L);
    backOffPolicy.setMultiplier(2);
    retryTemplate.setBackOffPolicy(backOffPolicy);
    return advice;
}

我们面临的问题是,服务激活器中的events集合包含2个或更多事件,并且由于某些原因,myMethod的处理失败并且服务器崩溃.似乎发生的事情是IntegrationFlow一次消耗并确认RabbitMQ发出的一条消息,因此,如果服务器在处理myMethod时崩溃,则除最后一个事件外的所有事件都会丢失.这对我们来说既不好也不安全.我们可以做些什么来配置IntegrationFlow在服务激活器中的myMethod成功完成之前不确认任何消息?

The problem we face is when the events collection in the service activator contains 2 or more events and for some reason the processing of myMethod fails and the server crash. What seems to happen is that the IntegrationFlow consumes and acks one message at a time from RabbitMQ, so if the server crash during the processing of myMethod all but the last event is lost. This is neither good nor safe enough for us. Is there something we can do to configure the IntegrationFlow to NOT ack any message until myMethod in the service activator has been completed successfully?

推荐答案

您可以使用确认模式手册,然后通过标题进行确认:

You can use Acknowledge Mode MANUAL and confirm via headers afterward:

https://docs.spring.io/spring-integration/docs/4.3.12.RELEASE/reference/html/amqp.html#amqp-inbound-channel-adapter

这篇关于在IntegrationFlow服务激活器方法成功返回之前,不要确认RabbitMQ消息吗?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

09-06 04:11