我有3个队列。
队列主
队列重试(由代码控制3倍)
尝试3次后排队保存错误消息。
它工作正常...但是如果我由于任何原因发送了错误的json消息,例如:
{
"name":"alan"," <<<< this ," is wrong for example
"age":29,
}
我的监听器不尝试处理该消息(我想捕获此消息,因为如果出现错误,我将发送到错误队列)
我收到了ListenerExecutionFailedException,然后丢失了此消息。
我试图将错误处理程序添加到我的配置中:
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(RabbitProperties properties, ObjectProvider<ConnectionNameStrategy> connectionNameStrategy) throws Exception {
PropertyMapper map = PropertyMapper.get();
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory(properties, connectionNameStrategy));
factory.setMessageConverter(messageConverter());
**factory.setErrorHandler(errorHandler());**
RabbitProperties.Listener listener = properties.getListener();
if (listener != null && listener.getSimple() != null) {
map.from(listener.getSimple()::getConcurrency).whenNonNull().to(factory::setConcurrentConsumers);
map.from(listener.getSimple()::getMaxConcurrency).whenNonNull().to(factory::setMaxConcurrentConsumers);
map.from(listener.getSimple()::getDefaultRequeueRejected).whenNonNull().to(factory::setDefaultRequeueRejected);
}
return factory;
}
public ErrorHandler errorHandler() {
return new ConditionalRejectingErrorHandler(new ListenerExceptionHandler());
}
@Log4j2
public class ListenerExceptionHandler extends ConditionalRejectingErrorHandler.DefaultExceptionStrategy {
@Override
public boolean isFatal(Throwable t) {
if (t instanceof ListenerExecutionFailedException) {
ListenerExecutionFailedException lefe = (ListenerExecutionFailedException) t;
log.error("Failed to process inbound message from queue {}, failed message={}",
lefe.getFailedMessage().getMessageProperties().getConsumerQueue(),
lefe.getFailedMessage());
}
return super.isFatal(t);
}
}
我只是想如果有可能在我的监听器上捕获此ListenerExecutionFailedException ..是可能的吗?
因为在我的处理程序类中,我什么也做不了..仅设置日志..我无法将此消息发送到错误队列。
听众:
@RabbitListener(queues = Queues.MAIN, concurrency = "2")
public void listenerMessage(Message message,@Header(name = "x-death", required = false) List<Map<String, ?>> xDeath) {
log.info("ProcessMessage Received:: {}", message.getPayload());
validateReceivedMessage(xDeath, message);
}
我试图添加try / catch像:
try {
}catch (ListenerExecutionFailedException e){
}
没有成功。
有什么建议吗?谢谢
最佳答案
没有;消息转换异常在堆栈中的位置太远了;没有办法用转换失败的消息来调用侦听器。
这样的异常被认为是致命的,可以避免无限循环。您将需要在原始队列上配置一个死信交换和路由密钥,以便将失败的消息路由到那里。