I'm trying to understand how the default values of below two confluent consumer configurations work together.max.poll.interval.ms - As per confluent documentation, the default value is 300,000 mssession.timeout.ms - As per confluent documentation, the default value is 10,000 msheartbeat.interval.ms - As per confluent documentation, the default value is 3,000 msLet's say if I'm using these default values in my configuration. Now I've a question here. For example, let's assume for a consumer, consumer is sending heartbeats every 3,000 ms and my first poll happened at the timestamp t1 and then second poll happened at t1 + 20,00 ms. Then would it cause a re-balance because this exceed the 'session.timeout.ms' ? or would it work fine as the consumer did send a heartbeat as per the expected timestamp? 解决方案 In previous thread Here also explained about session time out and max poll timeout. Let me also explain about my understanding on this.ConsumerRecords poll(final long timeout):is used to fetch data sequentially from topic's partition starting from last consumed offset or manual set offset. This will return immediately if there are record available otherwise it will await the passed timeout. If timeout passes will return empty record.The poll API keep calling to fetch any new message arrived as well as its ensure liveness of consumer.Underneath the coverssession.timeout.ms During each poll Consumer coordinator send heartbeat to broker to ensure that consumer's session live and active. If broker didn't receive any heartbeat till session.timeout.ms broker then broker leave that consumer and do rebalance You can assume session.timeout.ms is maximum time broker wait to get heartbeat from consumer whereas heartbeat.interval.ms is expected time consumer suppose to send heartbeat to Broker.thats explained heartbeat.interval.ms always less than session.timeout.ms because ideal case 1/3 of session timeout.max.poll.interval.ms : The maximum delay between invocations of poll() when using consumer group management. That means consumer maximum time will be idle before fetching more records.If poll() is not called before expiration of this timeout, then the consumer is considered failed and the group will rebalance by calling poll in order to reassign the partitions to another consumer instance.If we are doing long batch processing its good to increase max.poll.interval.ms but please note increasing this value may delay a group rebalance since the consumer will only join the rebalance inside the call to poll. We can tune by keeping max poll interval low by tuning max.poll.records.Now let's discuss how they relate to each other.Consumer while calling poll its check heartbeat, session time out poll time out in background as below manner:Consumer coordinator check if consumer is not in rebalancing state if still rebalancing then wait coordinator to join the consumer. wait and call poll . Please note if max.poll.interval.ms large it will take more time to rebalance.After poll and rebalance completed coordinator check session time out if session timeout has expired without seeing a successful heartbeat, old coordinator will get disconnected so next poll will try to rebalance.So Session timeout directly dependent time coordinator liveness if session time out consumer coordinator itself get dead and call poll will have to assign new coordinator before rebalancing. After session timeout check coordinator validate heartbeat.pollTimeoutExpired if poll timeout has expired, which means that the foreground thread has stalled in between calls to poll(), so member explicitly leave the group and call poll to get join new consumer not whole consumer group coordinator.After session time out and poll time out validation and before sending heart beat status , consumer coordinator check heart beat timeout, if heart beat exceed max delay heart beat time then pause/wait to retry backoff and poll again.If heartbeat time is also in limit not exceed then consumer coordinator sent sendHeartbeatRequest In case of sendHeartbeatRequest success thread will reset heartbeat time and call poll but in case of fail and consumer group is not in rebalance state it will wakeup consumer group coordinator to call poll again.As mentioned on shared link polling is independent with heartbeat so during polling in case poll is quite larger heartbeat still allow to sent heartbeat which make sure your thread are live means session time out doesn't directly link to poll .session.timeout.ms: Max time to receive heart beatmax.poll.interval.ms: Max time on independent processing threadSo if you set max.poll.interval.ms 300,000 then will have 300,000 ms to next poll that means consumer thread have max 300,000 ms to complete processing. In between heartbeat will keep sending heartbeat request at heartbeat.interval.ms i.e. 3,000 to indicate thread is still live and in case no heartbeat till session.timeout.ms i.e. 10,000 coordinator will be dead and call poll to reassign new coordinator and rebalancing 这篇关于融合的Kafka使用者配置-session.timeout.ms和max.poll.interval.ms是如何关联的?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!
10-14 20:41