这是我之前发送的有关Kafka Streams中的高延迟的问题的后续解答; (Kafka Streams rebalancing latency spikes on high throughput kafka-streams services)。

快速提醒一下,我们的无状态服务具有非常严格的延迟要求,并且我们面临着过高的延迟问题(某些消息在产生后消耗了10秒钟以上),特别是当消费者优雅地离开该组时。

经过进一步调查,我们发现至少对于小型消费群体而言,重新平衡的时间少于500毫秒。因此,我们认为,从中删除一个消费者(> 10s)时,巨大的延迟在哪里?

我们意识到这是从消费者优雅退出到重新平衡开始的时间。

先前的测试是在Kafka和Kafka Streams应用程序中使用全默认配置执行的。
我们将配置更改为:

properties.put("max.poll.records", 50); // defaults to 1000 in kafkastreams
properties.put("auto.offset.reset", "latest"); // defaults to latest
properties.put("heartbeat.interval.ms", 1000);
properties.put("session.timeout.ms", 6000);
properties.put("group.initial.rebalance.delay.ms", 0);
properties.put("max.poll.interval.ms", 6000);


结果是重新平衡开始的时间减少到5秒多一点。

我们还测试了通过“ kill -9”非优雅地杀死消费者;结果是触发重新平衡的时间完全相同。

所以我们有一些问题:
-我们希望当消费者正常停止时,立即触发重新平衡,这应该是预期的行为吗?为什么在我们的测试中没有发生?
-如何减少消费者正常退出与触发重新平衡之间的时间?权衡是什么?更多不必要的平衡?

对于更多上下文,我们的Kafka版本是1.1.0,在查看了例如kafka / kafka_2.11-1.1.0-cp1.jar的库之后,我们安装了Confluent平台4.1.0。在消费者方面,我们正在使用Kafka-streams 2.1.0。

谢谢!

最佳答案

正常关闭实例时,Kafka Streams不会发送“离开组请求”-这是有意的。目的是避免实例反弹(例如,如果一个实例升级了一个应用程序;或者一个实例在Kubernetes环境中运行并且POD快速自动重启),则避免了昂贵的重新平衡。

为此,使用了非公共配置。您可以通过覆盖配置

props.put("internal.leave.group.on.close", true); // Streams' default is `false`

10-07 19:24
查看更多