问题描述
我使用 apache camel kafka 作为客户端来生成消息,我观察到 kafka 生成器需要 1 毫秒来推送消息,如果我使用骆驼聚合将消息合并到批处理中,那么推送一条消息需要 100 毫秒.
I am using apache camel kafka as client for producing message, what I observed is kafka producer taking 1 ms to push a message, if I merge message into batch by using camel aggregation then it is taking 100ms to push a single message.
安装简述3 卡夫卡集群 16 核 32GB 内存
Brief description of installation3 kafka clusther 16Core 32GB RAM
示例代码
String endpoint="kafka:test?topic=test&brokers=nodekfa:9092,nodekfb:9092,nodekfc:9092&lingerMs=0&maxInFlightRequest=1&producerBatchSize=65536";
Message message = new Message();
String payload = new ObjectMapper().writeValueAsString(message);
StopWatch stopWatch = new StopWatch();
stopWatch.watch();
for (int i=0;i<size;i++)
{
producerTemplate.sendBody(endpoint,ExchangePattern.InOnly, payload);
}
logger.info("Time taken to push {} message is {}",size,stopWatch.getElasedTime());
骆驼生产者端点
kafka:[topic]?topic=[topic]&brokers=[brokers]&maxInFlightRequest=1
尽管 kafka 文档吹嘘生产者的 tps 约为 100,000,但我获得了 1000/s 的吞吐量.
I am getting throughput of 1000/s though kafka documentation brag producer tps around 100,000.
如果camel-kafka 或kafka 本身有任何错误,请告诉我.
Let me know if there is any bug in camel-kafka or in kafka itself.
生产者配置
acks = 1
batch.size = 65536
bootstrap.servers = [nodekfa:9092, nodekfb:9092, nodekfc:9092]
buffer.memory = 33554432
client.id =
compression.type = none
connections.max.idle.ms = 540000
enable.idempotence = false
interceptor.classes = []
key.serializer = class org.apache.kafka.common.serialization.StringSerializer
linger.ms = 0
max.block.ms = 60000
max.in.flight.requests.per.connection = 1
max.request.size = 1048576
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 305000
retries = 0
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
transaction.timeout.ms = 60000
transactional.id = null
value.serializer = class org.apache.kafka.common.serialization.StringSerializer
测试日志
DEBUG [2019-06-02 17:30:46,781] c.g.p.f.u.AuditEventNotifier: >>> Took 3 millis for the exchange on the route : null
DEBUG [2019-06-02 17:30:46,781] c.g.p.f.u.AuditEventNotifier: >>> Took 3 millis to send to external system : kafka://test?brokers=nodekfa%3A9092%2Cnodekfb%3A9092%2Cnodekfc%3A9092&lingerMs=0&maxInFlightRequest=1&producerBatchSize=65536&topic=test by thead http-nio-8551-exec-6
DEBUG [2019-06-02 17:30:46,783] c.g.p.f.u.AuditEventNotifier: >>> Took 2 millis for the exchange on the route : null
DEBUG [2019-06-02 17:30:46,783] c.g.p.f.u.AuditEventNotifier: >>> Took 2 millis to send to external system : kafka://test?brokers=nodekfa%3A9092%2Cnodekfb%3A9092%2Cnodekfc%3A9092&lingerMs=0&maxInFlightRequest=1&producerBatchSize=65536&topic=test by thead http-nio-8551-exec-6
DEBUG [2019-06-02 17:30:46,784] c.g.p.f.u.AuditEventNotifier: >>> Took 1 millis for the exchange on the route : null
DEBUG [2019-06-02 17:30:46,785] c.g.p.f.u.AuditEventNotifier: >>> Took 2 millis to send to external system : kafka://test?brokers=nodekfa%3A9092%2Cnodekfb%3A9092%2Cnodekfc%3A9092&lingerMs=0&maxInFlightRequest=1&producerBatchSize=65536&topic=test by thead http-nio-8551-exec-6
DEBUG [2019-06-02 17:30:46,786] c.g.p.f.u.AuditEventNotifier: >>> Took 1 millis for the exchange on the route : null
DEBUG [2019-06-02 17:30:46,786] c.g.p.f.u.AuditEventNotifier: >>> Took 1 millis to send to external system : kafka://test?brokers=nodekfa%3A9092%2Cnodekfb%3A9092%2Cnodekfc%3A9092&lingerMs=0&maxInFlightRequest=1&producerBatchSize=65536&topic=test by thead http-nio-8551-exec-6
DEBUG [2019-06-02 17:30:46,788] c.g.p.f.u.AuditEventNotifier: >>> Took 2 millis for the exchange on the route : null
DEBUG [2019-06-02 17:30:46,788] c.g.p.f.u.AuditEventNotifier: >>> Took 2 millis to send to external system : kafka://test?brokers=nodekfa%3A9092%2Cnodekfb%3A9092%2Cnodekfc%3A9092&lingerMs=0&maxInFlightRequest=1&producerBatchSize=65536&topic=test by thead http-nio-8551-exec-6
INFO [2019-06-02 17:30:46,788] c.g.p.f.a.MessageApiController: Time taken to push 5 message is 10ms
显然消息最少需要 1ms,默认工作池最大大小为 20 ,如果我将压缩编解码器设置为 snappy,这将使性能最差.
It is clearly taking minimum 1ms for message, default worker pool max size is 20 , if i set compression codec to snappy this will make performance worst.
让我知道我错过了什么!!
Let me know what I am missing !!
推荐答案
我面临同样的问题,来自这封电子邮件 https://camel.465427.n5.nabble.com/Kafka-Producer-Performance-tp5785767p5785860.html 我用了 https://camel.apache.org/manual/latest/aggregate-eip.html 创建批次并获得了更好的性能
I am facing the same issue, from this email https://camel.465427.n5.nabble.com/Kafka-Producer-Performance-tp5785767p5785860.html I used https://camel.apache.org/manual/latest/aggregate-eip.html to create batches and got better performance
from("direct:dp.events")
.aggregate(constant(true), new ArrayListAggregationStrategy())
.completionSize(3)
.to(kafkaUri)
.to("log:out?groupInterval=1000&groupDelay=500")
.end();
我明白了:
INFO Received: 1670 new messages, with total 13949 so far. Last group took: 998 millis which is: 1,673.347 messages per second. average: 1,262.696
这是使用 1 个 Azure 事件中心,使用 Kafka 协议和一个分区.奇怪的是,当我使用另一个带有 5 个分区的 EH 时,与 1 个分区示例相比,我的性能很差...
This is using 1 Azure Event Hub using Kafka Protocol w/ one partition. The weird thing is that when I use another EH w/ 5 partitions I get bad performance compare to the 1 partition example...
我能够通过增加 workerPoolCoreSize 和 workerPoolMaxSize 每秒获得 3K 消息,此外还向消息添加分区键并在发送到 kafka 端点之前添加聚合
I was able to get 3K message per second by increasing the workerPoolCoreSize and the workerPoolMaxSize, in addition to adding partition keys to the messages and adding aggregation before sending to kafka endpoint
这篇关于为什么骆驼 kafka 生产者很慢?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!