本文介绍了spring kafka 模板生产者表现的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用 Spring Kafka 模板来生成消息.而且它产生消息的速度太慢了.生成 15000 条消息大约需要 8 分钟.

I am using Spring Kafka template for producing messages. And the rate at which it is producing the messages is too slow. Takes around 8 mins for producing 15000 messages.

以下是我如何创建 Kafka 模板:

Following is How I created the Kafka template:

 @Bean
  public ProducerFactory<String, GenericRecord> highSpeedAvroProducerFactory(
      @Qualifier("highSpeedProducerProperties") KafkaProperties properties) {
    final Map<String, Object> kafkaPropertiesMap = properties.getKafkaPropertiesMap();
    System.out.println(kafkaPropertiesMap);
    kafkaPropertiesMap.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    kafkaPropertiesMap.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, AvroGenericSerializer.class);
    return new DefaultKafkaProducerFactory<>(kafkaPropertiesMap);
  }

  @Bean
  public KafkaTemplate<String, GenericRecord> highSpeedAvroKafkaTemplate(
      @Qualifier("highSpeedAvroProducerFactory") ProducerFactory<String, GenericRecord> highSpeedAvroProducerFactory) {
    return new KafkaTemplate<>(highSpeedAvroProducerFactory);
  }

以下是我使用模板发送消息的方式:

Here is how I am using the template to send the messages:

@Async("servicingPlatformUpdateExecutor")
  public void afterWrite(List<? extends Account> items) {
    LOGGER.info("Batch start:{}",items.size());
    for (Test test : items) {
        if (test.isOmega()) {

          ObjectKeyRecord objectKeyRecord = ObjectKeyRecord.newBuilder().setType("test").setId(test.getId()).build();
          LOGGER.info("build start, {}",test.getId());

          GenericRecord message = MessageUtils.buildEventRecord(
              schemaService.findSchema(topicName)
                  .orElseThrow(() -> new OmegaException("SchemaNotFoundException", topicName)), objectKeyRecord, test);
          LOGGER.info("build end, {}",account.getId());
          LOGGER.info("send Started , {}",account.getId());
          ListenableFuture<SendResult<String, GenericRecord>> future = highSpeedAvroKafkaTemplate.send(topicName, objectKeyRecord.toString(), message);
          LOGGER.info("send Done , {}",test.getId());
          future.addCallback(new KafkaProducerFutureCallback(kafkaSender, topicName, objectKeyRecord.toString(), message));
        }
    }
    LOGGER.info("Batch end}");

  }

生产者属性:

metric.reporters = []
metadata.max.age.ms = 300000
reconnect.backoff.ms = 50
sasl.kerberos.ticket.renew.window.factor = 0.8
bootstrap.servers = [***VALID BROKERS****))]
ssl.keystore.type = JKS
sasl.mechanism = GSSAPI
max.block.ms = 9223372036854775807
interceptor.classes = null
ssl.truststore.password = null
client.id = producer-1
ssl.endpoint.identification.algorithm = null
request.timeout.ms = 30000
acks = all
receive.buffer.bytes = 32768
ssl.truststore.type = JKS
retries = 2147483647
ssl.truststore.location = null
ssl.keystore.password = null
send.buffer.bytes = 131072
compression.type = none
metadata.fetch.timeout.ms = 60000
retry.backoff.ms = 100
sasl.kerberos.kinit.cmd = /usr/bin/kinit
buffer.memory = 800000000
timeout.ms = 30000
key.serializer = class org.apache.kafka.common.serialization.StringSerializer
sasl.kerberos.service.name = kafka
sasl.kerberos.ticket.renew.jitter = 0.05
ssl.trustmanager.algorithm = PKIX
block.on.buffer.full = false
ssl.key.password = null
sasl.kerberos.min.time.before.relogin = 60000
connections.max.idle.ms = 540000
max.in.flight.requests.per.connection = 10
metrics.num.samples = 2
ssl.protocol = TLS
ssl.provider = null
ssl.enabled.protocols = [TLSv1.2]
batch.size = 40000000
ssl.keystore.location = null
ssl.cipher.suites = null
security.protocol = SASL_SSL
max.request.size = 1048576
value.serializer = class com.message.serialization.AvroGenericSerializer
ssl.keymanager.algorithm = SunX509
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
linger.ms = 2

这是显示调用 kakfatemplate send 方法需要几毫秒的日志:

Here is the log which shows call to the kakfatemplate send method takes few milliseconds:

2018-04-27 05:29:05.691 INFO  - testservice -  - UpdateExecutor-1 - com.test.testservice.adapter.batch.testsyncjob.UpdateWriteListener:70 - build start, 1
2018-04-27 05:29:05.691 INFO  - testservice -  - UpdateExecutor-1 - com.test.testservice.adapter.batch.testsyncjob.UpdateWriteListener:75 - build end, 1
2018-04-27 05:29:05.691 INFO  - testservice -  - UpdateExecutor-1 - com.test.testservice.adapter.batch.testsyncjob.UpdateWriteListener:76 - send Started , 1
2018-04-27 05:29:05.778 INFO  - testservice -  - UpdateExecutor-1 - com.test.testservice.adapter.batch.testsyncjob.UpdateWriteListener:79 - send Done , 1
2018-04-27 05:29:07.794 INFO  - testservice -  - kafka-producer-network-thread | producer-1 - com.test.testservice.adapter.batch.testsyncjob.KafkaProducerFutureCallback:38

关于如何提高发件人性能的任何建议将不胜感激

Any suggestion on how can I improve the performance for the sender would be greatly appreciated

Spring Kakfa 版本:1.2.3.RELEASE卡夫卡客户端:0.10.2.1

Spring Kakfa version: 1.2.3.RELEASEKafka client: 0.10.2.1

更新 1:

将 Serializer 更改为 ByteArraySerializer 然后生成相同的.我仍然看到 kafkatempate 上的每个发送方法调用需要 100 到 200 毫秒

Changed the Serializer to ByteArraySerializer and then produced the same.I still see the each send method call on kafkatempate takes 100 to 200 milliseconds

ObjectKeyRecord objectKeyRecord = ObjectKeyRecord.newBuilder().setType("test").setId(test.getId()).build();
          GenericRecord message = MessageUtils.buildEventRecord(
              schemaService.findSchema(testConversionTopicName)
                  .orElseThrow(() -> new TestException("SchemaNotFoundException", testTopicName)), objectKeyRecord, test);
          byte[] messageBytes = serializer.serialize(testConversionTopicName,message);
          LOGGER.info("send Started , {}",test.getId());
          ListenableFuture<SendResult<String, byte[]>> future = highSpeedAvroKafkaTemplate.send(testConversionTopicName, objectKeyRecord.toString(), messageBytes);
          LOGGER.info("send Done , {}",test.getId());
          future.addCallback(new KafkaProducerFutureCallback(kafkaSender, testConversionTopicName, objectKeyRecord.toString(), message));

推荐答案

您是否对应用程序进行了概要分析?例如使用 YourKit.

Have you profiled your application? e.g. with YourKit.

我怀疑是 Avro 序列化程序;我能够在 274 毫秒内发送 15,000 条 1000 字节的消息.

I suspect it's the Avro serializer; I am able to send 15,000 1000 byte messages in 274ms.

@SpringBootApplication
public class So50060086Application {

    public static void main(String[] args) {
        SpringApplication.run(So50060086Application.class, args);
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<String, String> template) {
        return args -> {
            Thread.sleep(5_000);
            String payload = new String(new byte[999]);
            StopWatch watch = new StopWatch();
            watch.start();
            for (int i = 0; i < 15_000; i++) {
                template.send("so50060086a", "" + i + payload);
            }
            watch.stop();
            System.out.println(watch.prettyPrint());
        };
    }

    @Bean
    public NewTopic topic() {
        return new NewTopic("so50060086a", 1, (short) 1);
    }
}

StopWatch '': running time (millis) = 274

这篇关于spring kafka 模板生产者表现的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

07-20 17:47