首先,我是rabbitmq和bramqp的新手。我知道这可能是一个愚蠢的问题,但是由于这个问题导致我取消了队列中的消费者,所以我一直在努力。我搜索了整个互联网,没有找到有关如何执行basic.cancel代码的bramqp示例。
这是我的代码:
var bramqp = require('bramqp');
var net = require('net');
var async = require('async');
var queueName = 'testQueue';
var consumerTag = 'testConsumer';
var exchangeName = 'testExchange';
var socket = net.connect({
port : 5672
});
bramqp.initialize(socket, 'rabbitmq/full/amqp0-9-1.stripped.extended', function(error, handle){
async.series([ function(seriesCallback) {
handle.openAMQPCommunication('guest', 'guest', true, seriesCallback);
}, function(seriesCallback) {
handle.exchange.declare(
1 /*short reserved-1*/,
exchangeName /*exchange-name exchange*/,
'fanout' /*shortstr type*/,
false /*bit passive*/,
true /*bit durable*/,
false /*bit auto-delete*/,
false /*bit internal*/,
false /*no-wait no-wait*/,
{} /*table arguments*/
);
handle.once('exchange.declare-ok', function(channel, method, data) {
console.log('exchange declared');
seriesCallback();
});
}, function(seriesCallback) {
handle.basic.qos(
1 /*long prefetch-size*/,
0 /*short prefetch-count*/,
1 /*...*/,
false /*bit global*/
);
handle.once('basic.qos-ok', function(channel, method, data) {
console.log('qos accepted');
seriesCallback();
});
}, function(seriesCallback) {
handle.queue.declare(
1 /*short reserved-1*/,
queueName /*queue-name queue*/,
false /*bit passive*/,
true /*bit durable*/,
false /*bit exclusive*/,
false /*bit auto-delete*/,
false /*no-wait no-wait*/,
{} /*table arguments*/
);
handle.once('queue.declare-ok', function(channel, method, data) {
console.log('queue declared');
seriesCallback();
});
}, function(seriesCallback) {
handle.queue.bind(
1 /*short reserved-1*/,
queueName /*exchange-name destination*/,
exchangeName /*exchange-name source*/,
null /*shortstr routing-key*/,
false /*no-wait no-wait*/,
{} /*table arguments*/
);
handle.once('queue.bind-ok', function(channel, method, data) {
console.log('queue bound sucessfully');
seriesCallback();
});
}, function(seriesCallback) {
handle.basic.consume(1 /*short reserved-1*/,
queueName /*queue-name queue*/,
consumerTag,
false /*no-local no-local*/,
false /*no-ack no-ack*/,
false /*bit exclusive*/,
false /*no-wait no-wait*/,
{} /*table arguments*/
);
handle.once('basic.consume-ok', function (channel, method, data) {
console.log('consuming from queue');
console.log(data);
handle.on('basic.deliver', function (channel, method, data) {
console.log('incoming message');
console.log(data);
handle.once('content', function (channel, className, properties, content) {
console.log('got a message:');
console.log(content.toString());
if (content.toString().indexOf("END_MESSAGE") > -1){
handle.basic.cancel(consumerTag, false);
handle.once('basic.cancel-ok', function(channel, method, data) {
console.log("consumer cancelled successfully");
seriesCallback();
});
res.json("END_MESSAGE");
}
else{
console.log('acking');
handle.basic.ack(1, data['delivery-tag']);
}
console.log('with properties:');
console.log(properties);
seriesCallback();
});
});
});
} ], function() {
console.log('all done');
});
});
因此,我要尝试做的只是在检测到“ END_MESSAGE”类型的消息时停止使用者。我从上面的代码中得到的是以下错误:
events.js:72
throw er; // Unhandled 'error' event
^
TypeError: value is out of bounds
at TypeError (<anonymous>)
at checkInt (buffer.js:705:11)
at Buffer.writeUInt16BE (buffer.js:730:5)
req.on('error', function(e){
console.log('problem with request: ' + e.message);
任何意见和建议将不胜感激。谢谢!
最佳答案
似乎basic.cancel()的工作方式至少在今天有所不同。我在文档中发现此方法实际上吃了一个回调,如下所示:
handle.basic.cancel(consumerTag, function(err) {
if (err) throw err;
});
https://www.npmjs.com/package/bramqp-wrapper-搜索basic.cancel
重要!回调不能保证您不会从队列中收到更多消息。似乎仅是为了接受取消而触发该命令,而不是在队列服务器实际上将其摘要时触发。