本文介绍了将文件名附加到RDD的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
我有一个包含我的数据文件的文件夹.每个文件的大小约为1 GB.
I have a folder containing my data files. Each file has a size of about 1 GB.
我需要的是RDD中的文件名.
What I need is the filename inside the RDD.
以下操作无法按预期进行:
Following does not work as expected:
import glob
rdds = []
for filename in glob.iglob('/data/*'):
rdd = sc.textFile(filename).map(lambda row: (filename, row))
rdds.append(rdd)
allData = sc.union(rdds)
使用此代码,文件名
始终是最后读取文件的文件名
using this, filename
is always the filename of the last read file
我也尝试过:
import glob
rdds = []
for filename in glob.iglob('/data/*'):
def f(name=filename):
return name
rdd = sc.textFile(filename).map(lambda row: (f(), row))
rdds.append(rdd)
allData = sc.union(rdds)
但这会产生错误:广播只能在驱动程序中序列化
sc.wholeTextFile()
,因为单个文件太大.
sc.wholeTextFile()
is not an option because the single files are to big.
推荐答案
我想您想要这样的东西:
I guess you want something like this:
import functools
def proc(f):
return sc.textFile(f).map(lambda x: (f, x))
rdd = functools.reduce(
lambda rdd1, rdd2: rdd1.union(rdd2),
(proc(f) for f in glob.glob("/data/*")))
或使用 sc.union
:
sc.union([proc(f) for f in glob.glob("/data/*")])
这篇关于将文件名附加到RDD的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!