我正在尝试创建一个简单的KafkaProducer和KafkaConsumer,以便可以将数据发送到代理上的主题,然后验证是否已接收到数据。下面是我用来定义使用者和生产者的两种方法,以及如何发送消息。 send方法最多需要20秒才能完成,据我所知,Consumer.poll方法实际上并未完成,但是我离开的最长时间是10分钟。

有人对我做错了什么建议吗?生产者/消费者是否有一些属性设置不正确?这些属性是直接从文档复制的,所以我不明白为什么它们不起作用。

KafkaProducer docs
KafkaConsumer docs

"verify we can send to producer" in {
    val consumer = createKafkaConsumer("address:9002")
    val producer = createKafkaProducer("address:9002")

    val message = "I am a message"
    val record = new ProducerRecord[String, String]("myTopic", message)

    producer.send(record)
    TimeUnit.SECONDS.sleep(5)
    val records = consumer.poll(5000)
    println("records: "+records)
    consumer1.close()
}

def createKafkaProducer(kafka: String): KafkaProducer[String,String] = {
    val props = new Properties()
    props.put("bootstrap.servers", kafka)
    props.put("acks", "all")
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")

    new KafkaProducer[String,String](props)
}

def createKafkaConsumer(kafka: String): KafkaConsumer[String, String] = {
    val props = new Properties()
    props.put("bootstrap.servers", kafka)
    props.put("group.id", "test")
    props.put("enable.auto.commit", "true")
    props.put("auto.commit.interval.ms", "1000")
    props.put("session.timeout.ms", "30000")
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    val consumer = new KafkaConsumer[String, String](props)
    consumer.subscribe(Collections.singletonList("myTopic"))
    consumer
}


编辑:我已经更新了代码,以便现在可以从send方法获得响应,而且看来org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.超时了

最佳答案

原来我有一个DNS问题,这意味着我实际上没有连接到代理。修复此问题可使消息通过,配置没有任何问题。

09-28 09:43