最好的解释方法是通过示例进行说明。在这种情况下,我们将采用以下两行:

原版的:

ID  val
1   2
1   3
1   1
1   9
2   1
2   6
2   8
2   1


更新后的版本:

ID  sum_val
1   4
1   10
1   9
1   0
2   14
2   9
2   1
2   0


我正在PySpark工作,因为我的数据集很大。我是PySpark的新手,所以尝试进行这项工作时遇到麻烦。

任何帮助将非常感激。

最佳答案

使用窗口功能:

from pyspark.sql.functions import col, sum, monotonically_increasing_id
from pyspark.sql.window import Window

df = spark.createDataFrame(
    [(1, 2), (1, 3), (1, 1), (1, 9), (2, 1), (2, 6), (2, 8), (2, 1)],
    ("id", "val")
)


您将需要Window像这样:

w = (Window.partitionBy("id")
           .orderBy("_id")
           .rowsBetween(1, 2))


添加_id

(df
   .withColumn("_id", monotonically_increasing_id())
   .withColumn("sum_val", sum("val").over(w))
   .na.fill(0)
   .show())

# +---+---+-----------+-------+
# | id|val|        _id|sum_val|
# +---+---+-----------+-------+
# |  1|  2|          0|      4|
# |  1|  3|          1|     10|
# |  1|  1| 8589934592|      9|
# |  1|  9| 8589934593|      0|
# |  2|  1|17179869184|     14|
# |  2|  6|17179869185|      9|
# |  2|  8|25769803776|      1|
# |  2|  1|25769803777|      0|
# +---+---+-----------+-------+


请注意,这样的monotonically_increasing_id不是一个好习惯-在生产中,应该始终在数据本身中嵌入订购信息,并且永远不要依赖DataFrame的内部顺序。

关于python - Pyspark-不确定如何将以下X行的总和分配给现有行值,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/49930657/

10-14 19:06