本文介绍了带有transactionIdPrefix的DefaultKafkaProducerFactory会在引导服务器关闭时等待的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧! 问题描述 Hy,我正在使用spring-kafka 1.3.0.RELEASE创建一个事务生成器。 当引导服务器关闭时,DefaultKafkaProducerFactory会无休止地等待,直到引导服务器启动。I'm using spring-kafka 1.3.0.RELEASE to create a transactional producer.When the bootstrap server is down, the DefaultKafkaProducerFactory waits endlessly until the bootstrap server is up.我做错了什么? 我可以设置超时和/或其他类似属性吗?What am I doing wrong ? Can I set a timeout and/or other similar properties ?这是我重现场景的代码示例:This is an example of my code to reproduce the scenario:public static void main(String[] args) { final DefaultKafkaProducerFactory<Object, Object> producerFactory = new DefaultKafkaProducerFactory<>(producerConfigs()); producerFactory.setTransactionIdPrefix("transactionIdPrefix"); final Producer<Object, Object> producer = producerFactory.createProducer(); System.out.println("Created producer:" + producer); }private static Map<String, Object> producerConfigs() { final Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.56.1:9092"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); props.put(ProducerConfig.RETRIES_CONFIG, 1); props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1); props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 1000); props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 1000); props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 1000); return props;} 推荐答案这是由工厂调用引起的生成器后生成器上的 initTransactions(),例如,如果没有足够的代理支持事务日志复制因子。It's caused by the factory calling initTransactions() on the producer after creating it, for example if there are not enough brokers to support the transaction log replication factor.我不知道为什么超时不适用于该操作。I don't know why the timeouts don't apply to that operation.我们可能会更改工厂以推迟 initTransactions()直到第一个 beginTransaction() - 但这只会将问题推向下游。We could probably change the factory to defer the initTransactions() until the first beginTransaction() - but that will just push the problem downstream.我使用kafka 1.0.0客户端测试(可以与1.3.1或更高版本一起使用 - 目前为1.3.2)并且它仍然存在问题。我认为它应该尊重 TRANSACTION_TIMEOUT_CONFIG 但似乎没有。I tested with the kafka 1.0.0 client (which can be used with 1.3.1 or higher - currently 1.3.2) and it's still a problem there. I think it should honor the TRANSACTION_TIMEOUT_CONFIG but it appears not to.我建议在Kafka上打开一个问题 JIRA 。I suggest opening an issue on the Kafka JIRA. 这篇关于带有transactionIdPrefix的DefaultKafkaProducerFactory会在引导服务器关闭时等待的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!
10-11 01:08