问题描述
假设我具有以下RDD:
Assuming I am having the following RDD:
rdd = sc.parallelize([('a', (5,1)), ('d', (8,2)), ('2', (6,3)), ('a', (8,2)), ('d', (9,6)), ('b', (3,4)),('c', (8,3))])
如何使用repartitionAndSortWithinPartitions
并按x [0]和x [1] [0]排序.使用以下内容,我仅按键(x [0])进行排序:
How can I use repartitionAndSortWithinPartitions
and sort by x[0] and after x[1][0]. Using the following I sort only by the key(x[0]):
Npartitions = sc.defaultParallelism
rdd2 = rdd.repartitionAndSortWithinPartitions(2, lambda x: hash(x) % Npartitions, 2)
以下是一种实现方法,但我想应该有一些更简单的方法:
A way to do it is the following but there should something more simple I guess:
Npartitions = sc.defaultParallelism
partitioned_data = rdd
.partitionBy(2)
.map(lambda x:(x[0],x[1][0],x[1][1]))
.toDF(['letter','number2','number3'])
.sortWithinPartitions(['letter','number2'],ascending=False)
.map(lambda x:(x.letter,(x.number2,x.number3)))
>>> partitioned_data.glom().collect()
[[],
[(u'd', (9, 6)), (u'd', (8, 2))],
[(u'c', (8, 3)), (u'c', (6, 3))],
[(u'b', (3, 4))],
[(u'a', (8, 2)), (u'a', (5, 1))]
可以看出,为了使用sortWithinPartitions
,我必须将其转换为Dataframe.还有另一种方法吗?使用repartitionAndSortWIthinPartitions
?
As it can be seen I have to convert it to Dataframe in order to use sortWithinPartitions
. Is there another way? Using repartitionAndSortWIthinPartitions
?
(不必对数据进行全局排序.我只希望在分区内进行排序.)
(It doesnt matter that the data is not globally sorted. I care only to be sorted inside the partitions.)
推荐答案
可以,但是您必须在组合键中包含所有必需的信息:
It is possible but you'll have to include all required information in the composite key:
from pyspark.rdd import portable_hash
n = 2
def partitioner(n):
"""Partition by the first item in the key tuple"""
def partitioner_(x):
return portable_hash(x[0]) % n
return partitioner_
(rdd
.keyBy(lambda kv: (kv[0], kv[1][0])) # Create temporary composite key
.repartitionAndSortWithinPartitions(
numPartitions=n, partitionFunc=partitioner(n), ascending=False)
.map(lambda x: x[1])) # Drop key (note: there is no partitioner set anymore)
分步说明:
-
keyBy(lambda kv: (kv[0], kv[1][0]))
创建一个替换键,该键由原始键和值的第一个元素组成.换句话说,它会转换:
keyBy(lambda kv: (kv[0], kv[1][0]))
creates a substitute key which consist of original key and the first element of the value. In other words it transforms:
(0, (5,1))
进入
((0, 5), (0, (5, 1)))
实际上,简单地将数据重塑为
In practice it can be slightly more efficient to simply reshape data to
((0, 5), 1)
partitioner
基于键的第一个元素的哈希定义分区功能,因此:
partitioner
defines partitioning function based on a hash of the first element of the key so:
partitioner(7)((0, 5))
## 0
partitioner(7)((0, 6))
## 0
partitioner(7)((0, 99))
## 0
partitioner(7)((3, 99))
## 3
如您所见,它是一致的,并且忽略了第二位.
as you can see it is consistent and ignores the second bit.
我们使用默认的keyfunc
函数(即身份(lambda x: x
)),并依赖于Python tuple
上定义的词典顺序:
we use default keyfunc
function which is identity (lambda x: x
) and depend on lexicographic ordering defined on Python tuple
:
(0, 5) < (1, 5)
## True
(0, 5) < (0, 4)
## False
如前所述,您可以改形数据:
As mentioned before you could reshape data instead:
rdd.map(lambda kv: ((kv[0], kv[1][0]), kv[1][1]))
并删除最终的map
以提高性能.
and drop final map
to improve performance.
这篇关于Pyspark:将repartitionAndSortWithinPartitions与多个Critiria一起使用的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!