问题描述
我有一个Spark流传输过程,可以从kafka读取数据,进入DStream.
I have a Spark streaming process which reads data from kafka,into a DStream.
在我的管道中,我两次(一次又一次):
In my pipeline I do two times (one after another):
(每次我进行不同的处理并将数据插入到不同的目的地).
(each time I do different processing and insert data to different destination).
我想知道从Kafka读取数据后DStream.cache会如何工作吗?有可能做到吗?
I was wondering how would DStream.cache, right after I read data from Kafka work? Is it possible to do it?
该过程现在实际上是从Kafka读取两次数据吗?
Is the process now actually reading data two times from Kafka?
请记住,不可能将两个foreachRDD放在一个中(因为两个路径是完全不同的,那里有全状态转换-需要在DStream上应用...)
Please keep in mind, that it is not possible to put two foreachRDDs into one (because two paths are quite different, there are statefull transformations there - which need to be appliend on DStream...)
感谢您的帮助
推荐答案
有两种选择:
-
使用
Dstream.cache()
将基础RDD标记为已缓存.在spark.cleaner.ttl
配置控制下,Spark Streaming将在超时后不保留RDD.
Use
Dstream.cache()
to mark the underlying RDDs as cached. Spark Streaming will take care of unpersisting the RDDs after a timeout, controlled by thespark.cleaner.ttl
configuration.
使用其他foreachRDD
将cache()
和unpersist(false)
副作用操作应用于DStream中的RDD:
Use additional foreachRDD
to apply cache()
and unpersist(false)
side-effecting operations to the RDDs in the DStream:
例如:
val kafkaDStream = ???
val targetRDD = kafkaRDD
.transformation(...)
.transformation(...)
...
// Right before the lineage fork mark the RDD as cacheable:
targetRDD.foreachRDD{rdd => rdd.cache(...)}
targetRDD.foreachRDD{do stuff 1}
targetRDD.foreachRDD{do stuff 2}
targetRDD.foreachRDD{rdd => rdd.unpersist(false)}
请注意,如果可以的话,可以将缓存作为do stuff 1
的第一条语句.
Note that you could incorporate the cache as the first statement of do stuff 1
if that's an option.
我更喜欢这个选项,因为它可以让我对缓存的生命周期进行细粒度的控制,并且可以让我在需要时立即清除内容,而不必依赖于ttl.
I prefer this option because it gives me fine-grained control over the cache lifecycle and lets me cleanup stuff as soon as it's needed instead of depending of a ttl.
这篇关于在Spark Streaming中缓存DStream的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!