本文介绍了如何发现和过滤掉Kafka Streams中的重复记录的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

假设您有一个带有空键的主题,其值为

Say you have a topic with a null key and the value is

{id:1, name:Chris, age:99}

假设您想按姓名计算人数.您可以执行以下操作:

Lets say you want to count up the number of people by name. You would do something like below:

nameStream.groupBy((key,value) -> value.getName())
           .count();

现在让我们说它是有效的,你可以获得重复的记录,你可以根据 id 判断它是重复的.

Now lets says it is valid you can get duplicate records and you can tell it is a duplicate based on the id.

例如:

{id:1, name:Chris, age:99}
{id:1, name:Chris, age:xx}

应该导致计数为 1 和

Should result in a count of one and

   {id:1, name:Chris, age:99}
   {id:2, name:Chris, age:xx}

应该导致计数为 2.

你将如何实现这一目标?我认为 reduce 会起作用,但误解了它的工作原理.

How would you accomplish this? I thought reduce would work, but misunderstood how that works.

推荐答案

您可以使用多个属性进行分组.通过串联创建自定义键并作为键传递:

You can use more than one attribute for grouping. Create a custom key by concatenation and pass as key:

KTable<String,String> modifiedTable =  nameStream.groupBy((key,value) -> value.getName()+value.getId()).reduce((aggVal,newval) -> aggVal);

上面的 KTable 将给出具有给定名称和 ID 的任何记录的更新状态.所以对于 {id:1,name:Chris.....},它将在 KTable 中只有一条记录:

Above KTable will give the updated status for any record with the given name and ID. So for {id:1,name:Chris.....}, it will have only one record in KTable:

虽然在下面的情况下,两条记录都会出现:

While in below case, both records will be present:

<Chris1,  {id:1, name:Chris, age:99}> 
<Chris2,   {id:2, name:Chris, age:xx}> 

现在要使用 name 属性进行计数操作.因此,将键更改为name 并重新组合表并执行count().

Now you want to use the name attribute for count operation. So Change the key to name and re-group the table and perform count().

KTable countTable = modifiedTable.groupBy((k,v)-> KeyValue.pair(v.getName(), v)).count();

这里 count() 将在 KTable 之上执行.KTable 是任何给定 ID 的更新视图.
因此,对于下面的输入, modifiedTable 将一次有 1 条记录作为键Chris1"的更新值,您将获得 count=>1

Here count() will be performed on top of KTable. KTable is the updated view for any given ID.
Hence for below input, modifiedTable will have 1 record at a time as updated value for key "Chris1" and you will get count=>1

<Chris,1> // Here key will be Chris1

以下输入将导致 **count=>2

Below input will result **count=>2

{id:1, name:Chris, age:99}  // Here key was be Chris1
{id:2, name:Chris, age:xx}  // Here key was be Chris2

这篇关于如何发现和过滤掉Kafka Streams中的重复记录的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

10-30 03:50