我正在尝试使用以下设置配置Kinesis Analytics应用程序:
输入流是Kinesis Firehose,它采用字符串化JSON值
SQL是简单的传递(稍后需要更复杂,但是为了进行测试,它只是通过传递数据)
输出流是第二个Kinesis Firehose,它将记录传送到S3存储桶
接下来,我将使用Hive + JSONSERDE导入S3存储桶的内容,它期望每个JSON记录都位于其自己的行中。 Firehose输出仅附加所有JSON记录,从而破坏JSONSERDE。
我可以将AWS Lambda数据格式化程序附加到输出流,但这似乎很昂贵。我想要的是使用换行符拆分每个记录。
如果我没有Google Analytics(分析)应用,那么我会将换行符附加到每个Firehose记录中。在应用程序的SQL中无法执行此操作似乎很奇怪:
CREATE OR REPLACE STREAM "STREAM_OUT" (
a VARCHAR(4),
b VARCHAR(4),
c VARCHAR(4)
);
CREATE OR REPLACE PUMP "STREAM_PUMP" AS
INSERT INTO "STREAM_OUT"
SELECT STREAM
"a",
"b",
"c"
FROM "SOURCE_SQL_STREAM_001";
添加Lambda数据格式化程序是最好的答案吗?我真的很想避免这种情况。
最佳答案
我有类似的要求将新行添加到firehose生成的文件中,在我们的应用程序中,firehose是通过API Gateway调用的。
这是在“集成请求”部分下的“主体映射模板”中指定的。
API Gateway中的以下命令将为kinesis firehose记录生成新行。
方法1:
#set($payload="$input.path('$.Record.Data')
")
{
"DeliveryStreamName": "$input.path('$.DeliveryStreamName')",
"Record": {
"Data": "$util.base64Encode($payload)"
}
}
如果您通过API网关调用firehose,这将非常有效。
感谢和问候,
Srivignesh KN
关于amazon-kinesis - 我可以自动将换行符添加到AWS Firehose记录吗?,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/44246532/