本文介绍了如何使用rest api设置kafka连接auto.offset.reset的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我创建了一个将数据转换为其他存储的接收器 kafka 连接;当使用 kafka connect rest api 创建新连接器时,我想将 auto.offset.reset 设置为 latest;我在配置中设置了 consumer.auto.offset.reset: latest;

I have create a sink kafka connect that convert data to other storage; I want to set auto.offset.reset as latest when new connector is created with kafka connect rest api; I have set consumer.auto.offset.reset: latest in configs;

json{"name": "test_v14",配置":{"name": "test_v14","consumer.auto.offset.reset": "最新的","connector.class": "...",...}}

但是当任务开始时,kafka消费者仍然从最早开始轮询记录;将 auto.offset.reset 设置为最新的任何其他方法也是如此;

But when task started, kafka consumer still poll records from earliest; So is any other ways to set auto.offset.reset as latest;

推荐答案

Kafka 2.3 之前

consumer.auto.offset.reset 需要在 connect-distributed.properties 文件(Worker)中设置.

consumer.auto.offset.reset needs to be set in the connect-distributed.properties file (the Worker).

它不能应用于任何特定的连接器,除非该连接器类显式地创建和加载自己的消费者对象,该对象读取该属性.

It cannot be applied to any particular Connector unless that connector class is explicitly creating and loading its own Consumer objects that read in that property.

这篇关于如何使用rest api设置kafka连接auto.offset.reset的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

10-20 19:23