最好的解释方法是通过示例进行说明。在这种情况下,我们将采用以下两行:
原版的:
ID val
1 2
1 3
1 1
1 9
2 1
2 6
2 8
2 1
更新后的版本:
ID sum_val
1 4
1 10
1 9
1 0
2 14
2 9
2 1
2 0
我正在PySpark工作,因为我的数据集很大。我是PySpark的新手,所以尝试进行这项工作时遇到麻烦。
任何帮助将非常感激。
最佳答案
使用窗口功能:
from pyspark.sql.functions import col, sum, monotonically_increasing_id
from pyspark.sql.window import Window
df = spark.createDataFrame(
[(1, 2), (1, 3), (1, 1), (1, 9), (2, 1), (2, 6), (2, 8), (2, 1)],
("id", "val")
)
您将需要
Window
像这样:w = (Window.partitionBy("id")
.orderBy("_id")
.rowsBetween(1, 2))
添加
_id
:(df
.withColumn("_id", monotonically_increasing_id())
.withColumn("sum_val", sum("val").over(w))
.na.fill(0)
.show())
# +---+---+-----------+-------+
# | id|val| _id|sum_val|
# +---+---+-----------+-------+
# | 1| 2| 0| 4|
# | 1| 3| 1| 10|
# | 1| 1| 8589934592| 9|
# | 1| 9| 8589934593| 0|
# | 2| 1|17179869184| 14|
# | 2| 6|17179869185| 9|
# | 2| 8|25769803776| 1|
# | 2| 1|25769803777| 0|
# +---+---+-----------+-------+
请注意,这样的
monotonically_increasing_id
不是一个好习惯-在生产中,应该始终在数据本身中嵌入订购信息,并且永远不要依赖DataFrame
的内部顺序。关于python - Pyspark-不确定如何将以下X行的总和分配给现有行值,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/49930657/