本文介绍了使用 AMQP.Node 在 RabbitMQ 和节点中未使用死信消息的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想在一段时间后在我的一个工作人员中收到一条消息.在发现所谓的死信交换后,我决定使用 Node 和 RabbitMQ.

I want to receive a message after a certain amount of time in one of my workers. I decided to go with Node and RabbitMQ after discovering so-called dead letter exchanges.

该消息似乎已发送到 DeadExchange 中的队列,但在 WorkExchange 中的 WorkQueue 中经过的时间过后,消费者从未收到该消息.要么 bindQueue 已关闭,要么死信不起作用?

The message seems to get send to the queue in DeadExchange, but the consumer is never receiving the message after the elapsed time in the WorkQueue in the WorkExchange. Either the bindQueue is off, or the dead-letter'ing doesn't work?

我现在尝试了很多不同的值.有人可以指出我缺少什么吗?

I've tried a lot of different values now. Can someone please point out what I'm missing?

var amqp = require('amqplib');
var url = 'amqp://dev.rabbitmq.com';

amqp.connect(url).then(function(conn) {
    //Subscribe to the WorkQueue in WorkExchange to which the "delayed" messages get dead-letter'ed (is that a verb?) to.
    return conn.createChannel().then(function(ch) {
        return ch.assertExchange('WorkExchange', 'direct').then(function() {
            return ch.assertQueue('WorkQueue', {
                autoDelete: false,
                durable: true
            })
        }).then(function() {
            return ch.bindQueue('WorkQueue', 'WorkExchange', '');
        }).then(function() {
            console.log('Waiting for consume.');

            return ch.consume('WorkQueue', function(msg) {
                console.log('Received message.');
                console.log(msg.content.toString());
                ch.ack(msg);
            });
        });
    })
}).then(function() {
    //Now send a test message to DeadExchange to a random (unique) queue.
    return amqp.connect(url).then(function(conn) {
        return conn.createChannel();
    }).then(function(ch) {
        return ch.assertExchange('DeadExchange', 'direct').then(function() {
            return ch.assertQueue('', {
                arguments: {
                    'x-dead-letter-exchange': 'WorkExchange',
                    'x-message-ttl': 2000,
                    'x-expires': 10000
                }
            })
        }).then(function(ok) {
            console.log('Sending delayed message');

            return ch.sendToQueue(ok.queue, new Buffer(':)'));
        });
    })
}).then(null, function(error) {
    console.log('error\'ed')
    console.log(error);
    console.log(error.stack);
});

我正在使用 amqp.node (https://github.com/squaremo/amqp.node) 这是 npm 中的 amqplib.虽然 node-amqp (https://github.com/postwait/node-amqp) 似乎更受欢迎,它没有实现完整的协议,并且在重新连接方面存在一些突出的问题.

I'm using amqp.node (https://github.com/squaremo/amqp.node) which is amqplib in npm. Although node-amqp (https://github.com/postwait/node-amqp) seems to be so much more popular, it doesn't implement the full protocol and there are quite some outstanding issues regarding reconnecting.

dev.rabbitmq.com 正在运行 RabbitMQ 3.1.3.

dev.rabbitmq.com is running RabbitMQ 3.1.3.

推荐答案

AMQP.Node 中的 Channel#assertQueue 中存在一个 bug,刚刚修复,请参阅 https://github.com/squaremo/amqp.node/commit/3749c66b448875d28df374e6a89946c0bdd0cb918修复程序在 GitHub 上,但还没有在 npm 中.

There was a bug in Channel#assertQueue in AMQP.Node which just got fixed, see https://github.com/squaremo/amqp.node/commit/3749c66b448875d2df374e6a89946c0bdd0cb918. The fix is on GitHub but not in npm just yet.

这篇关于使用 AMQP.Node 在 RabbitMQ 和节点中未使用死信消息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!