我在MySQL中有一个语句,我正试图转换成PySpark:

my_table_name = default_engagements
UPDATE """ + my_table_name + """ SET Engagement = CASE WHEN LinkedAccountId IN ('123456778910', '1098765432123', '254325678912', '429576512356') THEN '808000000298' END WHERE Engagement IS NULL OR Engagement RLIKE '^[a-zA-Z]'; """

我在python spark中找到了这个示例:
from pyspark.sql import functions as F
update_func = (F.when(F.col('update_col') == replace_val, new_value)
                .otherwise(F.col('update_col')))

但我不知道如何使它适应上面的SQL。有人能帮我学一下语法吗?我想把新的信息转换成一个DF,这样我就可以把新的DF写到S3。

最佳答案

这就是你要找的吗?

import pyspark.sql.functions as f
default_engagements = default_engagements.withColumn('Engagement', f.when((f.col('LinkedAccountId').isin('123456778910', '1098765432123', '254325678912', '429576512356'))&((f.col('Engagement').isNull())|(f.col('Engagement').rlike('^[a-zA-Z]'))), '808000000298').otherwise(f.col('Engagement'))

10-05 23:30