问题描述
我需要通过更改日期列来拆分一行并创建一个新行,并使 amt 列为零,如下例所示:
I need to split a row and create a new row by changing the date columns and make the amt columns to zero as in the below example:
Input:
+---+-----------------------+-----------------------+-----+
|KEY|START_DATE |END_DATE |Amt |
+---+-----------------------+-----------------------+-----+
|0 |2016-12-14T23:59:59.000|2017-10-29T23:59:58.000|100.0|
|0 |2016-12-14T23:59:59.000|2017-10-29T23:59:58.000|200.0|
|0 |2017-10-30T00:00:00.000|2017-11-02T23:59:59.000|67.5 |->Split row based on start & date end date is between "2017-10-31T23:59:59" condition
|0 |2016-12-14T23:59:59.000|2017-10-29T23:59:58.000|55.3 |
|1 |2016-12-14T23:59:59.000|2017-10-29T23:59:58.000|22.2 |
|1 |2017-10-30T00:00:00.000|2017-11-01T23:59:59.000|11.0 |->Split row based on start & date end date is between "2017-10-31T23:59:59" condition
|1 |2016-12-14T23:59:59.000|2017-10-29T23:59:58.000|87.33|
+---+-----------------------+-----------------------+-----+
如果2017-10-31T23:59:59"在行 start_date 和 end_date 之间,则通过更改一行的 end_date 和另一行的 start_date 将行拆分为两行.并将新行的 amt 设为零,如下所示:
If "2017-10-31T23:59:59" is in between row start_date and end_date , then split the row into two rows by changing the end_date for one row and start_date for another row. And make the amt to zero for the new row as below:
期望输出:
+---+-----------------------+-----------------------+-----+---+
|KEY|START_DATE |END_DATE |Amt |Ind|
+---+-----------------------+-----------------------+-----+---+
|0 |2016-12-14T23:59:59.000|2017-10-29T23:59:58.000|100.0|N |
|0 |2016-12-14T23:59:59.000|2017-10-29T23:59:58.000|200.0|N |
|0 |2017-10-30T00:00:00.000|2017-10-30T23:59:59.998|67.5 |N |->parent row (changed the END_DATE)
|0 |2017-10-30T23:59:59.999|2017-11-02T23:59:59.000|0.0 |Y |->splitted new row(changed the START_DATE and Amt=0.0)
|0 |2016-12-14T23:59:59.000|2017-10-29T23:59:58.000|55.3 |N |
|1 |2016-12-14T23:59:59.000|2017-10-29T23:59:58.000|22.2 |N |
|1 |2017-10-30T00:00:00.000|2017-10-30T23:59:59.998|11.0 |N |->parent row (changed the END_DATE)
|1 |2017-10-30T23:59:59.999|2017-11-01T23:59:59.000|0.0 |Y |->splitted new row(changed the START_DATE and Amt=0.0)
|1 |2016-12-14T23:59:59.000|2017-10-29T23:59:58.000|87.33|N |
+---+-----------------------+-----------------------+-----+---+
我尝试了以下代码并能够复制行,但无法即时更新行.
I have tried the below code and able to copy the row , but unable to update the rows on the fly.
val df1Columns = Seq("KEY", "START_DATE", "END_DATE", "Amt")
val df1Schema = new StructType(df1Columns.map(c => StructField(c, StringType, nullable = false)).toArray)
val input1: Array[String] = Seq("0", "2016-12-14T23:59:59.000", "2017-10-29T23:59:58.000", "100.0").toArray;
val row1: Row = Row.fromSeq(input1)
val input2: Array[String] = Seq("0", "2016-12-14T23:59:59.000", "2017-10-29T23:59:58.000", "200.0").toArray;
val row2: Row = Row.fromSeq(input2)
val input3: Array[String] = Seq("0", "2017-10-30T00:00:00.000", "2017-11-0123:59:59.000", "67.5").toArray;
val row3: Row = Row.fromSeq(input3)
val input4: Array[String] = Seq("0", "2016-12-14T23:59:59.000", "2017-10-29T23:59:58.000", "55.3").toArray;
val row4: Row = Row.fromSeq(input4)
val input5: Array[String] = Seq("1", "2016-12-14T23:59:59.000", "2017-10-29T23:59:58.000", "22.2").toArray;
val row5: Row = Row.fromSeq(input5)
val input6: Array[String] = Seq("1", "2017-10-30T00:00:00.000", "2017-11-0123:59:59.000", "11.0").toArray;
val row6: Row = Row.fromSeq(input6)
val input7: Array[String] = Seq("1", "2016-12-14T23:59:59.000", "2017-10-29T23:59:58.000", "87.33").toArray;
val row7: Row = Row.fromSeq(input7)
val rdd: RDD[Row] = spark.sparkContext.parallelize(Seq(row1, row2, row3, row4, row5, row6, row7))
val df: DataFrame = spark.createDataFrame(rdd, df1Schema)
//----------------------------------------------------------------
def encoder(columns: Seq[String]): Encoder[Row] = RowEncoder(StructType(columns.map(StructField(_, StringType, nullable = true))))
val outputColumns = Seq("KEY", "START_DATE", "END_DATE", "Amt","Ind")
val result = df.groupByKey(r => r.getAs[String]("KEY"))
.flatMapGroups((_, rowsForAkey) => {
var result: List[Row] = List()
for (row <- rowsForAkey) {
val qrDate = "2017-10-31T23:59:59"
val currRowStartDate = row.getAs[String]("START_DATE")
val rowEndDate = row.getAs[String]("END_DATE")
if (currRowStartDate <= qrDate && qrDate <= rowEndDate) //Quota
{
val rLayer = row
result = result :+ rLayer
}
val originalRow = row
result = result :+ originalRow
}
result
})(encoder(df1Columns)).toDF
df.show(false)
result.show(false)
这是我的代码输出:
+---+-----------------------+-----------------------+-----+
|KEY|START_DATE |END_DATE |Amt |
+---+-----------------------+-----------------------+-----+
|0 |2016-12-14T23:59:59.000|2017-10-29T23:59:58.000|100.0|
|0 |2016-12-14T23:59:59.000|2017-10-29T23:59:58.000|200.0|
|0 |2017-10-30T00:00:00.000|2017-11-0123:59:59.000 |67.5 |
|0 |2017-10-30T00:00:00.000|2017-11-0123:59:59.000 |67.5 |
|0 |2016-12-14T23:59:59.000|2017-10-29T23:59:58.000|55.3 |
|1 |2016-12-14T23:59:59.000|2017-10-29T23:59:58.000|22.2 |
|1 |2017-10-30T00:00:00.000|2017-11-0123:59:59.000 |11.0 |
|1 |2017-10-30T00:00:00.000|2017-11-0123:59:59.000 |11.0 |
|1 |2016-12-14T23:59:59.000|2017-10-29T23:59:58.000|87.33|
+---+-----------------------+-----------------------+-----+
推荐答案
我建议你使用内置函数,而不是通过这样复杂的rdd方式.
I would suggest you to you to go with inbuilt functions rather than going through such complex rdd way.
我使用了内置函数,例如 lit
来填充常量和 udf
函数来更改日期列中的时间
I have used inbuilt functions such as lit
to populate constants and udf
function to change the time in date columns
主题是将dataframes
一分为二,最后union
(为了代码的清晰性,我已经进行了评论)
Main theme is to separate the dataframes
into two and finally union
them(I have commented for the clarity of the codes)
import org.apache.spark.sql.functions._
//udf function to change the time
def changeTimeInDate = udf((toCopy: String, withCopied: String)=> withCopied.split("T")(0)+"T"+toCopy.split("T")(1))
//creating Ind column with N populated and saving in temporaty dataframe
val indDF = df.withColumn("Ind", lit("N"))
//filtering out the rows that match the condition mentioned in the question and then changing the Amt column and Ind column and START_DATE
val duplicatedDF = indDF.filter($"START_DATE" <= "2017-10-31T23:59:59" && $"END_DATE" >= "2017-10-31T23:59:59")
.withColumn("Amt", lit("0.0"))
.withColumn("Ind", lit("Y"))
.withColumn("START_DATE", changeTimeInDate($"END_DATE", $"START_DATE"))
//Changing the END_DATE and finally merging both
val result = indDF.withColumn("END_DATE", changeTimeInDate($"START_DATE", $"END_DATE"))
.union(duplicatedDF)
你应该有想要的输出
+---+-----------------------+-----------------------+-----+---+
|KEY|START_DATE |END_DATE |Amt |Ind|
+---+-----------------------+-----------------------+-----+---+
|0 |2016-12-14T23:59:59.000|2017-10-29T23:59:59.000|100.0|N |
|0 |2016-12-14T23:59:59.000|2017-10-29T23:59:59.000|55.3 |N |
|0 |2016-12-14T23:59:59.000|2017-10-29T23:59:59.000|200.0|N |
|0 |2017-10-30T00:00:00.000|2017-11-01T00:00:00.000|67.5 |N |
|0 |2017-10-30T23:59:59.000|2017-11-01T23:59:59.000|0.0 |Y |
|1 |2016-12-14T23:59:59.000|2017-10-29T23:59:59.000|22.2 |N |
|1 |2016-12-14T23:59:59.000|2017-10-29T23:59:59.000|87.33|N |
|1 |2017-10-30T00:00:00.000|2017-11-01T00:00:00.000|11.0 |N |
|1 |2017-10-30T23:59:59.000|2017-11-01T23:59:59.000|0.0 |Y |
+---+-----------------------+-----------------------+-----+---+
这篇关于将一行分成两部分并虚拟一些列的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!