在java web开发中,大家经常会用到定时任务。比较常用的类库有:Timer、ScheduledExecutor 、Quartz、Spring Scheduler。
Timer和ScheduleExecutor都不支持持久化和分布式,宕机或重启则待执行的任务丢失。部署多个实例时会同时触发任务。
Quartz若支持持久化和分布式需要较多的配置,貌似是7张表。
Spring Scheduler支持持久化和分布式也是需要相应的插件。
本文将介绍下另一种思路,使用rabbitmq实现定时任务。
死信队列
DLX, Dead-Letter-Exchange。当消息在一个队列中变成死信之后,它能被重新publish到另一个Exchange,这个Exchange就是DLX。消息变成死信一向有一下几种情况:
- 消息被拒绝(basic.reject/ basic.nack)并且requeue=false
- 消息TTL过期
- 队列达到最大长度
变为死信队列也很简单,为队列设置一个属性Dead letter exchange指定死信要重新publish到的Exchange就行了。
思路
rabbitMq配置两个exchange:exchange-product负责接收生产者的消息、DLX负责接收死信。两个queue:queue-dead绑定在exchange-product上,queue-consume绑定在DLX上。
消息传递流程:
- 生产者发送消息到exchange-product后,由于这个exchange Binding了queue-dead,因此消息会传到queue-dead。
- 由于这个队列没有消费者,因此queue-dead的消息会超时。由于queue-dead设置了Dead Letter exchange属性,指定了死信要publish到DLX,因此消息会到DLX。
- DLX这个exchange Binding了queue-consume。消息会到queue-consume中。
- 由于消费者连接了queue-consume,最终消息传到消费者。
代码
像这种定时任务一般都是需要经常执行的。因此,队列,exchange都需要持久化。所以可以在控制台去创建队列和exchange。因此不需要在代码里创建队列以及exchange,只需要发送和接收消息就行了。
//消费者
@Component
public class MessageConsumer {
@RabbitListener(queues = "queue-consume")//指定要消费的队列
public void receive(byte[] body) {
long end = System.currentTimeMillis();
String start = new String(body);//解析消息体,内容传的消息发送时间
System.out.println("start: "+ start);
System.out.println("end: "+ end);
System.out.println("offset: "+(Long.parseLong(start)-end));//计算实际定时时间
}
}
//生产者
@Component
public class MessageProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
public void send(String str){
MessageProperties messageProperties = new MessageProperties();
Message message = new Message(str.getBytes(), messageProperties);
messageProperties.setContentType("json");
messageProperties.setExpiration("10000");//设置消息超时时间为10秒
rabbitTemplate.send("exchange-product","",message);//发送消息到exchange-product
}
}
演示的例子很简单。为了方便,是在spring的环境下写的。exchange配置的类型为fanout,因此不需要指定routing key。实际应用时可以换成别的类型。也没有指定手动ack。实际应用时还是建议手动ack。
测试
public class ApplicationTests {
@Autowired
MessageProducer messageProducer;
@Test
public void push() {
long start = System.currentTimeMillis();
messageProducer.send(""+start);
}
}
可以看到最终误差大概在0.1秒。由于我本机和rabbitmq不在一个局域网,所以网络开销比较大。局域网连接估计误差会小一些。
缺点
由于rabbitmq的消息,在没有优先级的情况下,是会按顺序消费,判断超时也是按顺序的。若有两条消息message1和message2。message1的定时时间(即超时时间)为10秒。message2的定时时间为5秒。若message1先发送,则需要先等待message1超时后publish到DLX,message2才能判断超时publish到DLX。最终message2可能需要等10秒。
那这种定时还有什么用呢?
刚刚举得例子是因为两条消息的超时时间不同。若两条消息超时时间相同便没有问题了。因此这种定时只适用于,同一业务的消息定时时间固定。多个业务建多个队列和exchange就好了。