我正在编写一个节点aws lambda函数,它从我的数据库中查询大约5000个项目,并通过消息将它们发送到aws sqs队列中。
我的本地环境包括使用aws sam local运行lambda,并使用GoAWS模拟aws sqs。
我的羔羊骨架的一个例子是:
async run() {
try {
const accounts = await this.getAccountsFromDB();
const results = await this.writeAccountsIntoQueue(accounts);
return 'I\'ve written: ' + results + ' messages into SQS';
} catch (e) {
console.log('Caught error running job: ');
console.log(e);
return e;
}
}
我的
getAccountsFromDB()
函数没有性能问题,它几乎立即运行,返回5000个帐户的数组。My
writeAccountsIntoQueue
函数看起来像:async writeAccountsIntoQueue(accounts) {
// Extract the sqsClient and queueUrl from the class
const { sqsClient, queueUrl } = this;
try {
// Create array of functions to concurrenctly call later
let promises = accounts.map(acc => async () => await sqsClient.sendMessage({
QueueUrl: queueUrl,
MessageBody: JSON.stringify(acc),
DelaySeconds: 10,
})
);
// Invoke the functions concurrently, using helper function `eachLimit`
let writtenMessages = await eachLimit(promises, 3);
return writtenMessages;
} catch (e) {
console.log('Error writing accounts into queue');
console.log(e);
return e;
}
}
我的助手,
eachLimit
看起来像:async function eachLimit (funcs, limit) {
let rest = funcs.slice(limit);
await Promise.all(
funcs.slice(0, limit).map(
async (func) => {
await func();
while (rest.length) {
await rest.shift()();
}
}
)
);
}
据我所知,它应该将并发执行限制为
limit
。此外,我还包装了aws sdk sqs客户端,以返回一个带有
sendMessage
函数的对象,该函数如下所示:sendMessage(params) {
const { client } = this;
return new Promise((resolve, reject) => {
client.sendMessage(params, (err, data) => {
if (err) {
console.log('Error sending message');
console.log(err);
return reject(err);
}
return resolve(data);
});
});
}
所以没什么好奇怪的,只是答应回拨电话。
我把lambda设置为300秒后超时,lambda总是超时,如果不超时,它会突然结束,并错过一些应该继续的最后日志记录,这让我觉得它甚至可能在某个地方出现错误,悄无声息。当我检查sqs队列时,我丢失了大约1000个条目。
最佳答案
我可以在你的代码中看到一些问题,
第一:
let promises = accounts.map(acc => async () => await sqsClient.sendMessage({
QueueUrl: queueUrl,
MessageBody: JSON.stringify(acc),
DelaySeconds: 10,
})
);
你在滥用
async / await
。请记住await
将一直等到您的承诺得到解决,然后再继续下一个承诺,在这种情况下,每当您映射数组promises
并调用每个函数项时,它将在继续之前等待该函数包装的承诺,这是不好的。因为你只对收回承诺感兴趣,所以你可以直接这样做:const promises = accounts.map(acc => () => sqsClient.sendMessage({
QueueUrl: queueUrl,
MessageBody: JSON.stringify(acc),
DelaySeconds: 10,
})
);
现在,在第二部分中,您的
eachLimit
实现看起来是错误的,而且非常冗长,我已经在es6-promise-pool的帮助下对它进行了重构,以便为您处理并发限制:const PromisePool = require('es6-promise-pool')
function eachLimit(promiseFuncs, limit) {
const promiseProducer = function () {
while(promiseFuncs.length) {
const promiseFunc = promiseFuncs.shift();
return promiseFunc();
}
return null;
}
const pool = new PromisePool(promiseProducer, limit)
const poolPromise = pool.start();
return poolPromise;
}
最后,但非常重要的是,看看SQS Limits,sqs fifo有高达300次发送/秒。因为您正在处理5K项,所以您可能会将并发限制提高到5K/(300+50),大约15。50可以是任何正数,只是稍微偏离极限。
另外,考虑使用SendMessageBatch可以获得更大的吞吐量并达到3k发送/秒。
编辑
如上所述,使用
sendMessageBatch
吞吐量更好,因此我重构了您承诺支持sendMessageBatch
的代码映射:function chunkArray(myArray, chunk_size){
var index = 0;
var arrayLength = myArray.length;
var tempArray = [];
for (index = 0; index < arrayLength; index += chunk_size) {
myChunk = myArray.slice(index, index+chunk_size);
tempArray.push(myChunk);
}
return tempArray;
}
const groupedAccounts = chunkArray(accounts, 10);
const promiseFuncs = groupedAccounts.map(accountsGroup => {
const messages = accountsGroup.map((acc,i) => {
return {
Id: `pos_${i}`,
MessageBody: JSON.stringify(acc),
DelaySeconds: 10
}
});
return () => sqsClient.sendMessageBatch({
Entries: messages,
QueueUrl: queueUrl
})
});
然后您可以像往常一样调用
eachLimit
:const result = await eachLimit(promiseFuncs, 3);
不同的是,现在处理的每个承诺都会发送一批大小为n的消息(在上面的示例中为10)。