本文介绍了如何将Spark Streaming DF写入Kafka主题的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用Spark Streaming处理两个Kafka队列之间的数据,但似乎找不到从Spark在Kafka上写的好方法.我已经尝试过了:

I am using Spark Streaming to process data between two Kafka queues but I can not seem to find a good way to write on Kafka from Spark. I have tried this:

input.foreachRDD(rdd =>
  rdd.foreachPartition(partition =>
    partition.foreach {
      case x: String => {
        val props = new HashMap[String, Object]()

        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
          "org.apache.kafka.common.serialization.StringSerializer")
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
          "org.apache.kafka.common.serialization.StringSerializer")

        println(x)
        val producer = new KafkaProducer[String, String](props)
        val message = new ProducerRecord[String, String]("output", null, x)
        producer.send(message)
      }
    }
  )
)

它可以按预期工作,但是在实际环境中为每个消息实例化一个新的KafkaProducer显然是不可行的,我正在尝试解决这个问题.

and it works as intended but instancing a new KafkaProducer for every message is clearly unfeasible in a real context and I'm trying to work around it.

我想为每个进程保留对单个实例的引用,并在需要发送消息时对其进行访问.如何从Spark Streaming写信给Kafka?

I would like to keep a reference to a single instance for every process and access it when I need to send a message. How can I write to Kafka from Spark Streaming?

推荐答案

我的第一个建议是尝试在foreachPartition中创建一个新实例,并测量它是否足够快以满足您的需要(在foreachPartition中实例化沉重的对象就是官方文档建议).

My first advice would be to try to create a new instance in foreachPartition and measure if that is fast enough for your needs (instantiating heavy objects in foreachPartition is what the official documentation suggests).

另一种选择是使用对象池,如本示例所示:

Another option is to use an object pool as illustrated in this example:

https://github.com/miguno/kafka-storm-starter/blob/develop/src/main/scala/com/miguno/kafkastorm/kafka/PooledKafkaProducerAppFactory.scala

但是我发现使用检查点时很难实现.

I however found it hard to implement when using checkpointing.

另一个对我来说效果很好的版本是一个工厂,如以下博客文章中所述,您只需检查它是否提供了足够的并行性来满足您的需要(请查看评论部分):

Another version that is working well for me is a factory as described in the following blog post, you just have to check if it provides enough parallelism for your needs (check the comments section):

http://allegro.tech/2015/08/spark-kafka-integration.html

这篇关于如何将Spark Streaming DF写入Kafka主题的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-20 13:16