问题描述
我有一个 DataFrame 生成如下:
I have a DataFrame generated as follows:
df.groupBy($"Hour", $"Category")
.agg(sum($"value").alias("TotalValue"))
.sort($"Hour".asc,$"TotalValue".desc))
结果如下:
+----+--------+----------+
|Hour|Category|TotalValue|
+----+--------+----------+
| 0| cat26| 30.9|
| 0| cat13| 22.1|
| 0| cat95| 19.6|
| 0| cat105| 1.3|
| 1| cat67| 28.5|
| 1| cat4| 26.8|
| 1| cat13| 12.6|
| 1| cat23| 5.3|
| 2| cat56| 39.6|
| 2| cat40| 29.7|
| 2| cat187| 27.9|
| 2| cat68| 9.8|
| 3| cat8| 35.6|
| ...| ....| ....|
+----+--------+----------+
我想根据 col("Hour")
的每个唯一值创建新的数据帧,即
I would like to make new dataframes based on every unique value of col("Hour")
, i.e.
- Hour==0 组
- Hour==1 组
- 为Hour==2组等等...
所以期望的输出是:
df0 as:
+----+--------+----------+
|Hour|Category|TotalValue|
+----+--------+----------+
| 0| cat26| 30.9|
| 0| cat13| 22.1|
| 0| cat95| 19.6|
| 0| cat105| 1.3|
+----+--------+----------+
df1 as:
+----+--------+----------+
|Hour|Category|TotalValue|
+----+--------+----------+
| 1| cat67| 28.5|
| 1| cat4| 26.8|
| 1| cat13| 12.6|
| 1| cat23| 5.3|
+----+--------+----------+
同样,
df2 as:
+----+--------+----------+
|Hour|Category|TotalValue|
+----+--------+----------+
| 2| cat56| 39.6|
| 2| cat40| 29.7|
| 2| cat187| 27.9|
| 2| cat68| 9.8|
+----+--------+----------+
非常感谢任何帮助.
编辑 1:
我尝试过的:
df.foreach(
row => splitHour(row)
)
def splitHour(row: Row) ={
val Hour=row.getAs[Long]("Hour")
val HourDF= sparkSession.createDataFrame(List((s"$Hour",1)))
val hdf=HourDF.withColumnRenamed("_1","Hour_unique").drop("_2")
val mydf: DataFrame =df.join(hdf,df("Hour")===hdf("Hour_unique"))
mydf.write.mode("overwrite").parquet(s"/home/dev/shaishave/etc/myparquet/$Hour/")
}
这个策略的问题:
在具有超过 100 万行的数据帧 df
上运行它需要 8 小时,并且在单个节点上为 Spark 作业提供了大约 10 GB 的 RAM.因此,join
变得非常低效.
It took 8 hours when it was run on a dataframe df
which had over 1 million rows and spark job was given around 10 GB RAM on single node. So, join
is turning out to be highly in-efficient.
警告:我必须将每个数据帧 mydf
编写为镶木地板,其中包含需要维护(未展平)的嵌套架构.
Caveat: I have to write each dataframe mydf
as parquet which has nested schema that is required to be maintained (not flattened).
推荐答案
正如我在评论中所指出的,解决这个问题的一种可能简单的方法是使用:
As noted in my comments, one potentially easy approach to this problem would be to use:
df.write.partitionBy("hour").saveAsTable("myparquet")
如前所述,文件夹结构将是 myparquet/hour=1
、myparquet/hour=2
、...、myparquet/hour=24
与 myparquet/1
、myparquet/2
、...、myparquet/24
相对.
As noted, the folder structure would be myparquet/hour=1
, myparquet/hour=2
, ..., myparquet/hour=24
as opposed to myparquet/1
, myparquet/2
, ..., myparquet/24
.
要更改文件夹结构,您可以
To change the folder structure, you could
- 可能在显式 HiveContext 中使用 Hive 配置设置
hcat.dynamic.partitioning.custom.pattern
;更多信息请参见 HCatalog DynamicPartitions. - 另一种方法是在您执行了
df.write.partitionBy.saveAsTable(...)
命令后直接更改文件系统,例如for f in *;做 mv $f ${f/${f:0:5}/} ;done
将从文件夹名称中删除Hour=
文本.
- Potentially use the Hive configuration setting
hcat.dynamic.partitioning.custom.pattern
within an explicit HiveContext; more information at HCatalog DynamicPartitions. - Another approach would be to change the file system directly after you have executed the
df.write.partitionBy.saveAsTable(...)
command with something likefor f in *; do mv $f ${f/${f:0:5}/} ; done
which would remove theHour=
text from the folder name.
需要注意的是,通过更改文件夹的命名模式,当您在该文件夹中运行 spark.read.parquet(...)
时,Spark 不会自动理解动态分区,因为它缺少 partitionKey(即 Hour
)信息.
It is important to note that by changing the naming pattern for the folders, when you are running spark.read.parquet(...)
in that folder, Spark will not automatically understand the dynamic partitions since its missing the partitionKey (i.e. Hour
) information.
这篇关于SPARK DataFrame:如何根据相同的列值有效地拆分每个组的数据帧的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!