我编写了一个小型测试节点应用程序,该应用程序循环运行并将消息添加到队列(azure存储队列)中,如下所示:

var queueService = azure.createQueueService();
var queueName = 'taskqueue';

// other stuff like check if created

// loop called after queue is confirmed

for (i=0;i<1000;i++){
  queueService.createMessage(queueName, "Hello world!", null, messageCreated);
}

// messageCreated does nothing at the moment, just logs to console


我正在尝试重写该代码,以处理说一百万个使用异步控制并行运行的工作程序功能的create。这是一项学习活动,比什么都重要。

https://github.com/caolan/async#queue

这是异步队列的基本设置,对于需要更改的内容我有些茫然。我认为以下内容不会起作用:

var q = async.queue(function (task, callback) {
   queueService.createMessage(queueName, task.msg, null, messageCreated);
    callback();
}, 100);


// assign a callback.  Called when all the queues have been processed
q.drain = function() {
    console.log('all items have been processed');
}

// add some items to the queue
for(i=0;i<1000000;i++) {
  q.push({msg: 'Hello World'}, function (err) {
    console.log('finished processing foo');
  });
    console.log('pushing: ' + i);
}


我不太了解如何将所有这些与异步一起使用。

最佳答案

这是您的错误:

var q = async.queue(function (task, callback) {
   queueService.createMessage(queueName, task.msg, null, messageCreated);
    callback();
}, 100);


您正在做的是在队列上创建一条消息,然后立即传递继续(调用回调)。您要做的是在传递给createMessage的回调中传递延续性:

var q = async.queue(function (task, callback) {
   queueService.createMessage(queueName, task.msg, null, function(error, serverQueue, serverResponse) {
       callback(error, serverQueue, serverResponse);
       messageCreated(error, serverQueue, serverResponse);
   });
}, 100);


现在,每个任务将在实际创建任务后报告完成情况。

编辑:createMessage回调的更新的接口。

关于node.js - 处理消息队列并使用异步,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/11140588/

10-09 08:19
查看更多