本文介绍了KafkaConsumer 连接到错误的代理,因此无法消费的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我根据 Kafka 文档中的示例编写了一个小脚本:

I've written a small script based on an example in the Kafka documentation:

import org.apache.kafka.clients.consumer.KafkaConsumer
import org.slf4j.LoggerFactory

import collection.JavaConverters._
import java.util.Properties

object ConsumerDemo extends App {
  val logger = LoggerFactory.getLogger(getClass.getSimpleName)

  def consumerFromKafka() = {
    val props = new Properties()
    props.put("bootstrap.servers", broker_address)
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    props.put("auto.offset.reset", "latest")
    props.put("group.id", "consumer-group")
    logger.info(s"Properties: ${props}")
    val consumer: KafkaConsumer[String, String] = new KafkaConsumer[String, String](props)
    val topics: List[String]                    = topic :: Nil
    consumer.subscribe(topics.asJava)
    while (true) {
      val record = consumer.poll(1000).asScala
      for (data <- record.iterator)
        println(data.value())
    }
  }

  consumerFromKafka()

}

当我运行脚本时,我得到以下输出:

When I run the script I get the following output:

110    INFO  ConsumerDemo$ {35} - Properties: {key.deserializer=org.apache.kafka.common.serialization.StringDeserializer, value.deserializer=org.apache.kafka.common.serialization.StringDeserializer, group.id=consumer-group, bootstrap.servers=172.16.146.89:9092, auto.offset.reset=latest}
302    INFO  ConsumerDemo$ {35} - Subscribing to topics List(Greetings)
2507   WARN  org.apache.kafka.clients.NetworkClient {35} - [Consumer clientId=consumer-consumer-group-1, groupId=consumer-group] Connection to node 2147483646 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.
4656   WARN  org.apache.kafka.clients.NetworkClient {35} - [Consumer clientId=consumer-consumer-group-1, groupId=consumer-group] Connection to node 1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.

第一行确认用于初始化此消费者的属性.

The first line confirms the properties that are being used to initialize this Consumer.

我没有在本地运行 Kafka,我在附近的服务器 (172.16.146.89:9092) 上运行.

I'm not running Kafka locally, I am running on a nearby server (172.16.146.89:9092).

看起来我已经为 bootstrap.servers 提供了一个有效的输入,但该值似乎被忽略了.它正在尝试连接到不存在的本地主机上的代理.

It looks like I've given it a valid input for bootstrap.servers, but that value seems to be ignored. It's trying to connect to a broker on localhost which does not exist.

我在这里做错了什么?

推荐答案

检查代理配置.具体来说,有一个属性 advertised.listeners,我怀疑它被设置为本地主机.

Check the broker configuration. Specifically, there is a property advertised.listeners, that I suspect is set localhost.

这篇关于KafkaConsumer 连接到错误的代理,因此无法消费的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-15 19:44