Kafka生产者TimeoutException

Kafka生产者TimeoutException

本文介绍了Kafka生产者TimeoutException:即将到期1条记录的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在 Spring-boot:

Kafka Producer类:

@Service
public class MyKafkaProducer {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    private static Logger LOGGER = LoggerFactory.getLogger(NotificationDispatcherSender.class);

    // Send Message
    public void sendMessage(String topicName, String message) throws Exception {
        LOGGER.debug("========topic Name===== " + topicName + "=========message=======" + message);
        ListenableFuture<SendResult<String, String>> result = kafkaTemplate.send(topicName, message);
        result.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
            @Override
            public void onSuccess(SendResult<String, String> result) {
                LOGGER.debug("sent message='{}' with offset={}", message, result.getRecordMetadata().offset());
            }

            @Override
            public void onFailure(Throwable ex) {
                LOGGER.error(Constants.PRODUCER_MESSAGE_EXCEPTION.getValue() + " : " + ex.getMessage());
            }
        });
    }
}

Kafka配置:

spring.kafka.producer.retries=0
spring.kafka.producer.batch-size=100000
spring.kafka.producer.request.timeout.ms=30000
spring.kafka.producer.linger.ms=10
spring.kafka.producer.acks=0
spring.kafka.producer.buffer-memory=33554432
spring.kafka.producer.max.block.ms=5000
spring.kafka.bootstrap-servers=192.168.1.161:9092,192.168.1.162:9093

比方说,我在主题my-test-topic中已发送10次10​​00条消息.

Let's say I have sent 10 times 1000 messages in topic my-test-topic.

在10次中,有8次我成功地在我的使用者中获取了所有消息,但是有时我在错误下得到此消息:

8 out of 10 times I am successfully getting all messages in my consumer but sometimes I am getting this below error:

2017-10-05 07:24:11, [ERROR] [my-service - LoggingProducerListener - onError:76] Exception thrown when sending a message with key='null' and payload='{"deviceType":"X","deviceKeys":[{"apiKey":"X-X-o"}],"devices...' to topic my-test-topic

org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for my-test-topic-4 due to 30024 ms has passed since batch creation plus linger time

推荐答案

有3种可能性:

  1. 增加request.timeout.ms-这是Kafka等待缓冲区中的整个批处理准备就绪的时间.因此,在您的情况下,如果缓冲区中的消息少于10万条,则会发生超时.此处更多信息: https://stackoverflow.com/a/34794261/2707179
  2. 减少batch-size-与上一点有关,它将更频繁地发送批处理,但是它们将包含较少的消息.
  3. 取决于消息的大小,也许您的网络无法赶上高负载?检查您的吞吐量是否不是瓶颈.
  1. Increase request.timeout.ms - this is the time that Kafka will wait for whole batch to be ready in buffer. So in your case if there are less than 100 000 messages in buffer, timeout will occur. More info here: https://stackoverflow.com/a/34794261/2707179
  2. Decrease batch-size - related to previous point, it will send batches more often but they will include fewer messages.
  3. Depending on message size, maybe your network cannot catch up with high load? Check if your throughput is not a bottleneck.

这篇关于Kafka生产者TimeoutException:即将到期1条记录的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-13 09:09