本文介绍了消费者处的Kafka kafka.common.MessageSizeTooLargeException的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

对于一些较大的消息,我遇到以下错误:

For some larger messages I encountered the following error:

kafka.common.MessageSizeTooLargeException: Message size is 1185198 bytes which exceeds the maximum configured message size of 1000012.

因此,根据此线程增加了经纪人和使用者的邮件大小:

So as per this thread increased the message size at broker and consumer:

fetch.message.max.bytes=10485760
replica.fetch.max.bytes=10485760
message.max.bytes=10485760


添加到 config/server.properties

但是随后消息通过,但消费者错误消失了:

But then the messages goes through but consumer error out:

[2015-08-26 21:08:08,722] ERROR Error processing message, stopping consumer:  (kafka.tools.ConsoleConsumer$)
kafka.common.MessageSizeTooLargeException: Found a message larger than the maximum fetch size of this consumer on topic xyz partition 0 at fetch offset 29. Increase the fetch size, or decrease the maximum message size the broker will allow.
    at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:90)
    at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:33)
    at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66)
    at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58)
    at scala.collection.Iterator$class.foreach(Iterator.scala:660)
    at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:32)
    at scala.collection.IterableLike$class.foreach(IterableLike.scala:73)
    at kafka.consumer.KafkaStream.foreach(KafkaStream.scala:25)
    at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:166)
    at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala)
Consumed 3 messages

似乎消费者没有拿起 fetch.message.max.bytes = 10485760

kafka_2.9.1-0.8.2.1

kafka_2.9.1-0.8.2.1

有指针吗?

推荐答案

您不应将 fetch.message.max.bytes 放入 config/server.properties 到您的 ConsumerConfig (请参见此处有关详细信息).如果使用控制台使用者,则可以传递-consumer.config Consumer.properties 标志,其中consumer.properties文件将包含此配置值.

You shouldn't put fetch.message.max.bytes to config/server.properties but to your ConsumerConfig (see here for details). If you are using console consumer you may pass a --consumer.config consumer.properties flag where consumer.properties file will contain this config value.

这篇关于消费者处的Kafka kafka.common.MessageSizeTooLargeException的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

10-12 17:40