本文介绍了Pyspark - 具有重置条件的累积和的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有这个数据框

+---+----+---+
|  A|   B|  C|
+---+----+---+
|  0|null|  1|
|  1| 3.0|  0|
|  2| 7.0|  0|
|  3|null|  1|
|  4| 4.0|  0|
|  5| 3.0|  0|
|  6|null|  1|
|  7|null|  1|
|  8|null|  1|
|  9| 5.0|  0|
| 10| 2.0|  0|
| 11|null|  1|
+---+----+---+

我需要做的是从 C 列到下一个值为零的累积值.

What I need do is a cumulative sum of values from column C until the next value is zero.

预期输出:

+---+----+---+----+
|  A|   B|  C|   D|
+---+----+---+----+
|  0|null|  1|   1|
|  1| 3.0|  0|   0|
|  2| 7.0|  0|   0|
|  3|null|  1|   1|
|  4| 4.0|  0|   0|
|  5| 3.0|  0|   0|
|  6|null|  1|   1|
|  7|null|  1|   2|
|  8|null|  1|   3|
|  9| 5.0|  0|   0|
| 10| 2.0|  0|   0|
| 11|null|  1|   1|
+---+----+---+----+

重现数据帧:

from pyspark.shell import sc
from pyspark.sql import Window
from pyspark.sql.functions import lag, when, sum

x = sc.parallelize([
    [0, None], [1, 3.], [2, 7.], [3, None], [4, 4.],
    [5, 3.], [6, None], [7, None], [8, None], [9, 5.], [10, 2.], [11, None]])
x = x.toDF(['A', 'B'])

# Transform null values into "1"
x = x.withColumn('C', when(x.B.isNull(), 1).otherwise(0))

推荐答案

创建一个临时列 (grp),每当 C 列等于 0(重置条件)并将其用作累积总和的分区列.

Create a temporary column (grp) that increments a counter each time column C is equal to 0 (the reset condition) and use this as a partitioning column for your cumulative sum.

import pyspark.sql.functions as f
from pyspark.sql import Window

x.withColumn(
    "grp",
    f.sum((f.col("C") == 0).cast("int")).over(Window.orderBy("A"))
).withColumn(
    "D",
    f.sum(f.col("C")).over(Window.partitionBy("grp").orderBy("A"))
).drop("grp").show()
#+---+----+---+---+
#|  A|   B|  C|  D|
#+---+----+---+---+
#|  0|null|  1|  1|
#|  1| 3.0|  0|  0|
#|  2| 7.0|  0|  0|
#|  3|null|  1|  1|
#|  4| 4.0|  0|  0|
#|  5| 3.0|  0|  0|
#|  6|null|  1|  1|
#|  7|null|  1|  2|
#|  8|null|  1|  3|
#|  9| 5.0|  0|  0|
#| 10| 2.0|  0|  0|
#| 11|null|  1|  1|
#+---+----+---+---+

这篇关于Pyspark - 具有重置条件的累积和的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-05 08:28