我想用语法重新创建一个Kafka主题。
我使用kafka.admin.AdminUtils
相同。
这是我的草稿代码:
AdminUtils.deleteTopic(zkUtils, topicName);
AdminUtils.createTopic(zkUtils, topicName, partitions, replicationFactor, new Properties());
上面的代码大多数时候都有效,但是几次失败,但出现以下异常:
Exception in thread "main" kafka.common.TopicExistsException: Topic "new_topic" already exists.
at kafka.admin.AdminUtils$.createOrUpdateTopicPartitionAssignmentPathInZK(AdminUtils.scala:253)
at kafka.admin.AdminUtils$.createTopic(AdminUtils.scala:237)
at kafka.admin.AdminUtils.createTopic(AdminUtils.scala)
我了解的是该主题未正确删除。我在这里做错了什么。
最佳答案
您不能在调用deleteTopic之后立即调用createTopic。必须注意两件事:
将'delete.topic.enable'设置为true
由于删除主题是异步操作,因此最好在创建新主题之前确保已从Zookeeper中成功删除所有元数据
关于java - 以编程方式创建Kafka主题,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/40676168/