DefaultConsumer
我的DemoConsumer继承自DefaultConsumer。
我注意到以这种方式从ThreadPool调用handleDelivery()。
(打印Thread.currentThread()。getName()我每次看到pool-1-thread-1 / 2/3/4。
我也对其进行了几次测试,发现订单已保存。
只是要确保-由于不同的线程调用句柄传递-会搞乱我的订单吗?
排队消费
所有Java教程都使用QueueingConsumer来消费消息。
在API Docs中,它被提为已弃用的类。
我应该更改代码以使用DefaultConsumer继承吗?教程过时了吗?
谢谢。
最佳答案
是的,DefaultConsumer
使用可以更改的内部线程池。
将ExecutorService
用作:
ExecutorService es = Executors.newFixedThreadPool(20);
Connection conn = factory.newConnection(es);
阅读http://www.rabbitmq.com/api-guide.html“高级连接选项”。
正如您从“ QueueingConsumer” doc中看到的:
因此,现在可以直接实现Consumer或扩展DefaultConsumer。
我从未使用过QueueingConsumer,因为它不是由事件驱动的。
如您所见:
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(QUEUE_NAME, true, consumer);
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
/// here you are blocked, waiting the next message.
String message = new String(delivery.getBody());
}
在这种情况下,一个典型的问题是如何关闭订阅,一个常见的解决方法是在本地主机中发送带标签的关闭消息。其实我不太喜欢。
如果扩展
DefaultConsumer
,则可以正确关闭订阅和频道:public class MyConsumer extends DefaultConsumer {...}
然后
public static void main(String[] args) {
MyConsumer consumer = new MyConsumer (channel);
String consumerTag = channel.basicConsume(Constants.queue, false, consumer);
System.out.println("press any key to terminate");
System.in.read();
channel.basicCancel(consumerTag);
channel.close();
....
总之,您不必担心消息顺序,因为如果一切正常,则消息顺序是正确的,但是我认为您无法假设它,因为如果出现问题,则可能会丢失消息顺序。如果您绝对需要维护消息顺序,则应包括一个顺序标记以在使用者端重建消息顺序。
并且您应该扩展DefaultConsumer。