问题描述
当护照和国家相同时,我不得不在最后一列应用 Levenshtein 函数.
I had to apply Levenshtein Function on last column when passport and country are same.
matrix = passport_heck.select(\
f.col('name_id').alias('name_id_1'),
f.col('last').alias('last_1'),
f.col('country').alias('country_1'),
f.col('passport').alias('passport_1')) \
.crossJoin(passport_heck.select(\
f.col('name_id').alias('name_id_2'),
f.col('last').alias('last_2'),
f.col('country').alias('country_2'),
f.col('passport').alias('passport_2')))\
.filter((f.col('passport_1') == f.col('passport_2')) & (f.col('country_1') == f.col('country_2')))```
res = matrix.withColumn('distance', levenshtein(f.col('last_1'), f.col('last_2')))
现在我得到以下输出,这完全没问题.
Now I am getting the following output which is totally fine.
现在我需要删除重复对(例如 ID 558635 与 1106562 然后 1106562 与 558635 比较相同的内容).
Now I need to delete duplicates pair (example ID 558635 with 1106562 then 1106562 with 558635 comparing same content).
任何人都可以在 pyspark 中给我一些逻辑以获取下表.
Can anyone please give me some logic in pyspark to get below table.
推荐答案
如果您想解决问题,您的问题可能会变得非常复杂,但这里有一些 pyspark 示例代码,希望可以帮助您入门.
Your problem can become quite complicated if you want to get it right, but here you have some sample code in pyspark that hopefully gets you started.
首先是一个小数据集,
tinydata = sqlContext.createDataFrame(
[
(3527524, 'aamir', 'al malik', '[email protected]'),
(4287983, 'aamir', 'al malik', '[email protected]'),
(200490, 'aamir', 'al malik', '[email protected]'),
(1906639, 'tahir', 'al malik', '[email protected]')
],
['ID', 'first_NAME', 'last_NAME', 'EMAIL']
)
然后通过cross-join
将其转换为差异矩阵.请注意,如果您有 500 万,这将变得巨大.您需要尽可能避免比较,例如遵循对您的问题的一些评论,以及您可能想出的其他想法.请注意,最终过滤器是为了避免将 2 行比较两次.
Then you convert it to a matrix of differences through a cross-join
. Note that if you have 5 million, this will become huge. You need to avoid comparisons as much as possible, such as following some of the comments to your question, and other ideas you may come up with. Note the final filter is to avoid comparing 2 rows twice.
matrix = tinydata.select(F.col('ID').alias('ID1'), F.col('EMAIL').alias('EMAIL1')) \
.crossJoin(tinydata.select(F.col('ID').alias('ID2'), F.col('EMAIL').alias('EMAIL2'))) \
.filter(F.col('ID1') > F.col('ID2'))
之后,您可以计算距离.
After that, you can calculate distances.
def lev_dist(left, right):
return Levenshtein.distance(left, right)
lev_dist_udf = udf(lev_dist, IntegerType())
res = matrix.withColumn('d', lev_dist_udf(F.col('EMAIL1'), F.col('EMAIL2')))
通过你得到的小例子
res.show()
+-------+--------------------+-------+--------------------+---+
| ID1| EMAIL1| ID2| EMAIL2| d|
+-------+--------------------+-------+--------------------+---+
|3527524|aamir.almalik@gma...| 200490|aamir.almalik@gma...| 1|
|3527524|aamir.almalik@gma...|1906639|tahir.almalik@gma...| 2|
|4287983|aamir.almalik@com...|3527524|aamir.almalik@gma...| 5|
|4287983|aamir.almalik@com...| 200490|aamir.almalik@gma...| 6|
|4287983|aamir.almalik@com...|1906639|tahir.almalik@gma...| 7|
|1906639|tahir.almalik@gma...| 200490|aamir.almalik@gma...| 3|
+-------+--------------------+-------+--------------------+---+
感谢您指出@cronoik
Thanks for pointing out @cronoik
不需要udf,应该是这样的:
No need for udf, should be something like this:
from pyspark.sql.functions import levenshtein
matrix = tinydata.select(F.col('ID').alias('ID1'), F.col('EMAIL').alias('EMAIL1')) \
.crossJoin(tinydata.select(F.col('ID').alias('ID2'), F.col('EMAIL').alias('EMAIL2'))) \
.filter(F.col('ID1') > F.col('ID2'))
res = matrix.withColumn('d', levenshtein(F.col('EMAIL1'), F.col('EMAIL2')))
这篇关于使用 Soundex 函数或 Levenshtein 距离模糊匹配 pyspark 或 SQL 中的字符串的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!