我已经在我的 Windows PC 上构建了一个小型测试环境并写下以下代码来测试 kafka(使用来自 org.apache.kafka 的 kafka_2.10:0.9.0.1)。

package iii.functiontesting;

import java.text.ParseException;
import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;


/**
 * Hello world!
 *
 */
public class test4
{
    public static void main( String[] args ) throws ParseException
    {
        Properties producerProps=new Properties();
        producerProps.put("bootstrap.servers", "localhost:9092");
        producerProps.put("serializer.class",org.apache.kafka.common.serialization.StringSerializer.class.getName());
        producerProps.put("key.serializer",org.apache.kafka.common.serialization.StringSerializer.class.getName());
        producerProps.put("value.serializer",org.apache.kafka.common.serialization.StringSerializer.class.getName());
        producerProps.put("request.required.acks","1");
        KafkaProducer<String,String> kafkawriter= new KafkaProducer<String,String>(producerProps);
        ProducerRecord<String,String> msg=new ProducerRecord<>("TEST3","ImKey","teststring1");
        kafkawriter.send(msg);
    }
}

我使用以下命令检查消息是否正确写入队列



但是,我发现 kafka-console-consumer 没有显示任何内容。

我怀疑我的kafka服务器无法正常运行,因此我使用console-producer进行测试。



这次我可以看到控制台消费者下清楚地显示了 aaaaa。
我无法弄清楚会发生什么。
谁能帮我?

最佳答案

在终止程序之前,您必须调用 KafkaProducer#flush [或] KafkaProducer#close 方法。

实际上,生产者在将记录发送给代理之前会对其进行缓冲。参见 Kafka Producer configuration 中的 buffer.memorybatch.size

kafkawriter.send(msg);
kafkawriter.close();

10-08 20:14