我是Python和PySpark的新手。我在PySpark中有一个数据框,如下所示:
## +---+---+------+
## | x1| x2| x3 |
## +---+---+------+
## | 0| a | 13.0|
## | 2| B | -33.0|
## | 1| B | -63.0|
## +---+---+------+
我有一个数组:
arr = [10,12,13]
我想在数据框中创建一列x4,这样它就应该基于x1的值作为索引从列表中获得相应的值。最终数据集应如下所示:
## +---+---+------+-----+
## | x1| x2| x3 | x4 |
## +---+---+------+-----+
## | 0| a | 13.0| 10 |
## | 2| B | -33.0| 13 |
## | 1| B | -63.0| 12 |
## +---+---+------+-----+
我尝试使用以下代码来实现:
df.withColumn("x4", lit(arr[col('x1')])).show()
但是,我得到一个错误:
IndexError: only integers, slices (`:`), ellipsis (`...`), numpy.newaxis (`None`) and integer or boolean arrays are valid indices
有什么办法可以有效地实现这一目标?
最佳答案
当您在数组索引和原始DataFrame之间进行联接时,一种方法是将数组转换为DataFrame,生成rownumber()-1
(将成为索引),然后将两个DataFrame联接在一起。
from pyspark.sql import Row
# Create original DataFrame `df`
df = sqlContext.createDataFrame(
[(0, "a", 13.0), (2, "B", -33.0), (1, "B", -63.0)], ("x1", "x2", "x3"))
df.createOrReplaceTempView("df")
# Create column "x4"
row = Row("x4")
# Take the array
arr = [10, 12, 13]
# Convert Array to RDD, and then create DataFrame
rdd = sc.parallelize(arr)
df2 = rdd.map(row).toDF()
df2.createOrReplaceTempView("df2")
# Create indices via row number
df3 = spark.sql("SELECT (row_number() OVER (ORDER by x4))-1 as indices, * FROM df2")
df3.createOrReplaceTempView("df3")
现在您有了两个数据框:
df
和df3
,您可以运行下面的SQL查询将两个数据框连接在一起。select a.x1, a.x2, a.x3, b.x4 from df a join df3 b on b.indices = a.x1
注意,这也是adding columns to DataFrames的良好参考答案。
关于python - 使用列表在PySpark数据框中创建一列,该列表的索引位于数据框的一列中,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/40609845/