我正在使用Kafka Streams从集群中的主题中读取内容,我想根据其JSON内容过滤消息,即:
JSON格式:
{
"id": 1
"timestamp": "2019-04-21 12:53:18",
"priority": "Medium",
"name": "Sample Text",
"metadata": [{
"metric_1": "0",
"metric_2": "1",
"metric_3": "2"
}]
}
我想从输入主题中读取消息(我们称其为“ input-topic”),对其进行过滤(假设我只希望优先级为“低”的消息),然后将其汇总,然后将其发送到另一个主题(“已过滤的主题”)
除了创建流本身及其配置之外,我没有太多的代码。我在想必须配置一些关于Serdes的东西,但是我不确定如何配置。我也尝试过使用JSON解串器,但是无法正常工作。
首先,这有可能吗?如果是这样,正确的做法是什么?
最佳答案
您可以从主题构建流。
StreamsBuilder builder = new StreamsBuilder();
// key value type here is both String for me and update based on cases
KStream<String, String> source = builder.stream("input-topic");
source.filter(new Predicate<String, String>() {
@Override
public boolean test(String s, String s2) {
// your filter logic here and s and s2 are key/value from topic
// In your case, s2 should be type of your json Java object
return false;
}
}).groupBy(new KeyValueMapper<String, String, String>() {
@Override
public String apply(String key, String value) {
// your group by logic
return null;
}
}).count().toStream().to("new topic");