我正在尝试创建一个简单的KafkaProducer和KafkaConsumer,以便可以将数据发送到代理上的主题,然后验证是否已接收到数据。下面是我用来定义使用者和生产者的两种方法,以及如何发送消息。 send方法最多需要20秒才能完成,据我所知,Consumer.poll方法实际上并未完成,但是我离开的最长时间是10分钟。
有人对我做错了什么建议吗?生产者/消费者是否有一些属性设置不正确?这些属性是直接从文档复制的,所以我不明白为什么它们不起作用。
KafkaProducer docs
KafkaConsumer docs
"verify we can send to producer" in {
val consumer = createKafkaConsumer("address:9002")
val producer = createKafkaProducer("address:9002")
val message = "I am a message"
val record = new ProducerRecord[String, String]("myTopic", message)
producer.send(record)
TimeUnit.SECONDS.sleep(5)
val records = consumer.poll(5000)
println("records: "+records)
consumer1.close()
}
def createKafkaProducer(kafka: String): KafkaProducer[String,String] = {
val props = new Properties()
props.put("bootstrap.servers", kafka)
props.put("acks", "all")
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
new KafkaProducer[String,String](props)
}
def createKafkaConsumer(kafka: String): KafkaConsumer[String, String] = {
val props = new Properties()
props.put("bootstrap.servers", kafka)
props.put("group.id", "test")
props.put("enable.auto.commit", "true")
props.put("auto.commit.interval.ms", "1000")
props.put("session.timeout.ms", "30000")
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
val consumer = new KafkaConsumer[String, String](props)
consumer.subscribe(Collections.singletonList("myTopic"))
consumer
}
编辑:我已经更新了代码,以便现在可以从send方法获得响应,而且看来
org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.
超时了 最佳答案
原来我有一个DNS问题,这意味着我实际上没有连接到代理。修复此问题可使消息通过,配置没有任何问题。