我是新加入Lambda&SQS的,我正在尝试创建一个发送电子邮件的函数,在SQS服务中排队,但是我不知道如何调用包含send+delete queue方法的流程函数。
下面我粘贴代码:

'use strict';

const AWS = require('aws-sdk');

const SQS = new AWS.SQS({ apiVersion: '2012-11-05' });
const Lambda = new AWS.Lambda({ apiVersion: '2015-03-31' });
const ses = new AWS.SES({ accessKeyId: "xxxxxxxx", secretAccesskey: "xxxxxxx/xxxxxxxxx" });
const s3 = new AWS.S3({ apiVersion: "2006-03-01", region: "us-west-2" });


const QUEUE_URL = 'https://sqs.us-west-2.amazonaws.com/xxxxxxx/queue';
const PROCESS_MESSAGE = 'process-message';

function getPieceOfMail (path, mapObj, replace) {
  return new Promise(function (resolve, reject) {
    s3.getObject({
      Bucket: "myBucket",
      Key: "myKey/" + path
    }, function (err, data) {
      if (err) {
        reject(err);
      } else {
        if (replace === true) {
            var re = new RegExp(Object.keys(mapObj).join("|"), "gi");
            data = data.Body.toString().replace(re, function (matched) {
              return mapObj[matched.toLowerCase()];
            });
            resolve(data);
        } else {
            resolve(data.Body.toString());
        }
      }
    });
  });
}

function getRegisterSource (nickname, activate_link) {
  var activate_link, pieces;

  pieces = [
    getPieceOfMail("starts/start.html", {}, false),
    getPieceOfMail("headers/a.html", {}, false),
    getPieceOfMail("footers/a.html", {}, false),
  ];

  return Promise.all(pieces)
    .then(function (data) {
      return (data[0] + data[1] + data[2]);
    })
    .catch(function (err) {
      return err;
    });
}

function sendEmail (email, data) {
    return new Promise(function (resolve, reject) {
        var params = {
            Destination: { ToAddresses: [email] },
            Message: {
              Body: {
                Html: {
                  Data: data
                },
                Text: {
                  Data: data
                }
              },
              Subject: {
                Data: "myData"
              }
            },
            Source: "someone <noreply@mydomain.co>",
        };

        ses.sendEmail(params, function (err, data) {
            if (err) {
                reject(err);
            } else {
                resolve(data);
            }
        });
    });
}

function process(message, callback) {
    console.log(message);

    // process message
    getRegisterSource(event['nickname'], event['user_id'])
      .then(function (data) {
        return sendEmail(event["email"], data);
      })
      .catch(function (err) {
        console.log("==ERROR==");
        callback(err, err);
      })
      .finally(function () {});

    // delete message
    const params = {
        QueueUrl: QUEUE_URL,
        ReceiptHandle: message.ReceiptHandle,
    };
    SQS.deleteMessage(params, (err) => callback(err, message));
}

function invokePoller(functionName, message) {
    const payload = {
        operation: PROCESS_MESSAGE,
        message,
    };
    const params = {
        FunctionName: functionName,
        InvocationType: 'Event',
        Payload: new Buffer(JSON.stringify(payload)),
    };
    return new Promise((resolve, reject) => {
        Lambda.invoke(params, (err) => (err ? reject(err) : resolve()));
    });
}

function poll(functionName, callback) {
    const params = {
        QueueUrl: QUEUE_URL,
        MaxNumberOfMessages: 10,
        VisibilityTimeout: 10,
    };
    // batch request messages
    SQS.receiveMessage(params, (err, data) => {
        if (err) {
            return callback(err);
        }
        // for each message, reinvoke the function
        const promises = data.Messages.map((message) => invokePoller(functionName, message));
        // complete when all invocations have been made
        Promise.all(promises).then(() => {
            const result = `Messages received: ${data.Messages.length}`;
            callback(null, result);
        });
    });
}

exports.handler = (event, context, callback) => {
    try {
        if (event.operation === PROCESS_MESSAGE) {
            console.log("Invoked by poller");
            process(event.message, callback);
        } else {
            console.log("invoked by schedule");
            poll(context.functionName, callback);
        }
    } catch (err) {
        callback(err);
    }
};

有人能给我点启发吗?
谢谢你的建议。
更新
在误解了这么多之后,我决定开始研究由AWS提供的轮询sqs的example是如何工作的。
在那里我发现我缺少一些基本的sqs权限,但是现在通过添加正确的策略解决了这个问题:
{
    "Version": "2012-10-17",
    "Statement": [{
        "Effect": "Allow",
        "Action": [
            "lambda:InvokeFunction"
        ],
        "Resource": ["*"]
    }]
}

这允许Lambda.invoke()调用process()
当调用process(message, callback)时,如果iconsole.log(message);,则似乎没有消息,尽管该行正在清除队列SQS.deleteMessage(params, (err) => callback(err, message));
我试图将我的sendMail功能与一个SQS服务结合起来,这样我就只需要将每个消息push发送到queue

最佳答案

这是一个常见的要求,其中aws ses在发送电子邮件时有自己的限制。如果违反了这些限制,ses帐户将对自身进行沙盒处理。似乎您已经使用正确的访问凭据解决了问题。
此代码包含python3 lambda代码,可用于处理这样的情况:lambda使用线程从sqs轮询,并使用ses发送电子邮件,而不超过给定的限制。
链接到Github Project
您还可以考虑在sqs中使用新特性,当新的消息放在sqs中时,sqs能够调用lambdas。但请注意不要超过aws帐户区域内lambda函数的最大数量。(见this document

07-24 09:38
查看更多