由于1.0.0存在OOM问题,因此我已更新至1.0.1版本。

我设置了具有四个代理的集群。

大约有150个主题,大约有4000个分区,ReplicationFactor是2。
连接器用于向代理写入数据或从代理读取数据。
连接器版本为0.10.1。
平均消息大小为500B,每秒约60000条消息。
经纪人之一保留报告OOM,并且无法处理以下请求:

    [2018-03-24 12:37:17,449]错误[KafkaApi-1001]处理请求{replica_id = -1,max_wait_time = 500,min_bytes = 1,topics = [{topic = voltetraffica.data,partitions = [
    {partition = 16,fetch_offset = 51198,max_bytes = 60728640},{partition = 12,fetch_offset = 50984,max_bytes = 60728640}]}}}(kafka.server.KafkaApis)
    java.lang.OutOfMemoryError:Java堆空间
    在java.nio.HeapByteBuffer。(HeapByteBuffer.java:57)
    在java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
    在org.apache.kafka.common.record.AbstractRecords.downConvert(AbstractRecords.java:101)
    在org.apache.kafka.common.record.FileRecords.downConvert(FileRecords.java:253)
    在kafka.server.KafkaApis $$ anonfun $ kafka $ server $ KafkaApis $$ convertedPartitionData $ 1 $ 1 $ anonfun $ apply $ 4.apply(KafkaApis.scala:525)
    在kafka.server.KafkaApis $$ anonfun $ kafka $ server $ KafkaApis $$ convertedPartitionData $ 1 $ 1 $ anonfun $ apply $ 4.apply(KafkaApis.scala:523)
    在scala.Option.map(Option.scala:146)
    在kafka.server.KafkaApis $$ anonfun $ kafka $ server $ KafkaApis $$ convertedPartitionData $ 1 $ 1.apply(KafkaApis.scala:523)
    在kafka.server.KafkaApis $$ anonfun $ kafka $ server $ KafkaApis $$ convertedPartitionData $ 1 $ 1.apply(KafkaApis.scala:513)
    在scala.Option.flatMap(Option.scala:171)
    在kafka.server.KafkaApis.kafka $ server $ KafkaApis $$ convertedPartitionData $ 1(KafkaApis.scala:513)
    在kafka.server.KafkaApis $$ anonfun $ kafka $ server $ KafkaApis $$ createResponse $ 2 $ 1.apply(KafkaApis.scala:561)
    在kafka.server.KafkaApis $$ anonfun $ kafka $ server $ KafkaApis $$ createResponse $ 2 $ 1.apply(KafkaApis.scala:560)
    在scala.collection.Iterator $ class.foreach(Iterator.scala:891)
    在scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
    在scala.collection.IterableLike $ class.foreach(IterableLike.scala:72)
    在scala.collection.AbstractIterable.foreach(Iterable.scala:54)
    在kafka.server.KafkaApis.kafka $ server $ KafkaApis $$ createResponse $ 2(KafkaApis.scala:560)
    在kafka.server.KafkaApis $$ anonfun $ kafka $ server $ KafkaApis $$ fetchResponseCallback $ 1 $ 1.apply(KafkaApis.scala:574)
    在kafka.server.KafkaApis $$ anonfun $ kafka $ server $ KafkaApis $$ fetchResponseCallback $ 1 $ 1.apply(KafkaApis.scala:574)
    在kafka.server.KafkaApis $$ anonfun $ sendResponseMaybeThrottle $ 1.apply $ mcVI $ sp(KafkaApis.scala:2041)
    在kafka.server.ClientRequestQuotaManager.maybeRecordAndThrottle(ClientRequestQuotaManager.scala:54)
    在kafka.server.KafkaApis.sendResponseMaybeThrottle(KafkaApis.scala:2040)
    在kafka.server.KafkaApis.kafka $ server $ KafkaApis $$ fetchResponseCallback $ 1(KafkaApis.scala:574)
    在kafka.server.KafkaApis $$ anonfun $ kafka $ server $ KafkaApis $$ processResponseCallback $ 1 $ 1.apply $ mcVI $ sp(KafkaApis.scala:593)
    在kafka.server.ClientQuotaManager.maybeRecordAndThrottle(ClientQuotaManager.scala:176)
    在kafka.server.KafkaApis.kafka $ server $ KafkaApis $$ processResponseCallback $ 1(KafkaApis.scala:592)
    在kafka.server.KafkaApis $$ anonfun $ handleFetchRequest $ 4.apply(KafkaApis.scala:609)
    在kafka.server.KafkaApis $$ anonfun $ handleFetchRequest $ 4.apply(KafkaApis.scala:609)
    在kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:820)
    在kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:601)
    在kafka.server.KafkaApis.handle(KafkaApis.scala:99)
  

然后是大量收缩ISR(此经纪商为1001)
        018-03-24 13:43:00,285]信息[分区gnup.source.offset.storage.topic-5 broker = 1001] ISR从1001,1002缩减到1001(kafka.cluster.Partition)
    018-03-24 13:43:00,286]信息[分区s1mme.data-72 broker = 1001]将ISR从1001,1002缩小到1001(kafka.cluster.Partition)
    018-03-24 13:43:00,286]信息[分区gnup.sink.status.storage.topic-17 broker = 1001] ISR从1001,1002缩减到1001(kafka.cluster.Partition)
    018-03-24 13:43:00,287]信息[分区probessgsniups.sink.offset.storage.topic-4 broker = 1001] ISR从1001,1002缩减到1001(kafka.cluster.Partition)
    018-03-24 13:43:01,447]信息[GroupCoordinator 1001]:稳定的组connect-VOICE_1_SINK_CONN生成26(__consumer_offsets-18)(kafka.coordinator.group.GroupCoordinator)

自从每次运行以来,我都无法转储堆:
[root @ sslave1 kafka]#jcmd 55409 GC.heap_dump /home/ngdb/heap_dump175.hprof
55409:

    com.sun.tools.attach.AttachNotSupportedException:无法打开套接字文件:目标进程未响应或未加载HotSpot VM
    在sun.tools.attach.LinuxVirtualMachine。(LinuxVirtualMachine.java:106)
    在sun.tools.attach.LinuxAttachProvider.attachVirtualMachine(LinuxAttachProvider.java:63)
    在com.sun.tools.attach.VirtualMachine.attach(VirtualMachine.java:208)
    在sun.tools.jcmd.JCmd.executeCommandForPid(JCmd.java:147)
    在sun.tools.jcmd.JCmd.main(JCmd.java:131)
   

JVM参数是:

    -XX:+ ExplicitGCInvokesConcurrent -XX:GCLogFileSize = 104857600 -XX:InitialHeapSize = 2147483648 -XX:InitiatingHeapOccupancyPercent = 35 -XX:+ ManagementServer -XX:MaxGCPauseMillis = 20 -XX:MaxHeapSize = 4294967296 -XX:NumberOfGCLogFiles = 10- PrintGC -XX:+ PrintGCDateStamps -XX:+ PrintGCDetails -XX:+ PrintGCTimeStamps -XX:+ UseCompressedClassPointers -XX:+ UseCompressedOops -XX:+ UseG1GC -XX:+ UseGCLogFileRotation


当我使用-XX:mx = 2G时,四个经纪人报告了OOM,
在我将其创建为4G之后,只有一个经纪人报告了OOM。
股票代号在https://issues.apache.org/jira/browse/KAFKA-6709中也被提高。

最佳答案

在0.10.X和> = 0.11.X Kafka版本之间,消息格式已更改。

因此,当使用较旧的客户端( = 0.11)时,代理必须先下转换消息,然后再将其发送回客户端。升级说明中记录了这一点:http://kafka.apache.org/documentation/#upgrade_11_message_format

您可以在堆栈跟踪中看到确实发生了这种情况:

at java.nio.HeapByteBuffer.(HeapByteBuffer.java:57)
at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
at org.apache.kafka.common.record.AbstractRecords.downConvert(AbstractRecords.java:101)
at org.apache.kafka.common.record.FileRecords.downConvert(FileRecords.java:253)


这带来了性能上的下降,并且还增加了所需的内存量,因为代理需要分配新的缓冲区来创建向下转换的消息。

您应该尝试将客户端升级到与代理相同的版本。另外还要考虑您当前的堆有多小(4GB),增加它可能会有所帮助。

另一个选择是强制较新的代理使用较旧的消息格式(使用log.message.format.version),但这会阻止您使用某些较新的功能。

10-02 05:41