我有一张如下表
id week count
A100 201008 2
A100 201009 9
A100 201010 16
A100 201011 23
A100 201012 30
A100 201013 36
A100 201015 43
A100 201017 50
A100 201018 57
A100 201019 63
A100 201023 70
A100 201024 82
A100 201025 88
A100 201026 95
A100 201027 102
在这里,我们可以看到缺少以下几周:
我的要求是每当我们有缺失值时,我们都需要显示上周的计数。
在这种情况下,输出应该是:
id week count
A100 201008 2
A100 201009 9
A100 201010 16
A100 201011 23
A100 201012 30
A100 201013 36
A100 201014 36
A100 201015 43
A100 201016 43
A100 201017 50
A100 201018 57
A100 201019 63
A100 201020 63
A100 201021 63
A100 201022 63
A100 201023 70
A100 201024 82
A100 201025 88
A100 201026 95
A100 201027 102
如何使用 hive/pyspark 实现此要求?
最佳答案
尽管此答案在 Scala
中,但 Python 版本看起来几乎相同并且可以轻松转换。
第 1 步:
查找之前缺少周值的行。
样本输入:
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
//sample input
val input = sc.parallelize(List(("A100",201008,2), ("A100",201009,9),("A100",201014,4), ("A100",201016,45))).toDF("id","week","count")
scala> input.show
+----+------+-----+
| id| week|count|
+----+------+-----+
|A100|201008| 2|
|A100|201009| 9|
|A100|201014| 4| //missing 4 rows
|A100|201016| 45| //missing 1 row
+----+------+-----+
要找到它,我们可以在
.lead()
上使用 week
函数。并计算 leadWeek
和 week
之间的差异。差异不应> 1,如果在它之前缺少行。val diffDF = input
.withColumn("leadWeek", lead($"week", 1).over(Window.partitionBy($"id").orderBy($"week"))) // partitioning by id & computing lead()
.withColumn("diff", ($"leadWeek" - $"week") -1) // finding difference between leadWeek & week
scala> diffDF.show
+----+------+-----+--------+----+
| id| week|count|leadWeek|diff|
+----+------+-----+--------+----+
|A100|201008| 2| 201009| 0| // diff -> 0 represents that no rows needs to be added
|A100|201009| 9| 201014| 4| // diff -> 4 represents 4 rows are to be added after this row.
|A100|201014| 4| 201016| 1| // diff -> 1 represents 1 row to be added after this row.
|A100|201016| 45| null|null|
+----+------+-----+--------+----+
第 2 步:
InputWithDiff
,检查下面的案例类),如diff
并相应地增加 week
值。返回新的与原始行一起创建的行。
将
diffDF
转换为 Dataset 以方便计算。case class InputWithDiff(id: Option[String], week: Option[Int], count: Option[Int], leadWeek: Option[Int], diff: Option[Int])
val diffDS = diffDF.as[InputWithDiff]
val output = diffDS.flatMap(x => {
val diff = x.diff.getOrElse(0)
diff match {
case n if n >= 1 => x :: (1 to diff).map(y => InputWithDiff(x.id, Some(x.week.get + y), x.count,x.leadWeek, x.diff)).toList // create and append new Rows
case _ => List(x) // return as it is
}
}).drop("leadWeek", "diff").toDF // drop unnecessary columns & convert to DF
最终输出:
scala> output.show
+----+------+-----+
| id| week|count|
+----+------+-----+
|A100|201008| 2|
|A100|201009| 9|
|A100|201010| 9|
|A100|201011| 9|
|A100|201012| 9|
|A100|201013| 9|
|A100|201014| 4|
|A100|201015| 4|
|A100|201016| 45|
+----+------+-----+
关于apache-spark - Hive 查询以查找中间周的计数,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/56071059/