https://www.safaribooksonline.com/library/view/kafka-the-definitive/9781491936153/ch04.html#callout_kafka_consumers__reading_data_from_kafka_CO2-1的报价



这句话对我来说还不清楚。我假设使用者将提交请求发送给代理,并且如果代理在某个超时时间内未响应,则意味着提交失败。我错了吗?

您能否详细说明commitSynccommitAsync的区别?
另外,请提供我应该首选哪种提交类型的用例。

最佳答案

正如API文档所述:

  • commitSync



  • 这意味着commitSync是一种阻止方法。调用它将阻塞您的线程,直到成功或失败为止。

    例如,
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(100);
        for (ConsumerRecord<String, String> record : records) {
            System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
            consumer.commitSync();
        }
    }
    

    对于for循环中的每个迭代,只有在consumer.commitSync()成功返回或因抛出异常而中断后,您的代码才会移至下一个迭代。

  • commitAsync



  • 这意味着commitAsync是一种非阻塞方法。调用它不会阻塞您的线程。相反,无论最终成功还是失败,它都会继续处理以下指令。

    例如,类似于前面的示例,但是在这里我们使用commitAsync:
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(100);
        for (ConsumerRecord<String, String> record : records) {
            System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
            consumer.commitAsync(callback);
        }
    }
    

    对于for循环中的每个迭代,无论最终consumer.commitAsync()会发生什么,您的代码都将移至下一个迭代。而且,提交的结果将由您定义的回调函数处理。

    权衡:延迟与数据一致性
  • 如果必须确保数据一致性,请选择commitSync(),因为它可以确保在执行任何其他操作之前,您将知道偏移提交是成功还是失败。但是由于它是同步和阻塞的,因此您将花费更多时间等待提交完成,这会导致高延迟。
  • 如果您可以确定某些数据不一致并且希望具有低延迟,请选择commitAsync(),因为它不会等待完成。相反,它只会发出提交请求并稍后处理来自Kafka的响应(成功或失败),同时,您的代码将继续执行。

  • 通常来说,实际行为将取决于您的实际代码以及调用方法的位置。

    07-26 09:28