我正在使用CppKafka对Kafka使用者进行编程。我希望当我的使用者启动时,它只会轮询新到达的消息(即消息在使用者启动时间之后到达),而不是位于使用者偏移处的消息。

// Construct the configuration
Configuration config = {
    { "metadata.broker.list", "127.0.0.1:9092" },
    { "group.id", "1" },
    // Disable auto commit
    { "enable.auto.commit", false },
    // Set offest to latest to receive latest message when consumer start working
    { "auto.offset.reset", "latest" },
};

// Create the consumer
Consumer consumer(config);

consumer.set_assignment_callback([](TopicPartitionList& partitions) {
    cout << "Got assigned: " << partitions << endl;
});

// Print the revoked partitions on revocation
consumer.set_revocation_callback([](const TopicPartitionList& partitions) {
    cout << "Got revoked: " << partitions << endl;
});


string topic_name = "test_topic";
// Subscribe to the topic
consumer.subscribe({ topic_name });


据我了解,设置为auto.offset.reset的配置latest仅在使用者开始读取分配的分区时没有提交的偏移量时才有效。因此,我的猜测是我应该在不提交的情况下调用consumer.poll(),但是这感觉很错误,而且我一路上会破坏一些东西。谁能向我展示实现我的要求的正确方法?

最佳答案

如果将“ enable.auto.commit”设置为false,并且您没有在代码中提交偏移量,则每次使用者启动时,如果auto.offset.reset = earestest,它将从主题中的第一条消息开始消耗消息。

auto.offset.reset的默认值为“ latest”,这意味着由于缺少有效的偏移量,使用者将开始从最新记录(使用者开始运行后写入的记录)中读取数据。

根据上面的问题,看来auto.offset.reset = latest应该可以解决您的问题。

但是,如果您需要基于实时的偏移量,则需要在使用者中应用时间过滤器。这意味着从主题获取消息,将偏移时间与消息有效负载中的某些自定义字段或消息的meta属性(ConsumerRecord.timestamp())进行比较,并进行相应的进一步处理。

另请参阅此答案Retrieve Timestamp based data from Kafka

关于c++ - 卡夫卡消费者调查最新消息,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/50388690/

10-10 21:36
查看更多