本文介绍了在Spark Streaming中缓存DStream的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个Spark流传输过程,可以从kafka读取数据,进入DStream.

I have a Spark streaming process which reads data from kafka,into a DStream.

在我的管道中,我两次(一次又一次):

In my pipeline I do two times (one after another):

(每次我进行不同的处理并将数据插入到不同的目的地).

(each time I do different processing and insert data to different destination).

我想知道从Kafka读取数据后DStream.cache会如何工作吗?有可能做到吗?

I was wondering how would ​DStream.cache​, right after I read data from Kafka work? Is it possible to do it?

该过程现在实际上是从Kafka读取两次数据吗?

Is the process now actually reading data two times from Kafka?

请记住,不可能将两个foreachRDD放在一个中(因为两个路径是完全不同的,那里有全状态转换-需要在DStream上应用...)

Please keep in mind, that it is not possible to put two foreachRDDs into one (because two paths are quite different, there are statefull transformations there - which need to be appliend on DStream...)

感谢您的帮助

推荐答案

有两种选择:

  • 使用Dstream.cache()将基础RDD标记为已缓存.在spark.cleaner.ttl配置控制下,Spark Streaming将在超时后不保留RDD.

  • Use Dstream.cache() to mark the underlying RDDs as cached. Spark Streaming will take care of unpersisting the RDDs after a timeout, controlled by the spark.cleaner.ttl configuration.

使用其他foreachRDDcache()unpersist(false)副作用操作应用于DStream中的RDD:

Use additional foreachRDD to apply cache() and unpersist(false) side-effecting operations to the RDDs in the DStream:

例如:

val kafkaDStream = ???
val targetRDD = kafkaRDD
                       .transformation(...)
                       .transformation(...)
                       ...
// Right before the lineage fork mark the RDD as cacheable:
targetRDD.foreachRDD{rdd => rdd.cache(...)}
targetRDD.foreachRDD{do stuff 1}
targetRDD.foreachRDD{do stuff 2}
targetRDD.foreachRDD{rdd => rdd.unpersist(false)}

请注意,如果可以的话,可以将缓存作为do stuff 1的第一条语句.

Note that you could incorporate the cache as the first statement of do stuff 1 if that's an option.

我更喜欢这个选项,因为它可以让我对缓存的生命周期进行细粒度的控制,并且可以让我在需要时立即清除内容,而不必依赖于ttl.

I prefer this option because it gives me fine-grained control over the cache lifecycle and lets me cleanup stuff as soon as it's needed instead of depending of a ttl.

这篇关于在Spark Streaming中缓存DStream的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

05-26 13:48