问题描述
关于此问题的帖子很多,但没有一个回答我的问题.
There's many posts about this issue, but none have answered my question.
我在尝试将许多不同的数据帧连接在一起时遇到了PySpark中的 OutOfMemoryError
s.
I'm running into OutOfMemoryError
s in PySpark while attempting to join many different dataframes together.
我的本地计算机具有16GB的内存,并且我已经将Spark配置设置如下:
My local machine has 16GB of memory, and I've set my Spark configurations as such:
class SparkRawConsumer:
def __init__(self, filename, reference_date, FILM_DATA):
self.sparkContext = SparkContext(master='local[*]', appName='my_app')
SparkContext.setSystemProperty('spark.executor.memory', '3g')
SparkContext.setSystemProperty('spark.driver.memory', '15g')
很明显,有很多关于Spark中OOM错误的SO帖子,但是基本上,大多数帖子都说是为了增加您的内存属性.
There are clearly many, many SO posts about OOM errors in Spark, but basically most of them say to increase your memory properties.
我基本上是从50-60个较小的数据帧执行联接的,这些数据帧有两列 uid
和 data_in_the_form_of_lists
(通常是Python字符串列表).我要加入的主数据框大约有10列,但是还包含一个 uid
列(我要加入).
I am essentially performing joins from 50-60 smaller dataframes, which have two columns uid
, and data_in_the_form_of_lists
(usually, it's a list of Python strings). My master dataframe that I am joining on has about 10 columns, but also contains a uid
column (that I am joining on).
我仅尝试合并1,500行数据.但是,当所有这些数据都可以放入内存时,我会经常遇到OutOfMemory错误.我通过在我的存储中的SparkUI中确认这一点:
I'm only attempting to join 1,500 rows of data. However, I'll encounter frequent OutOfMemory errors, when clearly all this data can fit into memory. I confirm this by looking in my SparkUI at my Storage:
在代码中,我的联接如下所示:
In code, my joins look like this:
# lots of computations to read in my dataframe and produce metric1, metric2, metric3, .... metric 50
metrics_df = metrics_df.join(
self.sqlContext.createDataFrame(metric1, schema=["uid", "metric1"]), on="uid")
metrics_df.count()
metrics_df.repartition("gid_value")
metrics_df = metrics_df.join(
self.sqlContext.createDataFrame(metric2, schema=["uid", "metric2"]),
on="gid_value")
metrics_df.repartition("gid_value")
metrics_df = metrics_df.join(
self.sqlContext.createDataFrame(metric3, schema=["uid", "metric3"]),
on="uid")
metrics_df.count()
metrics_df.repartition("gid_value")
其中 metric1
, metric2
和 metric3
是RDD,我在连接之前将其转换为数据帧(请记住,实际上其中有50个我要加入的较小的 metric
dfs.
Where metric1
, metric2
and metric3
are RDDs that I convert into dataframes prior to the join (keep in mind there's actually 50 of these smaller metric
dfs I am joining).
我调用 metric.count()
强制执行评估,因为这似乎有助于防止内存错误(否则,在尝试最终收集时,我会得到更多的驱动程序错误).
I call metric.count()
to force evaluation, since it seemed to help prevent the memory errors (I would get many more driver errors when attempting the final collect otherwise).
错误是不确定的.我看不到它们始终出现在我的联接中的任何特定位置,有时似乎是在我的最后一个 metrics_df.collect()
调用中发生,有时是在较小的联接中.
The errors are non-deterministic. I don't see them occurring at any particular spot in my joins consistently, and sometimes appears to be occurring my final metrics_df.collect()
call, and sometimes during the smaller joins.
我真的怀疑任务序列化/反序列化存在一些问题.例如,当我查看事件时间轴的一个典型阶段时,我发现其中的大部分是由任务反序列化处理的:
I really suspect there's some issues with task serialization/deserialization. For instance, when I look at my Event Timeline for a typical stage, I see that the bulk of it is taken up by task deserialization:
我还注意到有大量的垃圾收集时间:
I also notice that there's a huge number for garbage collection time:
垃圾回收是否是导致内存错误的问题?还是任务序列化?
Is garbage collection the issue in causing the memory errors? Or is it task serialization?
我一直在作为较大的PyCharm项目的一部分来运行Spark作业(因此,为什么Spark上下文围绕一个类包装).我使用以下火花提交重构了代码以将其作为脚本运行:
I've been running the Spark job as part of a larger PyCharm project (hence why the spark context was wrapped around a class). I refactored the code to run it as a script, using the following spark submit:
spark-submit spark_consumer.py \
--driver-memory=10G \
--executor-memory=5G \
--conf spark.executor.extraJavaOptions='-XX:+UseParallelGC -XX:+PrintGCDetails -XX:+PrintGCTimeStamps'
推荐答案
我遇到了类似的问题,并且可以解决以下问题:
Spark提交:
I faced similar issue and it worked with:
Spark Submit:
spark-submit --driver-memory 3g\
--executor-memory 14g\
*.py
代码:
sc = SparkContext().getOrCreate()
这篇关于执行许多数据框连接时出现PySpark OutOfMemoryErrors的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!