首先,我是rabbitmq和bramqp的新手。我知道这可能是一个愚蠢的问题,但是由于这个问题导致我取消了队列中的消费者,所以我一直在努力。我搜索了整个互联网,没有找到有关如何执行basic.cancel代码的bramqp示例。

这是我的代码:

        var bramqp = require('bramqp');
        var net = require('net');
        var async = require('async');

        var queueName = 'testQueue';
        var consumerTag = 'testConsumer';
        var exchangeName = 'testExchange';

        var socket = net.connect({
            port : 5672
        });

        bramqp.initialize(socket, 'rabbitmq/full/amqp0-9-1.stripped.extended', function(error, handle){
            async.series([ function(seriesCallback) {
                handle.openAMQPCommunication('guest', 'guest', true, seriesCallback);
            }, function(seriesCallback) {
                handle.exchange.declare(
                    1 /*short reserved-1*/,
                    exchangeName /*exchange-name exchange*/,
                    'fanout' /*shortstr type*/,
                    false /*bit passive*/,
                    true /*bit durable*/,
                    false /*bit auto-delete*/,
                    false /*bit internal*/,
                    false /*no-wait no-wait*/,
                    {} /*table arguments*/
                );
                handle.once('exchange.declare-ok', function(channel, method, data) {
                    console.log('exchange declared');
                    seriesCallback();
                });
            }, function(seriesCallback) {
                handle.basic.qos(
                    1 /*long prefetch-size*/,
                    0 /*short prefetch-count*/,
                    1 /*...*/,
                    false /*bit global*/
                );
                handle.once('basic.qos-ok', function(channel, method, data) {
                    console.log('qos accepted');
                    seriesCallback();
                });
            }, function(seriesCallback) {
                handle.queue.declare(
                    1 /*short reserved-1*/,
                    queueName /*queue-name queue*/,
                    false /*bit passive*/,
                    true /*bit durable*/,
                    false /*bit exclusive*/,
                    false /*bit auto-delete*/,
                    false /*no-wait no-wait*/,
                    {} /*table arguments*/
                );
                handle.once('queue.declare-ok', function(channel, method, data) {
                    console.log('queue declared');
                    seriesCallback();
                });
            }, function(seriesCallback) {
                handle.queue.bind(
                    1 /*short reserved-1*/,
                    queueName /*exchange-name destination*/,
                    exchangeName /*exchange-name source*/,
                    null /*shortstr routing-key*/,
                    false /*no-wait no-wait*/,
                    {} /*table arguments*/
                );
                handle.once('queue.bind-ok', function(channel, method, data) {
                    console.log('queue bound sucessfully');
                    seriesCallback();
                });
            }, function(seriesCallback) {
                handle.basic.consume(1 /*short reserved-1*/,
                    queueName /*queue-name queue*/,
                    consumerTag,
                    false /*no-local no-local*/,
                    false /*no-ack no-ack*/,
                    false /*bit exclusive*/,
                    false /*no-wait no-wait*/,
                    {} /*table arguments*/
                );
                handle.once('basic.consume-ok', function (channel, method, data) {
                    console.log('consuming from queue');
                    console.log(data);
                    handle.on('basic.deliver', function (channel, method, data) {
                        console.log('incoming message');
                        console.log(data);
                        handle.once('content', function (channel, className, properties, content) {
                            console.log('got a message:');
                            console.log(content.toString());
                            if (content.toString().indexOf("END_MESSAGE") > -1){
                                handle.basic.cancel(consumerTag, false);
                                handle.once('basic.cancel-ok', function(channel, method, data) {
                                    console.log("consumer cancelled successfully");
                                    seriesCallback();
                                });
                                res.json("END_MESSAGE");

                            }
                            else{
                                console.log('acking');
                                handle.basic.ack(1, data['delivery-tag']);
                            }
                            console.log('with properties:');
                            console.log(properties);
                            seriesCallback();
                        });
                    });
                });
            } ], function() {
                console.log('all done');
            });
        });


因此,我要尝试做的只是在检测到“ END_MESSAGE”类型的消息时停止使用者。我从上面的代码中得到的是以下错误:

    events.js:72
    throw er; // Unhandled 'error' event
          ^
    TypeError: value is out of bounds
        at TypeError (<anonymous>)
        at checkInt (buffer.js:705:11)
        at Buffer.writeUInt16BE (buffer.js:730:5)
    req.on('error', function(e){
        console.log('problem with request: ' + e.message);


任何意见和建议将不胜感激。谢谢!

最佳答案

似乎basic.cancel()的工作方式至少在今天有所不同。我在文档中发现此方法实际上吃了一个回调,如下所示:

handle.basic.cancel(consumerTag, function(err) {
  if (err) throw err;
});


https://www.npmjs.com/package/bramqp-wrapper-搜索basic.cancel

重要!回调不能保证您不会从队列中收到更多消息。似乎仅是为了接受取消而触发该命令,而不是在队列服务器实际上将其摘要时触发。

09-27 01:26