无法从kafka主题轮询

无法从kafka主题轮询

本文介绍了无法从kafka主题轮询/获取所有记录的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试从特定主题中轮询数据,例如kafka正在接收100条记录/秒但大多数情况下,它不会提取所有记录.我正在使用5000ms的超时时间,并且每个100ms都调用此方法注意:我也订阅了特定主题

I am trying to poll data from a specific topic like kafka is receiving 100 records/sbut most of the time it does not fetch all records.I am using timeout as 5000ms and I am calling this method every 100msNote : I am subscribing to the specific topic too

@Scheduled(fixedDelayString ="100")

    public void pollRecords() {
        ConsumerRecords<String, String> records =
        leadConsumer.poll("5000");

如何从kafka获取所有数据?

How can I fetch all the data from kafka ?

推荐答案

从poll()返回的最大记录数由max.poll.records消费者配置参数指定. (默认值为500).此外,还有另一个使用者配置参数,该参数限制了从服务器返回的最大数据量. (fetch.max.bytesmax.partition.fetch.bytes)

Maximum number of records returned from poll() is specified with max.poll.records consumer config parameter. (default is 500) Also, there are another consumer config parameters which limits the maximum amount of data returned from server. (fetch.max.bytes and max.partition.fetch.bytes)

另一方面,在经纪人方面,还有另一个大小限制,称为message.max.bytes.

On the other hand, on broker side there is another size limit which is called message.max.bytes.

因此,您应该正确设置这些参数以获取更多消息.

So you should set these parameters properly to get more messages.

来自Kafka文档(链接):

From Kafka docs (link):

fetch.max.bytes :服务器应为获取请求返回的最大数据量.记录是由计算机批量提取的. 消费者,并且如果第一个非空记录中的第一个记录 提取的分区大于此值,记录批处理 仍将退回以确保消费者可以取得进步. 因此,这不是绝对最大值.最大记录批量大小 代理接受的消息是通过message.max.bytes(经纪人)定义的 配置)或max.message.bytes(主题配置).注意消费者 并行执行多个提取. (默认:52428800)

fetch.max.bytes: The maximum amount of data the server should return for a fetch request. Records are fetched in batches by the consumer, and if the first record batch in the first non-empty partition of the fetch is larger than this value, the record batch will still be returned to ensure that the consumer can make progress. As such, this is not a absolute maximum. The maximum record batch size accepted by the broker is defined via message.max.bytes (broker config) or max.message.bytes (topic config). Note that the consumer performs multiple fetches in parallel. (default:52428800)

最大邮件字节数:Kafka允许的最大记录批处理大小.如果增加此值,并且有消费者的年龄大于0.10.2,则 消费者的获取量也必须增加,以便他们可以 提取这么大的记录批次.在最新的消息格式版本中, 记录总是按批次分组以提高效率.在上一个 消息格式版本,未压缩的记录未分组为 批处理,此限制仅适用于该记录中的单个记录 大小写.可以按主题级别max.message.bytes设置此主题 配置. (默认:1000012)

message.max.bytes: The largest record batch size allowed by Kafka. If this is increased and there are consumers older than 0.10.2, the consumers' fetch size must also be increased so that the they can fetch record batches this large. In the latest message format version, records are always grouped into batches for efficiency. In previous message format versions, uncompressed records are not grouped into batches and this limit only applies to a single record in that case.This can be set per topic with the topic level max.message.bytes config. (default: 1000012)

max.partition.fetch.bytes::服务器将为每个分区返回的最大数据量.记录是分批提取的 由消费者.如果第一个记录批中的第一个非空 提取的分区大于此限制,该批次仍将 返回以确保消费者可以取得进步.最大值 经纪人接受的记录批量大小是通过以下方式定义的 message.max.bytes(代理配置)或max.message.bytes(主题配置). 有关限制使用者请求大小的信息,请参见fetch.max.bytes. (默认:1048576)

max.partition.fetch.bytes: The maximum amount of data per-partition the server will return. Records are fetched in batches by the consumer. If the first record batch in the first non-empty partition of the fetch is larger than this limit, the batch will still be returned to ensure that the consumer can make progress. The maximum record batch size accepted by the broker is defined via message.max.bytes (broker config) or max.message.bytes (topic config). See fetch.max.bytes for limiting the consumer request size. (default: 1048576)

这篇关于无法从kafka主题轮询/获取所有记录的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-06 17:06