本文介绍了Kafka 传递重复消息的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我们使用 kafka(0.9.0.0) 来编排不同微服务之间的命令消息.我们发现了一个间歇性问题,即重复的消息被传递到特定主题.下面给出了发生此问题时发生的日志.有人可以帮助理解这个问题

We are using kafka(0.9.0.0) for orchestrating command messages between different micro services. We are finding an intermittent issue where duplicate messages are getting delivered to a particular topic. The logs that occur when this issue happens is given below. Can some one help to understand this issue

Wed, 21-Sep-2016 09:19:07 - WARNING Coordinator unknown during heartbeat -- will retry
Wed, 21-Sep-2016 09:19:07 - WARNING Heartbeat failed; retrying
Wed, 21-Sep-2016 09:19:07 - WARNING <BrokerConnection host=AZSG-D-BOT-DEV4 port=9092> timed out after 40000 ms. Closing connection.
Wed, 21-Sep-2016 09:19:07 - ERROR Fetch to node 1 failed: RequestTimedOutError - 7 - This error is thrown if the request exceeds the user-specified time limit in the request.
Wed, 21-Sep-2016 09:19:07 - INFO Marking the coordinator dead (node 1): None.
Wed, 21-Sep-2016 09:19:07 - INFO Group coordinator for kafka-python-default-group is BrokerMetadata(nodeId=1, host=u'AZSG-D-BOT-DEV4', port=9092)
Wed, 21-Sep-2016 09:19:07 - ERROR OffsetCommit failed for group kafka-python-default-group due to group error (UnknownMemberIdError - 25 - Returned from group requests (offset commits/fetches, heartbeats, etc) when the memberId is not in the current generation.), will rejoin
Wed, 21-Sep-2016 09:19:07 - WARNING Offset commit failed: group membership out of date This is likely to cause duplicate message delivery.
Wed, 21-Sep-2016 09:19:07 - ERROR LeaveGroup request failed: UnknownMemberIdError - 25 - Returned from group requests (offset commits/fetches, heartbeats, etc) when the memberId is not in the current generation.
Wed, 21-Sep-2016 09:19:07 - INFO Marking the coordinator dead (node 1): None.
Wed, 21-Sep-2016 09:19:07 - INFO Group coordinator for kafka-python-default-group is BrokerMetadata(nodeId=1, host=u'AZSG-D-BOT-DEV4', port=9092)
Wed, 21-Sep-2016 09:19:07 - ERROR OffsetCommit failed for group kafka-python-default-group due to group error (UnknownMemberIdError - 25 - Returned from group requests (offset commits/fetches, heartbeats, etc) when the memberId is not in the current generation.), will rejoin
Wed, 21-Sep-2016 09:19:07 - WARNING Offset commit failed: group membership out of date This is likely to cause duplicate message delivery.
Wed, 21-Sep-2016 09:19:10 - INFO Joined group 'kafka-python-default-group' (generation 5) with member_id kafka-python-1.0.2-8585f310-cb4f-493a-a98d-12ec9810419b
Wed, 21-Sep-2016 09:19:10 - INFO Updated partition assignment: [TopicPartition(topic=u'ilinaTestPlatformReq', partition=0)]

推荐答案

来自 关于消费者配置的 Kafka 文档:

session.timeout.ms (default 30000) - 用于检测的超时时间使用 Kafka 的组管理工具时失败.当一个在会话超时内没有收到消费者的心跳,经纪人将消费者标记为失败并重新平衡组.自从只有在调用 poll() 时才会发送心跳,更高的会话timeout 允许有更多时间在消费者的轮询中进行消息处理以更长的时间来检测硬故障为代价进行循环.也可以看看max.poll.records 用于控制处理时间的另一个选项轮询循环.请注意,该值必须在允许范围内,因为通过 group.min.session.timeout.ms 在代理配置中配置和 group.max.session.timeout.ms.

貌似消息处理时间大于30000ms会触发consumer rebalancing,可能会导致消息重复发送.

Seems that if the message processing time is greater than 30000 ms, consumer rebalancing is triggered which may cause duplicate message delivery.

您可以尝试增加session.timeout.ms.

另一种选择是在使用 pause() 在处理消息和 resume() 处理消息后.在这种情况下,即使处理时间长于 session.timeout.ms,消费者也会调用 poll()(并发送心跳).因此,broker 不会将您的消费者标记为失败,也不会启动重新平衡.

Another option is to process messages asynchronously while using pause() before processing a message and resume() after a message processed. In this case the consumer will call poll() (and send heartbeats) even if processing time takes longer than session.timeout.ms. Thus broker won't mark your consumer as failed and no rebalancing will be initiated.

这篇关于Kafka 传递重复消息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

07-28 02:39