生产者代码不变,消费者:
package com.toov5.Consumer; import java.io.IOException;
import java.util.concurrent.TimeoutException; import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.toov5.utils.MQConnectionUtils; public class Consumer { //队列名称
private static final String QUEUE_NAME = "test_queue"; public static void main(String[] args) throws IOException, TimeoutException {
System.out.println("消费者启动..........");
//创建新的连接
Connection connection = MQConnectionUtils.newConnection();
//创建Channel
Channel channel = connection.createChannel();
// 消费者关联队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null); DefaultConsumer defaultConsumerr = new DefaultConsumer(channel) {
//监听获取消息
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
byte[] body) throws IOException {
String msg =new String(body,"UTF-8");
System.out.println("消费者获取生产者消息:"+msg);
}
};
//牵手模式设置 默认自动应答模式 true:自动应答模式
channel.basicConsume(QUEUE_NAME, false, defaultConsumerr);// fanse手动应答 // //关闭通道和连接
// channel.close();
// connection.close();
}
}
手动应答。此时 消息队列的消息 一直没有被清除掉
生产者做如下修改就OK了:
package com.toov5.Consumer; import java.io.IOException;
import java.util.concurrent.TimeoutException; import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.toov5.utils.MQConnectionUtils; public class Consumer { //队列名称
private static final String QUEUE_NAME = "test_queue"; public static void main(String[] args) throws IOException, TimeoutException {
System.out.println("消费者启动..........");
//创建新的连接
Connection connection = MQConnectionUtils.newConnection();
//创建Channel
final Channel channel = connection.createChannel();
// 消费者关联队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null); DefaultConsumer defaultConsumerr = new DefaultConsumer(channel) {
//监听获取消息
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
byte[] body) throws IOException {
String msg =new String(body,"UTF-8");
System.out.println("消费者获取生产者消息:"+msg);
channel.basicAck(envelope.getDeliveryTag(), false); //手动应答 告诉消息队列服务器 消费成功
}
};
//牵手模式设置 默认自动应答模式 true:自动应答模式
channel.basicConsume(QUEUE_NAME, false, defaultConsumerr);// fanse手动应答 // //关闭通道和连接
// channel.close();
// connection.close();
}
}