我有一个将消息排队到RabbitMQ的功能,如下所示:
var amqp = require('amqplib/callback_api');
var _queueURL = 'amqp://127.0.0.1';
var _toBlahBlahQueueName = 'blahblah';
var self = module.exports = {
queueMessage: function (msgObj, callback) {
try {
amqp.connect(_queueURL, function (err, connection) {
if (err) {
callback(err);
}
connection.createChannel(function (err, channel) {
if (err) {
callback(err);
}
channel.assertQueue(_toBlahBlahQueueName, { durable: true }, function (err, _ok) {
if (err) {
callback(err);
}
var msg = new Buffer(JSON.stringify(msgObj));
channel.sendToQueue(_toBlahBlahQueueName, msg, { persistent: true }, function (err, ok) {
if (err) {
console.log(err);
callback(err);
}
console.log('published', ok);
channel.connection.close();
callback(null, { message: 'queued' });
});
});
});
});
}
catch (e) {
console.log(e.stack);
callback(e);
}
}
};
我正在用大约250K长度的消息调用queueMessage函数。
sendToQueue调用每次都挂起。它只是坐在那里而没有返回错误。但是,该消息似乎已排队!
服务器日志中有错误消息:客户端意外关闭了TCP连接
谢谢你的帮助!
最佳答案
amqplib不支持sendToQueue
或publish
的回调。
the documentation显示这不是一个选项:
要解决此问题,您需要像对待同步消息一样调用sendToQueue
。
如果要立即退出该应用程序,则必须等待几毫秒后再退出。否则将导致无法发送消息。
这是一个示例,说明如何更改代码以这种方式工作:
channel.sendToQueue(_toBlahBlahQueueName, msg, { persistent: true });
setTimeout(function () {
channel.connection.close();
callback(null, { message: 'queued' });
}, 500);