我正在使用AWS开发工具包从将数据发布到Kinesis流的Java应用程序写入数据。使用以下代码一次可批量处理10条记录。
// Convert to JSON object, and then to bytes...
ObjectWriter ow = new ObjectMapper().writer().withDefaultPrettyPrinter();
String json = ow.writeValueAsString(transaction);
// Add byte array to PutRecordsRequestEntry
PutRecordsRequestEntry record = new PutRecordsRequestEntry();
record.setPartitionKey(String.valueOf(java.util.UUID.randomUUID()));
record.setData(ByteBuffer.wrap(json.getBytes()));
// Add to list...
batch.add(record);
// Check and send batches
if(counter>=batchLimit){
logger.info("Sending batch of " + batchLimit + " rows.");
putRecordsRequest.setRecords(batch);
PutRecordsResult result = amazonKinesisClient.putRecords(putRecordsRequest);
batch = new ArrayList<>();
counter=0;
}else{
counter++;
}
然后,我有一个nodejs lambda函数,该函数在Kinesis上收到的每个事务上都会触发,其想法是编写从Kinesis传入的事务,并将其放入数据流中,以保存到S3。
var AWS = require('aws-sdk');
var firehose = new AWS.Firehose();
exports.handler = function(event, context) {
console.log(event);
var params = {
DeliveryStreamName: "transaction-postings",
Record: {
Data: decodeURIComponent(event)
}
};
firehose.putRecord(params, function(err, data) {
if (err) console.log(err, err.stack); // an error occurred
else {
console.log(data); // successful response
}
context.done();
});
};
但是,当查看S3上的数据时,我看到的只是以下内容,而不是我所期望的JSON对象列表...
[object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object]
谁能指出我将Kinesis中的数据作为JSON对象流式传输到S3所缺少的东西吗?
最佳答案
Data: decodeURIComponent(event)
您需要序列化事件,因为Lambda会自动反序列化参数。即:
Data: JSON.stringify(decodeURIComponent(event))