我正在使用AWS开发工具包从将数据发布到Kinesis流的Java应用程序写入数据。使用以下代码一次可批量处理10条记录。

// Convert to JSON object, and then to bytes...
                ObjectWriter ow = new ObjectMapper().writer().withDefaultPrettyPrinter();
                String json = ow.writeValueAsString(transaction);

                // Add byte array to PutRecordsRequestEntry
                PutRecordsRequestEntry record = new PutRecordsRequestEntry();
                record.setPartitionKey(String.valueOf(java.util.UUID.randomUUID()));
                record.setData(ByteBuffer.wrap(json.getBytes()));

                // Add to list...
                batch.add(record);

                // Check and send batches
                if(counter>=batchLimit){

                    logger.info("Sending batch of " + batchLimit + " rows.");

                    putRecordsRequest.setRecords(batch);
                    PutRecordsResult result = amazonKinesisClient.putRecords(putRecordsRequest);
                    batch = new ArrayList<>();
                    counter=0;

                }else{
                    counter++;
                }


然后,我有一个nodejs lambda函数,该函数在Kinesis上收到的每个事务上都会触发,其想法是编写从Kinesis传入的事务,并将其放入数据流中,以保存到S3。

    var AWS = require('aws-sdk');
var firehose = new AWS.Firehose();

exports.handler = function(event, context) {

    console.log(event);

    var params = {
        DeliveryStreamName: "transaction-postings",
        Record: {
            Data:  decodeURIComponent(event)
        }
    };
    firehose.putRecord(params, function(err, data) {
        if (err) console.log(err, err.stack); // an error occurred
        else    {
            console.log(data);           // successful response
        }

        context.done();
    });
};


但是,当查看S3上的数据时,我看到的只是以下内容,而不是我所期望的JSON对象列表...

[object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object]


谁能指出我将Kinesis中的数据作为JSON对象流式传输到S3所缺少的东西吗?

最佳答案

Data:  decodeURIComponent(event)


您需要序列化事件,因为Lambda会自动反序列化参数。即:

Data: JSON.stringify(decodeURIComponent(event))

10-08 08:16
查看更多