问题描述
我有一组文件.文件的路径保存在文件中,例如all_files.txt
.使用apache spark,我需要对所有文件进行操作并将结果合并.
I have a set of files. The path to the files are saved in a file., say all_files.txt
. Using apache spark, I need to do an operation on all the files and club the results.
我要执行的步骤是:
- 通过阅读
all_files.txt
创建RDD - 对于
all_files.txt
中的每一行(每行都是某个文件的路径),将每个文件的内容读入一个RDD - 然后对所有内容进行操作
- Create an RDD by reading
all_files.txt
- For each line in
all_files.txt
(Each line is a path to some file),read the contents of each of the files into a single RDD - Then do an operation all contents
这是我为此写的代码:
def return_contents_from_file (file_name):
return spark.read.text(file_name).rdd.map(lambda r: r[0])
def run_spark():
file_name = 'path_to_file'
spark = SparkSession \
.builder \
.appName("PythonWordCount") \
.getOrCreate()
counts = spark.read.text(file_name).rdd.map(lambda r: r[0]) \ # this line is supposed to return the paths to each file
.flatMap(return_contents_from_file) \ # here i am expecting to club all the contents of all files
.flatMap(do_operation_on_each_line_of_all_files) # here i am expecting do an operation on each line of all files
这引发了错误:
有人可以告诉我我做错了什么以及应该如何进一步进行.预先感谢.
Can someone please tell me what I am doing wrong and how I should proceed further. Thanks in advance.
推荐答案
不允许在flatMap
中使用spark
或在执行程序上发生的任何转换(spark
会话仅在驱动程序上可用).也无法创建RDD的RDD(请参阅:是否可以在Apache Spark中创建嵌套的RDD?)
Using spark
inside flatMap
or any transformation that occures on executors is not allowed (spark
session is available on driver only). It is also not possible to create RDD of RDDs (see: Is it possible to create nested RDDs in Apache Spark?)
但是您可以通过另一种方式来实现此转换-将all_files.txt
的所有内容读入数据框,使用 local map
使其成为数据框,然后使用 local 全部合并,请参见示例:
But you can achieve this transformation in another way - read all content of all_files.txt
into dataframe, use local map
to make them dataframes and local reduce
to union all, see example:
>>> filenames = spark.read.text('all_files.txt').collect()
>>> dataframes = map(lambda r: spark.read.text(r[0]), filenames)
>>> all_lines_df = reduce(lambda df1, df2: df1.unionAll(df2), dataframes)
这篇关于PySpark投掷错误方法__getnewargs __([])不存在的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!