问题描述
我有一个这样定义的集成流程:
I have an integration flow defined like this:
IntegrationFlows.from(Amqp.inboundAdapter(connectionFactory, "queueName")
.id("id")
.autoStartup(autoStartup)
.concurrentConsumers(2)
.maxConcurrentConsumers(3)
.messageConverter(messageConverter()))
.aggregate(a -> ...)
.handle(serviceActivatorBean, "myMethod", e -> e.advice(requestHandlerRetryAdviceForIntegrationFlow()))
.get();
serviceActivatorBean
的定义如下:
@Component
@Transactional
public class ServiceActivator {
@ServiceActivator
public void myMethod(Collection<MyEvent> events) {
....
}
}
requestHandlerRetryAdviceForIntegrationFlow()
的定义如下:
public static RequestHandlerRetryAdvice requestHandlerRetryAdviceForIntegrationFlow() {
RequestHandlerRetryAdvice advice = new RequestHandlerRetryAdvice();
RetryTemplate retryTemplate = new RetryTemplate();
SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
retryPolicy.setMaxAttempts(MAX_VALUE);
retryTemplate.setRetryPolicy(retryPolicy);
retryTemplate.setListeners(new RetryListenerSupport[]{new RetryListenerSupport() {
@Override
public <T, E extends Throwable> void onError(RetryContext context, RetryCallback<T, E> callback, Throwable throwable) {
log.error("Caught exception {} (retry count {}), will retry again!", throwable.getClass().getSimpleName(),
context.getRetryCount(), throwable);
}
}});
advice.setRetryTemplate(retryTemplate);
ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
backOffPolicy.setMaxInterval(5000L);
backOffPolicy.setInitialInterval(200L);
backOffPolicy.setMultiplier(2);
retryTemplate.setBackOffPolicy(backOffPolicy);
return advice;
}
我们面临的问题是,服务激活器中的events
集合包含2个或更多事件,并且由于某些原因,myMethod
的处理失败并且服务器崩溃.似乎发生的事情是IntegrationFlow
一次消耗并确认RabbitMQ发出的一条消息,因此,如果服务器在处理myMethod
时崩溃,则除最后一个事件外的所有事件都会丢失.这对我们来说既不好也不安全.我们可以做些什么来配置IntegrationFlow
在服务激活器中的myMethod
成功完成之前不确认任何消息?
The problem we face is when the events
collection in the service activator contains 2 or more events and for some reason the processing of myMethod
fails and the server crash. What seems to happen is that the IntegrationFlow
consumes and acks one message at a time from RabbitMQ, so if the server crash during the processing of myMethod
all but the last event is lost. This is neither good nor safe enough for us. Is there something we can do to configure the IntegrationFlow
to NOT ack any message until myMethod
in the service activator has been completed successfully?
推荐答案
您可以使用确认模式手册,然后通过标题进行确认:
You can use Acknowledge Mode MANUAL and confirm via headers afterward:
这篇关于在IntegrationFlow服务激活器方法成功返回之前,不要确认RabbitMQ消息吗?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!