我是新加入Lambda
&SQS
的,我正在尝试创建一个发送电子邮件的函数,在SQS
服务中排队,但是我不知道如何调用包含send
+delete queue
方法的流程函数。
下面我粘贴代码:
'use strict';
const AWS = require('aws-sdk');
const SQS = new AWS.SQS({ apiVersion: '2012-11-05' });
const Lambda = new AWS.Lambda({ apiVersion: '2015-03-31' });
const ses = new AWS.SES({ accessKeyId: "xxxxxxxx", secretAccesskey: "xxxxxxx/xxxxxxxxx" });
const s3 = new AWS.S3({ apiVersion: "2006-03-01", region: "us-west-2" });
const QUEUE_URL = 'https://sqs.us-west-2.amazonaws.com/xxxxxxx/queue';
const PROCESS_MESSAGE = 'process-message';
function getPieceOfMail (path, mapObj, replace) {
return new Promise(function (resolve, reject) {
s3.getObject({
Bucket: "myBucket",
Key: "myKey/" + path
}, function (err, data) {
if (err) {
reject(err);
} else {
if (replace === true) {
var re = new RegExp(Object.keys(mapObj).join("|"), "gi");
data = data.Body.toString().replace(re, function (matched) {
return mapObj[matched.toLowerCase()];
});
resolve(data);
} else {
resolve(data.Body.toString());
}
}
});
});
}
function getRegisterSource (nickname, activate_link) {
var activate_link, pieces;
pieces = [
getPieceOfMail("starts/start.html", {}, false),
getPieceOfMail("headers/a.html", {}, false),
getPieceOfMail("footers/a.html", {}, false),
];
return Promise.all(pieces)
.then(function (data) {
return (data[0] + data[1] + data[2]);
})
.catch(function (err) {
return err;
});
}
function sendEmail (email, data) {
return new Promise(function (resolve, reject) {
var params = {
Destination: { ToAddresses: [email] },
Message: {
Body: {
Html: {
Data: data
},
Text: {
Data: data
}
},
Subject: {
Data: "myData"
}
},
Source: "someone <noreply@mydomain.co>",
};
ses.sendEmail(params, function (err, data) {
if (err) {
reject(err);
} else {
resolve(data);
}
});
});
}
function process(message, callback) {
console.log(message);
// process message
getRegisterSource(event['nickname'], event['user_id'])
.then(function (data) {
return sendEmail(event["email"], data);
})
.catch(function (err) {
console.log("==ERROR==");
callback(err, err);
})
.finally(function () {});
// delete message
const params = {
QueueUrl: QUEUE_URL,
ReceiptHandle: message.ReceiptHandle,
};
SQS.deleteMessage(params, (err) => callback(err, message));
}
function invokePoller(functionName, message) {
const payload = {
operation: PROCESS_MESSAGE,
message,
};
const params = {
FunctionName: functionName,
InvocationType: 'Event',
Payload: new Buffer(JSON.stringify(payload)),
};
return new Promise((resolve, reject) => {
Lambda.invoke(params, (err) => (err ? reject(err) : resolve()));
});
}
function poll(functionName, callback) {
const params = {
QueueUrl: QUEUE_URL,
MaxNumberOfMessages: 10,
VisibilityTimeout: 10,
};
// batch request messages
SQS.receiveMessage(params, (err, data) => {
if (err) {
return callback(err);
}
// for each message, reinvoke the function
const promises = data.Messages.map((message) => invokePoller(functionName, message));
// complete when all invocations have been made
Promise.all(promises).then(() => {
const result = `Messages received: ${data.Messages.length}`;
callback(null, result);
});
});
}
exports.handler = (event, context, callback) => {
try {
if (event.operation === PROCESS_MESSAGE) {
console.log("Invoked by poller");
process(event.message, callback);
} else {
console.log("invoked by schedule");
poll(context.functionName, callback);
}
} catch (err) {
callback(err);
}
};
有人能给我点启发吗?
谢谢你的建议。
更新
在误解了这么多之后,我决定开始研究由
AWS
提供的轮询sqs的example是如何工作的。在那里我发现我缺少一些基本的sqs权限,但是现在通过添加正确的策略解决了这个问题:
{
"Version": "2012-10-17",
"Statement": [{
"Effect": "Allow",
"Action": [
"lambda:InvokeFunction"
],
"Resource": ["*"]
}]
}
这允许
Lambda.invoke()
调用process()
。当调用
process(message, callback)
时,如果iconsole.log(message);
,则似乎没有消息,尽管该行正在清除队列SQS.deleteMessage(params, (err) => callback(err, message));
我试图将我的sendMail功能与一个
SQS
服务结合起来,这样我就只需要将每个消息push
发送到queue
。 最佳答案
这是一个常见的要求,其中aws ses在发送电子邮件时有自己的限制。如果违反了这些限制,ses帐户将对自身进行沙盒处理。似乎您已经使用正确的访问凭据解决了问题。
此代码包含python3 lambda代码,可用于处理这样的情况:lambda使用线程从sqs轮询,并使用ses发送电子邮件,而不超过给定的限制。
链接到Github Project。
您还可以考虑在sqs中使用新特性,当新的消息放在sqs中时,sqs能够调用lambdas。但请注意不要超过aws帐户区域内lambda函数的最大数量。(见this document)