本文介绍了关闭Kafka连接的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个应将有限数量的消息发送到Kafka然后退出的应用程序.由于某些原因,即使我关闭了制作人,Kafka连接也会保持连接状态.我的实现(在Scala中)或多或少

I have an application that should send a finite number of messages to Kafka and then quit. For some reason, the Kafka connection stays up even if I close the producer. My implementation (in Scala) is more or less

object Kafka {
  private val props = new Properties()

  props.put("compression.codec", DefaultCompressionCodec.codec.toString)
  props.put("producer.type", "sync")
  props.put("metadata.broker.list", "localhost:9092")
  props.put("batch.num.messages", "200")
  props.put("message.send.max.retries", "3")
  props.put("request.required.acks", "-1")
  props.put("client.id", "myclient")

  private val producer = new Producer[Array[Byte], Array[Byte]](new ProducerConfig(props))
  private def encode(msg: Message) = new KeyedMessage("topic", msg.id.getBytes, write(msg).getBytes)

  def send(msg: Message) = Try(producer.send(encode(msg)))
  def close() = producer.close()
}

Message是一个简单的案例类,如何将其转换为字节数组并不重要.

Here Message is a simple case class, and how I convert it to byte array is not really relevant.

消息确实到达了,但是当我最终调用Kafka.close()时,应用程序没有退出,并且连接似乎也没有释放.

The messages do arrive, but when I eventually call Kafka.close(), the application does not exit, and the connection does not seem to be released.

有没有办法明确要求Kafka终止连接?

Is there a way to explicitly ask Kafka to terminate the connection?

推荐答案

这将创建一个名为"close"的函数,该函数将调用producer.close()我看不到任何证据表明您的代码实际上关闭了生产者.

This creates a function called "close" that calls producer.close()I see no evidence that your code actually closes the producer.

您只需要致电:producer.close

You need to just call:producer.close

这篇关于关闭Kafka连接的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

09-05 14:33