由于1.0.0存在OOM问题,因此我已更新至1.0.1版本。
我设置了具有四个代理的集群。
大约有150个主题,大约有4000个分区,ReplicationFactor是2。
连接器用于向代理写入数据或从代理读取数据。
连接器版本为0.10.1。
平均消息大小为500B,每秒约60000条消息。
经纪人之一保留报告OOM,并且无法处理以下请求:
[2018-03-24 12:37:17,449]错误[KafkaApi-1001]处理请求{replica_id = -1,max_wait_time = 500,min_bytes = 1,topics = [{topic = voltetraffica.data,partitions = [
{partition = 16,fetch_offset = 51198,max_bytes = 60728640},{partition = 12,fetch_offset = 50984,max_bytes = 60728640}]}}}(kafka.server.KafkaApis)
java.lang.OutOfMemoryError:Java堆空间
在java.nio.HeapByteBuffer。(HeapByteBuffer.java:57)
在java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
在org.apache.kafka.common.record.AbstractRecords.downConvert(AbstractRecords.java:101)
在org.apache.kafka.common.record.FileRecords.downConvert(FileRecords.java:253)
在kafka.server.KafkaApis $$ anonfun $ kafka $ server $ KafkaApis $$ convertedPartitionData $ 1 $ 1 $ anonfun $ apply $ 4.apply(KafkaApis.scala:525)
在kafka.server.KafkaApis $$ anonfun $ kafka $ server $ KafkaApis $$ convertedPartitionData $ 1 $ 1 $ anonfun $ apply $ 4.apply(KafkaApis.scala:523)
在scala.Option.map(Option.scala:146)
在kafka.server.KafkaApis $$ anonfun $ kafka $ server $ KafkaApis $$ convertedPartitionData $ 1 $ 1.apply(KafkaApis.scala:523)
在kafka.server.KafkaApis $$ anonfun $ kafka $ server $ KafkaApis $$ convertedPartitionData $ 1 $ 1.apply(KafkaApis.scala:513)
在scala.Option.flatMap(Option.scala:171)
在kafka.server.KafkaApis.kafka $ server $ KafkaApis $$ convertedPartitionData $ 1(KafkaApis.scala:513)
在kafka.server.KafkaApis $$ anonfun $ kafka $ server $ KafkaApis $$ createResponse $ 2 $ 1.apply(KafkaApis.scala:561)
在kafka.server.KafkaApis $$ anonfun $ kafka $ server $ KafkaApis $$ createResponse $ 2 $ 1.apply(KafkaApis.scala:560)
在scala.collection.Iterator $ class.foreach(Iterator.scala:891)
在scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
在scala.collection.IterableLike $ class.foreach(IterableLike.scala:72)
在scala.collection.AbstractIterable.foreach(Iterable.scala:54)
在kafka.server.KafkaApis.kafka $ server $ KafkaApis $$ createResponse $ 2(KafkaApis.scala:560)
在kafka.server.KafkaApis $$ anonfun $ kafka $ server $ KafkaApis $$ fetchResponseCallback $ 1 $ 1.apply(KafkaApis.scala:574)
在kafka.server.KafkaApis $$ anonfun $ kafka $ server $ KafkaApis $$ fetchResponseCallback $ 1 $ 1.apply(KafkaApis.scala:574)
在kafka.server.KafkaApis $$ anonfun $ sendResponseMaybeThrottle $ 1.apply $ mcVI $ sp(KafkaApis.scala:2041)
在kafka.server.ClientRequestQuotaManager.maybeRecordAndThrottle(ClientRequestQuotaManager.scala:54)
在kafka.server.KafkaApis.sendResponseMaybeThrottle(KafkaApis.scala:2040)
在kafka.server.KafkaApis.kafka $ server $ KafkaApis $$ fetchResponseCallback $ 1(KafkaApis.scala:574)
在kafka.server.KafkaApis $$ anonfun $ kafka $ server $ KafkaApis $$ processResponseCallback $ 1 $ 1.apply $ mcVI $ sp(KafkaApis.scala:593)
在kafka.server.ClientQuotaManager.maybeRecordAndThrottle(ClientQuotaManager.scala:176)
在kafka.server.KafkaApis.kafka $ server $ KafkaApis $$ processResponseCallback $ 1(KafkaApis.scala:592)
在kafka.server.KafkaApis $$ anonfun $ handleFetchRequest $ 4.apply(KafkaApis.scala:609)
在kafka.server.KafkaApis $$ anonfun $ handleFetchRequest $ 4.apply(KafkaApis.scala:609)
在kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:820)
在kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:601)
在kafka.server.KafkaApis.handle(KafkaApis.scala:99)
然后是大量收缩ISR(此经纪商为1001)
018-03-24 13:43:00,285]信息[分区gnup.source.offset.storage.topic-5 broker = 1001] ISR从1001,1002缩减到1001(kafka.cluster.Partition)
018-03-24 13:43:00,286]信息[分区s1mme.data-72 broker = 1001]将ISR从1001,1002缩小到1001(kafka.cluster.Partition)
018-03-24 13:43:00,286]信息[分区gnup.sink.status.storage.topic-17 broker = 1001] ISR从1001,1002缩减到1001(kafka.cluster.Partition)
018-03-24 13:43:00,287]信息[分区probessgsniups.sink.offset.storage.topic-4 broker = 1001] ISR从1001,1002缩减到1001(kafka.cluster.Partition)
018-03-24 13:43:01,447]信息[GroupCoordinator 1001]:稳定的组connect-VOICE_1_SINK_CONN生成26(__consumer_offsets-18)(kafka.coordinator.group.GroupCoordinator)
自从每次运行以来,我都无法转储堆:
[root @ sslave1 kafka]#jcmd 55409 GC.heap_dump /home/ngdb/heap_dump175.hprof
55409:
com.sun.tools.attach.AttachNotSupportedException:无法打开套接字文件:目标进程未响应或未加载HotSpot VM
在sun.tools.attach.LinuxVirtualMachine。(LinuxVirtualMachine.java:106)
在sun.tools.attach.LinuxAttachProvider.attachVirtualMachine(LinuxAttachProvider.java:63)
在com.sun.tools.attach.VirtualMachine.attach(VirtualMachine.java:208)
在sun.tools.jcmd.JCmd.executeCommandForPid(JCmd.java:147)
在sun.tools.jcmd.JCmd.main(JCmd.java:131)
JVM参数是:
-XX:+ ExplicitGCInvokesConcurrent -XX:GCLogFileSize = 104857600 -XX:InitialHeapSize = 2147483648 -XX:InitiatingHeapOccupancyPercent = 35 -XX:+ ManagementServer -XX:MaxGCPauseMillis = 20 -XX:MaxHeapSize = 4294967296 -XX:NumberOfGCLogFiles = 10- PrintGC -XX:+ PrintGCDateStamps -XX:+ PrintGCDetails -XX:+ PrintGCTimeStamps -XX:+ UseCompressedClassPointers -XX:+ UseCompressedOops -XX:+ UseG1GC -XX:+ UseGCLogFileRotation
当我使用-XX:mx = 2G时,四个经纪人报告了OOM,
在我将其创建为4G之后,只有一个经纪人报告了OOM。
股票代号在https://issues.apache.org/jira/browse/KAFKA-6709中也被提高。
最佳答案
在0.10.X和> = 0.11.X Kafka版本之间,消息格式已更改。
因此,当使用较旧的客户端( = 0.11)时,代理必须先下转换消息,然后再将其发送回客户端。升级说明中记录了这一点:http://kafka.apache.org/documentation/#upgrade_11_message_format。
您可以在堆栈跟踪中看到确实发生了这种情况:
at java.nio.HeapByteBuffer.(HeapByteBuffer.java:57)
at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
at org.apache.kafka.common.record.AbstractRecords.downConvert(AbstractRecords.java:101)
at org.apache.kafka.common.record.FileRecords.downConvert(FileRecords.java:253)
这带来了性能上的下降,并且还增加了所需的内存量,因为代理需要分配新的缓冲区来创建向下转换的消息。
您应该尝试将客户端升级到与代理相同的版本。另外还要考虑您当前的堆有多小(4GB),增加它可能会有所帮助。
另一个选择是强制较新的代理使用较旧的消息格式(使用
log.message.format.version
),但这会阻止您使用某些较新的功能。