问题描述
我根据 Kafka 文档中的示例编写了一个小脚本:
I've written a small script based on an example in the Kafka documentation:
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.slf4j.LoggerFactory
import collection.JavaConverters._
import java.util.Properties
object ConsumerDemo extends App {
val logger = LoggerFactory.getLogger(getClass.getSimpleName)
def consumerFromKafka() = {
val props = new Properties()
props.put("bootstrap.servers", broker_address)
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.put("auto.offset.reset", "latest")
props.put("group.id", "consumer-group")
logger.info(s"Properties: ${props}")
val consumer: KafkaConsumer[String, String] = new KafkaConsumer[String, String](props)
val topics: List[String] = topic :: Nil
consumer.subscribe(topics.asJava)
while (true) {
val record = consumer.poll(1000).asScala
for (data <- record.iterator)
println(data.value())
}
}
consumerFromKafka()
}
当我运行脚本时,我得到以下输出:
When I run the script I get the following output:
110 INFO ConsumerDemo$ {35} - Properties: {key.deserializer=org.apache.kafka.common.serialization.StringDeserializer, value.deserializer=org.apache.kafka.common.serialization.StringDeserializer, group.id=consumer-group, bootstrap.servers=172.16.146.89:9092, auto.offset.reset=latest}
302 INFO ConsumerDemo$ {35} - Subscribing to topics List(Greetings)
2507 WARN org.apache.kafka.clients.NetworkClient {35} - [Consumer clientId=consumer-consumer-group-1, groupId=consumer-group] Connection to node 2147483646 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.
4656 WARN org.apache.kafka.clients.NetworkClient {35} - [Consumer clientId=consumer-consumer-group-1, groupId=consumer-group] Connection to node 1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.
第一行确认用于初始化此消费者的属性.
The first line confirms the properties that are being used to initialize this Consumer.
我没有在本地运行 Kafka,我在附近的服务器 (172.16.146.89:9092) 上运行.
I'm not running Kafka locally, I am running on a nearby server (172.16.146.89:9092).
看起来我已经为 bootstrap.servers 提供了一个有效的输入,但该值似乎被忽略了.它正在尝试连接到不存在的本地主机上的代理.
It looks like I've given it a valid input for bootstrap.servers, but that value seems to be ignored. It's trying to connect to a broker on localhost which does not exist.
我在这里做错了什么?
推荐答案
检查代理配置.具体来说,有一个属性 advertised.listeners,我怀疑它被设置为本地主机.
Check the broker configuration. Specifically, there is a property advertised.listeners, that I suspect is set localhost.
这篇关于KafkaConsumer 连接到错误的代理,因此无法消费的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!