本文介绍了当引导服务器关闭时,带有 transactionIdPrefix 的 DefaultKafkaProducerFactory 无限等待的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用 spring-kafka 1.3.0.RELEASE 创建一个事务性生产者.当引导服务器关闭时,DefaultKafkaProducerFactory 无休止地等待,直到引导服务器启动.

I'm using spring-kafka 1.3.0.RELEASE to create a transactional producer.When the bootstrap server is down, the DefaultKafkaProducerFactory waits endlessly until the bootstrap server is up.

我做错了什么?我可以设置超时和/或其他类似的属性吗?

What am I doing wrong ? Can I set a timeout and/or other similar properties ?

这是我重现场景的代码示例:

This is an example of my code to reproduce the scenario:

public static void main(String[] args) {

    final DefaultKafkaProducerFactory<Object, Object> producerFactory = new DefaultKafkaProducerFactory<>(producerConfigs());    

    producerFactory.setTransactionIdPrefix("transactionIdPrefix");

    final Producer<Object, Object> producer = producerFactory.createProducer();       

    System.out.println("Created producer:" + producer);  
}

private static Map<String, Object> producerConfigs() {

    final Map<String, Object> props = new HashMap<>();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.56.1:9092");
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

    props.put(ProducerConfig.RETRIES_CONFIG, 1);
    props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);

    props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 1000);    
    props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 1000);
    props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 1000);

   return props;
}

推荐答案

是由工厂在创建生产者后调用 initTransactions() 引起的,例如如果没有足够的 broker 来支持事务日志复制因子.

It's caused by the factory calling initTransactions() on the producer after creating it, for example if there are not enough brokers to support the transaction log replication factor.

我不知道为什么超时不适用于该操作.

I don't know why the timeouts don't apply to that operation.

我们可能会更改工厂以将 initTransactions() 推迟到第一个 beginTransaction() - 但这只会将问题推向下游.

We could probably change the factory to defer the initTransactions() until the first beginTransaction() - but that will just push the problem downstream.

我使用 kafka 1.0.0 客户端(可用于 1.3.1 或更高版本 - 目前为 1.3.2)进行测试,但仍然存在问题.我认为它应该遵守 TRANSACTION_TIMEOUT_CONFIG 但它似乎没有.

I tested with the kafka 1.0.0 client (which can be used with 1.3.1 or higher - currently 1.3.2) and it's still a problem there. I think it should honor the TRANSACTION_TIMEOUT_CONFIG but it appears not to.

我建议在 Kafka JIRA 上打开一个问题.

I suggest opening an issue on the Kafka JIRA.

这篇关于当引导服务器关闭时,带有 transactionIdPrefix 的 DefaultKafkaProducerFactory 无限等待的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

09-23 17:28