我有一张如下表

id      week    count
A100    201008  2
A100    201009  9
A100    201010  16
A100    201011  23
A100    201012  30
A100    201013  36
A100    201015  43
A100    201017  50
A100    201018  57
A100    201019  63
A100    201023  70
A100    201024  82
A100    201025  88
A100    201026  95
A100    201027  102

在这里,我们可以看到缺少以下几周:
  • 第一个 201014 缺失
  • Second 201016 缺失
  • 第三周缺失 201020、201021、201022

  • 我的要求是每当我们有缺失值时,我们都需要显示上周的计数。

    在这种情况下,输出应该是:
    id      week    count
    A100    201008  2
    A100    201009  9
    A100    201010  16
    A100    201011  23
    A100    201012  30
    A100    201013  36
    A100    201014  36
    A100    201015  43
    A100    201016  43
    A100    201017  50
    A100    201018  57
    A100    201019  63
    A100    201020  63
    A100    201021  63
    A100    201022  63
    A100    201023  70
    A100    201024  82
    A100    201025  88
    A100    201026  95
    A100    201027  102
    

    如何使用 hive/pyspark 实现此要求?

    最佳答案

    尽管此答案在 Scala 中,但 Python 版本看起来几乎相同并且可以轻松转换。

    第 1 步:

    查找之前缺少周值的行。

    样本输入:

    import org.apache.spark.sql.expressions.Window
    import org.apache.spark.sql.functions._
    
    //sample input
    val input = sc.parallelize(List(("A100",201008,2), ("A100",201009,9),("A100",201014,4), ("A100",201016,45))).toDF("id","week","count")
    
    scala> input.show
    +----+------+-----+
    |  id|  week|count|
    +----+------+-----+
    |A100|201008|    2|
    |A100|201009|    9|
    |A100|201014|    4| //missing 4 rows
    |A100|201016|   45| //missing 1 row
    +----+------+-----+
    

    要找到它,我们可以在 .lead() 上使用 week 函数。并计算 leadWeekweek 之间的差异。差异不应> 1,如果在它之前缺少行。
    val diffDF = input
      .withColumn("leadWeek", lead($"week", 1).over(Window.partitionBy($"id").orderBy($"week")))   // partitioning by id & computing lead()
      .withColumn("diff", ($"leadWeek" - $"week") -1)                                 // finding difference between leadWeek & week
    
    scala> diffDF.show
    +----+------+-----+--------+----+
    |  id|  week|count|leadWeek|diff|
    +----+------+-----+--------+----+
    |A100|201008|    2|  201009|   0| // diff -> 0 represents that no rows needs to be added
    |A100|201009|    9|  201014|   4| // diff -> 4 represents 4 rows are to be added after this row.
    |A100|201014|    4|  201016|   1| // diff -> 1 represents 1 row to be added after this row.
    |A100|201016|   45|    null|null|
    +----+------+-----+--------+----+
    

    第 2 步:
  • 如果差异 >= 1:创建并添加 n 行( InputWithDiff ,检查下面的案例类),如diff 并相应地增加 week 值。返回新的
    与原始行一起创建的行。
  • 如果 diff 为 0,则不需要额外的计算。按原样返回原始行。

  • diffDF 转换为 Dataset 以方便计算。
    case class InputWithDiff(id: Option[String], week: Option[Int], count: Option[Int], leadWeek: Option[Int], diff: Option[Int])
    
    val diffDS = diffDF.as[InputWithDiff]
    
    val output = diffDS.flatMap(x => {
     val diff = x.diff.getOrElse(0)
    
     diff match {
      case n if n >= 1 => x :: (1 to diff).map(y => InputWithDiff(x.id, Some(x.week.get + y), x.count,x.leadWeek, x.diff)).toList  // create and append new Rows
      case _ => List(x)      // return as it is
     }
    }).drop("leadWeek", "diff").toDF   // drop unnecessary columns & convert to DF
    

    最终输出:
    scala> output.show
    +----+------+-----+
    |  id|  week|count|
    +----+------+-----+
    |A100|201008|    2|
    |A100|201009|    9|
    |A100|201010|    9|
    |A100|201011|    9|
    |A100|201012|    9|
    |A100|201013|    9|
    |A100|201014|    4|
    |A100|201015|    4|
    |A100|201016|   45|
    +----+------+-----+
    

    关于apache-spark - Hive 查询以查找中间周的计数,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/56071059/

    10-16 03:30