问题描述
我正在使用 Spark Streaming 处理两个 Kafka 队列之间的数据,但我似乎找不到从 Spark 写入 Kafka 的好方法.我试过这个:
I am using Spark Streaming to process data between two Kafka queues but I can not seem to find a good way to write on Kafka from Spark. I have tried this:
input.foreachRDD(rdd =>
rdd.foreachPartition(partition =>
partition.foreach {
case x: String => {
val props = new HashMap[String, Object]()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
println(x)
val producer = new KafkaProducer[String, String](props)
val message = new ProducerRecord[String, String]("output", null, x)
producer.send(message)
}
}
)
)
它按预期工作,但在真实环境中为每条消息实例化一个新的 KafkaProducer 显然是不可行的,我正在努力解决它.
and it works as intended but instancing a new KafkaProducer for every message is clearly unfeasible in a real context and I'm trying to work around it.
我想为每个进程保留对单个实例的引用,并在需要发送消息时访问它.如何从 Spark Streaming 写入 Kafka?
I would like to keep a reference to a single instance for every process and access it when I need to send a message. How can I write to Kafka from Spark Streaming?
推荐答案
我的第一个建议是尝试在 foreachPartition 中创建一个新实例并测量它是否足够快满足您的需要(在 foreachPartition 中实例化重对象是官方文档建议).
My first advice would be to try to create a new instance in foreachPartition and measure if that is fast enough for your needs (instantiating heavy objects in foreachPartition is what the official documentation suggests).
另一种选择是使用对象池,如下例所示:
Another option is to use an object pool as illustrated in this example:
然而,我发现在使用检查点时很难实现.
I however found it hard to implement when using checkpointing.
另一个对我来说运行良好的版本是一个工厂,如以下博客文章中所述,您只需检查它是否提供了足够的并行性以满足您的需求(查看评论部分):
Another version that is working well for me is a factory as described in the following blog post, you just have to check if it provides enough parallelism for your needs (check the comments section):
http://allegro.tech/2015/08/spark-kafka-integration.html一个>
这篇关于如何将火花流 DF 写入 Kafka 主题的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!