问题描述
我有一个应将有限数量的消息发送到Kafka然后退出的应用程序.由于某些原因,即使我关闭了制作人,Kafka连接也会保持连接状态.我的实现(在Scala中)或多或少
I have an application that should send a finite number of messages to Kafka and then quit. For some reason, the Kafka connection stays up even if I close the producer. My implementation (in Scala) is more or less
object Kafka {
private val props = new Properties()
props.put("compression.codec", DefaultCompressionCodec.codec.toString)
props.put("producer.type", "sync")
props.put("metadata.broker.list", "localhost:9092")
props.put("batch.num.messages", "200")
props.put("message.send.max.retries", "3")
props.put("request.required.acks", "-1")
props.put("client.id", "myclient")
private val producer = new Producer[Array[Byte], Array[Byte]](new ProducerConfig(props))
private def encode(msg: Message) = new KeyedMessage("topic", msg.id.getBytes, write(msg).getBytes)
def send(msg: Message) = Try(producer.send(encode(msg)))
def close() = producer.close()
}
Message
是一个简单的案例类,如何将其转换为字节数组并不重要.
Here Message
is a simple case class, and how I convert it to byte array is not really relevant.
消息确实到达了,但是当我最终调用Kafka.close()
时,应用程序没有退出,并且连接似乎也没有释放.
The messages do arrive, but when I eventually call Kafka.close()
, the application does not exit, and the connection does not seem to be released.
有没有办法明确要求Kafka终止连接?
Is there a way to explicitly ask Kafka to terminate the connection?
推荐答案
这将创建一个名为"close"的函数,该函数将调用producer.close()我看不到任何证据表明您的代码实际上关闭了生产者.
This creates a function called "close" that calls producer.close()I see no evidence that your code actually closes the producer.
您只需要致电:producer.close
You need to just call:producer.close
这篇关于关闭Kafka连接的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!