Kinesis Data Firehose 可以调用您的 Lambda 函数转换传入的源数据并将转换后的数据传输给目标。当您创建传输流时,可以启用 Kinesis Data Firehose 数据转换。

数据转换流

启用 Kinesis Data Firehose 数据转换后,Kinesis Data Firehose 默认情况下将缓冲最多 3 MB 的传入数据。(要调整缓冲大小,请将 ProcessingConfiguration API 与名为 BufferSizeInMBs 的 ProcessorParameter 一起使用。)然后,Kinesis Data Firehose 将使用 AWS Lambda 同步调用模式,对每个缓冲的批处理异步调用指定的 Lambda 函数。转换后的数据将从 Lambda 发送到 Kinesis Data Firehose。然后,当达到指定的目标缓冲大小或缓冲间隔时(以先达到者为准),Kinesis Data Firehose 会将这些数据发送到目的地。

重要

对于请求和响应,Lambda 同步调用模式的负载大小限制为 6 MB。确保用于向函数发送请求的缓冲大小小于或等于 6 MB,并且函数返回的响应也不超过 6 MB。

数据转换和状态模型

所有通过 Lambda 转换的记录均包含以下参数,否则 Kinesis Data Firehose 会拒绝它们并将其视为数据转换失败。

recordId

记录 ID 在调用期间从 Kinesis Data Firehose 传递到 Lambda。转换后的记录必须包含相同记录 ID。原始记录的 ID 和转换记录的 ID 之间如果有不匹配,将被视为数据转换失败。

result

记录的数据转换的状态。可能的值为:Ok(记录成功转换)、Dropped(处理逻辑故意丢弃记录)和 ProcessingFailed(记录无法转换)。如果记录的状态为 Ok 或 Dropped,Kinesis Data Firehose 会认为它已成功处理。否则,Kinesis Data Firehose 会认为它未被成功处理。

数据

转换后的数据负载 (使用 base64 编码之后)。

Lambda 蓝图

Kinesis Data Firehose 提供以下 Lambda 蓝图,可供您用来为数据转换创建 Lambda 函数。

  • General Firehose Processing (一般 Firehose 处理) — 包含上一部分中描述的数据转换和状态模型。可将此蓝图用于任何自定义转换逻辑。

  • Syslog to JSON (Syslog 到 JSON) — 解析 Syslog 行并将其转换为 JSON 对象(使用预定义的 JSON 字段名称)。

  • Syslog to CSV (Syslog 到 CSV) — 解析 Syslog 行并将其转换为 CSV 格式。

  • Kinesis Data Firehose Process Record Streams as source (Kinesis Data Firehose 将记录流作为源进行处理) — 访问输入中的 Kinesis Data Streams 记录,并返回记录及处理状态。

  • Kinesis Data Firehose CloudWatch Logs Processor (Kinesis Data Firehose CloudWatch Logs 处理器) —从 CloudWatch Logs 订阅筛选器发送的记录中解析和提取各个日志事件。

Lambda 蓝图仅提供 Node.js 和 Python 语言版本。

Lambda 调用的持续时间

Kinesis Data Firehose 支持的 Lambda 调用时间长达 5 分钟。如果您的 Lambda 函数需要超过 5 分钟才能完成,您将收到以下错误:Firehose 在调用 AWS Lambda 时遇到超时错误。支持的最大函数超时为 5 分钟。

02-12 03:53