我们常见的Spark处理流程如下所示:

正在加载:

rdd = sqlContext.parquetFile("mydata/")
rdd = rdd.map(lambda row: (row.id,(some stuff)))
rdd = rdd.filter(....)
rdd = rdd.partitionBy(rdd.getNumPatitions())


通过id处理(这就是我们执行上面的partitionBy的原因!)

rdd.reduceByKey(....)
rdd.join(...)


但是,Spark 1.3将sqlContext.parquetFile更改为返回DataFrame而不是RDD,并且它不再具有partitionBygetNumPartitionsreduceByKey方法。

现在我们用partitionBy做什么?

我们可以将加载代码替换为

rdd = sqlContext.parquetFile("mydata/").rdd
rdd = rdd.map(lambda row: (row.id,(some stuff)))
rdd = rdd.filter(....)
rdd = rdd.partitionBy(rdd.getNumPatitions())
df = rdd.map(lambda ...: Row(...)).toDF(???)


并使用groupBy代替reduceByKey

这是正确的方法吗?

PS。是的,我了解partitionBy对于groupBy等而言不是必需的。但是,如果没有先前的partitionBy,则每个joingroupBy和c可能都必须执行跨节点操作。我正在寻找一种方法来确保所有需要按我的密钥分组的操作都将在本地运行。

最佳答案

从版本1.6开始,repartition(self, numPartitions, *cols)似乎可以满足我的需求:

.. versionchanged:: 1.6



添加了可选参数以指定分区列。
如果指定了分区列,还将numPartitions设为可选。

08-25 06:10