执行许多数据框连接时出现PySpark

执行许多数据框连接时出现PySpark

本文介绍了执行许多数据框连接时出现PySpark OutOfMemoryErrors的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

关于此问题的帖子很多,但没有一个回答我的问题.

There's many posts about this issue, but none have answered my question.

我在尝试将许多不同的数据帧连接在一起时遇到了PySpark中的 OutOfMemoryError s.

I'm running into OutOfMemoryErrors in PySpark while attempting to join many different dataframes together.

我的本​​地计算机具有16GB的内存,并且我已经将Spark配置设置如下:

My local machine has 16GB of memory, and I've set my Spark configurations as such:

class SparkRawConsumer:

    def __init__(self, filename, reference_date, FILM_DATA):
        self.sparkContext = SparkContext(master='local[*]', appName='my_app')
        SparkContext.setSystemProperty('spark.executor.memory', '3g')
        SparkContext.setSystemProperty('spark.driver.memory', '15g')

很明显,有很多关于Spark中OOM错误的SO帖子,但是基本上,大多数帖子都说是为了增加您的内存属性.

There are clearly many, many SO posts about OOM errors in Spark, but basically most of them say to increase your memory properties.

我基本上是从50-60个较小的数据帧执行联接的,这些数据帧有两列 uid data_in_the_form_of_lists (通常是Python字符串列表).我要加入的主数据框大约有10列,但是还包含一个 uid 列(我要加入).

I am essentially performing joins from 50-60 smaller dataframes, which have two columns uid, and data_in_the_form_of_lists (usually, it's a list of Python strings). My master dataframe that I am joining on has about 10 columns, but also contains a uid column (that I am joining on).

我仅尝试合并1,500行数据.但是,当所有这些数据都可以放入内存时,我会经常遇到OutOfMemory错误.我通过在我的存储中的SparkUI中确认这一点:

I'm only attempting to join 1,500 rows of data. However, I'll encounter frequent OutOfMemory errors, when clearly all this data can fit into memory. I confirm this by looking in my SparkUI at my Storage:

在代码中,我的联接如下所示:

In code, my joins look like this:

# lots of computations to read in my dataframe and produce metric1, metric2, metric3, .... metric 50
metrics_df = metrics_df.join(
                self.sqlContext.createDataFrame(metric1, schema=["uid", "metric1"]), on="uid")

metrics_df.count()
metrics_df.repartition("gid_value")
metrics_df = metrics_df.join(
                self.sqlContext.createDataFrame(metric2, schema=["uid", "metric2"]),
                on="gid_value")

metrics_df.repartition("gid_value")
metrics_df = metrics_df.join(
                self.sqlContext.createDataFrame(metric3, schema=["uid", "metric3"]),
                on="uid")

metrics_df.count()
metrics_df.repartition("gid_value")

其中 metric1 metric2 metric3 是RDD,我在连接之前将其转换为数据帧(请记住,实际上其中有50个我要加入的较小的 metric dfs.

Where metric1, metric2 and metric3 are RDDs that I convert into dataframes prior to the join (keep in mind there's actually 50 of these smaller metric dfs I am joining).

我调用 metric.count()强制执行评估,因为这似乎有助于防止内存错误(否则,在尝试最终收集时,我会得到更多的驱动程序错误).

I call metric.count() to force evaluation, since it seemed to help prevent the memory errors (I would get many more driver errors when attempting the final collect otherwise).

错误是不确定的.我看不到它们始终出现在我的联接中的任何特定位置,有时似乎是在我的最后一个 metrics_df.collect()调用中发生,有时是在较小的联接中.

The errors are non-deterministic. I don't see them occurring at any particular spot in my joins consistently, and sometimes appears to be occurring my final metrics_df.collect() call, and sometimes during the smaller joins.

我真的怀疑任务序列化/反序列化存在一些问题.例如,当我查看事件时间轴的一个典型阶段时,我发现其中的大部分是由任务反序列化处理的:

I really suspect there's some issues with task serialization/deserialization. For instance, when I look at my Event Timeline for a typical stage, I see that the bulk of it is taken up by task deserialization:

我还注意到有大量的垃圾收集时间:

I also notice that there's a huge number for garbage collection time:

垃圾回收是否是导致内存错误的问题?还是任务序列化?

Is garbage collection the issue in causing the memory errors? Or is it task serialization?

我一直在作为较大的PyCharm项目的一部分来运行Spark作业(因此,为什么Spark上下文围绕一个类包装).我使用以下火花提交重构了代码以将其作为脚本运行:

I've been running the Spark job as part of a larger PyCharm project (hence why the spark context was wrapped around a class). I refactored the code to run it as a script, using the following spark submit:

spark-submit spark_consumer.py \
  --driver-memory=10G \
  --executor-memory=5G \
  --conf spark.executor.extraJavaOptions='-XX:+UseParallelGC -XX:+PrintGCDetails -XX:+PrintGCTimeStamps'

推荐答案

我遇到了类似的问题,并且可以解决以下问题:
Spark提交:

I faced similar issue and it worked with:
Spark Submit:

spark-submit --driver-memory 3g\
            --executor-memory 14g\
            *.py

代码:

sc = SparkContext().getOrCreate()

这篇关于执行许多数据框连接时出现PySpark OutOfMemoryErrors的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-15 12:02