我正在尝试使用以下设置配置Kinesis Analytics应用程序:


输入流是Kinesis Firehose,它采用字符串化JSON值
SQL是简单的传递(稍后需要更复杂,但是为了进行测试,它只是通过传递数据)
输出流是第二个Kinesis Firehose,它将记录传送到S3存储桶


接下来,我将使用Hive + JSONSERDE导入S3存储桶的内容,它期望每个JSON记录都位于其自己的行中。 Firehose输出仅附加所有JSON记录,从而破坏JSONSERDE。

我可以将AWS Lambda数据格式化程序附加到输出流,但这似乎很昂贵。我想要的是使用换行符拆分每个记录。

如果我没有Google Analytics(分析)应用,那么我会将换行符附加到每个Firehose记录中。在应用程序的SQL中无法执行此操作似乎很奇怪:



CREATE OR REPLACE STREAM "STREAM_OUT" (
  a VARCHAR(4),
  b VARCHAR(4),
  c VARCHAR(4)
);
CREATE OR REPLACE PUMP "STREAM_PUMP" AS
  INSERT INTO "STREAM_OUT"
    SELECT STREAM
      "a",
      "b",
      "c"
    FROM "SOURCE_SQL_STREAM_001";


添加Lambda数据格式化程序是最好的答案吗?我真的很想避免这种情况。

最佳答案

我有类似的要求将新行添加到firehose生成的文件中,在我们的应用程序中,firehose是通过API Gateway调用的。

这是在“集成请求”部分下的“主体映射模板”中指定的。

API Gateway中的以下命令将为kinesis firehose记录生成新行。

方法1:



    #set($payload="$input.path('$.Record.Data')
")
        {
            "DeliveryStreamName": "$input.path('$.DeliveryStreamName')",
            "Record": {
            "Data": "$util.base64Encode($payload)"
        }
        }


如果您通过API网关调用firehose,这将非常有效。

感谢和问候,
Srivignesh KN

关于amazon-kinesis - 我可以自动将换行符添加到AWS Firehose记录吗?,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/44246532/

10-10 14:47