我编写了一个小型测试节点应用程序,该应用程序循环运行并将消息添加到队列(azure存储队列)中,如下所示:
var queueService = azure.createQueueService();
var queueName = 'taskqueue';
// other stuff like check if created
// loop called after queue is confirmed
for (i=0;i<1000;i++){
queueService.createMessage(queueName, "Hello world!", null, messageCreated);
}
// messageCreated does nothing at the moment, just logs to console
我正在尝试重写该代码,以处理说一百万个使用异步控制并行运行的工作程序功能的create。这是一项学习活动,比什么都重要。
https://github.com/caolan/async#queue
这是异步队列的基本设置,对于需要更改的内容我有些茫然。我认为以下内容不会起作用:
var q = async.queue(function (task, callback) {
queueService.createMessage(queueName, task.msg, null, messageCreated);
callback();
}, 100);
// assign a callback. Called when all the queues have been processed
q.drain = function() {
console.log('all items have been processed');
}
// add some items to the queue
for(i=0;i<1000000;i++) {
q.push({msg: 'Hello World'}, function (err) {
console.log('finished processing foo');
});
console.log('pushing: ' + i);
}
我不太了解如何将所有这些与异步一起使用。
最佳答案
这是您的错误:
var q = async.queue(function (task, callback) {
queueService.createMessage(queueName, task.msg, null, messageCreated);
callback();
}, 100);
您正在做的是在队列上创建一条消息,然后立即传递继续(调用回调)。您要做的是在传递给
createMessage
的回调中传递延续性:var q = async.queue(function (task, callback) {
queueService.createMessage(queueName, task.msg, null, function(error, serverQueue, serverResponse) {
callback(error, serverQueue, serverResponse);
messageCreated(error, serverQueue, serverResponse);
});
}, 100);
现在,每个任务将在实际创建任务后报告完成情况。
编辑:
createMessage
回调的更新的接口。关于node.js - 处理消息队列并使用异步,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/11140588/