我在MS SQL中有一些表,这些表每秒更新一次,查询或多或少看起来像这样
SELECT G_ID,UpdateTime,ID,Name,T_NAME FROM TABLE1 AS table1 INNER JOIN TABLE2 AS table2 ON table1.IP=table2.ID
WHERE table2.UpdateTime >= ${lastUpdateTime} AND table2.G_ID > ${lastID}
假设选择内部联接查询结果有5条记录,如下所示。
如果查询是第一次运行
${lastUpdateTime}
并且${lastG_ID}
设置为0,它将返回以下5条记录。处理完记录后,查询将在max(G_ID)
表中存储max(UpdateTime)
(即5)和etl_stat
(即1512010479)。 G_ID UpdateTime ID Name T_NAME
-------------------------------------------------------------------
1 1512010470 12591225 DUMMY_DATA DUMMY_ID
2 1512096873 12591538 DUMMY_DATA DUMMY_ID
3 1512096875 12591539 DUMMY_DATA DUMMY_ID
4 1512010477 12591226 DUMMY_DATA DUMMY_ID
5 1512010479 12591227 DUMMY_DATA DUMMY_ID
如果在表中添加另外5条新记录,如下所示:
G_ID UpdateTime ID Name T_NAME
-------------------------------------------------------------------
1 1512010470 12591225 DUMMY_DATA DUMMY_ID
2 1512096873 12591538 DUMMY_DATA DUMMY_ID
3 1512096875 12591539 DUMMY_DATA DUMMY_ID
4 1512010477 12591226 DUMMY_DATA DUMMY_ID
5 1512010479 12591227 DUMMY_DATA DUMMY_ID
6 1512010480 12591230 DUMMY_DATA DUMMY_ID
7 1512010485 12591231 DUMMY_DATA DUMMY_ID
8 1512010490 12591232 DUMMY_DATA DUMMY_ID
9 1512010493 12591233 DUMMY_DATA DUMMY_ID
10 1512010500 12591234 DUMMY_DATA DUMMY_ID
该查询将首先从
max(G_ID)
中读取max(UpdateTime)
和etl_stat table
,并将按以下方式构成查询SELECT G_ID,UpdateTime,ID,Name,T_NAME FROM TABLE1 AS table1 INNER JOIN TABLE2 AS table2 ON table1.IP=table2.ID WHERE table2.UpdateTime >= 1512010479 AND table2.G_ID > 5
,以便查询仅返回5个增量记录,如下所示。G_ID UpdateTime ID Name T_NAME
-------------------------------------------------------------------
6 1512010480 12591230 DUMMY_DATA DUMMY_ID
7 1512010485 12591231 DUMMY_DATA DUMMY_ID
8 1512010490 12591232 DUMMY_DATA DUMMY_ID
9 1512010493 12591233 DUMMY_DATA DUMMY_ID
10 1512010500 12591234 DUMMY_DATA DUMMY_ID
因此,每次查询运行时,都应首先从
max(G_ID)
表中读取max(UpdateTime)
和etl_stat
,并框住选择内部联接查询,如上所示,并获得增量更改。使用SPARK SQL按原样进行架构
我已经实现了上述用例,如下所示:
1)Spark JDBC读取phoenix表以从
max(G_ID)
表中获取max(UpdateTime)
和etl_stat
。2)Spark JDBC框架选择内部联接查询,例如
SELECT G_ID,UpdateTime,ID,Name,T_NAME FROM TABLE1 AS table1 INNER JOIN TABLE2 AS table2 ON table1.IP=table2.ID WHERE table2.UpdateTime >= 1512010479 AND table2.G_ID > 5
3)Spark JDBC运行步骤2内部联接查询,从MS SQL Server读取增量消息,处理记录并将其插入HBase。
4)成功插入HBase后,Spark使用最新的
etl_stat
即10和G_ID
即1512010500更新UpdateTime
表。5)该作业已按计划每1分钟运行一次。
使用NIFI进行架构
我想将此用例移至Nifi,我想使用NiFi从MS SQL DB读取记录并将此记录发送到Kafka。
成功发布到Kafka后,NiFi会将G_ID和UpdateTime保存在数据库中。
一旦消息到达Kafka,Spark流将读取Kafka的消息,并使用现有业务逻辑将其保存到HBase。
在每次运行时,Nifi处理器应使用
max(G_ID)
和max(UpdateTime)
框架选择内部联接查询,以获取增量记录并发布到Kafka。我是Nifi / HDF的新手。我需要您的帮助和指导,以便使用Nifi / HDF实现此操作。如果您对此用例有更好的解决方案/体系结构,请提出建议。
抱歉,这么长的帖子。
最佳答案
您要描述的是JDBC Kafka Connect connector开箱即用的功能。设置配置文件,将其加载,然后就可以使用了。做完了Kafka Connect是Apache Kafka的一部分。无需额外的工具和技术。
您可能还需要考虑适当的变更数据捕获(CDC)。对于专有的RDBMS(Oracle,DB2,MS SQL等),您具有商业工具,例如GoldenGate,Attunity,DBVisit等。对于开源RDBMS(例如MySQL,PostgreSQL),您应该查看开源Debezium工具。
所有这些CDC工具都直接与Kafka集成。