问题描述
我有一个 Spark DataFrame(使用 PySpark 1.5.1)并且想添加一个新列.
I have a Spark DataFrame (using PySpark 1.5.1) and would like to add a new column.
我尝试了以下方法但没有成功:
I've tried the following without any success:
type(randomed_hours) # => list
# Create in Python and transform to RDD
new_col = pd.DataFrame(randomed_hours, columns=['new_col'])
spark_new_col = sqlContext.createDataFrame(new_col)
my_df_spark.withColumn("hours", spark_new_col["new_col"])
使用这个也有错误:
my_df_spark.withColumn("hours", sc.parallelize(randomed_hours))
那么如何使用 PySpark 向现有 DataFrame 添加新列(基于 Python 向量)?
So how do I add a new column (based on Python vector) to an existing DataFrame with PySpark?
推荐答案
您不能在 Spark 中向 DataFrame
添加任意列.只能使用文字创建新列(其他文字类型在 如何在 Spark DataFrame 中添加常量列? 中描述)
You cannot add an arbitrary column to a DataFrame
in Spark. New columns can be created only by using literals (other literal types are described in How to add a constant column in a Spark DataFrame?)
from pyspark.sql.functions import lit
df = sqlContext.createDataFrame(
[(1, "a", 23.0), (3, "B", -23.0)], ("x1", "x2", "x3"))
df_with_x4 = df.withColumn("x4", lit(0))
df_with_x4.show()
## +---+---+-----+---+
## | x1| x2| x3| x4|
## +---+---+-----+---+
## | 1| a| 23.0| 0|
## | 3| B|-23.0| 0|
## +---+---+-----+---+
转换现有列:
from pyspark.sql.functions import exp
df_with_x5 = df_with_x4.withColumn("x5", exp("x3"))
df_with_x5.show()
## +---+---+-----+---+--------------------+
## | x1| x2| x3| x4| x5|
## +---+---+-----+---+--------------------+
## | 1| a| 23.0| 0| 9.744803446248903E9|
## | 3| B|-23.0| 0|1.026187963170189...|
## +---+---+-----+---+--------------------+
使用 join
包含:
from pyspark.sql.functions import exp
lookup = sqlContext.createDataFrame([(1, "foo"), (2, "bar")], ("k", "v"))
df_with_x6 = (df_with_x5
.join(lookup, col("x1") == col("k"), "leftouter")
.drop("k")
.withColumnRenamed("v", "x6"))
## +---+---+-----+---+--------------------+----+
## | x1| x2| x3| x4| x5| x6|
## +---+---+-----+---+--------------------+----+
## | 1| a| 23.0| 0| 9.744803446248903E9| foo|
## | 3| B|-23.0| 0|1.026187963170189...|null|
## +---+---+-----+---+--------------------+----+
或用函数/udf生成:
or generated with function / udf:
from pyspark.sql.functions import rand
df_with_x7 = df_with_x6.withColumn("x7", rand())
df_with_x7.show()
## +---+---+-----+---+--------------------+----+-------------------+
## | x1| x2| x3| x4| x5| x6| x7|
## +---+---+-----+---+--------------------+----+-------------------+
## | 1| a| 23.0| 0| 9.744803446248903E9| foo|0.41930610446846617|
## | 3| B|-23.0| 0|1.026187963170189...|null|0.37801881545497873|
## +---+---+-----+---+--------------------+----+-------------------+
映射到 Catalyst 表达式的性能方面的内置函数 (pyspark.sql.functions
) 通常优于 Python 用户定义函数.
Performance-wise, built-in functions (pyspark.sql.functions
), which map to Catalyst expression, are usually preferred over Python user defined functions.
如果要将任意 RDD 的内容添加为列,可以
If you want to add content of an arbitrary RDD as a column you can
- 将行号添加到现有数据框
- 在 RDD 上调用
zipWithIndex
并将其转换为数据框 - 使用索引作为连接键连接两者
这篇关于如何向 Spark DataFrame 添加新列(使用 PySpark)?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!