我们的配置是:1 ... n具有共享数据库的消息接收者。
邮件仅应处理一次。
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "message-queue", durable = "true"),
exchange = @Exchange(value = TOPIC_EXCHANGE, type = "topic", durable = "true"),
key = MESSAGE_QUEUE1_RK)
)
public void receiveMessage(CustomMessage message) throws InterruptedException {
System.out.println("I have been received = " + message);
}
我们想保证消息将被处理一次,我们有一个消息存储库,其中已处理消息的ID。
是否可以在接收消息之前挂接此检查?
我们试图用RabbitTemplate查看一个MessagePostProcessor,但似乎没有用。
有关如何执行此操作的任何建议?
我们尝试过使用MethodInterceptor并能正常工作,但是非常难看。
谢谢
找到解决方案-感谢Gary
我创建了一个实现
MessagePostProcessorInjector
的SmartLifecycle
在启动时,我检查每个容器,如果它是
AbstractMessageListenerContainer
,请添加客户MessagePostProccesser
和一个自定义
ErrorHandler
,用于查找某些类型的Exception并将其丢弃(其他转发给defaultErrorHandler)由于我们使用的是DLQ,因此我发现抛出异常或将其设置为null不会真正起作用。
我将发出一个请求,以忽略MPP之后的空消息。
最佳答案
有趣; SimpleMessageListenerContainer
确实具有属性afterReceivePostProcessors
(当前无法通过注释所使用的侦听器容器工厂使用,但稍后可以注入)。
但是,这些后处理器将无济于事,因为我们仍然调用侦听器。
请随意打开JIRA Improvement Issue进行以下两项操作:
在监听器容器工厂中公开afterReceivePostProcessors
如果后处理器返回null,则跳过调用listener方法。
(更正后,该属性确实已由工厂公开)。
编辑
怎么运行的...
在上下文初始化期间...
对于bean后处理器检测到的每个注释,将创建容器并将其注册在RabbitListenerEndpointRegistry
中
在上下文初始化快要结束时,将对注册表进行start()
初始化,并启动为autoStartup
配置的所有容器(默认)。
要在启动容器之前进行进一步的配置(例如,针对容器工厂当前未公开的属性),请将autoStartup
设置为false
。
然后,您可以从注册表(作为集合或通过id
)获取容器。只需@Autowire
应用中的注册表。
将容器转换为SimpleMessageListenerContainer
(如果使用Spring AMQP 2.0或更高版本,而您使用的是工厂,则转换为DirectMessageListenerContainer
)。
设置其他属性(例如afterReceiveMessagePostProcessors
);然后start()
容器。
注意:在增强容器以允许返回null
的MPP之前,一种可能的选择是从MPP抛出AmqpRejectAndDontRequeueException
。但是,如果配置了DLQ,则可能不是您想要的。