在java web开发中,大家经常会用到定时任务。比较常用的类库有:Timer、ScheduledExecutor 、Quartz、Spring Scheduler。
Timer和ScheduleExecutor都不支持持久化和分布式,宕机或重启则待执行的任务丢失。部署多个实例时会同时触发任务。
Quartz若支持持久化和分布式需要较多的配置,貌似是7张表。
Spring Scheduler支持持久化和分布式也是需要相应的插件。
本文将介绍下另一种思路,使用rabbitmq实现定时任务。

死信队列

DLX, Dead-Letter-Exchange。当消息在一个队列中变成死信之后,它能被重新publish到另一个Exchange,这个Exchange就是DLX。消息变成死信一向有一下几种情况:

  • 消息被拒绝(basic.reject/ basic.nack)并且requeue=false
  • 消息TTL过期
  • 队列达到最大长度
    变为死信队列也很简单,为队列设置一个属性Dead letter exchange指定死信要重新publish到的Exchange就行了。

思路

rabbitmq实现分布式定时任务-LMLPHP
rabbitMq配置两个exchange:exchange-product负责接收生产者的消息、DLX负责接收死信。两个queue:queue-dead绑定在exchange-product上,queue-consume绑定在DLX上。
消息传递流程:

  1. 生产者发送消息到exchange-product后,由于这个exchange Binding了queue-dead,因此消息会传到queue-dead。
  2. 由于这个队列没有消费者,因此queue-dead的消息会超时。由于queue-dead设置了Dead Letter exchange属性,指定了死信要publish到DLX,因此消息会到DLX。
  3. DLX这个exchange Binding了queue-consume。消息会到queue-consume中。
  4. 由于消费者连接了queue-consume,最终消息传到消费者。

代码

像这种定时任务一般都是需要经常执行的。因此,队列,exchange都需要持久化。所以可以在控制台去创建队列和exchange。因此不需要在代码里创建队列以及exchange,只需要发送和接收消息就行了。

//消费者
@Component
public class MessageConsumer {
    @RabbitListener(queues = "queue-consume")//指定要消费的队列
    public void receive(byte[] body) {
        long end = System.currentTimeMillis();
        String start = new String(body);//解析消息体,内容传的消息发送时间
        System.out.println("start: "+ start);
        System.out.println("end: "+ end);
        System.out.println("offset: "+(Long.parseLong(start)-end));//计算实际定时时间
    }
}
//生产者
@Component
public class MessageProducer {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    public void send(String str){
        MessageProperties messageProperties = new MessageProperties();
        Message message = new Message(str.getBytes(), messageProperties);
        messageProperties.setContentType("json");
        messageProperties.setExpiration("10000");//设置消息超时时间为10秒
        rabbitTemplate.send("exchange-product","",message);//发送消息到exchange-product
    }
}

演示的例子很简单。为了方便,是在spring的环境下写的。exchange配置的类型为fanout,因此不需要指定routing key。实际应用时可以换成别的类型。也没有指定手动ack。实际应用时还是建议手动ack。

测试

public class ApplicationTests {
	@Autowired
	MessageProducer messageProducer;
	@Test
	public void push() {
		long start = System.currentTimeMillis();
		messageProducer.send(""+start);
	}
}

rabbitmq实现分布式定时任务-LMLPHP
可以看到最终误差大概在0.1秒。由于我本机和rabbitmq不在一个局域网,所以网络开销比较大。局域网连接估计误差会小一些。

缺点

由于rabbitmq的消息,在没有优先级的情况下,是会按顺序消费,判断超时也是按顺序的。若有两条消息message1和message2。message1的定时时间(即超时时间)为10秒。message2的定时时间为5秒。若message1先发送,则需要先等待message1超时后publish到DLX,message2才能判断超时publish到DLX。最终message2可能需要等10秒。
那这种定时还有什么用呢
刚刚举得例子是因为两条消息的超时时间不同。若两条消息超时时间相同便没有问题了。因此这种定时只适用于,同一业务的消息定时时间固定。多个业务建多个队列和exchange就好了。

03-04 19:22