https://www.safaribooksonline.com/library/view/kafka-the-definitive/9781491936153/ch04.html#callout_kafka_consumers__reading_data_from_kafka_CO2-1的报价
这句话对我来说还不清楚。我假设使用者将提交请求发送给代理,并且如果代理在某个超时时间内未响应,则意味着提交失败。我错了吗?
您能否详细说明commitSync
和commitAsync
的区别?
另外,请提供我应该首选哪种提交类型的用例。
最佳答案
正如API文档所述:
这意味着
commitSync
是一种阻止方法。调用它将阻塞您的线程,直到成功或失败为止。例如,
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
consumer.commitSync();
}
}
对于for循环中的每个迭代,只有在
consumer.commitSync()
成功返回或因抛出异常而中断后,您的代码才会移至下一个迭代。这意味着
commitAsync
是一种非阻塞方法。调用它不会阻塞您的线程。相反,无论最终成功还是失败,它都会继续处理以下指令。例如,类似于前面的示例,但是在这里我们使用
commitAsync
:while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
consumer.commitAsync(callback);
}
}
对于for循环中的每个迭代,无论最终
consumer.commitAsync()
会发生什么,您的代码都将移至下一个迭代。而且,提交的结果将由您定义的回调函数处理。权衡:延迟与数据一致性
commitSync()
,因为它可以确保在执行任何其他操作之前,您将知道偏移提交是成功还是失败。但是由于它是同步和阻塞的,因此您将花费更多时间等待提交完成,这会导致高延迟。 commitAsync()
,因为它不会等待完成。相反,它只会发出提交请求并稍后处理来自Kafka的响应(成功或失败),同时,您的代码将继续执行。 通常来说,实际行为将取决于您的实际代码以及调用方法的位置。