Kinesis Data Firehose 可以调用您的 Lambda 函数转换传入的源数据并将转换后的数据传输给目标。当您创建传输流时,可以启用 Kinesis Data Firehose 数据转换。
数据转换流
启用 Kinesis Data Firehose 数据转换后,Kinesis Data Firehose 默认情况下将缓冲最多 3 MB 的传入数据。(要调整缓冲大小,请将 ProcessingConfiguration
API 与名为 BufferSizeInMBs
的 ProcessorParameter
一起使用。)然后,Kinesis Data Firehose 将使用 AWS Lambda 同步调用模式,对每个缓冲的批处理异步调用指定的 Lambda 函数。转换后的数据将从 Lambda 发送到 Kinesis Data Firehose。然后,当达到指定的目标缓冲大小或缓冲间隔时(以先达到者为准),Kinesis Data Firehose 会将这些数据发送到目的地。
重要
对于请求和响应,Lambda 同步调用模式的负载大小限制为 6 MB。确保用于向函数发送请求的缓冲大小小于或等于 6 MB,并且函数返回的响应也不超过 6 MB。
数据转换和状态模型
所有通过 Lambda 转换的记录均包含以下参数,否则 Kinesis Data Firehose 会拒绝它们并将其视为数据转换失败。
- recordId
记录 ID 在调用期间从 Kinesis Data Firehose 传递到 Lambda。转换后的记录必须包含相同记录 ID。原始记录的 ID 和转换记录的 ID 之间如果有不匹配,将被视为数据转换失败。
- result
记录的数据转换的状态。可能的值为:
Ok
(记录成功转换)、Dropped
(处理逻辑故意丢弃记录)和ProcessingFailed
(记录无法转换)。如果记录的状态为Ok
或Dropped
,Kinesis Data Firehose 会认为它已成功处理。否则,Kinesis Data Firehose 会认为它未被成功处理。- 数据
转换后的数据负载 (使用 base64 编码之后)。
Lambda 蓝图
Kinesis Data Firehose 提供以下 Lambda 蓝图,可供您用来为数据转换创建 Lambda 函数。
General Firehose Processing (一般 Firehose 处理) — 包含上一部分中描述的数据转换和状态模型。可将此蓝图用于任何自定义转换逻辑。
Syslog to JSON (Syslog 到 JSON) — 解析 Syslog 行并将其转换为 JSON 对象(使用预定义的 JSON 字段名称)。
Syslog to CSV (Syslog 到 CSV) — 解析 Syslog 行并将其转换为 CSV 格式。
Kinesis Data Firehose Process Record Streams as source (Kinesis Data Firehose 将记录流作为源进行处理) — 访问输入中的 Kinesis Data Streams 记录,并返回记录及处理状态。
Kinesis Data Firehose CloudWatch Logs Processor (Kinesis Data Firehose CloudWatch Logs 处理器) —从 CloudWatch Logs 订阅筛选器发送的记录中解析和提取各个日志事件。
Lambda 蓝图仅提供 Node.js 和 Python 语言版本。
Lambda 调用的持续时间
Kinesis Data Firehose 支持的 Lambda 调用时间长达 5 分钟。如果您的 Lambda 函数需要超过 5 分钟才能完成,您将收到以下错误:Firehose 在调用 AWS Lambda 时遇到超时错误。支持的最大函数超时为 5 分钟。