问题描述
我们假定有一个使用者发送请求以提交偏移量10.如果存在通信问题,并且经纪人没有收到请求,当然也没有响应.之后,我们有另一个使用者流程另一个批次,并成功提交了偏移量20.
We assume that we have a consumer that sends a request to commit offset 10.If there is a communication problem and the broker didn't get the request and of course didn't respond. After that we have another consumer process another batch and successfully committed offset 20.
问:我想知道是否有一种方法或属性可以处理,因此我们可以在提交偏移量20之前检查是否提交了日志中的上一个偏移量? /p>
Q: I want to know if there is a way or property to handle so we can check if the previous offset in the log are committed or not before committing in our case the offset 20?
推荐答案
您所描述的场景只能在使用异步提交时发生.
The scenario you are describing can only happen when using asynchronous commits.
请记住,一个特定的TopicPartition只能由同一ConsumerGroup中的单个使用者使用.如果您有两个使用者正在读取相同的TopicPartition,则只能这样做
Keep in mind that one particular TopicPartition can only be consumed by a single consumer within the same ConsumerGroup. If you have two consumers reading the same TopicPartition it is only possible
- 如果他们有不同的ConsumerGroup,或者
- 如果他们具有相同的ConsumerGroup并发生重新平衡.但是,仍然只有一个使用者一次会读取该TopicPartition,而不会同时读取.
案例1很清楚:如果它们具有不同的ConsumerGroup,则它们将并行且独立地使用该分区.另外,它们的committed偏移量是单独管理的.
Case #1 is pretty clear: If they have different ConsumerGroups they consume the partition in parallel and independently. Also their comitted offsets are managed separately.
情况#2:如果第一个使用者由于失败/死亡而无法恢复,则第一个使用者未能提交偏移量10,那么将发生重新平衡,而另一个活动的使用者将选择该分区.由于未提交偏移量10,因此新用户将在跳至下一批并可能提交偏移量20之前再次开始读取偏移量10.这导致至少一次"返回偏移量10.语义,并可能导致重复.
Case #2: If the first consumer fails to commit offset 10 because the consumer failed/died and is not recovering a consumer Rebalance will happen and another active consumer will pick up that partition. As the offset 10 was not committed, the new consumer will start reading again offset 10 before jumping to the next batch and possibly commit offset 20. This leads to "at-least-once" semantics and could lead to duplicates.
现在,进入唯一的情况,您可以在提交较高的偏移量之后提交较小的偏移量.如开头所述,如果您异步提交偏移量(使用commitAsync
),确实可能会发生这种情况.想象以下场景,按时间排序:
Now, coming to the only scenario where you could commit a smaller offset after committing a higher offset. As said in the beginning, this could indeed happen if you asynchronously commit the offsets (using commitAsync
). Imagine the following scenario, ordered by time:
- 消费者读取偏移量0(后台线程尝试提交偏移量0)
- 提交偏移量0成功
- 消费者读取偏移量1(后台线程尝试提交偏移量1)
- 提交偏移量1失败,请稍后再试
- 消费者读取偏移量2(后台线程尝试提交偏移量2)
- 提交偏移量2成功
- 现在,要做什么(重试提交偏移量1?)
- Consumer reads offset 0 (background thread tries to commit offset 0)
- committing offset 0 succeeded
- Consumer reads offset 1 (background thread tries to commit offset 1)
- committing offset 1 failed, try again later
- Consumer reads offset 2 (background thread tries to commit offset 2)
- committing offset 2 succeeded
- Now, what to to with (re-trying committing offset 1?)
如果您让重试机制再次提交偏移量1,则您的使用者似乎只提交了直到偏移量1.这是因为最新的偏移量TopicPartition上每个使用者组的信息都存储在内部 compacted Kafka主题__consumer_offsets,旨在仅存储我们的消费者组的最新值(在我们的示例中为offset 1).
If you let the retrying mechanism to commit offset 1 again, it looks like your consumer has only committed up until the offset 1. This is because the information for each Consumer group on the latest offset par TopicPartition is stored in the internal compacted Kafka topic __consumer_offsets which is meant to store only the latest value (in our case: offset 1) for our Consumer Group.
在"Kafka-权威指南"一书中,有关于如何缓解此问题的提示:
In the book "Kafka - The Definitive Guide", there is a hint on how to mitigate this problem:
作为示例,您可以在下面的Scala中看到此想法的实现:
As an example, you can see an implementation of this idea in Scala below:
import java.util._
import java.time.Duration
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, KafkaConsumer, OffsetAndMetadata, OffsetCommitCallback}
import org.apache.kafka.common.{KafkaException, TopicPartition}
import collection.JavaConverters._
object AsyncCommitWithCallback extends App {
// define topic
val topic = "myOutputTopic"
// set properties
val props = new Properties()
props.put(ConsumerConfig.GROUP_ID_CONFIG, "AsyncCommitter5")
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
// [set more properties...]
// create KafkaConsumer and subscribe
val consumer = new KafkaConsumer[String, String](props)
consumer.subscribe(List(topic).asJavaCollection)
// initialize global counter
val atomicLong = new AtomicLong(0)
// consume message
try {
while(true) {
val records = consumer.poll(Duration.ofMillis(1)).asScala
if(records.nonEmpty) {
for (data <- records) {
// do something with the records
}
consumer.commitAsync(new KeepOrderAsyncCommit)
}
}
} catch {
case ex: KafkaException => ex.printStackTrace()
} finally {
consumer.commitSync()
consumer.close()
}
class KeepOrderAsyncCommit extends OffsetCommitCallback {
// keeping position of this callback instance
val position = atomicLong.incrementAndGet()
override def onComplete(offsets: util.Map[TopicPartition, OffsetAndMetadata], exception: Exception): Unit = {
// retrying only if no other commit incremented the global counter
if(exception != null){
if(position == atomicLong.get) {
consumer.commitAsync(this)
}
}
}
}
}
这篇关于Kafka Consumer偏移量提交检查以避免提交较小的偏移量的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!