问题描述
我想将输入 AWS Kinesis 流扇出/链接/复制到 N 个新的 Kinesis 流,以便写入输入 Kinesis 的每条记录都将出现在 N 个流中的每一个中.
是否有 AWS 服务或开源解决方案?
如果有现成的解决方案,我宁愿不编写代码来执行此操作.部分:
您还可以编写相互独立运行的 SQL 查询.例如,您可以编写两个 SQL 语句来查询相同的应用程序内流,但将输出发送到不同的应用程序内流.
我设法实现了如下:
- 创建了三个流:input、output1、output2
- 创建了两个 Amazon Kinesis Analytics 应用程序:copy1、copy2
Amazon Kinesis Analytics SQL 应用程序如下所示:
创建或替换流DESTINATION_SQL_STREAM"(记录 VARCHAR(16));创建或替换泵COPY_PUMP1"AS插入DESTINATION_SQL_STREAM"从SOURCE_SQL_STREAM_001"中选择流日志";
此代码创建了一个 泵(将其视为一个连续的选择语句),它从 input
流中进行选择并输出到 output1
溪流.我创建了另一个相同的应用程序,输出到 output2
流.
为了测试,我将数据发送到 input
流:
#!/usr/bin/env python导入json,时间从 boto 进口 kinesiskinesis = kinesis.connect_to_region("us-west-2")我 = 0而真:数据={}数据['日志'] = '记录' + str(i)我 += 1打印数据kinesis.put_record("input", json.dumps(data), "key")时间.sleep(2)
我让它运行了一段时间,然后使用以下代码显示输出:
from boto import kinesiskinesis = kinesis.connect_to_region("us-west-2")iterator = kinesis.get_shard_iterator('output1', 'shardId-000000000000', 'TRIM_HORIZON')['ShardIterator']记录 = kinesis.get_records(iterator, 5)打印 [r['Data'] for r in records['Records']]
输出是:
[u'{"LOG":"Record 0"}', u'{"LOG":"Record 1"}', u'{"LOG":"Record 2"}', u'{"LOG":"记录 3"}', u'{"LOG":"记录 4"}']
我再次为 output2
运行它并显示相同的输出.
选项 2:使用 AWS Lambda
如果您要扇出多个流,更有效的方法可能是创建 AWS Lambda 函数:
- 由 Amazon Kinesis 流记录触发
- 将记录写入多个 Amazon Kinesis输出"流
您甚至可以让 Lambda 函数根据命名约定自行发现输出流(例如,任何名为 app-output-*
的流).
I'd like to fanout/chain/replicate an Input AWS Kinesis stream To N new Kinesis streams, So that each record written to the input Kinesis will appear in each of the N streams.
Is there an AWS service or an open source solution?
I prefer not to write code to do that if there's a ready-made solution. AWS Kinesis firehose is a no solution because it can't output to kinesis. Perhaps a AWS Lambda solution if that won't be too expensive to run?
There are two ways you could accomplish fan-out of an Amazon Kinesis stream:
- Use Amazon Kinesis Analytics to copy records to additional streams
- Trigger an AWS Lambda function to copy records to another stream
Option 1: Using Amazon Kinesis Analytics to fan-out
You can use Amazon Kinesis Analytics to generate a new stream from an existing stream.
From the Amazon Kinesis Analytics documentation:
Fan-out is mentioned in the Application Code section:
I managed to implement this as follows:
- Created three streams: input, output1, output2
- Created two Amazon Kinesis Analytics applications: copy1, copy2
The Amazon Kinesis Analytics SQL application looks like this:
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM"
(log VARCHAR(16));
CREATE OR REPLACE PUMP "COPY_PUMP1" AS
INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM "log" FROM "SOURCE_SQL_STREAM_001";
This code creates a pump (think of it as a continual select statement) that selects from the input
stream and outputs to the output1
stream. I created another identical application that outputs to the output2
stream.
To test, I sent data to the input
stream:
#!/usr/bin/env python
import json, time
from boto import kinesis
kinesis = kinesis.connect_to_region("us-west-2")
i = 0
while True:
data={}
data['log'] = 'Record ' + str(i)
i += 1
print data
kinesis.put_record("input", json.dumps(data), "key")
time.sleep(2)
I let it run for a while, then displayed the output using this code:
from boto import kinesis
kinesis = kinesis.connect_to_region("us-west-2")
iterator = kinesis.get_shard_iterator('output1', 'shardId-000000000000', 'TRIM_HORIZON')['ShardIterator']
records = kinesis.get_records(iterator, 5)
print [r['Data'] for r in records['Records']]
The output was:
[u'{"LOG":"Record 0"}', u'{"LOG":"Record 1"}', u'{"LOG":"Record 2"}', u'{"LOG":"Record 3"}', u'{"LOG":"Record 4"}']
I ran it again for output2
and the identical output was shown.
Option 2: Using AWS Lambda
If you are fanning-out to many streams, a more efficient method might be to create an AWS Lambda function:
- Triggered by Amazon Kinesis stream records
- That writes records to multiple Amazon Kinesis 'output' streams
You could even have the Lambda function self-discover the output streams based on a naming convention (eg any stream named app-output-*
).
这篇关于如何扇出 AWS kinesis 流?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!