我目前正在向aws kinesis流发送一系列xml消息,我已经在不同的项目中使用了它,因此我对此很有信心。然后,我编写了一个lambda来处理从运动学流到运动学firehose的事件:

import os
import boto3
import base64

firehose = boto3.client('firehose')


def lambda_handler(event, context):
    deliveryStreamName = os.environ['FIREHOSE_STREAM_NAME']

    # Send record directly to firehose
    for record in event['Records']:
        data = record['kinesis']['data']

        response = firehose.put_record(
            DeliveryStreamName=deliveryStreamName,
            Record={'Data': data}
        )
        print(response)

我将kinesis流设置为lamdba触发器,并将批处理大小设置为1,并将起始位置设置为LATEST。

对于kinesis firehose,我具有以下配置:
Data transformation*: Disabled
Source record backup*: Disabled
S3 buffer size (MB)*: 10
S3 buffer interval (sec)*: 60
S3 Compression: UNCOMPRESSED
S3 Encryption: No Encryption
Status: ACTIVE
Error logging: Enabled

我发送了162个事件,并从s3中读取了这些事件,而我最多设法使它达到160个,通常更少。我什至试图等待几个小时,以防重试发生奇怪的事情。

任何人都有使用kinesis-> lamdba-> firehose的经验,并且看到丢失数据的问题吗?

最佳答案

从我这里看到的情况来看,当您将数据发布到Kinesis Stream(不是FireHose)时,很可能丢失了项目。

由于在写入FireHose时使用的是 put_record ,因此它将引发异常,并且在这种情况下将重试lambda。 (检查该级别上是否存在故障是有意义的)。

因此,考虑到我可能认为记录在到达Kinesis流之前就已经丢失了。
如果使用 put_records 方法将项目发送到Kinesis流,这不能保证所有记录都将被发送到流(由于超出了写入吞吐量或内部错误),某些记录可能无法发送。在那种情况下,记录的失败子集应该由您的代码重新发送(这是Java example,很抱歉,我找不到Python)。

关于amazon-web-services - 使用AWS kinesis流,lambda和firehose时,有人经历过丢失数据的经历吗?,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/44909875/

10-11 09:10