消费端限流
1. 为什么要对消费端限流
假设一个场景,首先,我们 Rabbitmq 服务器积压了有上万条未处理的消息,我们随便打开一个消费者客户端,会出现这样情况: 巨量的消息瞬间全部推送过来,但是我们单个客户端无法同时处理这么多数据!
当数据量特别大的时候,我们对生产端限流肯定是不科学的,因为有时候并发量就是特别大,有时候并发量又特别少,我们无法约束生产端,这是用户的行为。所以我们应该对消费端限流,用于保持消费端的稳定,当消息数量激增的时候很有可能造成资源耗尽,以及影响服务的性能,导致系统的卡顿甚至直接崩溃。
2.限流的 api 讲解
RabbitMQ 提供了一种 qos (服务质量保证)功能,即在非自动确认消息的前提下,如果一定数目的消息(通过基于 consume 或者 channel 设置 Qos 的值)未被确认前,不进行消费新的消息。
/**
* Request specific "quality of service" settings.
* These settings impose limits on the amount of data the server
* will deliver to consumers before requiring acknowledgements.
* Thus they provide a means of consumer-initiated flow control.
* @param prefetchSize maximum amount of content (measured in
* octets) that the server will deliver, 0 if unlimited
* @param prefetchCount maximum number of messages that the server
* will deliver, 0 if unlimited
* @param global true if the settings should be applied to the
* entire channel rather than each consumer
* @throws java.io.IOException if an error is encountered
*/
void basicQos(int prefetchSize, int prefetchCount, boolean global) throws IOException;
prefetchSize:0,单条消息大小限制,0代表不限制
prefetchCount:一次性消费的消息数量。会告诉 RabbitMQ 不要同时给一个消费者推送多于 N 个消息,即一旦有 N 个消息还没有 ack,则该 consumer 将 block 掉,直到有消息 ack。
- global:true、false 是否将上面设置应用于 channel,简单点说,就是上面限制是 channel 级别的还是 consumer 级别。当我们设置为 false 的时候生效,设置为 true 的时候没有了限流功能,因为 channel 级别尚未实现。
注意:prefetchSize 和 global 这两项,rabbitmq 没有实现,暂且不研究。特别注意一点,prefetchCount 在 no_ask=false 的情况下才生效,即在自动应答的情况下这两个值是不生效的。
3.如何对消费端进行限流
首先第一步,我们既然要使用消费端限流,我们需要关闭自动 ack,将 autoAck 设置为 false
channel.basicConsume(queueName, false, consumer);
第二步我们来设置具体的限流大小以及数量。
channel.basicQos(0, 15, false);
第三步在消费者的 handleDelivery 消费方法中手动 ack,并且设置批量处理 ack 回应为 true
channel.basicAck(envelope.getDeliveryTag(), true);
这是生产端代码,与前几章的生产端代码没有做任何改变,主要的操作集中在消费端。
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class QosProducer {
public static void main(String[] args) throws Exception {
//1. 创建一个 ConnectionFactory 并进行设置
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setVirtualHost("/");
factory.setUsername("guest");
factory.setPassword("guest");
//2. 通过连接工厂来创建连接
Connection connection = factory.newConnection();
//3. 通过 Connection 来创建 Channel
Channel channel = connection.createChannel();
//4. 声明
String exchangeName = "test_qos_exchange";
String routingKey = "item.add";
//5. 发送
String msg = "this is qos msg";
for (int i = 0; i < 10; i++) {
String tem = msg + " : " + i;
channel.basicPublish(exchangeName, routingKey, null, tem.getBytes());
System.out.println("Send message : " + tem);
}
//6. 关闭连接
channel.close();
connection.close();
}
}
这里我们创建了两个消费者,以方便验证限流api中的 global
参数设置为 true
时不起作用.。整体结构如下图所示,两个 Consumer 都绑定在同一个队列上,这样的话两个消费者将共同消费发送的10条消息。
import com.rabbitmq.client.*;
import java.io.IOException;
public class QosConsumer {
public static void main(String[] args) throws Exception {
//1. 创建一个 ConnectionFactory 并进行设置
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setVirtualHost("/");
factory.setUsername("guest");
factory.setPassword("guest");
factory.setAutomaticRecoveryEnabled(true);
factory.setNetworkRecoveryInterval(3000);
//2. 通过连接工厂来创建连接
Connection connection = factory.newConnection();
//3. 通过 Connection 来创建 Channel
final Channel channel = connection.createChannel();
//4. 声明
String exchangeName = "test_qos_exchange";
String queueName = "test_qos_queue";
String queueName1 = "test_qos_queue_1";
String routingKey = "item.#";
channel.exchangeDeclare(exchangeName, "topic", true, false, null);
channel.queueDeclare(queueName, true, false, false, null);
channel.basicQos(0, 3, false);
//一般不用代码绑定,在管理界面手动绑定
channel.queueBind(queueName, exchangeName, routingKey);
channel.queueBind(queueName1, exchangeName, routingKey);
//5. 创建消费者并接收消息
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
String message = new String(body, "UTF-8");
System.out.println("[x] Received '" + message + "'");
channel.basicAck(envelope.getDeliveryTag(), true);
}
};
Consumer consumer1 = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body);
System.out.println("[Y] Receives '" + message + "'");
channel.basicAck(envelope.getDeliveryTag(), true);
}
};
//6. 设置 Channel 消费者绑定队列
channel.basicConsume(queueName, false, consumer);
channel.basicConsume(queueName, false, consumer1);
}
}
从如下结果中我们可以看到两个消费者依次消费三条消息。
[x] Received 'this is qos msg : 0'
[x] Received 'this is qos msg : 1'
[x] Received 'this is qos msg : 2'
[Y] Receives 'this is qos msg : 3'
[Y] Receives 'this is qos msg : 4'
[Y] Receives 'this is qos msg : 5'
[x] Received 'this is qos msg : 6'
[x] Received 'this is qos msg : 7'
[x] Received 'this is qos msg : 8'
[Y] Receives 'this is qos msg : 9'
当我们将void basicQos(int prefetchSize, int prefetchCount, boolean global)
中的 global 设置为 true
的时候我们发现并没有了限流的作用。
[x] Received 'this is qos msg : 0'
[x] Received 'this is qos msg : 1'
[x] Received 'this is qos msg : 2'
[x] Received 'this is qos msg : 3'
[Y] Receives 'this is qos msg : 4'
[x] Received 'this is qos msg : 5'
[Y] Receives 'this is qos msg : 6'
[x] Received 'this is qos msg : 7'
[Y] Receives 'this is qos msg : 8'
[x] Received 'this is qos msg : 9'
TTL
TTL是Time To Live的缩写,也就是生存时间。RabbitMQ支持消息的过期时间,在消息发送时可以进行指定。
RabbitMQ支持队列的过期时间,从消息入队列开始计算,只要超过了队列的超时时间配置,那么消息会自动的清除。
这与 Redis 中的过期时间概念类似。我们应该合理使用 TTL 技术,可以有效的处理过期垃圾消息,从而降低服务器的负载,最大化的发挥服务器的性能。
1.消息的 TTL
我们在生产端发送消息的时候可以在 properties 中指定 expiration
属性来对消息过期时间进行设置,单位为毫秒(ms)。
/**
* deliverMode 设置为 2 的时候代表持久化消息
* expiration 意思是设置消息的有效期,超过10秒没有被消费者接收后会被自动删除
* headers 自定义的一些属性
* */
//5. 发送
Map<String, Object> headers = new HashMap<String, Object>();
headers.put("myhead1", "111");
headers.put("myhead2", "222");
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()
.deliveryMode(2)
.contentEncoding("UTF-8")
.expiration("100000")
.headers(headers)
.build();
String msg = "test message";
channel.basicPublish("", queueName, properties, msg.getBytes());
我们也可以后台管理页面中进入 Exchange 发送消息指定expiration
2.队列的 TTL
我们也可以在后台管理界面中新增一个 queue,创建时可以设置 ttl,对于队列中超过该时间的消息将会被移除。
死信队列
死信队列:没有被及时消费的消息存放的队列
消息没有被及时消费的原因:
a.消息被拒绝(basic.reject/ basic.nack)并且不再重新投递 requeue=false
b.TTL(time-to-live) 消息超时未消费
c.达到最大队列长度
实现死信队列步骤
首先需要设置死信队列的 exchange 和 queue,然后进行绑定:
Exchange: dlx.exchange Queue: dlx.queue RoutingKey: # 代表接收所有路由 key
- 然后我们进行正常声明交换机、队列、绑定,只不过我们需要在普通队列加上一个参数即可:
arguments.put("x-dead-letter-exchange",' dlx.exchange' )
这样消息在过期、requeue失败、 队列在达到最大长度时,消息就可以直接路由到死信队列!
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class DlxProducer {
public static void main(String[] args) throws Exception {
//设置连接以及创建 channel 湖绿
String exchangeName = "test_dlx_exchange";
String routingKey = "item.update";
String msg = "this is dlx msg";
//我们设置消息过期时间,10秒后再消费 让消息进入死信队列
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()
.deliveryMode(2)
.expiration("10000")
.build();
channel.basicPublish(exchangeName, routingKey, true, properties, msg.getBytes());
System.out.println("Send message : " + msg);
channel.close();
connection.close();
}
}
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
public class DlxConsumer {
public static void main(String[] args) throws Exception {
//创建连接、创建channel忽略 内容可以在上面代码中获取
String exchangeName = "test_dlx_exchange";
String queueName = "test_dlx_queue";
String routingKey = "item.#";
//必须设置参数到 arguments 中
Map<String, Object> arguments = new HashMap<String, Object>();
arguments.put("x-dead-letter-exchange", "dlx.exchange");
channel.exchangeDeclare(exchangeName, "topic", true, false, null);
//将 arguments 放入队列的声明中
channel.queueDeclare(queueName, true, false, false, arguments);
//一般不用代码绑定,在管理界面手动绑定
channel.queueBind(queueName, exchangeName, routingKey);
//声明死信队列
channel.exchangeDeclare("dlx.exchange", "topic", true, false, null);
channel.queueDeclare("dlx.queue", true, false, false, null);
//路由键为 # 代表可以路由到所有消息
channel.queueBind("dlx.queue", "dlx.exchange", "#");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
}
};
//6. 设置 Channel 消费者绑定队列
channel.basicConsume(queueName, true, consumer);
}
}
总结
DLX也是一个正常的 Exchange,和一般的 Exchange 没有区别,它能在任何的队列上被指定,实际上就是设置某个队列的属性。当这个队列中有死信时,RabbitMQ 就会自动的将这个消息重新发布到设置的 Exchange 上去,进而被路由到另一个队列。可以监听这个队列中消息做相应的处理。