我正在尝试使用DataFrameDataFrameWriter保存为Parquet格式的HDFS,按三个列值进行分区,如下所示:

dataFrame.write.mode(SaveMode.Overwrite).partitionBy("eventdate", "hour", "processtime").parquet(path)

this question中所述,partitionBy将删除path上分区的全部现有层次结构,并将其替换为dataFrame中的分区。由于特定日期的新增量数据将定期出现,因此我想要的是仅替换dataFrame拥有数据的层次结构中的那些分区,而其他分区则保持不变。

为此,我似乎需要使用其完整路径分别保存每个分区,如下所示:
singlePartition.write.mode(SaveMode.Overwrite).parquet(path + "/eventdate=2017-01-01/hour=0/processtime=1234567890")

但是,我在理解将数据组织为单分区DataFrame的最佳方法方面遇到困难,因此我无法使用它们的完整路径将其写出。一个想法是这样的:
dataFrame.repartition("eventdate", "hour", "processtime").foreachPartition ...

但是foreachPartitionIterator[Row]上运行,这对于写出Parquet格式并不理想。

我还考虑过使用select...distinct eventdate, hour, processtime获取分区列表,然后按每个分区过滤原始数据帧并将结果保存到其完整分区路径。但是针对每个分区的独特查询和过滤器似乎效率不高,因为它将执行大量过滤/写入操作。

我希望有一种更干净的方法来保留dataFrame没有数据的现有分区?

谢谢阅读。

Spark版本:2.1

最佳答案

模式选项Append很有用!

df.write.partitionBy("y","m","d")
.mode(SaveMode.Append)
.parquet("/data/hive/warehouse/mydbname.db/" + tableName)

我已经测试过,发现它将保留现有的分区文件。但是,这次的问题如下:如果您两次运行相同的代码(使用相同的数据),则它将创建新的 Parquet 文件,而不是用相同的数据替换现有的 Parquet 文件(Spark 1.6)。因此,我们仍然可以使用Append来解决此问题,而不是使用Overwrite。与其在表级别覆盖,不如在分区级别覆盖。
df.write.mode(SaveMode.Overwrite)
.parquet("/data/hive/warehouse/mydbname.db/" + tableName + "/y=" + year + "/m=" + month + "/d=" + day)

有关更多信息,请参见以下链接:

Overwrite specific partitions in spark dataframe write method

(在suriyanto发表评论后,我更新了我的回复。Thnx。)

关于apache-spark - 如何在Spark中分区和写入DataFrame而不删除没有新数据的分区?,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/42317738/

10-16 03:36