本文介绍了重新启动Kafka Connect S3 Sink任务丢失位置,完全重写所有内容的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

重新启动Kafka Connect S3接收器任务后,它从主题开始一直重新开始写入,并写入了较旧记录的重复副本。换句话说,Kafka Connect似乎失去了位置。



因此,我想像一下,Kafka Connect将当前的偏移位置信息存储在内部的 connect-中,补偿主题。该主题为空,我认为是问题的一部分。



其他两个内部主题 connect-statuses connect-configs 不为空。 connect-statuses 有52个条目。 connect-configs 有6个条目;对于已配置的两个接收器连接器,每个三个: connector-< name> task-< name> -0 commit-< name>



我手动创建了内部Kafka Connect主题,如运行此文件之前的文档:

  / usr / bin / kafka-topics --create --zookeeper localhost:2181 --topic connect -configs --replication-factor 3 --partitions 1 --config cleanup.policy = compact 
/ usr / bin / kafka-topics --create --zookeeper localhost:2181 --topic connect-offsets --replication -factor 3 --partitions 50 --config cleanup.policy = compact
/ usr / bin / kafka-topics --create --zookeeper localhost:2181 --topic connect-statuses --replication-factor 3-分区10 --config cleanup.policy = compact

我可以验证 connect-offsets 主题似乎已正确创建:

  / usr / bin / kafka-topics- -zookeeper本地主机:2181-描述--topic连接- offsets 
主题:connect-offsets PartitionCount:50 ReplicationFactor:3 Configs:cleanup.policy = compact
主题:connect-offsets分区:0 Leader:1复制品:1,2,3 Isr:1,2 ,3
主题:connect-offsets分区:1 Leader:2复制副本:2,3,1 Isr:2,3,1
主题:connect-offsets分区:2 Leader:3复制副本:3, 1,2 Isr:3,1,2
< snip>

这是一个三服务器集群,运行Confluent Platform v3.2.1,运行Kafka 10.2.1。 p>

连接偏移量应该为空吗?重新启动任务时,为什么Kafka Connect在主题开头会重新启动?



更新:响应Randall Hauch的回答。




  • 有关源连接器偏移量与接收器连接器偏移量的说明解释了连接偏移量。谢谢您的解释!

  • 我绝对不会更改连接器名称。

  • 如果连接器关闭了约5天,然后又重新启动,是否在那里连接器偏移位置将到期并重置的任何原因是什么?我看到 __ consumer_offsets 具有 cleanup.policy = compact

  • auto.offset.reset 仅在 __ consumer_offsets 中没有位置时才生效,对吗?



我主要使用系统默认值。我的接收器配置JSON如下。我正在使用一个非常简单的自定义分区程序在Avro日期时间字段而不是墙上时钟时间进行分区。该功能似乎已在Confluent v3.2.2中添加,因此我不需要为此功能定制的插件。我希望跳过Confluent v3.2.2,并在可用时直接转到v3.3.0。

  {
name: my-s3-sink,

tasks.max:1,
topics: my-topic,
flush。 size:10000,

connector.class: io.confluent.connect.s3.S3SinkConnector,
storage.class: io.confluent.connect.s3。 storage.S3Storage,
format.class: io.confluent.connect.s3.format.avro.AvroFormat,
schema.generator.class: io.confluent.connect。 storage.hive.schema.TimeBasedSchemaGenerator,
partitioner.class: mycompany.partitioner.TimeFieldPartitioner,

s3.bucket.name: my-bucket,
s3.region:我们西部2,

partition.field.name:时间戳,

locale:我们,
时区: UTC,
path.format:'year'= YYYY /'month'= MM /'day'= dd /'hour'= HH ,

schema.compatibility:无,

key.converter: io.confluent.connect.avro.AvroConverter,
键。 converter.schema.registry.url: http:// localhost:8081,
value.converter: io.confluent.connect.avro.AvroConverter,
value.converter。 schema.registry.url: http:// localhost:8081
}


解决方案

Kafka使用者的默认偏移保留期为24小时(1440分钟)。如果停止连接器,因此在24小时内未进行任何新提交,则偏移量将过期,并且在重新启动时将以新使用者的身份重新开始。您可以使用 offsets.retention.minutes 参数

$ b在 __ consumer_offsets 主题上修改保留期限。 $ b

After restarting a Kafka Connect S3 sink task, it restarted writing all the way from the beginning of the topic and wrote duplicate copies of older records. In other words, Kafka Connect seemed to lose its place.

So, I imagine that Kafka Connect stores current offset position information in the internal connect-offsets topic. That topic is empty which I presume is part of the problem.

The other two internal topics connect-statuses and connect-configs are not empty. connect-statuses has 52 entries. connect-configs has 6 entries; three for each of two sink connectors I have configured: connector-<name>, task-<name>-0, commit-<name>.

I manually created the internal Kafka Connect topics as specified in the docs before running this:

/usr/bin/kafka-topics --create --zookeeper localhost:2181 --topic connect-configs --replication-factor 3 --partitions 1 --config cleanup.policy=compact
/usr/bin/kafka-topics --create --zookeeper localhost:2181 --topic connect-offsets --replication-factor 3 --partitions 50 --config cleanup.policy=compact
/usr/bin/kafka-topics --create --zookeeper localhost:2181 --topic connect-statuses --replication-factor 3 --partitions 10 --config cleanup.policy=compact

I can verify that the connect-offsets topic seems to be created correctly:

/usr/bin/kafka-topics --zookeeper localhost:2181 --describe --topic connect-offsets
Topic:connect-offsets   PartitionCount:50   ReplicationFactor:3 Configs:cleanup.policy=compact
    Topic: connect-offsets  Partition: 0    Leader: 1   Replicas: 1,2,3 Isr: 1,2,3
    Topic: connect-offsets  Partition: 1    Leader: 2   Replicas: 2,3,1 Isr: 2,3,1
    Topic: connect-offsets  Partition: 2    Leader: 3   Replicas: 3,1,2 Isr: 3,1,2
  <snip>

This is with a three server cluster running Confluent Platform v3.2.1 running Kafka 10.2.1.

Is connect-offsets supposed to be empty? Why else would Kafka Connect restart at the beginning of the topic when restarting a task?

UPDATE: Response to Randall Hauch's answer.

  • Explanation regarding source connector offsets vs sink connector offsets explains empty connect-offsets. Thanks for explanation!
  • I'm definitely not changing connector name.
  • If the connector is down for ~five days and restarted afterwards, is there any reason that the connector offset position would expire and reset? I see __consumer_offsets has cleanup.policy=compact
  • auto.offset.reset should only take affect if there is no position in __consumer_offsets, right?

I'm using mostly system defaults. My Sink config JSON is as follows. I'm using a very simple custom partitioner to partition on an Avro datetime field rather than wallclock time. That feature seems to have been added in Confluent v3.2.2 so that I won't need a custom plugin for that functionality. I'm hoping to skip Confluent v3.2.2 and go straight to v3.3.0 when it is available.

{
  "name": "my-s3-sink",

  "tasks.max": 1,
  "topics": "my-topic",
  "flush.size": 10000,

  "connector.class": "io.confluent.connect.s3.S3SinkConnector",
  "storage.class": "io.confluent.connect.s3.storage.S3Storage",
  "format.class": "io.confluent.connect.s3.format.avro.AvroFormat",
  "schema.generator.class": "io.confluent.connect.storage.hive.schema.TimeBasedSchemaGenerator",
  "partitioner.class": "mycompany.partitioner.TimeFieldPartitioner",

  "s3.bucket.name": "my-bucket",
  "s3.region": "us-west-2",

  "partition.field.name": "timestamp",

  "locale": "us",
  "timezone": "UTC",
  "path.format": "'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH",

  "schema.compatibility": "NONE",

  "key.converter": "io.confluent.connect.avro.AvroConverter",
  "key.converter.schema.registry.url": "http://localhost:8081",
  "value.converter": "io.confluent.connect.avro.AvroConverter",
  "value.converter.schema.registry.url": "http://localhost:8081"
}
解决方案

The default offset retention period for Kafka consumers is 24 hours (1440 minutes). If you stop a connector and therefore make no new commits for longer than 24 hours your offsets will expire and you will start over as a new consumer when you restart. You can modify the retention period on the __consumer_offsets topic using the offsets.retention.minutes parameter

这篇关于重新启动Kafka Connect S3 Sink任务丢失位置,完全重写所有内容的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

10-29 07:02