我需要使用 Spark (PySpark) 对表进行增量加载
这是示例:
第一天
id | value
-----------
1 | abc
2 | def
第 2 天
id | value
-----------
2 | cde
3 | xyz
预期结果
id | value
-----------
1 | abc
2 | cde
3 | xyz
这可以在关系数据库中轻松完成,
想知道这是否可以在 Spark 或其他转换工具中完成,例如急速?
最佳答案
干得好!
第一个数据框:
>>> list1 = [(1, 'abc'),(2,'def')]
>>> olddf = spark.createDataFrame(list1, ['id', 'value'])
>>> olddf.show();
+---+-----+
| id|value|
+---+-----+
| 1| abc|
| 2| def|
+---+-----+
第二个数据框:
>>> list2 = [(2, 'cde'),(3,'xyz')]
>>> newdf = spark.createDataFrame(list2, ['id', 'value'])
>>> newdf.show();
+---+-----+
| id|value|
+---+-----+
| 2| cde|
| 3| xyz|
+---+-----+
现在使用完全外连接连接和合并这两个数据名,并在选择时使用合并函数,并且可以用用户定义的值替换空值。
from pyspark.sql.functions import *
>>> df = olddf.join(newdf, olddf.id == newdf.id,'full_outer').select(coalesce(olddf.id,newdf.id).alias("id"),coalesce(newdf.value,olddf.value).alias("value"))
>>> df.show();
+---+-----+
| id|value|
+---+-----+
| 1| abc|
| 3| xyz|
| 2| cde|
+---+-----+
我希望这可以解决您的问题。 :-)
关于apache-spark - Spark增量加载覆盖旧记录,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/53587175/