问题描述
Kafka 消费者有一个配置 max.poll.records
,它控制在一次调用 poll() 中返回的最大记录数及其 默认值为 500.我已将其设置为非常高的数字,以便我可以在一次轮询中获取所有消息.但是,即使该主题有更多消息,轮询在一次调用中也仅返回几千条消息(大约 6000 条).
如何进一步增加单个消费者读取的消息数?
Kafka consumer has a configuration max.poll.records
which controls the maximum number of records returned in a single call to poll() and its default value is 500. I have set it to a very high number so that I can get all the messages in a single poll.However, the poll returns only a few thousand messages(roughly 6000) in a single call even though the topic has many more.
How can I further increase the number of messages read by a single consumer?
推荐答案
很可能您的负载受到 max.partition.fetch.bytes
的限制,默认情况下为 1MB.请参阅Kafka Consumer 配置.
Most probably your payload is limited by max.partition.fetch.bytes
, which is 1MB by default. Refer to Kafka Consumer configuration.
这里有很好的详细解释:
Here's good detailed explanation:
MAX.PARTITION.FETCH.BYTES
此属性控制服务器将返回每个分区的最大字节数.默认值为 1 MB,这意味着当 KafkaConsumer.poll() 返回 ConsumerRecords 时,记录对象将最多使用分配给消费者的每个分区的 max.partition.fetch.bytes.因此,如果一个主题有 20 个分区,而您有 5 个消费者,则每个消费者将需要有 4 MB 的内存可用于 ConsumerRecords.在实践中,您将需要分配更多内存,因为如果组中的其他使用者出现故障,每个使用者将需要处理更多分区.最大限度.partition.fetch.bytes 必须大于代理将接受的最大消息(由代理配置中的 max.message.size 属性确定),否则代理可能具有消费者无法消费的消息,在这种情况下消费者将挂起试图阅读它们.设置 max.partition.fetch.bytes 时的另一个重要考虑因素是消费者处理数据所需的时间.您还记得,消费者必须足够频繁地调用 poll() 以避免会话超时和随后的重新平衡.如果单个 poll() 返回的数据量非常大,则消费者可能需要更长的时间来处理,这意味着它将无法及时到达轮询循环的下一次迭代以避免会话超时.如果发生这种情况,两个选项要么是降低最大值.partition.fetch.bytes 或增加会话超时.
This property controls the maximum number of bytes the server will return per partition. The default is 1 MB, which means that when KafkaConsumer.poll() returns ConsumerRecords, the record object will use at most max.partition.fetch.bytes per partition assigned to the consumer. So if a topic has 20 partitions, and you have 5 consumers, each consumer will need to have 4 MB of memory available for ConsumerRecords. In practice, you will want to allocate more memory as each consumer will need to handle more partitions if other consumers in the group fail. max. partition.fetch.bytes must be larger than the largest message a broker will accept (determined by the max.message.size property in the broker configuration), or the broker may have messages that the consumer will be unable to consume, in which case the consumer will hang trying to read them. Another important consideration when setting max.partition.fetch.bytes is the amount of time it takes the consumer to process data. As you recall, the consumer must call poll() frequently enough to avoid session timeout and subsequent rebalance. If the amount of data a single poll() returns is very large, it may take the consumer longer to process, which means it will not get to the next iteration of the poll loop in time to avoid a session timeout. If this occurs, the two options are either to lower max. partition.fetch.bytes or to increase the session timeout.
希望有帮助!
这篇关于增加 Kafka 消费者在单次轮询中读取的消息数量的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!