可以说,我有三个Kafka主题,其中填充了表示在不同聚合中发生的业务事件的事件(事件源应用程序)。这些事件允许构建具有以下属性的聚合:
现在,我想创建一个包含用户和产品名称(而不是ID)的所有赠款的流。
我想这样做:
不好问题在于联接似乎只能在主键上进行。但是流的密钥是Grant的技术标识符,而user和product表的密钥不是(它们与Grant无关)。
那么如何进行呢?
最佳答案
好吧,目前在Kafka Streams中尚不直接支持外键联接。
有一个开放的KIP:https://issues.apache.org/jira/browse/KAFKA-3705相同。
目前,可以有一种解决方法来解决此问题。您可以使用 KStream-KTable Join 。
首先,将用户流和模块流聚合到各自的具有聚合事件集合的KTable中。
KTable<String,Object> UserTable = userStream.groupBy(<UserId>).aggregate(<... build collection/latest event>) ;
KTable<String,Object> ModuleTable = moduleStream.groupBy(<ModuleId>).aggregate(<... build collection/latest event>);
现在,选择moduleID作为 Grants 流中的键。
KStream<String,Object> grantRekeyedStream = grantStream.selectKey(<moduleId>);
它将密钥更改为 moduleId 。现在,您可以使用 ModuleTable 执行流表联接。它将右侧的所有匹配记录连接到左侧的键中。结果流将以 ModuleId 作为键,将授予和模块数据合并到一个流中。
KStream<String,Object> grantModuleStream = grantRekeyedStream.join(moduleTable);
下一步是加入 userTable 。因此,您需要使用 userId 再次重新键入 grantModuleTable 。
KStream<String,Object> grantModuleRekeyedStream = grantModuleTable.selectKey(<Select UserId>);
现在 grantModuleRekeyedStream 可以与 userTable一起使用与一起加入KStream-KTable Join
KStream<String,Object> grantModuleUserStream = grantModuleRekeyedStream .join(userTable);
以上Stream将以用户ID为键,并包含该用户的所有授权和模块详细信息。