本文介绍了在Spark中将表转换为给定格式的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用sparkv2.4.1,有一种情况,我需要转换结构如下的给定表

I am using sparkv2.4.1,Have a scenario , where i need to convert given table structred as below

val df = Seq(
  ("A", "2016-01-01", "2016-12-01", "0.044999408"),
("A", "2016-01-01", "2016-12-01", "0.0449999426"),
("A", "2016-01-01", "2016-12-01", "0.045999415"),
("B", "2016-01-01", "2016-12-01", "0.0787888909"),
("B", "2016-01-01", "2016-12-01", "0.079779426"),
("B", "2016-01-01", "2016-12-01", "0.999989415"),
("C", "2016-01-01", "2016-12-01", "0.0011999408"),
("C", "2016-01-01", "2016-12-01", "0.0087999426"),
("C", "2016-01-01", "2016-12-01", "0.0089899941")
).toDF("class_type","start_date","end_date","ratio")
.withColumn("start_date", to_date($"start_date" ,"yyyy-MM-dd").cast(DateType))
.withColumn("end_date", to_date($"end_date" ,"yyyy-MM-dd").cast(DateType))
.withColumn("ratio", col("ratio").cast(DoubleType))

df.show(200)

给定表:

+----------+----------+----------+------------+
|class_type|start_date|  end_date|       ratio|
+----------+----------+----------+------------+
|         A|2016-01-01|2016-12-01| 0.044999408|
|         A|2016-01-01|2016-12-01|0.0449999426|
|         A|2016-01-01|2016-12-01| 0.045999415|

|         B|2016-01-01|2016-12-01|0.0787888909|
|         B|2016-01-01|2016-12-01| 0.079779426|
|         B|2016-01-01|2016-12-01| 0.999989415|


|         C|2016-01-01|2016-12-01|0.0011999408|
|         C|2016-01-01|2016-12-01|0.0087999426|
|         C|2016-01-01|2016-12-01|0.0089899941|
+----------+----------+----------+------------+

期望的表格格式

+----------+----------+------------+------------+------------+
|start_date|  end_date|           A|           B|           C|
+----------+----------+------------+------------+------------+
|2016-01-01|2016-12-01| 0.044999408|0.0787888909|0.0011999408|
|2016-01-01|2016-12-01|0.0449999426| 0.079779426|0.0087999426|
|2016-01-01|2016-12-01| 0.045999415| 0.999989415|0.0089899941|
+----------+----------+------------+------------+------------+

这怎么办?

尝试过

val pivotDf = df.groupBy("start_date","end_date","class_type").pivot(col("class_type")).agg(first(col("ratio")))


+----------+----------+----------+-----------+------------+------------+
|start_date|  end_date|class_type|          A|           B|           C|
+----------+----------+----------+-----------+------------+------------+
|2016-01-01|2016-12-01|         A|0.044999408|        null|        null|
|2016-01-01|2016-12-01|         B|       null|0.0787888909|        null|
|2016-01-01|2016-12-01|         C|       null|        null|0.0011999408|
+----------+----------+----------+-----------+------------+------------+

推荐答案

基于示例示例中的数据,对于后续行,定量与class_type之间没有任何关系.

Based on data on sample example, you do not have any relation between ration and class_type for subsequent rows.

如果已经订购,则可以分配等级,然后使用该等级进行旋转.

If it is already ordered then you can assign rank and then use this to pivot.

这是使用排名的示例.

import org.apache.spark.sql.types.DoubleType
import org.apache.spark.sql.types.DateType

import org.apache.spark.sql.expressions.Window
val byRatio = Window.partitionBy(col("start_date"),col("end_date"),col("class_type")).orderBy(col("ratio"))

var df = Seq(
  ("A", "2016-01-01", "2016-12-01", "0.044999408"),
("A", "2016-01-01", "2016-12-01", "0.0449999426"),
("A", "2016-01-01", "2016-12-01", "0.045999415"),
("B", "2016-01-01", "2016-12-01", "0.0787888909"),
("B", "2016-01-01", "2016-12-01", "0.079779426"),
("B", "2016-01-01", "2016-12-01", "0.999989415"),
("C", "2016-01-01", "2016-12-01", "0.0011999408"),
("C", "2016-01-01", "2016-12-01", "0.0087999426"),
("C", "2016-01-01", "2016-12-01", "0.0089899941")
).toDF("class_type","start_date","end_date","ratio").
withColumn("start_date", to_date($"start_date" ,"yyyy-MM-dd").cast(DateType)).
withColumn("end_date", to_date($"end_date" ,"yyyy-MM-dd").cast(DateType)).
withColumn("ratio", col("ratio").cast(DoubleType))

df = df.withColumn("class_rank",rank over byRatio)

var pivotDf = df.groupBy("start_date","end_date","class_rank").pivot("class_type").agg(max(col("ratio")))
pivotDf = pivotDf.drop(col("class_rank"))
pivotDf.show(10,false)

根据您的数据,您将获得如下输出:

Based on your data, you will get output like below:

+----------+----------+------------+------------+------------+
|start_date|end_date  |A           |B           |C           |
+----------+----------+------------+------------+------------+
|2016-01-01|2016-12-01|0.044999408 |0.0787888909|0.0011999408|
|2016-01-01|2016-12-01|0.0449999426|0.079779426 |0.0087999426|
|2016-01-01|2016-12-01|0.045999415 |0.999989415 |0.0089899941|
+----------+----------+------------+------------+------------+

这篇关于在Spark中将表转换为给定格式的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

10-10 09:20