You can load the 2 JSON files into Spark data frames and do a left_join to get updates from the latest JSON data :from pyspark.sql import functions as Ffull_json_df = spark.read.json(full_json_path, multiLine=True)latest_json_df = spark.read.json(latest_json_path, multiLine=True)updated_df = full_json_df.alias("full").join( latest_json_df.alias("latest"), F.col("full.id") == F.col("latest.id"), "left").select( F.col("full.id"), *[ F.when(F.col("latest.id").isNotNull(), F.col(f"latest.{c}")).otherwise(F.col(f"full.{c}")).alias(c) for c in full_json_df.columns if c != 'id' ])updated_df.show(truncate=False)#+----+------------+---------+-----------------------------------------------------------------------------------------------------+--------+#|id |email |firstName|layer01 |surname |#+----+------------+---------+-----------------------------------------------------------------------------------------------------+--------+#|6304|[email protected]|name01 |[value1, value2, value3, value4, [value1_changedData, value2], [[inner value01,], [, inner_value02]]]|Optional|#+----+------------+---------+-----------------------------------------------------------------------------------------------------+--------+更新:如果架构在 full 和 latest JSON 之间发生变化,您可以将这 2 个文件加载到同一个数据帧中(通过这种方式合并架构),然后进行重复数据删除每个 id:If the schema changes between full and latest JSONs, you can load the 2 files into the same data frame (this way the schemas are being merged) and then deduplicate per id:from pyspark.sql import Windowfrom pyspark.sql import functions as Fmerged_json_df = spark.read.json("/path/to/{full_json.json,latest_json.json}", multiLine=True)# order priority: latest file then fullw = Window.partitionBy(F.col("id")).orderBy(F.when(F.input_file_name().like('%latest%'), 0).otherwise(1))updated_df = merged_json_df.withColumn("rn", F.row_number().over(w))\ .filter("rn = 1")\ .drop("rn")updated_df.show(truncate=False) 这篇关于使用 Python 用另一个嵌套的 Json 更新嵌套的 Json的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持! 10-23 01:59