我将 AWS Kinesis Firehose 与自定义数据转换结合使用。 Lambda 用 Python 3.6 编写并返回如下所示的字符串:
{
"records": [
{
"recordId": "...",
"result": "Ok",
"data": "..."
},
{
"recordId": "...",
"result": "Ok",
"data": "..."
},
{
"recordId": "...",
"result": "Ok",
"data": "..."
}
]
}
这个 Lambda 非常满意,并且在将它们返回到 Firehose 之前记录类似于上面的输出。但是,Firehose 的 S3 日志随后显示错误:
Invalid output structure: Please check your function and make sure the processed records contain valid result status of Dropped, Ok, or ProcessingFailed.
看看这个在 JS 和 Java 中传播到网络上的例子,我不清楚我需要做些什么不同的事情;我很困惑。
最佳答案
如果你的数据是一个json对象,你可以尝试以下
import base64
import json
def lambda_handler(event, context):
output = []
for record in event['records']:
# your own business logic.
json_object = {...}
output_record = {
'recordId': record['recordId'],
'result': 'Ok',
'data': base64.b64encode(json.dumps(json_object).encode('utf-8')).decode('utf-8')
}
output.append(output_record)
return {'records': output}
base64.b64encode 函数只适用于 b'xxx' 字符串,而 output_record 的 'data' 属性需要一个普通的 'xxx' 字符串。