简单的Kafka使用者未收到消息

简单的Kafka使用者未收到消息

本文介绍了简单的Kafka使用者未收到消息的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我是Kafka的新手,并运行 KafkaConsumer KafkaProducer .当我从终端运行使用者时,使用者正在接收消息,但是我无法使用Java代码进行侦听.我也在StackoverFlow上搜索了类似的问题(链接: Link1 Link2 ),并尝试了该解决方案,但似乎没有为我工作.Kafka版本:kafka_2.10-0.10.2.1,并且在pom中使用了相应的maven依赖项.

I am a newbie to Kafka and running a simple kafka consumer/producer example as given on KafkaConsumer and KafkaProducer. When I am running consumer from terminal, consumer is receiving messages but I am not able to listen using Java code.I have searched for similar issues on StackoverFlow also (Links: Link1, Link2) and tried that solutions but nothing seems to be working for me.Kafka Version: kafka_2.10-0.10.2.1 and corresponding maven dependency is used in pom.

生产者和消费者的Java代码:

Java Code for producer and consumer:

public class SimpleProducer {
public static void main(String[] args) throws InterruptedException {
    Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9094");
    props.put("acks", "all");
    props.put("retries", 0);
    props.put("batch.size", 16384);
    props.put("linger.ms", 1);
    props.put("buffer.memory", 33554432);
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

    Producer<String, String> producer = new KafkaProducer<>(props);
    for(int i = 0; i < 10; i++)
        producer.send(new ProducerRecord<String, String>("topic3", Integer.toString(i), Integer.toString(i)));

    producer.close();

}}

public class SimpleConsumer {

public static void main(String[] args) {
    Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9094");
    props.put("group.id", "test");
    props.put("zookeeper.connect", "localhost:2181");
    props.put("enable.auto.commit", "true");
    props.put("auto.commit.interval.ms", "1000");
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    consumer.subscribe(Arrays.asList("topic3", "topic2"));
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(100);
        for (ConsumerRecord<String, String> record : records)
            System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
    }
}}

启动kafka:bin/kafka-server-start.sh config/server.properties(我已经在属性文件中设置了端口,brokerid)

Starting kafka:bin/kafka-server-start.sh config/server.properties (I have already set port, brokerid in properties file)

推荐答案

首先使用以下命令检查所有可用的组:

First check what all the groups are available by using :

./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list

然后使用以下cmd检查主题所属的组:

Then check for which group your topic belongs by using below cmd :

./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group <your group name> --describe

找到主题和相关的组名后(如果该组不属于默认组,只需将 group.id 替换为您的组),然后尝试使用下面的prop并让我知道它是否有效:

Once you find your topic and associated group name (just replace group.id with your group if it not belongs to default group) then try with below prop and let me know if it works :

  props.put("bootstrap.servers", "localhost:9092");
  props.put("group.id", "test-consumer-group"); // default topic name
  props.put("enable.auto.commit", "true");
  props.put("auto.commit.interval.ms", "1000");
  props.put("session.timeout.ms", "30000");
  props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
  KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);

  //Kafka Consumer subscribes list of topics here.
  consumer.subscribe(Arrays.asList(topicName));  // replace you topic name

  //print the topic name

  java.util.Map<String,java.util.List<PartitionInfo>> listTopics = consumer.listTopics();
  System.out.println("list of topic size :" + listTopics.size());

  for(String topic : listTopics.keySet()){
      System.out.println("topic name :"+topic);
  }

这篇关于简单的Kafka使用者未收到消息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

09-06 00:30