问题描述
重新启动Kafka Connect S3接收器任务后,它从主题开始一直重新开始写入,并写入了较旧记录的重复副本。换句话说,Kafka Connect似乎失去了位置。
因此,我想像一下,Kafka Connect将当前的偏移位置信息存储在内部的 connect-中,补偿
主题。该主题为空,我认为是问题的一部分。
其他两个内部主题 connect-statuses
和 connect-configs
不为空。 connect-statuses
有52个条目。 connect-configs
有6个条目;对于已配置的两个接收器连接器,每个三个: connector-< name>
, task-< name> -0
, commit-< name>
。
我手动创建了内部Kafka Connect主题,如运行此文件之前的文档:
/ usr / bin / kafka-topics --create --zookeeper localhost:2181 --topic connect -configs --replication-factor 3 --partitions 1 --config cleanup.policy = compact
/ usr / bin / kafka-topics --create --zookeeper localhost:2181 --topic connect-offsets --replication -factor 3 --partitions 50 --config cleanup.policy = compact
/ usr / bin / kafka-topics --create --zookeeper localhost:2181 --topic connect-statuses --replication-factor 3-分区10 --config cleanup.policy = compact
我可以验证 connect-offsets
主题似乎已正确创建:
/ usr / bin / kafka-topics- -zookeeper本地主机:2181-描述--topic连接- offsets
主题:connect-offsets PartitionCount:50 ReplicationFactor:3 Configs:cleanup.policy = compact
主题:connect-offsets分区:0 Leader:1复制品:1,2,3 Isr:1,2 ,3
主题:connect-offsets分区:1 Leader:2复制副本:2,3,1 Isr:2,3,1
主题:connect-offsets分区:2 Leader:3复制副本:3, 1,2 Isr:3,1,2
< snip>
这是一个三服务器集群,运行Confluent Platform v3.2.1,运行Kafka 10.2.1。 p>
连接偏移量
应该为空吗?重新启动任务时,为什么Kafka Connect在主题开头会重新启动?
更新:响应Randall Hauch的回答。
- 有关源连接器偏移量与接收器连接器偏移量的说明解释了
连接偏移量
。谢谢您的解释! - 我绝对不会更改连接器名称。
- 如果连接器关闭了约5天,然后又重新启动,是否在那里连接器偏移位置将到期并重置的任何原因是什么?我看到
__ consumer_offsets
具有cleanup.policy = compact
-
auto.offset.reset
仅在__ consumer_offsets
中没有位置时才生效,对吗?
我主要使用系统默认值。我的接收器配置JSON如下。我正在使用一个非常简单的自定义分区程序在Avro日期时间字段而不是墙上时钟时间进行分区。该功能似乎已在Confluent v3.2.2中添加,因此我不需要为此功能定制的插件。我希望跳过Confluent v3.2.2,并在可用时直接转到v3.3.0。
{
name: my-s3-sink,
tasks.max:1,
topics: my-topic,
flush。 size:10000,
connector.class: io.confluent.connect.s3.S3SinkConnector,
storage.class: io.confluent.connect.s3。 storage.S3Storage,
format.class: io.confluent.connect.s3.format.avro.AvroFormat,
schema.generator.class: io.confluent.connect。 storage.hive.schema.TimeBasedSchemaGenerator,
partitioner.class: mycompany.partitioner.TimeFieldPartitioner,
s3.bucket.name: my-bucket,
s3.region:我们西部2,
partition.field.name:时间戳,
locale:我们,
时区: UTC,
path.format:'year'= YYYY /'month'= MM /'day'= dd /'hour'= HH ,
schema.compatibility:无,
key.converter: io.confluent.connect.avro.AvroConverter,
键。 converter.schema.registry.url: http:// localhost:8081,
value.converter: io.confluent.connect.avro.AvroConverter,
value.converter。 schema.registry.url: http:// localhost:8081
}
Kafka使用者的默认偏移保留期为24小时(1440分钟)。如果停止连接器,因此在24小时内未进行任何新提交,则偏移量将过期,并且在重新启动时将以新使用者的身份重新开始。您可以使用 offsets.retention.minutes
参数
__ consumer_offsets
主题上修改保留期限。 $ bAfter restarting a Kafka Connect S3 sink task, it restarted writing all the way from the beginning of the topic and wrote duplicate copies of older records. In other words, Kafka Connect seemed to lose its place.
So, I imagine that Kafka Connect stores current offset position information in the internal connect-offsets
topic. That topic is empty which I presume is part of the problem.
The other two internal topics connect-statuses
and connect-configs
are not empty. connect-statuses
has 52 entries. connect-configs
has 6 entries; three for each of two sink connectors I have configured: connector-<name>
, task-<name>-0
, commit-<name>
.
I manually created the internal Kafka Connect topics as specified in the docs before running this:
/usr/bin/kafka-topics --create --zookeeper localhost:2181 --topic connect-configs --replication-factor 3 --partitions 1 --config cleanup.policy=compact
/usr/bin/kafka-topics --create --zookeeper localhost:2181 --topic connect-offsets --replication-factor 3 --partitions 50 --config cleanup.policy=compact
/usr/bin/kafka-topics --create --zookeeper localhost:2181 --topic connect-statuses --replication-factor 3 --partitions 10 --config cleanup.policy=compact
I can verify that the connect-offsets
topic seems to be created correctly:
/usr/bin/kafka-topics --zookeeper localhost:2181 --describe --topic connect-offsets
Topic:connect-offsets PartitionCount:50 ReplicationFactor:3 Configs:cleanup.policy=compact
Topic: connect-offsets Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
Topic: connect-offsets Partition: 1 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1
Topic: connect-offsets Partition: 2 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2
<snip>
This is with a three server cluster running Confluent Platform v3.2.1 running Kafka 10.2.1.
Is connect-offsets
supposed to be empty? Why else would Kafka Connect restart at the beginning of the topic when restarting a task?
UPDATE: Response to Randall Hauch's answer.
- Explanation regarding source connector offsets vs sink connector offsets explains empty
connect-offsets
. Thanks for explanation! - I'm definitely not changing connector name.
- If the connector is down for ~five days and restarted afterwards, is there any reason that the connector offset position would expire and reset? I see
__consumer_offsets
hascleanup.policy=compact
auto.offset.reset
should only take affect if there is no position in__consumer_offsets
, right?
I'm using mostly system defaults. My Sink config JSON is as follows. I'm using a very simple custom partitioner to partition on an Avro datetime field rather than wallclock time. That feature seems to have been added in Confluent v3.2.2 so that I won't need a custom plugin for that functionality. I'm hoping to skip Confluent v3.2.2 and go straight to v3.3.0 when it is available.
{
"name": "my-s3-sink",
"tasks.max": 1,
"topics": "my-topic",
"flush.size": 10000,
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"format.class": "io.confluent.connect.s3.format.avro.AvroFormat",
"schema.generator.class": "io.confluent.connect.storage.hive.schema.TimeBasedSchemaGenerator",
"partitioner.class": "mycompany.partitioner.TimeFieldPartitioner",
"s3.bucket.name": "my-bucket",
"s3.region": "us-west-2",
"partition.field.name": "timestamp",
"locale": "us",
"timezone": "UTC",
"path.format": "'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH",
"schema.compatibility": "NONE",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://localhost:8081",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://localhost:8081"
}
The default offset retention period for Kafka consumers is 24 hours (1440 minutes). If you stop a connector and therefore make no new commits for longer than 24 hours your offsets will expire and you will start over as a new consumer when you restart. You can modify the retention period on the __consumer_offsets
topic using the offsets.retention.minutes
parameter
这篇关于重新启动Kafka Connect S3 Sink任务丢失位置,完全重写所有内容的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!