我有一个路径列表,其中包含多个具有相同形状的CSV,称为routes
将此读入单个Spark Data Frame中:
df = spark.read.option("header","true").format('csv').load(routes)
路由是DBFS路径的列表。运行以上命令时,可以在路径中添加一列,以便我知道数据来自何处?
在大熊猫中,我会做类似
pd.concat(files, keys=routes) # assume files is a list of pandas csv dataframes.
的事情有没有一种方法来添加路线的列
自然地,我认为您可以使用
.withColumn('path',routes)
,但这会引发以下错误:AttributeError: 'DataFrameReader' object has no attribute 'withColumn'
我的路线看起来像
routes = ['dbfs:/mnt/Foo/Day_1`,'dbfs:/mnt/Foo/Day_2`,'dbfs:/mnt/Foo/Day_3`]
当前df
df.show()
col_1 | col_2 | col_3
A | 1 | 1
B | 1 | 1
A | 2 | 2
...
预期产量
col_1 | col_2 | col_3 | path
A | 1 | 1 | 'dbfs:/mnt/Foo/Day_1`
B | 1 | 1 | 'dbfs:/mnt/Foo/Day_2`
A | 2 | 2 | 'dbfs:/mnt/Foo/Day_3`
...
最佳答案
使用input_file_name()
函数:
from pyspark.sql.functions import input_file_name
df.withColumn('path', input_file_name())