我试图从代码中将分区号设置为2,并且我有单节点设置(1个zookeeper,1kafka)。当我使用该消息时,看到kafka仅使用一个分区来存储数据,是否需要对设置进行任何修改以具有多个分区?
private void setupZookeeper(String[] topicList){
ZkClient zkClient = null;
ZkUtils zkUtils = null;
try {
String[] zookeeperHosts = {"localhost:2181"}; // If multiple zookeeper then -> String zookeeperHosts = "192.168.20.1:2181,192.168.20.2:2181";
int sessionTimeOutInMs = 15 * 1000; // 15 secs
int connectionTimeOutInMs = 10 * 1000; // 10 secs
//String topicName = "testTopic";
int noOfPartitions = 2;
int noOfReplication = 1;
for(String zookeeper:zookeeperHosts){
zkClient = new ZkClient(zookeeper, sessionTimeOutInMs, connectionTimeOutInMs, ZKStringSerializer$.MODULE$);
zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeper), false);
for(String topicName: topicList){
System.out.println("Setting no of partitions ="+noOfPartitions + "for topic" + topicName);
AdminUtils.createTopic(zkUtils, topicName, noOfPartitions, noOfReplication,
producerConfig(),RackAwareMode.Disabled$.MODULE$);
}
}
} catch (Exception ex) {
ex.printStackTrace();
} finally {
if (zkClient != null) {
zkClient.close();
}
}
我的producerConfig如下所示:
private Properties producerConfig() {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
//props.put("retries", 0);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
return props;
}
最佳答案
当我使用该消息时,我看到kafka仅使用一个
分区存储数据
如下所示的默认消息分区策略“仅使用一个分区”可能是由恒定的消息密钥,计算出的相同哈希值以及仅路由到一个分区引起的。
如果记录中指定了分区,请使用它;
如果未指定分区但存在密钥,则根据密钥的哈希值选择一个分区;
如果没有分区或密钥,则以循环方式选择一个分区。
您
关于java - 在Apache Kafka中设置多个分区,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/38601184/