运行此代码1-2天后,我在AWS EC2上收到此错误

错误

<--- Last few GCs --->
st[10805:0x41cdff0]  7130379 ms: Mark-sweep 33.2 (78.7) -> 21.1 (75.8) MB, 13.8 / 0.1 ms  (+ 23.1 ms in 23 steps since start of marking, biggest step 4.3 ms, walltime since start of marking 160 ms) final$

<--- JS stacktrace --->
Cannot get stack trace in GC.
FATAL ERROR: Scavenger: promoting marked
Allocation failed - process out of memory
1: node::Abort() [node]
2: 0x12b288c [node]
3: v8::Utils::ReportOOMFailure(char const*, bool) [node]
4: v8::internal::V8::FatalProcessOutOfMemory(char const*, bool) [node]
5: 0xa96bfb [node]
6: void v8::internal::ScavengingVisitor<(v8::internal::MarksHandling)0,
(v8::internal::PromotionMode)0, (v8::internal::LoggingAndProfiling)1>::EvacuateObject<(v8::internal::ScavengingVisitor<(v8::intern$
 7: v8::internal::Scavenger::ScavengeObject(v8::internal::HeapObject**, v8::internal::HeapObject*) [node]
 8: v8::internal::Heap::IteratePromotedObjectPointers(v8::internal::HeapObject*, unsigned char*, unsigned char*, bool, void (*)(v8::internal::HeapObject**, v8::internal::HeapObject*)) [node]
 9: void v8::internal::BodyDescriptorBase::IterateBodyImpl<v8::internal::ObjectVisitor>(v8::internal::HeapObject*, int, int, v8::internal::ObjectVisitor*) [node]
10: void v8::internal::BodyDescriptorApply<v8::internal::CallIterateBody, void, v8::internal::HeapObject*, int, v8::internal::ObjectVisitor*>(v8::internal::InstanceType, v8::internal::HeapObject*, int, v$
11: v8::internal::Heap::DoScavenge(v8::internal::ObjectVisitor*, unsigned char*, v8::internal::PromotionMode) [node]
12: v8::internal::Heap::Scavenge() [node]
13: v8::internal::Heap::PerformGarbageCollection(v8::internal::GarbageCollector, v8::GCCallbackFlags) [node]
14: v8::internal::Heap::CollectGarbage(v8::internal::GarbageCollector, v8::internal::GarbageCollectionReason, char const*, v8::GCCallbackFlags) [node]
15: v8::internal::Factory::NewRawTwoByteString(int, v8::internal::PretenureFlag) [node]
16: v8::internal::Factory::NewStringFromUtf8(v8::internal::Vector<char const>, v8::internal::PretenureFlag) [node]
17: v8::String::NewFromUtf8(v8::Isolate*, char const*, v8::String::NewStringType, int) [node]
18: node::StringBytes::Encode(v8::Isolate*, char const*, unsigned long, node::encoding) [node]
19: void node::Buffer::StringSlice<(node::encoding)1>(v8::FunctionCallbackInfo<v8::Value> const&) [node]
20: 0x33c699f18dcf


我的主要功能是一个异步的while循环,看起来像这样,这是一个表达路线的控制器功能

function controller(cb) {
  return new Promise((resolve, reject) => {
    let killed = false;
    (async() => {
      let isEmpty = false;
      while (!killed && !isEmpty) {
        const code = await processBatch();
        if (code === EMPTY_QUEUE) {
          isEmpty = true;
          console.log('ss');
          resolve(false);
        }
      }
    })();
    cb()
      .then((state) => killed = state);
  });
}


在这里,processBatch()可能需要大约10秒钟才能解决承诺

注意:processBatch将永远不会返回EMPTY_QUEUE,并且不会通过回调将kill设置为true

考虑到这一点,有人可以告诉我为什么此控制器功能在一段时间后会消耗这么多的内存吗,我是否正在做一些阻止节点进行垃圾收集数据的操作?

-更新-

这是调用控制器功能的路由器代码,并确保一次不超过一个控制器在工作

const query = require('../controllers/fetchContent').query;
const controller = require('../../storage/controllers/index').controller;

let isFetching = false;
let killed = false;

function killSwitch () {
 return new Promise((resolve, reject) => {
     setInterval(() => {
        if(killed) {
            resolve(killed);
        }
    }, 10000);
 })
}
module.exports = (app) => {
 app.get('/api', (req, res) => {
    res.setHeader('Content-Type', 'application/json');
    res.json({"statusCode" : 200, "body" : "Hey"})
});
 app.post('/', (req, res) => {
    if(!killed) {
        if (!isFetching) {
            isFetching = true;
            controller(killSwitch)
                    .then((response) => {
                        isFetching = response.isFetching;
                    });
            res.send({
                success: true,
                message: 'Okay I will extract send the contents to the database'
            })
        } else {
            res.send({
                success: true,
                message: 'Already Fetching'
            })
        }
    } else {
        res.send({
            success: false,
            message: 'In killed State, start to continue'
        })
    }
});
 app.post('/kill', (req, res) => {
    killed = true;
    isFetching = false;
    res.send(200, 'Okay I have stopped the fetcher process')
});
 app.post('/alive', (req, res) => {
    killed = false;
    res.send({
        success: true,
        message: 'Now New req to / will be entertained'
    })
 });
  app.post('/api/fetch', query);
};


-更新2-

这是processBatch()函数,它的作用是从Amazon SQS获取数据,并在处理完数据后将其发送到另一个Amazon SQS,并通过Amazon SNS通知订阅者。

async function processBatch() {
let data = await getDataFromQueue();// Wait for the promise returned after messages are retrieved from the Queue.
let listOfReceipt = [];
if (q.length() > 50 ) {
  // if queue length is more than 50 then wait for queue to process previous data ( done in order to put a max cap on queue size )
    await sleep(400);
    console.log(q.length());
    return CLEAN_EXIT;
}
//Also get the ReceiptHandles for those messages. (To be used for deletion later on)

if (!data.Messages || !data.Messages.length) {
    pushSNS(null, true);
    pushDelete(null, true);
    return EMPTY_QUEUE;
}
try {
    for (let i = 0; i < data.Messages.length; i++) {
        data.Messages[i].Body = JSON.parse(data.Messages[i].Body);
        const URL = data.Messages[i].Body.url;
        const identifier = data.Messages[i].Body.identifier;
        listOfReceipt.push(data.Messages[i].ReceiptHandle);// get the ReceiptHandle out of the message.
        q.push(URL, async (err, html) => {
            if (err) {
                console.log(err);
            } else {
                await sendDataToQueue({url: URL, content: html, identifier});
                pushDelete(data.Messages[i].ReceiptHandle);
                pushSNS();
            }
        });
    }
} catch (e) {
    console.log(e);
    pushSNS(null, true);
    pushDelete(null, true);
    return CLEAN_EXIT;
// simply ignore any error and delete that msg
 }
 return CLEAN_EXIT;
}


在这里q是Async.queue,它的辅助函数即extractContent角色是获取所提供URL的内容。

此模块有帮助功能。

const q = async.queue((URL, cb) => {
extractContent(URL, array)
        .then((html) => {
            cb(null,html);
        })
        .catch((e) => {
            cb(e);
        })
 }, concurrency);


function internalQueue(cb) {
    let arr = [];
    return function (message, flag) {
       arr.push(message);
       if(arr.length >= 10 || flag) {
          arr = [];
          cb();
       }
    }
}

function sleep (delay) {
 return new Promise ((resolve, reject) => {
    setTimeout(() => resolve(), delay)
  })
}
// this is done in order to do things in a batch, this reduces cost
let pushSNS = internalQueue(sendDataToSNS);
let pushDelete = internalQueue(deleteDataFromSQS);

最佳答案

首先,您的controller函数将返回一个Promise,该Promise永远不会根据您的声明来解决,即processBatch将永远不会返回EMPTY_QUEUE。我假设您将返回的Promises存储在某个地方,并且每个消耗内存。

同样,每次调用controller函数时,它都会创建一个新循环,该循环无限调用processBatch。因此,如果controller是快速路由的控制器功能,则每次有人请求该路由时,您都会创建一个无限调用processBatch的新循环。我敢打赌这不是理想的行为,它肯定会阻塞大量内存。

由于新的详细信息而更新:

目前,如果某人将在/ kill上过帐,然后在'/ alive'上过帐而没有延迟,那么她将能够在/上过帐,并在controller中启动另一个循环,因为processBatch可能需要大约10秒钟才能解决承诺。这样,如果有人对/ kill-> / alive-> /进行多次重复的POST,她将有效地DoS您的应用程序。可能就是这样。

另一个更新

此代码q.push(URL, async (err, html) => {启动一个新查询,并附加一个在查询完成后应调用的回调。 q计数器在调用回调之前减少。但是回调是异步的(async),它还会执行另一个查询await sendDataToQueue({url: URL, content: html, identifier});

如您所见,如果sendDataToQueue的执行速度比q慢,则回调会累积并消耗内存。

10-06 03:07