我正在尝试使用DataFrame
将DataFrameWriter
保存为Parquet格式的HDFS,按三个列值进行分区,如下所示:
dataFrame.write.mode(SaveMode.Overwrite).partitionBy("eventdate", "hour", "processtime").parquet(path)
如this question中所述,
partitionBy
将删除path
上分区的全部现有层次结构,并将其替换为dataFrame
中的分区。由于特定日期的新增量数据将定期出现,因此我想要的是仅替换dataFrame
拥有数据的层次结构中的那些分区,而其他分区则保持不变。为此,我似乎需要使用其完整路径分别保存每个分区,如下所示:
singlePartition.write.mode(SaveMode.Overwrite).parquet(path + "/eventdate=2017-01-01/hour=0/processtime=1234567890")
但是,我在理解将数据组织为单分区
DataFrame
的最佳方法方面遇到困难,因此我无法使用它们的完整路径将其写出。一个想法是这样的:dataFrame.repartition("eventdate", "hour", "processtime").foreachPartition ...
但是
foreachPartition
在Iterator[Row]
上运行,这对于写出Parquet格式并不理想。我还考虑过使用
select...distinct eventdate, hour, processtime
获取分区列表,然后按每个分区过滤原始数据帧并将结果保存到其完整分区路径。但是针对每个分区的独特查询和过滤器似乎效率不高,因为它将执行大量过滤/写入操作。我希望有一种更干净的方法来保留
dataFrame
没有数据的现有分区?谢谢阅读。
Spark版本:2.1
最佳答案
模式选项Append
很有用!
df.write.partitionBy("y","m","d")
.mode(SaveMode.Append)
.parquet("/data/hive/warehouse/mydbname.db/" + tableName)
我已经测试过,发现它将保留现有的分区文件。但是,这次的问题如下:如果您两次运行相同的代码(使用相同的数据),则它将创建新的 Parquet 文件,而不是用相同的数据替换现有的 Parquet 文件(Spark 1.6)。因此,我们仍然可以使用
Append
来解决此问题,而不是使用Overwrite
。与其在表级别覆盖,不如在分区级别覆盖。df.write.mode(SaveMode.Overwrite)
.parquet("/data/hive/warehouse/mydbname.db/" + tableName + "/y=" + year + "/m=" + month + "/d=" + day)
有关更多信息,请参见以下链接:
Overwrite specific partitions in spark dataframe write method
(在suriyanto发表评论后,我更新了我的回复。Thnx。)
关于apache-spark - 如何在Spark中分区和写入DataFrame而不删除没有新数据的分区?,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/42317738/