问题描述
假设您有一个带有空键的主题,其值为
Say you have a topic with a null key and the value is
{id:1, name:Chris, age:99}
假设您想按姓名计算人数.您可以执行以下操作:
Lets say you want to count up the number of people by name. You would do something like below:
nameStream.groupBy((key,value) -> value.getName())
.count();
现在让我们说它是有效的,你可以获得重复的记录,你可以根据 id 判断它是重复的.
Now lets says it is valid you can get duplicate records and you can tell it is a duplicate based on the id.
例如:
{id:1, name:Chris, age:99}
{id:1, name:Chris, age:xx}
应该导致计数为 1 和
Should result in a count of one and
{id:1, name:Chris, age:99}
{id:2, name:Chris, age:xx}
应该导致计数为 2.
你将如何实现这一目标?我认为 reduce 会起作用,但误解了它的工作原理.
How would you accomplish this? I thought reduce would work, but misunderstood how that works.
推荐答案
您可以使用多个属性进行分组.通过串联创建自定义键并作为键传递:
You can use more than one attribute for grouping. Create a custom key by concatenation and pass as key:
KTable<String,String> modifiedTable = nameStream.groupBy((key,value) -> value.getName()+value.getId()).reduce((aggVal,newval) -> aggVal);
上面的 KTable 将给出具有给定名称和 ID 的任何记录的更新状态.所以对于 {id:1,name:Chris.....}
,它将在 KTable 中只有一条记录:
Above KTable will give the updated status for any record with the given name and ID. So for {id:1,name:Chris.....}
, it will have only one record in KTable:
虽然在下面的情况下,两条记录都会出现:
While in below case, both records will be present:
<Chris1, {id:1, name:Chris, age:99}>
<Chris2, {id:2, name:Chris, age:xx}>
现在要使用 name 属性进行计数操作.因此,将键更改为name 并重新组合表并执行count().
Now you want to use the name attribute for count operation. So Change the key to name and re-group the table and perform count().
KTable countTable = modifiedTable.groupBy((k,v)-> KeyValue.pair(v.getName(), v)).count();
这里 count() 将在 KTable 之上执行.KTable 是任何给定 ID 的更新视图.
因此,对于下面的输入, modifiedTable 将一次有 1 条记录作为键Chris1"的更新值,您将获得 count=>1
Here count() will be performed on top of KTable. KTable is the updated view for any given ID.
Hence for below input, modifiedTable will have 1 record at a time as updated value for key "Chris1" and you will get count=>1
<Chris,1> // Here key will be Chris1
以下输入将导致 **count=>2
Below input will result **count=>2
{id:1, name:Chris, age:99} // Here key was be Chris1
{id:2, name:Chris, age:xx} // Here key was be Chris2
这篇关于如何发现和过滤掉Kafka Streams中的重复记录的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!