我有一个路径列表,其中包含多个具有相同形状的CSV,称为routes

将此读入单个Spark Data Frame中:

df = spark.read.option("header","true").format('csv').load(routes)


路由是DBFS路径的列表。运行以上命令时,可以在路径中添加一列,以便我知道数据来自何处?

在大熊猫中,我会做类似pd.concat(files, keys=routes) # assume files is a list of pandas csv dataframes.的事情

有没有一种方法来添加路线的列

自然地,我认为您可以使用.withColumn('path',routes),但这会引发以下错误:

AttributeError: 'DataFrameReader' object has no attribute 'withColumn'

我的路线看起来像

routes = ['dbfs:/mnt/Foo/Day_1`,'dbfs:/mnt/Foo/Day_2`,'dbfs:/mnt/Foo/Day_3`]


当前df

df.show()
col_1 | col_2 | col_3
A     |  1    | 1
B     |  1    | 1
A     |  2    | 2
...


预期产量

col_1 | col_2 | col_3 | path
A     |  1    | 1     | 'dbfs:/mnt/Foo/Day_1`
B     |  1    | 1     | 'dbfs:/mnt/Foo/Day_2`
A     |  2    | 2     | 'dbfs:/mnt/Foo/Day_3`
...

最佳答案

使用input_file_name()函数:

from pyspark.sql.functions import input_file_name

df.withColumn('path', input_file_name())

10-07 12:59