问题描述
我是一个 Spark 应用程序,有几个点我想保留当前状态.这通常是在一个很大的步骤之后,或者缓存一个我想多次使用的状态.看来,当我第二次在我的数据帧上调用缓存时,一个新副本被缓存到内存中.在我的应用程序中,这会导致扩展时出现内存问题.尽管在我当前的测试中给定的数据帧最大约为 100 MB,但中间结果的累积大小超出了执行程序上分配的内存.请参阅下面的一个小示例,显示此行为.
I am a spark application with several points where I would like to persist the current state. This is usually after a large step, or caching a state that I would like to use multiple times. It appears that when I call cache on my dataframe a second time, a new copy is cached to memory. In my application, this leads to memory issues when scaling up. Even though, a given dataframe is a maximum of about 100 MB in my current tests, the cumulative size of the intermediate results grows beyond the alloted memory on the executor. See below for a small example that shows this behavior.
cache_test.py:
cache_test.py:
from pyspark import SparkContext, HiveContext
spark_context = SparkContext(appName='cache_test')
hive_context = HiveContext(spark_context)
df = (hive_context.read
.format('com.databricks.spark.csv')
.load('simple_data.csv')
)
df.cache()
df.show()
df = df.withColumn('C1+C2', df['C1'] + df['C2'])
df.cache()
df.show()
spark_context.stop()
simple_data.csv:
simple_data.csv:
1,2,3
4,5,6
7,8,9
查看应用程序 UI,除了带有新列的数据帧之外,还有原始数据帧的副本.我可以通过在 withColumn 行之前调用 df.unpersist()
来删除原始副本.这是删除缓存中间结果的推荐方法吗(即在每个 cache()
之前调用 unpersist).
Looking at the application UI, there is a copy of the original dataframe, in adition to the one with the new column. I can remove the original copy by calling df.unpersist()
before the withColumn line. Is this the recommended way to remove cached intermediate result (i.e. call unpersist before every cache()
).
另外,是否可以清除所有缓存的对象.在我的应用程序中,有一些自然断点,我可以简单地清除所有内存,然后转到下一个文件.我想这样做而不为每个输入文件创建一个新的 spark 应用程序.
Also, is it possible to purge all cached objects. In my application, there are natural breakpoints where I can simply purge all memory, and move on to the next file. I would like to do this without creating a new spark application for each input file.
先谢谢你!
推荐答案
Spark 2.x
您可以使用Catalog.clearCache
:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate
...
spark.catalog.clearCache()
Spark 1.x
您可以使用 SQLContext.clearCache
方法,其中
You can use SQLContext.clearCache
method which
从内存缓存中删除所有缓存的表.
from pyspark.sql import SQLContext
from pyspark import SparkContext
sqlContext = SQLContext.getOrCreate(SparkContext.getOrCreate())
...
sqlContext.clearCache()
这篇关于取消保留 (py)spark 中的所有数据帧的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!