我将 AWS Kinesis Firehose 与自定义数据转换结合使用。 Lambda 用 Python 3.6 编写并返回如下所示的字符串:

{
    "records": [
        {
            "recordId": "...",
            "result": "Ok",
            "data": "..."
        },
        {
            "recordId": "...",
            "result": "Ok",
            "data": "..."
        },
        {
            "recordId": "...",
            "result": "Ok",
            "data": "..."
        }
    ]
}

这个 Lambda 非常满意,并且在将它们返回到 Firehose 之前记录类似于上面的输出。但是,Firehose 的 S3 日志随后显示错误:
Invalid output structure: Please check your function and make sure the processed records contain valid result status of Dropped, Ok, or ProcessingFailed.
看看这个在 JS 和 Java 中传播到网络上的例子,我不清楚我需要做些什么不同的事情;我很困惑。

最佳答案

如果你的数据是一个json对象,你可以尝试以下

import base64
import json
def lambda_handler(event, context):
    output = []
    for record in event['records']:
        # your own business logic.
        json_object = {...}
        output_record = {
            'recordId': record['recordId'],
            'result': 'Ok',
            'data': base64.b64encode(json.dumps(json_object).encode('utf-8')).decode('utf-8')
        }
        output.append(output_record)
    return {'records': output}

base64.b64encode 函数只适用于 b'xxx' 字符串,而 output_record 的 'data' 属性需要一个普通的 'xxx' 字符串。

10-07 17:12