问题描述
我想将输入的AWS Kinesis流成扇形散布/链接/复制到N个新的Kinesis流,以使写入输入Kinesis的每条记录都将出现在N条流中的每条中./p>
是否有 AWS服务或开源解决方案?
如果有现成的解决方案,我不希望编写代码来做到这一点. AWS Kinesis Firehose 是没有解决方案,因为它无法输出到运动学.如果运行起来不会太昂贵,也许是AWS Lambda解决方案?
有两种方法可以散布Amazon Kinesis流:
- 使用 Amazon Kinesis Analytics 将记录复制到其他流中
- 触发一个 AWS Lambda 函数以将记录复制到另一个流中
选项1:使用Amazon Kinesis Analytics进行扇出
您可以使用 Amazon Kinesis Analytics 生成来自现有流的新流.
来自 Amazon Kinesis Analytics文档 :
应用代码" 部分:
我设法实现了以下目的:
- 创建了三个流:input,output1,output2
- 创建了两个Amazon Kinesis Analytics应用程序:copy1,copy2
Amazon Kinesis Analytics SQL应用程序如下所示:
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM"
(log VARCHAR(16));
CREATE OR REPLACE PUMP "COPY_PUMP1" AS
INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM "log" FROM "SOURCE_SQL_STREAM_001";
此代码创建一个泵(将其视为连续的select语句),该泵从input
流中进行选择并输出到output1
流.我创建了另一个相同的应用程序,输出到output2
流.
为了测试,我将数据发送到了input
流:
#!/usr/bin/env python
import json, time
from boto import kinesis
kinesis = kinesis.connect_to_region("us-west-2")
i = 0
while True:
data={}
data['log'] = 'Record ' + str(i)
i += 1
print data
kinesis.put_record("input", json.dumps(data), "key")
time.sleep(2)
我让它运行一段时间,然后使用以下代码显示输出:
from boto import kinesis
kinesis = kinesis.connect_to_region("us-west-2")
iterator = kinesis.get_shard_iterator('output1', 'shardId-000000000000', 'TRIM_HORIZON')['ShardIterator']
records = kinesis.get_records(iterator, 5)
print [r['Data'] for r in records['Records']]
输出为:
[u'{"LOG":"Record 0"}', u'{"LOG":"Record 1"}', u'{"LOG":"Record 2"}', u'{"LOG":"Record 3"}', u'{"LOG":"Record 4"}']
我再次为output2
运行它,并显示了相同的输出.
选项2:使用AWS Lambda
如果您要扇出许多流,则更有效的方法可能是创建一个AWS Lambda函数:
- 由Amazon Kinesis流记录触发
- 将记录写入到多个Amazon Kinesis输出"流
您甚至可以让Lambda函数根据命名约定自动发现输出流(例如,任何名为app-output-*
的流).
I'd like to fanout/chain/replicate an Input AWS Kinesis stream To N new Kinesis streams, So that each record written to the input Kinesis will appear in each of the N streams.
Is there an AWS service or an open source solution?
I prefer not to write code to do that if there's a ready-made solution. AWS Kinesis firehose is a no solution because it can't output to kinesis. Perhaps a AWS Lambda solution if that won't be too expensive to run?
There are two ways you could accomplish fan-out of an Amazon Kinesis stream:
- Use Amazon Kinesis Analytics to copy records to additional streams
- Trigger an AWS Lambda function to copy records to another stream
Option 1: Using Amazon Kinesis Analytics to fan-out
You can use Amazon Kinesis Analytics to generate a new stream from an existing stream.
From the Amazon Kinesis Analytics documentation:
Fan-out is mentioned in the Application Code section:
I managed to implement this as follows:
- Created three streams: input, output1, output2
- Created two Amazon Kinesis Analytics applications: copy1, copy2
The Amazon Kinesis Analytics SQL application looks like this:
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM"
(log VARCHAR(16));
CREATE OR REPLACE PUMP "COPY_PUMP1" AS
INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM "log" FROM "SOURCE_SQL_STREAM_001";
This code creates a pump (think of it as a continual select statement) that selects from the input
stream and outputs to the output1
stream. I created another identical application that outputs to the output2
stream.
To test, I sent data to the input
stream:
#!/usr/bin/env python
import json, time
from boto import kinesis
kinesis = kinesis.connect_to_region("us-west-2")
i = 0
while True:
data={}
data['log'] = 'Record ' + str(i)
i += 1
print data
kinesis.put_record("input", json.dumps(data), "key")
time.sleep(2)
I let it run for a while, then displayed the output using this code:
from boto import kinesis
kinesis = kinesis.connect_to_region("us-west-2")
iterator = kinesis.get_shard_iterator('output1', 'shardId-000000000000', 'TRIM_HORIZON')['ShardIterator']
records = kinesis.get_records(iterator, 5)
print [r['Data'] for r in records['Records']]
The output was:
[u'{"LOG":"Record 0"}', u'{"LOG":"Record 1"}', u'{"LOG":"Record 2"}', u'{"LOG":"Record 3"}', u'{"LOG":"Record 4"}']
I ran it again for output2
and the identical output was shown.
Option 2: Using AWS Lambda
If you are fanning-out to many streams, a more efficient method might be to create an AWS Lambda function:
- Triggered by Amazon Kinesis stream records
- That writes records to multiple Amazon Kinesis 'output' streams
You could even have the Lambda function self-discover the output streams based on a naming convention (eg any stream named app-output-*
).
这篇关于如何扇出AWS kinesis流?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!