我已经使用kafka-topics.bat --zookeeper localhost:2181 --alter --topic test --config cleanup.policy=compact delete config min.cleanable.dirty.ratio=0.01 --config segment.ms=100 --config delete.retention.ms=100来压缩主题。
我已经用相同的键发送了2000条消息。当我使用这些消息时,我分别获得每条消息,而不是一条压缩的消息。

最佳答案

您所指的压缩设置与您使用Kafka客户端使用消息的方式无关。请检查official documentation here以获取更多详细信息。

如果要控制客户端使用消息的方式,则必须使用client config properties配置客户端。

考虑一种情况,您将主题池化了300毫秒,并且收到了一组消息(ConsumerRecords),然后可以对其进行迭代以独立处理每条消息。

while(true) {
   ConsumerRecords<String, JsonNode> records = kafkaConsumer.poll(300);
       if(records.count() > 0) {
          for(ConsumerRecord<String, JsonNode> record: records) {
             if(counter % 500 == 0) {
                 log.info("Record recovered, groupId: {}, topicName: {}, key: {}, value: {} , offset: {}",
                 this.groupId, this.topicNames, record.key(), record.value(), record.offset());
                    }
                }
            }
        }

10-02 00:15
查看更多