我有一套文件。文件的路径保存在一个文件中,比如“all_files.txt”。使用ApacheSark,我需要对所有文件进行操作,并对结果进行分类。
我要做的步骤是:
通过读取“all_files.txt”创建RDD
对于“all_files.txt”中的每一行(每一行是某个文件的路径)。
将每个文件的内容读取到单个RDD中
然后执行一个操作所有内容
这是我为同一个代码编写的代码:
def return_contents_from_file (file_name):
return spark.read.text(file_name).rdd.map(lambda r: r[0])
def run_spark():
file_name = 'path_to_file'
spark = SparkSession \
.builder \
.appName("PythonWordCount") \
.getOrCreate()
counts = spark.read.text(file_name).rdd.map(lambda r: r[0]) \ # this line is supposed to return the paths to each file
.flatMap(return_contents_from_file) \ # here i am expecting to club all the contents of all files
.flatMap(do_operation_on_each_line_of_all_files) # here i am expecting do an operation on each line of all files
这将引发错误:
第323行,在get_返回值py4j.protocol.py4j error中:一个错误
调用O25.GetNewArgs时发生。跟踪:py4j.py4jexception:
方法GetNewArgs([])不存在于
Py4j.Nealth.ErrutsEngult.GETMead(反射引擎.java:318)
在
Py4j.Nealth.ErrutsEngult.GETMead(反射引擎.java:326)
在Py4j.Gateway。调用(网关.java:272)AT
Py4J.Cudith.ActudioCord.VoCoMeMod(ActudioCord.java:132)
在Py4j.Cudith.Calcult.Exc执行(CalpCuth.java:79)AT
Py4J.GATEWAN连接.运行(网关连接. Java:214)AT
Java.Lang.Tr.Run(线程.java:745)
有人能告诉我我做错了什么,我该怎么做。事先谢谢。
最佳答案
不允许在spark
内部使用flatMap
或在执行器上发生的任何转换(spark
会话仅在驱动程序上可用)。也不可能创建RDD的RDD(请参见:Is it possible to create nested RDDs in Apache Spark?)
但您可以用另一种方式实现此转换—将all_files.txt
的所有内容读取到数据帧中,使用localmap
使其成为数据帧,使用localreduce
将所有内容合并,请参见示例:
>>> filenames = spark.read.text('all_files.txt').collect()
>>> dataframes = map(lambda r: spark.read.text(r[0]), filenames)
>>> all_lines_df = reduce(lambda df1, df2: df1.unionAll(df2), dataframes)