我正在使用aws-sdk Node 模块,以(据我所知)批准的方式来轮询消息。

基本上可以归纳为:

        sqs.receiveMessage({
            QueueUrl: queueUrl,
            MaxNumberOfMessages: 10,
            WaitTimeSeconds: 20
        }, function(err, data) {
            if (err) {
                logger.fatal('Error on Message Recieve');
                logger.fatal(err);
            } else {
                // all good
                if (undefined === data.Messages) {
                    logger.info('No Messages Object');
                } else if (data.Messages.length > 0) {
                    logger.info('Messages Count: ' + data.Messages.length);

                    var delete_batch = new Array();
                    for (var x=0;x<data.Messages.length;x++) {
                        // process
                        receiveMessage(data.Messages[x]);

                        // flag to delete

                        var pck = new Array();
                        pck['Id'] = data.Messages[x].MessageId;
                        pck['ReceiptHandle'] = data.Messages[x].ReceiptHandle;

                        delete_batch.push(pck);
                    }

                    if (delete_batch.length > 0) {
                        logger.info('Calling Delete');
                        sqs.deleteMessageBatch({
                            Entries: delete_batch,
                            QueueUrl: queueUrl
                        }, function(err, data) {
                            if (err) {
                                logger.fatal('Failed to delete messages');
                                logger.fatal(err);
                            } else {
                                logger.debug('Deleted recieved ok');
                            }
                        });
                    }
                } else {
                    logger.info('No Messages Count');
                }
            }
        });
receiveMessage是我的“如果我有足够的收集的消息,则对收集的消息进行处理”功能

有时,我的脚本停顿了,因为我根本没有得到任何针对亚马逊的响应,例如说队列中没有要使用的消息,而不是点击WaitTimeSeconds并发送“无消息对象”,而是使用回调。被称为。

(我正在写这本书给Amazon Weirdness)

我要问的是检测和处理此问题的最佳方法是什么,因为我有一些代码可以阻止并发调用receiveMessage。

建议的答案在这里:Nodejs sqs queue processor还具有防止并发消息请求查询的代码(当然,它一次只能获取一条消息)

我确实把整个东西都包裹了
var running = false;
runMonitorJob = setInterval(function() {
    if (running) {
    } else {
        running = true;
        // call SQS.receive
    }
}, 500);

(在删除循环后运行= false(不在回调中))

我的解决方案是
watchdogTimeout = setTimeout(function() {
    running = false;
}, 30000);

但是可以肯定的是,这会留下一堆 float 的sq.receive潜伏着,因此随着时间的流逝会有更多的存储空间吗?

(此作业一直运行,而我在星期五一直运行,在星期六上午停滞并挂起,直到今天早上手动重新启动该作业)

编辑:我已经看到了挂起〜5分钟然后突然收到消息但等待时间为20秒的情况,它应该在20秒后抛出“无消息”。因此,约10分钟的WatchDog可能更实用(取决于其余的业务逻辑)

编辑:是,长轮询已配置为队列端。

编辑:这是aws-sdk和NodeJS v4.4.4(最新)的v2.3.9。

最佳答案

我一直在追寻这个(或类似的)问题几天,这是我注意到的:

  • 尽管仅在120秒
  • 之后,receiveMessage调用最终仍会返回
  • 并发调用receiveMessage由AWS.SDK库序列化,因此并行进行多个调用无效。
  • receiveMessage回调没有错误-实际上,经过120秒后,它可能包含消息。

  • 关于这个还能做什么?发生这种情况可能有多种原因,其中某些/很多不一定是固定的。答案是运行多个服务,每个服务都会调用receiveMessage并在消息出现时对其进行处理-SQS支持此功能。在任何时候,这些服务之一都可能会延迟120秒,但是其他服务应该能够继续正常运行。

    我的特别问题是我有一些关键的单例服务,这些服务无法承受120秒的停机时间。为此,我将研究1)使用HTTP而不是SQS将消息推送到我的服务中,或2)在每个单例周围生成从属进程以从SQS中获取消息并将它们推送到服务中。

    关于node.js - 带有AWS开发工具包的Amazon SQS ReceiveMessage停顿,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/37111431/

    10-11 06:47