我正在使用spark 2.2.0和pyspark2。
我创建了一个DataFrame df
,现在尝试添加一个新列"rowhash"
,它是DataFrame中特定列的sha2哈希。
例如,假设df
具有以下列:(column1, column2, ..., column10)
我需要在新列sha2((column2||column3||column4||...... column8), 256)
中添加"rowhash"
。
现在,我尝试使用以下方法:
1)使用了hash()
函数,但是由于它提供了整数输出,因此没有太大用处
2)尝试使用sha2()
函数,但失败。
说columnarray
有我需要的列数组。
def concat(columnarray):
concat_str = ''
for val in columnarray:
concat_str = concat_str + '||' + str(val)
concat_str = concat_str[2:]
return concat_str
然后
df1 = df1.withColumn("row_sha2", sha2(concat(columnarray),256))
这失败,并显示“无法解决”错误。
谢谢gaw的回答。由于我仅需要哈希特定的列,因此我创建了这些列名称的列表(在hash_col中),并将您的函数更改为:
def sha_concat(row, columnarray):
row_dict = row.asDict() #transform row to a dict
concat_str = ''
for v in columnarray:
concat_str = concat_str + '||' + str(row_dict.get(v))
concat_str = concat_str[2:]
#preserve concatenated value for testing (this can be removed later)
row_dict["sha_values"] = concat_str
row_dict["sha_hash"] = hashlib.sha256(concat_str).hexdigest()
return Row(**row_dict)
然后传递为:
df1.rdd.map(lambda row: sha_concat(row,hash_col)).toDF().show(truncate=False)
现在,它因错误而失败:
UnicodeEncodeError: 'ascii' codec can't encode character u'\ufffd' in position 8: ordinal not in range(128)
我可以在其中一列中看到\ufffd的值,所以我不确定是否有办法解决这个问题?
最佳答案
您可以使用 pyspark.sql.functions.concat_ws()
连接您的列,并使用 pyspark.sql.functions.sha2()
获取SHA256哈希。
使用@gaw中的数据:
from pyspark.sql.functions import sha2, concat_ws
df = spark.createDataFrame(
[(1,"2",5,1),(3,"4",7,8)],
("col1","col2","col3","col4")
)
df.withColumn("row_sha2", sha2(concat_ws("||", *df.columns), 256)).show(truncate=False)
#+----+----+----+----+----------------------------------------------------------------+
#|col1|col2|col3|col4|row_sha2 |
#+----+----+----+----+----------------------------------------------------------------+
#|1 |2 |5 |1 |1b0ae4beb8ce031cf585e9bb79df7d32c3b93c8c73c27d8f2c2ddc2de9c8edcd|
#|3 |4 |7 |8 |57f057bdc4178b69b1b6ab9d78eabee47133790cba8cf503ac1658fa7a496db1|
#+----+----+----+----+----------------------------------------------------------------+
您可以根据文档将
0
或256
传递为sha2()
的第二个参数:函数
concat_ws
带有分隔符和要连接的列的列表。我将||
作为分隔符,并将df.columns
作为列列表传递。我在这里使用所有列,但是您可以指定想要的列子集-在您的情况下为
columnarray
。 (您需要使用*
解压缩列表。)关于pyspark生成特定列的行哈希并将其添加为新列,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/52292180/