我有一个非常简单的 Spark DataFrame,当运行 DataFrame groupby 时,性能很糟糕 - 比(在我脑海中)等效的 RDD reduceByKey 慢约 8 倍......
我缓存的 DF 只有两列,customer 和 name,只有 50k 行:
== Physical Plan ==
InMemoryColumnarTableScan [customer#2454,name#2456], InMemoryRelation [customer#2454,name#2456], true, 10000, StorageLevel(true, true, false, true, 1), Scan ParquetRelation[customer#2454,name#2456] InputPaths: hdfs://nameservice1/tmp/v2_selected_parquet/test_parquet2, None
当我运行以下两个代码段时,我希望有类似的性能,而不是 rdd 版本在 10 秒内运行,而 DF 版本在 85 秒内运行......
rawtempDF2.rdd.map(lambda x: (x['name'], 1)).reduceByKey(lambda x,y: x+y).collect()
rawtempDF2.groupby('name').count().collect()
我在这里错过了一些真正重要的东西吗? FWIW,RDD版本运行54个stage,DF版本运行227个:/
编辑:我使用的是 Spark 1.6.1 和 Python 3.4.2。
编辑 2:此外,源 Parquet 已分区客户/天/名称 - 目前 27 个客户,1 天,c。 45个名字。
最佳答案
这两个数字似乎都相对较高,并且不清楚如何创建 DataFrame
或测量时间,但总的来说,与分区数量相比,记录数量较少可以解释这种差异。spark.sql.shuffle.partitions
的默认值为 200,即您获得的任务数。对于 50K 记录,启动任务的开销将高于您从并行执行中获得的加速。让我们用一个简单的例子来说明。首先让我们创建一个示例数据:
import string
import random
random.seed(323)
def random_string():
n = random.randint(3, 6)
return (''.join(random.choice(string.ascii_uppercase) for _ in range(n)), )
df = (sc
.parallelize([random_string() for _ in range(50000)], 8).toDF(["name"])
.cache())
并根据
shuffle.partitions
的数量测量时间:sqlContext.setConf("spark.sql.shuffle.partitions", "1")
%timeit -n 10 df.groupby('name').count().collect()
## 10 loops, best of 3: 504 ms per loop
sqlContext.setConf("spark.sql.shuffle.partitions", "1")
%timeit -n 10 df.groupby('name').count().collect()
## 10 loops, best of 3: 451 ms per loop
sqlContext.setConf("spark.sql.shuffle.partitions", "100")
%timeit -n 10 df.groupby('name').count().collect()
## 10 loops, best of 3: 624 ms per loop
sqlContext.setConf("spark.sql.shuffle.partitions", "200")
%timeit -n 10 df.groupby('name').count().collect()
## 10 loops, best of 3: 778 ms per loop
sqlContext.setConf("spark.sql.shuffle.partitions", "1000")
%timeit -n 10 df.groupby('name').count().collect()
## 10 loops, best of 3: 1.75 s per loop
尽管这些值与您声称的值不具有可比性,并且这些数据是在本地模式下收集的,但您可以看到相对清晰的模式。这同样适用于 RDD:
from operator import add
%timeit -n 10 df.rdd.map(lambda x: (x['name'], 1)).reduceByKey(add, 1).collect()
## 10 loops, best of 3: 414 ms per loop
%timeit -n 10 df.rdd.map(lambda x: (x['name'], 1)).reduceByKey(add, 10).collect()
## 10 loops, best of 3: 439 ms per loop
%timeit -n 10 df.rdd.map(lambda x: (x['name'], 1)).reduceByKey(add, 100).collect()
## 10 loops, best of 3: 1.3 s per loop
%timeit -n 10 df.rdd.map(lambda x: (x['name'], 1)).reduceByKey(add, 1000).collect()
## 10 loops, best of 3: 8.41 s per loop
在适当的分布式环境中,由于网络 IO 的成本,这将更高。
只是为了比较,让我们检查在没有 Spark 的情况下在本地执行此任务需要多长时间
from collections import Counter
data = df.rdd.flatMap(lambda x: x).collect()
%timeit -n 10 Counter(data)
## 10 loops, best of 3: 9.9 ms per loop
您还应该查看数据位置。根据您使用的存储和配置,即使像这样的小输入,这也会为您的作业增加额外的延迟。
关于python - 为什么我的 Spark DataFrame 比 RDD 慢得多?,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/38050140/