我有一套文件。文件的路径保存在一个文件中,比如“all_files.txt”。使用ApacheSark,我需要对所有文件进行操作,并对结果进行分类。
我要做的步骤是:
通过读取“all_files.txt”创建RDD
对于“all_files.txt”中的每一行(每一行是某个文件的路径)。
将每个文件的内容读取到单个RDD中
然后执行一个操作所有内容
这是我为同一个代码编写的代码:

def return_contents_from_file (file_name):
    return spark.read.text(file_name).rdd.map(lambda  r: r[0])

def run_spark():
    file_name = 'path_to_file'

    spark = SparkSession \
        .builder \
        .appName("PythonWordCount") \
        .getOrCreate()

    counts = spark.read.text(file_name).rdd.map(lambda r: r[0]) \ # this line is supposed to return the paths to each file
        .flatMap(return_contents_from_file) \ # here i am expecting to club all the contents of all files
        .flatMap(do_operation_on_each_line_of_all_files) # here i am expecting do an operation on each line of all files

这将引发错误:
第323行,在get_返回值py4j.protocol.py4j error中:一个错误
调用O25.GetNewArgs时发生。跟踪:py4j.py4jexception:
方法GetNewArgs([])不存在于
Py4j.Nealth.ErrutsEngult.GETMead(反射引擎.java:318)

Py4j.Nealth.ErrutsEngult.GETMead(反射引擎.java:326)
在Py4j.Gateway。调用(网关.java:272)AT
Py4J.Cudith.ActudioCord.VoCoMeMod(ActudioCord.java:132)
在Py4j.Cudith.Calcult.Exc执行(CalpCuth.java:79)AT
Py4J.GATEWAN连接.运行(网关连接. Java:214)AT
Java.Lang.Tr.Run(线程.java:745)
有人能告诉我我做错了什么,我该怎么做。事先谢谢。

最佳答案

不允许在spark内部使用flatMap或在执行器上发生的任何转换(spark会话仅在驱动程序上可用)。也不可能创建RDD的RDD(请参见:Is it possible to create nested RDDs in Apache Spark?
但您可以用另一种方式实现此转换—将all_files.txt的所有内容读取到数据帧中,使用localmap使其成为数据帧,使用localreduce将所有内容合并,请参见示例:

>>> filenames = spark.read.text('all_files.txt').collect()
>>> dataframes = map(lambda r: spark.read.text(r[0]), filenames)
>>> all_lines_df = reduce(lambda df1, df2: df1.unionAll(df2), dataframes)

10-08 15:51