可以说,我有三个Kafka主题,其中填充了表示在不同聚合中发生的业务事件的事件(事件源应用程序)。这些事件允许构建具有以下属性的聚合:

  • 用户:usedId,名称
  • 应用程序的
  • 模块:moduleId,名称
  • 用户对应用程序模块的授予:grantId,userId,moduleId,作用域

  • 现在,我想创建一个包含用户和产品名称(而不是ID)的所有赠款的流。
    我想这样做:
  • 通过按userId分组事件为用户创建一个KTable。 KTable以userId为键。没关系。
  • 通过按productId分组事件为产品创建一个KTable。 KTable以productId为键。没关系。
  • 从Grants流创建流,并加入两个KTable。
    不好问题在于联接似乎只能在主键上进行。但是流的密钥是Grant的技术标识符,而user和product表的密钥不是(它们与Grant无关)。

  • 那么如何进行呢?

    最佳答案

    好吧,目前在Kafka Streams中尚不直接支持外键联接。
    有一个开放的KIP:https://issues.apache.org/jira/browse/KAFKA-3705相同。

    目前,可以有一种解决方法来解决此问题。您可以使用 KStream-KTable Join

    首先,将用户流和模块流聚合到各自的具有聚合事件集合的KTable中。

    KTable<String,Object> UserTable = userStream.groupBy(<UserId>).aggregate(<... build collection/latest event>) ;
    KTable<String,Object> ModuleTable = moduleStream.groupBy(<ModuleId>).aggregate(<... build collection/latest event>);
    

    现在,选择moduleID作为 Grants 流中的键。
    KStream<String,Object> grantRekeyedStream = grantStream.selectKey(<moduleId>);
    

    它将密钥更改为 moduleId 。现在,您可以使用 ModuleTable 执行流表联接。它将右侧的所有匹配记录连接到左侧的键中。结果流将以 ModuleId 作为键,将授予模块数据合并到一个流中。
    KStream<String,Object> grantModuleStream = grantRekeyedStream.join(moduleTable);
    

    下一步是加入 userTable 。因此,您需要使用 userId 再次重新键入 grantModuleTable
    KStream<String,Object> grantModuleRekeyedStream = grantModuleTable.selectKey(<Select UserId>);
    

    现在 grantModuleRekeyedStream 可以与 userTable一起使用一起加入KStream-KTable Join
     KStream<String,Object> grantModuleUserStream = grantModuleRekeyedStream .join(userTable);
    

    以上Stream将以用户ID为键,并包含该用户的所有授权和模块详细信息。

    10-07 16:12