我正在使用Kafka 0.8 beta,我只是想弄乱发送不同的对象,使用我自己的编码器序列化它们,然后将它们发送到现有的代理配置。现在,我正在尝试使DefaultEncoder正常工作。
我具有代理程序,并且一切都可以为StringEncoder设置和工作,但是我无法获取任何其他数据类型,包括纯字节[],由代理程序发送和接收。
我对生产者的代码是:
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import java.util.Date;
import java.util.Properties;
import java.util.Random;
public class ProducerTest {
public static void main(String[] args) {
long events = 5;
Random rnd = new Random();
rnd.setSeed(new Date().getTime());
Properties props = new Properties();
props.setProperty("metadata.broker.list", "localhost:9093,localhost:9094");
props.setProperty("serializer.class", "kafka.serializer.DefaultEncoder");
props.setProperty("partitioner.class", "example.producer.SimplePartitioner");
props.setProperty("request.required.acks", "1");
props.setProperty("producer.type", "async");
props.setProperty("batch.num.messages", "4");
ProducerConfig config = new ProducerConfig(props);
Producer<byte[], byte[]> producer = new Producer<byte[], byte[]>(config);
for (long nEvents = 0; nEvents < events; nEvents++) {
byte[] a = "Hello".getBytes();
byte[] b = "There".getBytes();
KeyedMessage<byte[], byte[]> data = new KeyedMessage<byte[], byte[]>("page_visits", a, b);
producer.send(data);
}
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
producer.close();
}
}
我使用了与示例here中相同的SimplePartitioner,并用Strings替换了所有字节数组,并将序列化器更改为kafka.serializer.StringEncoder可以正常工作。
作为参考,SimplePartitioner:
import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties;
public class SimplePartitioner implements Partitioner<String> {
public SimplePartitioner (VerifiableProperties props) {
}
public int partition(String key, int a_numPartitions) {
int partition = 0;
int offset = key.lastIndexOf('.');
if (offset > 0) {
partition = Integer.parseInt( key.substring(offset+1)) % a_numPartitions;
}
return partition;
}
}
我究竟做错了什么?
最佳答案
答案是分区类SimplePartitioner
仅适用于字符串。当我尝试异步运行Producer时,它将创建一个单独的线程,该线程在发送给代理之前处理编码和分区。当意识到SimplePartitioner仅适用于Strings时,该线程遇到了障碍,但是由于它是一个单独的线程,因此不会引发任何异常,因此该线程仅退出而没有任何不当行为的迹象。
例如,如果我们将SimplePartitioner更改为接受byte [],则:
import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties;
public class SimplePartitioner implements Partitioner<byte[]> {
public SimplePartitioner (VerifiableProperties props) {
}
public int partition(byte[] key, int a_numPartitions) {
int partition = 0;
return partition;
}
}
现在可以完美运行了。