一些测试数据,具有两列:第一个二进制(在此示例中使用字母数字字节),第二个为整数:

from pyspark.sql.types import *
from pyspark.sql import functions as F

df = spark.createDataFrame([
    (bytearray(b'0001'), 1),
    (bytearray(b'0001'), 1),
    (bytearray(b'0001'), 2),
    (bytearray(b'0002'), 2)
],
schema=StructType([
    StructField("bin", BinaryType()),
    StructField("number", IntegerType())
]))


使用collect_set按整数列分组然后删除重复项是行不通的,因为字节数组不支持哈希。因此:

(
    df
    .groupBy('number')
    .agg(F.collect_set("bin").alias('bin_array'))
    .show()
)

+------+------------+
|number|   bin_array|
+------+------------+
|     1|[0001, 0001]|
|     2|[0001, 0002]|
+------+------------+


一个棘手的选择是将二进制数组嵌入结构中,然后再将它们全部解包,但是我怀疑这将导致大量的分配并且非常昂贵(尽管实际上并未对其进行概要分析):

def unstruct_array(input):
    return [x.bin for x in input]

unstruct_array_udf = F.udf(unstruct_array, ArrayType(BinaryType()))

(
    df
    .withColumn("bin", F.struct("bin"))
    .groupBy('number')
    .agg(F.collect_set("bin").alias('bin_array'))
    .withColumn('bin_array', unstruct_array_udf('bin_array'))
    .show()
)

+------+------------+
|number|   bin_array|
+------+------------+
|     1|      [0001]|
|     2|[0001, 0002]|
+------+------------+


如果我尝试使用很多关于二进制类型和Spark的Google搜索词,那么会有各种答案说如果需要散列,则应该包装数组。建议包括自定义包装或通过调用Scala的toSeq来创建Scala WrappedArray。例如:

ReduceByKey with a byte array as the key

How to use byte array as key in RDD?

因此,选项包括:


映射基础的RDD以使二进制字段成为WrappedArray。不确定如何在Python中执行此操作?
为数组创建Python包装器,然后以某种方式哈希Python中的底层Java数组?虽然不确定是否比使用结构有任何优势?
我可以包装一个结构,然后再不解开,这将在处理方面更加有效,但是可能会使木地板文件更大,并且在所有下游任务中解析起来都比较昂贵。

最佳答案

这是一种hack,可能比包装和展开更有效。您可以简单地事先调用distinct方法。

df.show()
+-------------+------+
|          bin|number|
+-------------+------+
|[30 30 30 31]|     1|
|[30 30 30 31]|     1|
|[30 30 30 31]|     2|
|[30 30 30 32]|     2|
+-------------+------+

df.distinct().show()
+-------------+------+
|          bin|number|
+-------------+------+
|[30 30 30 31]|     1|
|[30 30 30 31]|     2|
|[30 30 30 32]|     2|
+-------------+------+


请注意,由于二进制数组的显示似乎不同,我可能未使用与您使用的相同版本的Spark(我的是​​2.2.1)。

然后,对于collect_set,它简单地归结为:

df.distinct().groupBy("number").agg(F.collect_set("bin"))

10-05 21:14
查看更多