我们常见的Spark处理流程如下所示:
正在加载:
rdd = sqlContext.parquetFile("mydata/")
rdd = rdd.map(lambda row: (row.id,(some stuff)))
rdd = rdd.filter(....)
rdd = rdd.partitionBy(rdd.getNumPatitions())
通过
id
处理(这就是我们执行上面的partitionBy
的原因!)rdd.reduceByKey(....)
rdd.join(...)
但是,Spark 1.3将
sqlContext.parquetFile
更改为返回DataFrame
而不是RDD
,并且它不再具有partitionBy
,getNumPartitions
和reduceByKey
方法。现在我们用
partitionBy
做什么?我们可以将加载代码替换为
rdd = sqlContext.parquetFile("mydata/").rdd
rdd = rdd.map(lambda row: (row.id,(some stuff)))
rdd = rdd.filter(....)
rdd = rdd.partitionBy(rdd.getNumPatitions())
df = rdd.map(lambda ...: Row(...)).toDF(???)
并使用
groupBy
代替reduceByKey
。这是正确的方法吗?
PS。是的,我了解
partitionBy
对于groupBy
等而言不是必需的。但是,如果没有先前的partitionBy
,则每个join
,groupBy
和c可能都必须执行跨节点操作。我正在寻找一种方法来确保所有需要按我的密钥分组的操作都将在本地运行。 最佳答案
从版本1.6开始,repartition(self, numPartitions, *cols)
似乎可以满足我的需求:
.. versionchanged:: 1.6
添加了可选参数以指定分区列。
如果指定了分区列,还将
numPartitions
设为可选。