1 基本处理方式

1.1 topic(publish-subscribe)

     发布订阅模式,发布者发布消息到broker,所有订阅者都会接收到相同消息的copy。

1 基本处理方式-LMLPHP

1.2 queue(p2p)

    p2p是生产者生成消息,经过broker-queue,只能有一个消费者处理。在p2p的场景里,相互通信的双方是通过一个类似于队列的方式来进行交流。和前面pub-sub的区别在于一个topic有一个发送者和多个接收者,而在p2p里一个queue只有一个发送者和一个接收者。

1 基本处理方式-LMLPHP

下图为queue处理过程,以及各种ack_type回复的处理流程,这些ack_type配合ack_mode进行,由jms内部控制。

   

1 基本处理方式-LMLPHP

1. 获得JMS connection factory. 通过我们提供特定环境的连接信息来构造factory。

2. 利用factory构造JMS connection

3. 启动connection

4. 通过connection创建JMS session.

5. 指定JMS destination.

6. 创建JMS producer或者创建JMS message并提供destination.

7. 创建JMS consumer或注册JMS message listener.

8. 发送和接收JMS message.

9. 关闭所有JMS资源,包括connection, session, producer, consumer等。

1.3 request-response(p2p)

和前面两种方式比较起来,request-response的通信方式很常见,但是不是默认提供的一种模式。在前面的两种模式中都是一方负责发送消息而另外一方负责处理。而我们实际中的很多应用相当于一种一应一答的过程,需要双方都能给对方发送消息。于是请求-应答的这种通信方式也很重要。它也应用的很普遍。 

     请求-应答方式并不是JMS规范系统默认提供的一种通信方式,而是通过在现有通信方式的基础上稍微运用一点技巧实现的。下图是典型的请求-应答方式的交互过程:

在JMS里面,如果要实现请求/应答的方式,可以利用JMSReplyTo和JMSCorrelationID消息头来将通信的双方关联起来。另外,QueueRequestor和TopicRequestor能够支持简单的请求/应答过程。

1 基本处理方式-LMLPHP

2 生产模式

通过创建producer,然后调用send接口实现。

3 消费模式

spring框架中使用JMS传递消息有两种方式:JMS template和message listener Container,前者用于同步收发消息,后者用于异步收发消息。

3.1 同步方式

同步方式是创建consumer,通过session进行receive()方式接收消息。

3.2 异步方式

异步方式是通过实现messageListener的onMessage()接口实现,在该接口中完成消息处理。

4 可靠性机制

4.1 AUTO_ACKNOWLEDGE 

        Session.AUTO_ACKNOWLEDGE(自动确认模式)

  当消息成功的从receive方法返回时,或者从MessageListener接口的onMessage方法成功返回时,会话自动确认客户端的消息接收。

    自动确认,这就意味着消息的确认时机将有consumer择机确认."择机确认"似乎充满了不确定性,这也意味着,开发者必须明确知道"择机确认"的具体时机,否则将有可能导致消息的丢失,或者消息的重复接受.那么在ActiveMQ中,AUTO_ACKNOWLEDGE是如何运作的呢?

    1) 对于consumer而言,optimizeAcknowledge属性只会在AUTO_ACK模式下有效。

    2) 其中DUPS_ACKNOWLEGE也是一种潜在的AUTO_ACK,只是确认消息的条数和时间上有所不同。

    3) 在“同步”(receive)方法返回message之前,会检测optimizeACK选项是否开启,如果没有开启,此单条消息将立即确认,所以在这种情况下,message返回之后,如果开发者在处理message过程中出现异常,会导致此消息也不会redelivery,即"潜在的消息丢失";如果开启了optimizeACK,则会在unAck数量达到prefetch * 0.65时确认,当然我们可以指定prefetchSize = 1来实现逐条消息确认。

    4) 在"异步"(messageListener)方式中,将会首先调用listener.onMessage(message),此后再ACK,如果onMessage方法异常,将导致client端补充发送一个ACK_TYPE为REDELIVERED_ACK_TYPE确认指令;如果onMessage方法正常,消息将会正常确认(STANDARD_ACK_TYPE)。此外需要注意,消息的重发次数是有限制的,每条消息中都会包含“redeliveryCounter”计数器,用来表示此消息已经被重发的次数,如果重发次数达到阀值,将会导致发送一个ACK_TYPE为POSION_ACK_TYPE确认指令,这就导致broker端认为此消息无法消费,此消息将会被删除或者迁移到"dead letter"通道中。

        因此当我们使用messageListener方式消费消息时,通常建议在onMessage方法中使用try-catch,这样可以在处理消息出错时记录一些信息,而不是让consumer不断去重发消息;如果你没有使用try-catch,就有可能会因为异常而导致消息重复接收的问题,需要注意你的onMessage方法中逻辑是否能够兼容对重复消息的判断。

4.2 CLIENT_ACKNOWLEDGE 

        Session.CLIENT_ACKNOWLEDGE(客户端确认模式)

  客户端通过调用消息的acknowledge方法签收消息。在这种模式中,签收是在会话层上进行:签收一个已消费的消息会自动地签收这个Session所有已消费消息的收条。

客户端手动确认,这就意味着AcitveMQ将不会“自作主张”的为你ACK任何消息,开发者需要自己择机确认。在此模式下,开发者需要需要关注几个方法:

1) message.acknowledge(),

2) ActiveMQMessageConsumer.acknowledege(),

3) ActiveMQSession.acknowledge();

其1)和3)是等效的,将当前session中所有consumer中尚未ACK的消息都一起确认,2)只会对当前consumer中那些尚未确认的消息进行确认。开发者可以在合适的时机必须调用一次上述方法。

    我们通常会在基于Group(消息分组)情况下会使用CLIENT_ACKNOWLEDGE,我们将在一个group的消息序列接受完毕之后确认消息(组);不过当你认为消息很重要,只有当消息被正确处理之后才能确认时,也很可以使用此ACK_MODE。

    如果开发者忘记调用acknowledge方法,将会导致当consumer重启后,会接受到重复消息,因为对于broker而言,那些尚未真正ACK的消息被视为“未消费”。

    开发者可以在当前消息处理成功之后,立即调用message.acknowledge()方法来"逐个"确认消息,这样可以尽可能的减少因网络故障而导致消息重发的个数;当然也可以处理多条消息之后,间歇性的调用acknowledge方法来一次确认多条消息,减少ack的次数来提升consumer的效率,不过这仍然是一个利弊权衡的问题。

    除了message.acknowledge()方法之外,ActiveMQMessageConumser.acknowledge()和ActiveMQSession.acknowledge()也可以确认消息,只不过前者只会确认当前consumer中的消息。其中sesson.acknowledge()和message.acknowledge()是等效的。

    无论是“同步”/“异步”,ActiveMQ都不会发送STANDARD_ACK_TYPE,直到message.acknowledge()调用。如果在client端未确认的消息个数达到prefetchSize * 0.5时,会补充发送一个ACK_TYPE为DELIVERED_ACK_TYPE的确认指令,这会触发broker端可以继续push消息到client端。(参看PrefetchSubscription.acknwoledge方法)

    在broker端,针对每个Consumer,都会保存一个因为"DELIVERED_ACK_TYPE"而“拖延”的消息个数,这个参数为prefetchExtension,事实上这个值不会大于prefetchSize * 0.5,因为Consumer端会严格控制DELIVERED_ACK_TYPE指令发送的时机(参见ActiveMQMessageConsumer.ackLater方法),broker端通过“prefetchExtension”与prefetchSize互相配合,来决定即将push给client端的消息个数,count = prefetchExtension + prefetchSize - dispatched.size(),其中dispatched表示已经发送给client端但是还没有“STANDARD_ACK_TYPE”的消息总量;由此可见,在CLIENT_ACK模式下,足够快速的调用acknowledge()方法是决定consumer端消费消息的速率;如果client端因为某种原因导致acknowledge方法未被执行,将导致大量消息不能被确认,broker端将不会push消息,事实上client端将处于“假死”状态,而无法继续消费消息。我们要求client端在消费1.5*prefetchSize个消息之前,必须acknowledge()一次;通常我们总是每消费一个消息调用一次,这是一种良好的设计。

    此外需要额外的补充一下:所有ACK指令都是依次发送给broker端,在CLIET_ACK模式下,消息在交付给listener之前,都会首先创建一个DELIVERED_ACK_TYPE的ACK指令,直到client端未确认的消息达到"prefetchSize * 0.5"时才会发送此ACK指令,如果在此之前,开发者调用了acknowledge()方法,会导致消息直接被确认(STANDARD_ACK_TYPE)。broker端通常会认为“DELIVERED_ACK_TYPE”确认指令是一种“slow consumer”信号,如果consumer不能及时的对消息进行acknowledge而导致broker端阻塞,那么此consumer将会被标记为“slow”,此后queue中的消息将会转发给其他Consumer。

4.3 DUPS_OK_ACKNOWLEDGE

        Session.DUPS_OK_ACKNOWLEDGE(延时/批量确认模式)

  这种确认方式允许JMS不必急于确认收到的消息,允许在收到多个消息之后一次完成确认,与Auto_AcKnowledge相比,这种确认方式在某些情况下可能更有效,因为没有确认,当系统崩溃或者网络出现故障的时候,消息可以被重新传递. 

  这种方式会引起消息的重复,但是降低了Session的开销,所以只有客户端能容忍重复的消息,才可使用。(如果ActiveMQ再次传送同一消息,那么消息头中的JMSRedelivered将被设置为true)

 "消息可重复"确认,意思是此模式下,可能会出现重复消息,并不是一条消息需要发送多次ACK才行。它是一种潜在的"AUTO_ACK"确认机制,为批量确认而生,而且具有“延迟”确认的特点。对于开发者而言,这种模式下的代码结构和AUTO_ACKNOWLEDGE一样,不需要像CLIENT_ACKNOWLEDGE那样调用acknowledge()方法来确认消息。

    1) 在ActiveMQ中,如果在Destination是Queue通道,我们真的可以认为DUPS_OK_ACK就是“AUTO_ACK + optimizeACK + (prefetch > 0)”这种情况,在确认时机上几乎完全一致;此外在此模式下,如果prefetchSize =1 或者没有开启optimizeACK,也会导致消息逐条确认,从而失去批量确认的特性。

    2) 如果Destination为Topic,DUPS_OK_ACKNOWLEDGE才会产生JMS规范中诠释的意义,即无论optimizeACK是否开启,都会在消费的消息个数>=prefetch * 0.5时,批量确认(STANDARD_ACK_TYPE),在此过程中,不会发送DELIVERED_ACK_TYPE的确认指令,这是1)和AUTO_ACK的最大的区别。

    这也意味着,当consumer故障重启后,那些尚未ACK的消息会重新发送过来。

4.4 SESSION_TRANSACTED

当session使用事务时,就是使用此模式。在事务开启之后,和session.commit()之前,所有消费的消息,要么全部正常确认,要么全部redelivery。这种严谨性,通常在基于GROUP(消息分组)或者其他场景下特别适合。在SESSION_TRANSACTED模式下,optimizeACK并不能发挥任何效果,因为在此模式下,optimizeACK会被强制设定为false,不过prefetch仍然可以决定DELIVERED_ACK_TYPE的发送时机。

    因为Session非线程安全,那么当前session下所有的consumer都会共享同一个transactionContext;同时建议,一个事务类型的Session中只有一个Consumer,已避免rollback()或者commit()方法被多个consumer调用而造成的消息混乱。

    当consumer接受到消息之后,首先检测TransactionContext是否已经开启,如果没有,就会开启并生成新的transactionId,并把信息发送给broker;此后将检测事务中已经消费的消息个数是否 >= prefetch * 0.5,如果大于则补充发送一个“DELIVERED_ACK_TYPE”的确认指令;这时就开始调用onMessage()方法,如果是同步(receive),那么即返回message。上述过程,和其他确认模式没有任何特殊的地方。

    当开发者决定事务可以提交时,必须调用session.commit()方法,commit方法将会导致当前session的事务中所有消息立即被确认;事务的确认过程中,首先把本地的deliveredMessage队列中尚未确认的消息全部确认(STANDARD_ACK_TYPE);此后向broker发送transaction提交指令并等待broker反馈,如果broker端事务操作成功,那么将会把本地deliveredMessage队列清空,新的事务开始;如果broker端事务操作失败(此时broker已经rollback),那么对于session而言,将执行inner-rollback,这个rollback所做的事情,就是将当前事务中的消息清空并要求broker重发(REDELIVERED_ACK_TYPE),同时commit方法将抛出异常。

    当session.commit方法异常时,对于开发者而言通常是调用session.rollback()回滚事务(事实上开发者不调用也没有问题),当然你可以在事务开始之后的任何时机调用rollback(),rollback意味着当前事务的结束,事务中所有的消息都将被重发。需要注意,无论是inner-rollback还是调用session.rollback()而导致消息重发,都会导致message.redeliveryCounter计数器增加,最终都会受限于brokerUrl中配置的"jms.redeliveryPolicy.maximumRedeliveries",如果rollback的次数过多,而达到重发次数的上限时,消息将会被DLQ(dead letter)。

4.5 INDIVIDUAL_ACKNOWLEDGE

单条消息确认,这种确认模式,我们很少使用,它的确认时机和CLIENT_ACKNOWLEDGE几乎一样,当消息消费成功之后,需要调用message.acknowledege来确认此消息(单条),而CLIENT_ACKNOWLEDGE模式先message.acknowledge()方法将导致整个session中所有消息被确认(批量确认)。

5 消息重传机制

5.1 重传触发机制

消息重传主要是针对事务性消息以及client_ack方式的进行,重传触发时机如下,

1)事务session,并且调用了rollback()方法;
2)事务session,关闭之前调用了commit;
3)非事务session中使用CLIENT_ACKNOWLEDGE签收模式,并且调用了Session.recover()方法。

如果不是以上情况,(未成功确认的消息)不会立即触发消息重传,会在下次客户端启动时,重传。

一旦消息重发尝试超过重发策略中配置的maximumRedeliveries(缺省为6次)时,会给broker发送一个"Poison ack",通知它,这个消息被认为是一个毒丸(a poison pill),接着broker会将这个消息发送到DLQ(Dead Letter Queue),以便后续分析处理。

通过recover重传方式,当前线程会和消息绑定,当前线程挂起,消息不转发给其他线程,只有当该线程关闭,才会释放对该消息的持有。

通过抛异常的方式,也会重发重新投递,但此时,只是将该消息当作一个新消息,重新负载分配到所有监听。

5.2 消息重传配置

具体参数如下,

 属性 默认值 说明 
 collisionAvoidanceFactor  0.15  设置防止冲突范围的正负百分比,只有启用useCollisionAvoidance参数时才生效。
 maximumRedeliveries  6  最大重传次数,达到最大重连次数后抛出异常。为-1时不限制次数,为0时表示不进行重传。
 maximumRedeliveryDelay  -1  最大传送延迟,只在useExponentialBackOff为true时有效(V5.5),假设首次重连间隔为10ms,倍数为2,那么第二次重连时间间隔为 20ms,第三次重连时间间隔为40ms,当重连时间间隔大的最大重连时间间隔时,以后每次重连时间间隔都为最大重连时间间隔。
 initialRedeliveryDelay  1000L  初始重发延迟时间
 redeliveryDelay  1000L  重发延迟时间,当initialRedeliveryDelay=0时生效(v5.4)
 useCollisionAvoidance  false  启用防止冲突功能,因为消息接收时是可以使用多线程并发处理的,应该是为了重发的安全性,避开所有并发线程都在同一个时间点进行消息接收处理。所有线程在同一个时间点处理时会发生什么问题呢?应该没有问题,只是为了平衡broker处理性能,不会有时很忙,有时很空闲。
 useExponentialBackOff  false  启用指数倍数递增的方式增加延迟时间。
 backOffMultiplier  5  重连时间间隔递增倍数,只有值大于1和启用useExponentialBackOff参数时才生效。

6  消息持久化

为了避免意外宕机以后丢失信息,需要做到重启后可以恢复消息队列,消息系统一般都会采用持久化机制。

ActiveMQ的消息持久化机制有JDBC,AMQ,KahaDB和LevelDB,无论使用哪种持久化方式,消息的存储逻辑都是一致的。

就是在发送者将消息发送出去后,消息中心首先将消息存储到本地数据文件、内存数据库或者远程数据库等,然后试图将消息发送给接收者,发送成功则将消息从存储中删除,失败则继续尝试。

消息中心启动以后首先要检查指定的存储位置,如果有未发送成功的消息,则需要把消息发送出去。

6.1 JDBC持久化方式

使用JDBC持久化方式,数据库会创建3个表:activemq_msgs,activemq_acks和activemq_lock。
activemq_msgs用于存储消息,Queue和Topic都存储在这个表中。

(1)配置方式

配置持久化的方式,都是修改安装目录下conf/acticvemq.xml文件,

首先定义一个mysql-ds的MySQL数据源,然后在persistenceAdapter节点中配置jdbcPersistenceAdapter并且引用刚才定义的数据源。

<persistenceAdapter> 

    <jdbcPersistenceAdapter dataSource="#mysql-ds" createTablesOnStartup="false" /> 

</persistenceAdapter>

dataSource指定持久化数据库的bean,createTablesOnStartup是否在启动的时候创建数据表,默认值是true,这样每次启动都会去创建数据表了,一般是第一次启动的时候设置为true,之后改成false。
使用MySQL配置JDBC持久化:

<beans>

    <broker brokerName="test-broker" persistent="true" xmlns="http://activemq.apache.org/schema/core">

        <persistenceAdapter>

            <jdbcPersistenceAdapter dataSource="#mysql-ds" createTablesOnStartup="false"/>

        </persistenceAdapter>

    </broker>

    <bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">

        <property name="driverClassName" value="com.mysql.jdbc.Driver"/>

        <property name="url" value="jdbc:mysql://localhost/activemq?relaxAutoCommit=true"/>

        <property name="username" value="activemq"/>

        <property name="password" value="activemq"/>

        <property name="maxActive" value="200"/>

        <property name="poolPreparedStatements" value="true"/>

    </bean>

</beans>

(2)数据库表信息 

activemq_msgs用于存储消息,Queue和Topic都存储在这个表中:
ID:自增的数据库主键
CONTAINER:消息的Destination
MSGID_PROD:消息发送者客户端的主键
MSG_SEQ:是发送消息的顺序,MSGID_PROD+MSG_SEQ可以组成JMS的MessageID
EXPIRATION:消息的过期时间,存储的是从1970-01-01到现在的毫秒数
MSG:消息本体的Java序列化对象的二进制数据
PRIORITY:优先级,从0-9,数值越大优先级越高

activemq_acks用于存储订阅关系。如果是持久化Topic,订阅者和服务器的订阅关系在这个表保存:
主要的数据库字段如下:
CONTAINER:消息的Destination
SUB_DEST:如果是使用Static集群,这个字段会有集群其他系统的信息
CLIENT_ID:每个订阅者都必须有一个唯一的客户端ID用以区分
SUB_NAME:订阅者名称
SELECTOR:选择器,可以选择只消费满足条件的消息。条件可以用自定义属性实现,可支持多属性AND和OR操作
LAST_ACKED_ID:记录消费过的消息的ID。

表activemq_lock在集群环境中才有用,只有一个Broker可以获得消息,称为Master Broker,
其他的只能作为备份等待Master Broker不可用,才可能成为下一个Master Broker。
这个表用于记录哪个Broker是当前的Master Broker。

6.2 AMQ方式

性能高于JDBC,写入消息时,会将消息写入日志文件,由于是顺序追加写,性能很高。为了提升性能,创建消息主键索引,并且提供缓存机制,进一步提升性能。每个日志文件的大小都是有限制的(默认32m,可自行配置)。
当超过这个大小,系统会重新建立一个文件。当所有的消息都消费完成,系统会删除这个文件或者归档(取决于配置)。
主要的缺点是AMQ Message会为每一个Destination创建一个索引,如果使用了大量的Queue,索引文件的大小会占用很多磁盘空间。
而且由于索引巨大,一旦Broker崩溃,重建索引的速度会非常慢。

配置片段如下:

<persistenceAdapter>

     <amqPersistenceAdapter directory="${activemq.data}/activemq-data" maxFileLength="32mb"/>

</persistenceAdapter>

然AMQ性能略高于下面的Kaha DB方式,但是由于其重建索引时间过长,而且索引文件占用磁盘空间过大,所以已经不推荐使用。

6.3  KahaDB方式

KahaDB是从ActiveMQ 5.4开始默认的持久化插件,也是我们项目现在使用的持久化方式。

KahaDb恢复时间远远小于其前身AMQ并且使用更少的数据文件,所以可以完全代替AMQ。
kahaDB的持久化机制同样是基于日志文件,索引和缓存。

配置方式:

<persistenceAdapter>

    <kahaDB directory="${activemq.data}/activemq-data" journalMaxFileLength="16mb"/>

</persistenceAdapter>

directory : 指定持久化消息的存储目录

journalMaxFileLength : 指定保存消息的日志文件大小,具体根据你的实际应用配置

(1)KahaDB主要特性
1、日志形式存储消息;
2、消息索引以B-Tree结构存储,可以快速更新;
3、完全支持JMS事务;
4、支持多种恢复机制;

(2)KahaDB的结构

消息存储在基于文件的数据日志中。如果消息发送成功,变标记为可删除的。系统会周期性的清除或者归档日志文件。
消息文件的位置索引存储在内存中,这样能快速定位到。定期将内存中的消息索引保存到metadata store中,避免大量消息未发送时,消息索引占用过多内存空间。

Data logs:
Data logs用于存储消息日志,消息的全部内容都在Data logs中。
同AMQ一样,一个Data logs文件大小超过规定的最大值,会新建一个文件。同样是文件尾部追加,写入性能很快。
每个消息在Data logs中有计数引用,所以当一个文件里所有的消息都不需要了,系统会自动删除文件或放入归档文件夹。

Metadata cache :
缓存用于存放在线消费者的消息。如果消费者已经快速的消费完成,那么这些消息就不需要再写入磁盘了。
Btree索引会根据MessageID创建索引,用于快速的查找消息。这个索引同样维护持久化订阅者与Destination的关系,以及每个消费者消费消息的指针。

Metadata store 
在db.data文件中保存消息日志中消息的元数据,也是以B-Tree结构存储的,定时从Metadata cache更新数据。Metadata store中也会备份一些在消息日志中存在的信息,这样可以让Broker实例快速启动。
即便metadata store文件被破坏或者误删除了。broker可以读取Data logs恢复过来,只是速度会相对较慢些。

6.4 LevelDB方式

从ActiveMQ 5.6版本之后,又推出了LevelDB的持久化引擎。
目前默认的持久化方式仍然是KahaDB,不过LevelDB持久化性能高于KahaDB,可能是以后的趋势。
在ActiveMQ 5.9版本提供了基于LevelDB和Zookeeper的数据复制方式,用于Master-slave方式的首选数据复制方案。

  结语:到目前为止,我们已经已经简单的了解了ActiveMQ中消息传送机制,还有JMS中ACK策略,重点分析了optimizeACK的策略,希望开发者能够在使用activeMQ中避免一些不必要的错误。本文如有疏漏和错误之处,请各位不吝赐教,特此感谢。在后续的文章中,会详细介绍具体配置。

7 延迟消息

有时候我们不希望消息马上被broker投递出去,而是想要消息60秒以后发给消费者,或者我们想让消息没隔一定时间投递一次,一共投递指定的次数。。。

类似这种需求,ActiveMQ提供了一种broker端消息定时调度机制。

我们只需要把几个描述消息定时调度方式的参数作为属性添加到消息,broker端的调度器就会按照我们想要的行为去处理消息。

首先开启schedulerSupport为true,在activemq.xml文件添加

<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}" schedulerSupport="true">

一共有四个属性:

Property name type description
AMQ_SCHEDULED_DELAY long 延迟投递的时间
AMQ_SCHEDULED_PERIOD long 重复投递的时间间隔
AMQ_SCHEDULED_REPEAT int 重复投递次数
AMQ_SCHEDULED_CRON String Cron表达式

当然ActiveMQ也提供了一个封装的消息类型:org.apache.activemq.ScheduledMessage.

使用示例,延迟60秒:

MessageProducer producer = session.createProducer(destination);
        TextMessage message = session.createTextMessage("test msg");
        long time = 60 * 1000;
        message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, time);
        producer.send(message);

延迟30秒,投递10次,间隔10秒:

MessageProducer producer = session.createProducer(destination);
        TextMessage message = session.createTextMessage("test msg");
        long delay = 30 * 1000;
        long period = 10 * 1000;
        int repeat = 9;
        message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay);
        message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, period);
        message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, repeat);
        producer.send(message);

使用 CRON 表达式的例子:

MessageProducer producer = session.createProducer(destination);
        TextMessage message = session.createTextMessage("test msg");
        message.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON, "0 * * * *");
        producer.send(message);

CRON表达式的优先级高于另外三个参数,如果在设置了CRON的同时,也有repeat和period参数,则会在每次CRON执行的时候,重复投递repeat次,每次间隔为period。就是说设置是叠加的效果。例如每小时都会发生消息被投递10次,延迟1秒开始,每次间隔1秒:

MessageProducer producer = session.createProducer(destination);
        TextMessage message = session.createTextMessage("test msg");
        message.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON, "0 * * * *");
        message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, 1000);
        message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, 1000);
        message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, 9);
        producer.send(message);

stomp方式,只需要在head中添加属性即可,

activeMQ.publish(exports.signQueue,msg,{'persistent':true,'AMQ_SCHEDULED_DELAY':60000});

参考文章:

http://blog.csdn.net/czp11210/article/details/47022639

http://activemq.apache.org/redelivery-policy.html

http://www.myexception.cn/internet/1252460.html

04-26 22:57