本文介绍了kafka-streams 间歇性 isDisconnected的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我的日志中有一个间歇性问题.

I have an intermittent issue in my logs.

似乎心线一直在挣扎和获取

It seems the heart-thread is constantly struggling and get

Error sending fetch request
org.apache.kafka.common.errors.DisconnectException

Group coordinator xxx is unavailable or invalid due to cause: coordinator unavailable.isDisconnected: true. Rediscovery will be attempted.

我已经把 heartbeat.interval.ms 变大了一点,但这种情况仍在发生.

i already made the heartbeat.interval.ms a bit larger, but this it is still happening.

  1. 我想了解在为静态成员资格配置的 kafka-streams 应用程序中,这可以级联成什么.特别是它会导致重新平衡吗?

  1. I would like to understand what this can cascade into, in a kafka-streams application configured for static membership. In particular can it lead to re-balance ?

我也增加了request.timeout.ms 是否还要增加delivery.timeout.ms,如果是为什么?

I have also increased request.timeout.ms shall I also increase delivery.timeout.ms, if so why?

我试图了解事情是如何级联的.也就是说,如果心跳线程不断尝试然后到达 delivery.timeout.ms 会发生什么?是什么原因导致 isavailable 或 invalid 由于以下原因:coordinatoravailable.isDisconnected:true?之后的重试由什么控制.事实上消费者并没有失败,它最终会发现协调者?

I am trying to understand how things cascade. That is, What happen if the heartbeat thread keep trying and then reach delivery.timeout.ms ? is that what causes is unavailable or invalid due to cause: coordinator unavailable.isDisconnected: true ? What controls the subsequent retry after that. Indeed the consumer isnot failing, itrecoves asitultimately discover the coordinator ?

我对这如何级联感到有点困惑,因此正在寻找一种方法来解释它并解决它.

I am a little and confused about how this can cascade, hence looking for a way to explain it, and get a handle on it.

知道如何帮助解决这个问题吗?

Any idea on how to help with this ?

09:48:12.357 [sdc-oxygen-dev-entellect-cbe-builder-resnet-22-StreamThread-2] INFO  o.a.k.s.p.internals.StreamThread - stream-thread [sdc-oxygen-dev-entellect-cbe-builder-resnet-22-StreamThread-2] Processed 70000 total records, ran 0 punctuators, and committed 3 total tasks since the last update
09:48:21.125 [kafka-coordinator-heartbeat-thread | _entellect-cbe-builder-resnet-0] INFO  o.a.k.clients.FetchSessionHandler - [Consumer instanceId=sdc-oxygen-dev-entellect-cbe-builder-resnet-22-1, clientId=sdc-oxygen-dev-entellect-cbe-builder-resnet-22-StreamThread-1-consumer, groupId=_entellect-cbe-builder-resnet-0] Error sending fetch request (sessionId=1490129196, epoch=INITIAL) to node 0:
org.apache.kafka.common.errors.DisconnectException: null
09:48:21.125 [kafka-coordinator-heartbeat-thread | _entellect-cbe-builder-resnet-0] INFO  o.a.k.clients.FetchSessionHandler - [Consumer instanceId=sdc-oxygen-dev-entellect-cbe-builder-resnet-22-1, clientId=sdc-oxygen-dev-entellect-cbe-builder-resnet-22-StreamThread-1-consumer, groupId=_entellect-cbe-builder-resnet-0] Error sending fetch request (sessionId=1666554310, epoch=INITIAL) to node 4:
org.apache.kafka.common.errors.DisconnectException: null
09:48:21.225 [kafka-coordinator-heartbeat-thread | _entellect-cbe-builder-resnet-0] INFO  o.a.k.clients.FetchSessionHandler - [Consumer instanceId=sdc-oxygen-dev-entellect-cbe-builder-resnet-22-1, clientId=sdc-oxygen-dev-entellect-cbe-builder-resnet-22-StreamThread-1-consumer, groupId=_entellect-cbe-builder-resnet-0] Error sending fetch request (sessionId=522549112, epoch=INITIAL) to node 5:
org.apache.kafka.common.errors.DisconnectException: null
09:48:42.711 [kafka-coordinator-heartbeat-thread | _entellect-cbe-builder-resnet-0] INFO  o.a.k.clients.FetchSessionHandler - [Consumer instanceId=sdc-oxygen-dev-entellect-cbe-builder-resnet-22-2, clientId=sdc-oxygen-dev-entellect-cbe-builder-resnet-22-StreamThread-2-consumer, groupId=_entellect-cbe-builder-resnet-0] Error sending fetch request (sessionId=810037183, epoch=INITIAL) to node 2:
org.apache.kafka.common.errors.DisconnectException: null
09:49:23.172 [kafka-coordinator-heartbeat-thread | _entellect-cbe-builder-resnet-0] INFO  o.a.k.clients.FetchSessionHandler - [Consumer instanceId=sdc-oxygen-dev-entellect-cbe-builder-resnet-22-2, clientId=sdc-oxygen-dev-entellect-cbe-builder-resnet-22-StreamThread-2-consumer, groupId=_entellect-cbe-builder-resnet-0] Error sending fetch request (sessionId=810037183, epoch=INITIAL) to node 2:
org.apache.kafka.common.errors.DisconnectException: null
09:50:17.295 [sdc-oxygen-dev-entellect-cbe-builder-resnet-22-StreamThread-2] INFO  o.a.k.s.p.internals.StreamThread - stream-thread [sdc-oxygen-dev-entellect-cbe-builder-resnet-22-StreamThread-2] Processed 42290 total records, ran 0 punctuators, and committed 3 total tasks since the last update
09:52:28.299 [sdc-oxygen-dev-entellect-cbe-builder-resnet-22-StreamThread-2] INFO  o.a.k.s.p.internals.StreamThread - stream-thread [sdc-oxygen-dev-entellect-cbe-builder-resnet-22-StreamThread-2] Processed 60000 total records, ran 0 punctuators, and committed 1 total tasks since the last update
09:54:35.743 [sdc-oxygen-dev-entellect-cbe-builder-resnet-22-StreamThread-2] INFO  o.a.k.s.p.internals.StreamThread - stream-thread [sdc-oxygen-dev-entellect-cbe-builder-resnet-22-StreamThread-2] Processed 60000 total records, ran 0 punctuators, and committed 1 total tasks since the last update
09:55:07.269 [kafka-coordinator-heartbeat-thread | _entellect-cbe-builder-resnet-0] INFO  o.a.k.c.c.i.AbstractCoordinator - [Consumer instanceId=sdc-oxygen-dev-entellect-cbe-builder-resnet-22-1, clientId=sdc-oxygen-dev-entellect-cbe-builder-resnet-22-StreamThread-1-consumer, groupId=_entellect-cbe-builder-resnet-0] Group coordinator sdc-oxygen-dev-cp-kafka-4.sdc-oxygen-dev-cp-kafka-headless.sdc-oxygen-dev:9092 (id: 2147483643 rack: null) is unavailable or invalid due to cause: coordinator unavailable.isDisconnected: true. Rediscovery will be attempted.
09:55:07.371 [kafka-coordinator-heartbeat-thread | _entellect-cbe-builder-resnet-0] INFO  o.a.k.c.c.i.AbstractCoordinator - [Consumer instanceId=sdc-oxygen-dev-entellect-cbe-builder-resnet-22-1, clientId=sdc-oxygen-dev-entellect-cbe-builder-resnet-22-StreamThread-1-consumer, groupId=_entellect-cbe-builder-resnet-0] Discovered group coordinator sdc-oxygen-dev-cp-kafka-4.sdc-oxygen-dev-cp-kafka-headless.sdc-oxygen-dev:9092 (id: 2147483643 rack: null)
09:55:07.473 [kafka-coordinator-heartbeat-thread | _entellect-cbe-builder-resnet-0] INFO  o.a.k.c.c.i.AbstractCoordinator - [Consumer instanceId=sdc-oxygen-dev-entellect-cbe-builder-resnet-22-1, clientId=sdc-oxygen-dev-entellect-cbe-builder-resnet-22-StreamThread-1-consumer, groupId=_entellect-cbe-builder-resnet-0] Discovered group coordinator sdc-oxygen-dev-cp-kafka-4.sdc-oxygen-dev-cp-kafka-headless.sdc-oxygen-dev:9092 (id: 2147483643 rack: null)
09:55:50.644 [sdc-oxygen-dev-entellect-cbe-builder-resnet-22-StreamThread-1] INFO  o.a.k.s.p.internals.StreamThread - stream-thread [sdc-oxygen-dev-entellect-cbe-builder-resnet-22-StreamThread-1] Processed 30000 total records, ran 0 punctuators, and committed 1 total tasks since the last update
09:56:20.888 [kafka-coordinator-heartbeat-thread | _entellect-cbe-builder-resnet-0] INFO  o.a.k.clients.FetchSessionHandler - [Consumer instanceId=sdc-oxygen-dev-entellect-cbe-builder-resnet-22-1, clientId=sdc-oxygen-dev-entellect-cbe-builder-resnet-22-StreamThread-1-consumer, groupId=_entellect-cbe-builder-resnet-0] Error sending fetch request (sessionId=1490129196, epoch=INITIAL) to node 0:
org.apache.kafka.common.errors.DisconnectException: null
09:56:20.888 [kafka-coordinator-heartbeat-thread | _entellect-cbe-builder-resnet-0] INFO  o.a.k.clients.FetchSessionHandler - [Consumer instanceId=sdc-oxygen-dev-entellect-cbe-builder-resnet-22-1, clientId=sdc-oxygen-dev-entellect-cbe-builder-resnet-22-StreamThread-1-consumer, groupId=_entellect-cbe-builder-resnet-0] Error sending fetch request (sessionId=1666554310, epoch=INITIAL) to node 4:
org.apache.kafka.common.errors.DisconnectException: null
09:56:20.989 [kafka-coordinator-heartbeat-thread | _entellect-cbe-builder-resnet-0] INFO  o.a.k.clients.FetchSessionHandler - [Consumer instanceId=sdc-oxygen-dev-entellect-cbe-builder-resnet-22-1, clientId=sdc-oxygen-dev-entellect-cbe-builder-resnet-22-StreamThread-1-consumer, groupId=_entellect-cbe-builder-resnet-0] Error sending fetch request (sessionId=522549112, epoch=INITIAL) to node 5:
org.apache.kafka.common.errors.DisconnectException: null
09:56:47.135 [sdc-oxygen-dev-entellect-cbe-builder-resnet-22-StreamThread-2] INFO  o.a.k.s.p.internals.StreamThread - stream-thread [sdc-oxygen-dev-entellect-cbe-builder-resnet-22-StreamThread-2] Processed 70000 total records, ran 0 punctuators, and committed 1 total tasks since the last update
09:57:04.873 [kafka-coordinator-heartbeat-thread | _entellect-cbe-builder-resnet-0] INFO  o.a.k.clients.FetchSessionHandler - [Consumer instanceId=sdc-oxygen-dev-entellect-cbe-builder-resnet-22-1, clientId=sdc-oxygen-dev-entellect-cbe-builder-resnet-22-StreamThread-1-consumer, groupId=_entellect-cbe-builder-resnet-0] Error sending fetch request (sessionId=1490129196, epoch=INITIAL) to node 0:
org.apache.kafka.common.errors.DisconnectException: null
09:57:04.873 [kafka-coordinator-heartbeat-thread | _entellect-cbe-builder-resnet-0] INFO  o.a.k.clients.FetchSessionHandler - [Consumer instanceId=sdc-oxygen-dev-entellect-cbe-builder-resnet-22-1, clientId=sdc-oxygen-dev-entellect-cbe-builder-resnet-22-StreamThread-1-consumer, groupId=_entellect-cbe-builder-resnet-0] Error sending fetch request (sessionId=1666554310, epoch=INITIAL) to node 4:
org.apache.kafka.common.errors.DisconnectException: null
09:57:04.973 [kafka-coordinator-heartbeat-thread | _entellect-cbe-builder-resnet-0] INFO  o.a.k.clients.FetchSessionHandler - [Consumer instanceId=sdc-oxygen-dev-entellect-cbe-builder-resnet-22-1, clientId=sdc-oxygen-dev-entellect-cbe-builder-resnet-22-StreamThread-1-consumer, groupId=_entellect-cbe-builder-resnet-0] Error sending fetch request (sessionId=522549112, epoch=INITIAL) to node 5:
org.apache.kafka.common.errors.DisconnectException: null

推荐答案

对于避免异常是无效的,实际上是一种故障检测机制;用于检测异常情况.如果设置大的间隔,则稍后会检测到异常情况,反之亦然.总而言之,无论你把它做大还是小都没有区别.

It's ineffective for avoiding the exception, As a matter of fact, It's a mechanism for fault-detection; For detecting abnormal situations. If you set a large interval, abnormal condition is detected later and vice versa. In conclusion, if you made it either large or small no difference.

我也增加了 request.timeout.ms 我也要增加Delivery.timeout.ms,如果是,为什么?

我相信它们都是相互独立的.例如,如果请求发送到端点但没有得到任何反馈,这并不意味着端点处于非活动状态.确实,它在现实中发生在端点被负载淹没并且无法尽快反馈时.因此,request.timeout.ms 应设置为合理值.但是,对于 delivery.timeout.ms,它应该设置为一个较大的数字.正如官方 doc 关于 delivery.timeout.ms 所述:

I believe they are both independent of each other. for example, If the request is sent to the end-point but it doesn't get any feedback, it doesn't mean the end-point is inactive. Indeed, It happens in reality when end-point is overwhelmed by load and can't feedback as soon as possible. As a result, request.timeout.ms should set to a rational value. However, for delivery.timeout.ms it should be set to a large number. As stated in official doc about delivery.timeout.ms:

设置发送记录之间总时间的上限并收到经纪人的确认.默认情况下,传递超时设置为 2 分钟.

正如我所提到的.

我试图了解事情是如何级联的.也就是说,如果心跳线程不断尝试,然后到达 delivery.timeout.ms ?是什么原因不可用或无效的原因是:协调器不可用.isDisconnected:true

是的,在任何类型的请求或交付超时后,心跳线程都会向主节点发出信号,表明存在故障组的嫌疑人.

Yes, heartbeat thread signals to a master node that there is a suspect to failure group after timeout of any type either request or delivery.

之后的重试由什么控制

管理员,管理和检查主题、代理、acls 和其他 Kafka 对象

Admin, managing and inspecting topics, brokers, acls, and other Kafka objects

这篇关于kafka-streams 间歇性 isDisconnected的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-05 03:27