我们的配置是:1 ... n具有共享数据库的消息接收者。
邮件仅应处理一次。

   @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "message-queue", durable = "true"),
            exchange = @Exchange(value = TOPIC_EXCHANGE, type = "topic", durable = "true"),
            key = MESSAGE_QUEUE1_RK)
    )
    public void receiveMessage(CustomMessage message) throws InterruptedException {
        System.out.println("I have been received = " + message);
    }


我们想保证消息将被处理一次,我们有一个消息存储库,其中已处理消息的ID。
是否可以在接收消息之前挂接此检查?
我们试图用RabbitTemplate查看一个MessagePostProcessor,但似乎没有用。

有关如何执行此操作的任何建议?
我们尝试过使用MethodInterceptor并能正常工作,但是非常难看。
谢谢



找到解决方案-感谢Gary
我创建了一个实现MessagePostProcessorInjectorSmartLifecycle
在启动时,我检查每个容器,如果它是AbstractMessageListenerContainer,请添加客户MessagePostProccesser
和一个自定义ErrorHandler,用于查找某些类型的Exception并将其丢弃(其他转发给defaultErrorHandler)
由于我们使用的是DLQ,因此我发现抛出异常或将其设置为null不会真正起作用。

我将发出一个请求,以忽略MPP之后的空消息。

最佳答案

有趣; SimpleMessageListenerContainer确实具有属性afterReceivePostProcessors(当前无法通过注释所使用的侦听器容器工厂使用,但稍后可以注入)。

但是,这些后处理器将无济于事,因为我们仍然调用侦听器。

请随意打开JIRA Improvement Issue进行以下两项操作:


在监听器容器工厂中公开afterReceivePostProcessors
如果后处理器返回null,则跳过调用listener方法。


(更正后,该属性确实已由工厂公开)。

编辑

怎么运行的...

在上下文初始化期间...


对于bean后处理器检测到的每个注释,将创建容器并将其注册在RabbitListenerEndpointRegistry
在上下文初始化快要结束时,将对注册表进行start()初始化,并启动为autoStartup配置的所有容器(默认)。


要在启动容器之前进行进一步的配置(例如,针对容器工厂当前未公开的属性),请将autoStartup设置为false

然后,您可以从注册表(作为集合或通过id)获取容器。只需@Autowire应用中的注册表。

将容器转换为SimpleMessageListenerContainer(如果使用Spring AMQP 2.0或更高版本,而您使用的是工厂,则转换为DirectMessageListenerContainer)。

设置其他属性(例如afterReceiveMessagePostProcessors);然后start()容器。

注意:在增强容器以允许返回null的MPP之前,一种可能的选择是从MPP抛出AmqpRejectAndDontRequeueException。但是,如果配置了DLQ,则可能不是您想要的。

09-28 01:56