本文介绍了加载多个文件且缺少一个文件时,PySpark作业失败的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

使用PySpark从S3加载多个JSON文件时,出现错误,如果缺少文件,Spark作业将失败.

When using PySpark to load multiple JSON files from S3 I get an error and the Spark job fails if a file is missing.

这就是我将PySpark的最后5天添加到工作中的方式.

This is how I add the 5 last days to my job with PySpark.

days = 5
x = 0
files = []

while x < days:
    filedate = (date.today() - timedelta(x)).isoformat()
    path = "s3n://example/example/"+filedate+"/*.json"
    files.append(path)
    x += 1

rdd = sc.textFile(",".join(files))
df = sql_context.read.json(rdd, schema)

如何让PySpark忽略丢失的文件并继续工作?

How can I get PySpark to ignore the missing files and continue with the job?

推荐答案

使用尝试加载文件的函数,如果缺少该文件,它将失败并返回False.

Use a function that tries to load the file, if the file is missing it fails and returns False.

from py4j.protocol import Py4JJavaError

def path_exist(sc, path):
    try:
        rdd = sc.textFile(path)
        rdd.take(1)
        return True
    except Py4JJavaError as e:
        return False

这使您可以在将文件添加到列表之前检查文件是否可用,而不必使用AWS Cli或S3命令.

This lets you check if files are available before adding them to your list without having to use AWS Cli or S3 commands.

days = 5
x = 0
files = []

while x < days:
    filedate = (date.today() - timedelta(x)).isoformat()
    path = "s3n://example/example/"+filedate+"/*.json"
    if path_exist(sc, path):
        files.append(path)
    else:
        print('Path does not exist, skipping: ' + path)
    x += 1

rdd = sc.textFile(",".join(files))
df = sql_context.read.json(rdd, schema)

我在 http://www.learn4master.com/big-data/pyspark/pyspark-check-if-file-exists

这篇关于加载多个文件且缺少一个文件时,PySpark作业失败的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

09-06 04:23