我正在使用Kafka Streams从集群中的主题中读取内容,我想根据其JSON内容过滤消息,即:

JSON格式:

{
   "id": 1
   "timestamp": "2019-04-21 12:53:18",
   "priority": "Medium",
   "name": "Sample Text",
   "metadata": [{
      "metric_1": "0",
      "metric_2": "1",
      "metric_3": "2"
   }]
}


我想从输入主题中读取消息(我们称其为“ input-topic”),对其进行过滤(假设我只希望优先级为“低”的消息),然后将其汇总,然后将其发送到另一个主题(“已过滤的主题”)

除了创建流本身及其配置之外,我没有太多的代码。我在想必须配置一些关于Serdes的东西,但是我不确定如何配置。我也尝试过使用JSON解串器,但是无法正常工作。

首先,这有可能吗?如果是这样,正确的做法是什么?

最佳答案

您可以从主题构建流。

    StreamsBuilder builder = new StreamsBuilder();

    // key value type here is both String for me and update based on cases
    KStream<String, String> source = builder.stream("input-topic");

    source.filter(new Predicate<String, String>() {
        @Override
        public boolean test(String s, String s2) {
            // your filter logic here and s and s2 are key/value from topic
            // In your case, s2 should be type of your json Java object
            return false;
        }
    }).groupBy(new KeyValueMapper<String, String, String>() {
        @Override
        public String apply(String key, String value) {
            // your group by logic
            return null;
        }
    }).count().toStream().to("new topic");

10-07 19:27
查看更多