本文介绍了如何让 ActiveMQ 检测来自消息发布者(幂等生产者)的重复消息的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

ActiveMQ 是否支持幂等生产者?我知道 Camel 有一个幂等的消费者模式来检测和处理重复的消息,但我想知道是否可以在源头(生产者)防止这种情况发生.

Does ActiveMQ support Idempotent producer? I know Camel has an idempotent consumer pattern to detect and handle duplicate messages, but I'm wondering if this can be prevented at the source (producer).

这里有一点背景知识.我有水平扩展的应用程序访问相同的数据库.有一个特定的表来维护特定进程的状态.这些水平应用程序应该能够读取状态并调用另一个进程,但是只有其中一个应该能够调用它.一旦满足所需条件,此应用程序会定期轮询数据库并向消息传递代理发送消息.但我希望负载平衡应用程序之一应该能够发布消息.

Here is a little back ground. I have applications that are horizontally scaled accessing same database. There is one particular table that maintains status of a particular process. These horizontal applications should be able to read the status and invoke another process, however only one of them should be able to invoke it. This application periodically polls the data base and posts a message to a messaging broker, once the required condition is met. But I want one of the load balancing application should be able to post the message.

我想到的一种粗略的方法是......

One crude approach I'm thinking is...

在机器 1 上:

  1. 读取数据库以检查是否满足必要条件.
  2. 在向代理发布消息之前,将记录写入另一个状态表,并使用唯一键标识进程和提交.如果此操作因违反唯一键约束而失败,则表示另一台机器上的进程已成功发布消息.
  3. 将消息发布给代理
  4. 如果消息发布失败,由于某种原因,根据唯一键/主键对状态表进行删除操作.

在机器 2、3、4 等上运行的相同应用程序可以执行相同的操作

The same operation can be performed by same application running on machine 2 , 3, 4 etc.

下面是我很快注意到这种方法的一个陷阱.

Below is one pitfall I quickly notice with this approach.

假设机器 1 能够完成第 2 步但未能执行第 3 步并继续执行第 4 步.同时,当第 2 步失败时,机器 2 将继续前进,不再尝试再次读取状态并发布消息.

Assuming that Machine 1 is able to complete step 2 but failed performing step 3 and continues with step 4. Meanwhile Machine 2, when it failed at step 2, will move on with out attempting to read the status again and post the message.

为了解决这个问题,我需要在第 3 步进行重试,直到消息成功发布到代理.

To address this, I need to put retry on step 3, until the message is successfully posted to broker.

另一种选择是使用 https://camel.apache.org/components/latest/eips/idempotentConsumer-eip.html 模式.但这本质上是消费者端的过滤器.虽然这将符合我的目的,但在消息发布方面是否有类似的开箱即用方法.

Another option is to use https://camel.apache.org/components/latest/eips/idempotentConsumer-eip.html pattern. But this is essentially a filter at consumer side. Though this will serve my purpose, is there a similar approach out of box available on message publishing side.

我想知道,这种方法是否正确或任何更好的替代方法,或者任何可用于跨本地或远程 JVM 执行锁定机制的现有库.

I wonder, if this approach is even correct or any better alternative approach, or any existing libraries that can be used to perform locking kind of mechanism across JVM either local or remote.

推荐答案

不清楚您使用的是哪个版本的 ActiveMQ(即 ActiveMQ 5.xActiveMQ Artemis) 所以我将尝试为双方解决这个问题.

It's not clear what version of ActiveMQ you're using (i.e. ActiveMQ 5.x or ActiveMQ Artemis) so I'll try to address this issue for both.

ActiveMQ 5.x 没有任何内置支持来检测从客户端发送的重复数据.但是,您可以使用 broker 插件 来实现此功能.我在这里看到的唯一挑战是配置、管理和监控重复 ID 的缓存.

ActiveMQ 5.x doesn't have any built-in support for detecting duplicates sent from clients. However, you could potentially implement this feature using a broker plugin. The only challenge I see here is configuring, managing, and monitoring the cache of duplicate IDs.

ActiveMQ Artemis 确实内置支持检测从客户端发送的重复数据.您可以在文档中阅读有关重复检测的更多信息.由于代理本身支持这种行为,因此它提供了干净的配置、管理和监控.

ActiveMQ Artemis does have built in support for detecting duplicates sent from clients. You can read more about duplicate detection in the documentation. Since the broker supports this behavior natively it provides clean configuration, management, and monitoring.

在任何一种情况下,您都需要在每条消息上设置一个特殊的标头,并带有标识进程的唯一键";就像您对潜在的数据库解决方案所做的那样.此外,使用代理作为重复检测器总体上要简单得多.

In either case you'll need to set a special header on each message with "a unique key that identifies the process" just like you would for your potential database solution. Furthermore, using the broker as the duplicate detector is much simpler overall.

如果您当前使用的是 ActiveMQ 5.x,但想要迁移到 ActiveMQ Artemis 以使用重复检测功能,则您不一定需要更新您的客户端,因为 ActiveMQ Artemis 完全支持 5.x 使用的 OpenWire 协议.x 客户.您应该能够将它们指向 ActiveMQ Artemis 的新实例,并且一切正常.

If you're currently using ActiveMQ 5.x but want to move to ActiveMQ Artemis in order to use the duplicate detection feature you don't necessarily need to update your clients as ActiveMQ Artemis fully supports the OpenWire protocol used by 5.x clients. You should just be able to point them to the new instance of ActiveMQ Artemis and have everything work.

这篇关于如何让 ActiveMQ 检测来自消息发布者(幂等生产者)的重复消息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-15 07:00