问题描述
使用PySpark从S3加载多个JSON文件时,出现错误,如果缺少文件,Spark作业将失败.
When using PySpark to load multiple JSON files from S3 I get an error and the Spark job fails if a file is missing.
这就是我将PySpark的最后5天添加到工作中的方式.
This is how I add the 5 last days to my job with PySpark.
days = 5
x = 0
files = []
while x < days:
filedate = (date.today() - timedelta(x)).isoformat()
path = "s3n://example/example/"+filedate+"/*.json"
files.append(path)
x += 1
rdd = sc.textFile(",".join(files))
df = sql_context.read.json(rdd, schema)
如何让PySpark忽略丢失的文件并继续工作?
How can I get PySpark to ignore the missing files and continue with the job?
推荐答案
使用尝试加载文件的函数,如果缺少该文件,它将失败并返回False.
Use a function that tries to load the file, if the file is missing it fails and returns False.
from py4j.protocol import Py4JJavaError
def path_exist(sc, path):
try:
rdd = sc.textFile(path)
rdd.take(1)
return True
except Py4JJavaError as e:
return False
这使您可以在将文件添加到列表之前检查文件是否可用,而不必使用AWS Cli或S3命令.
This lets you check if files are available before adding them to your list without having to use AWS Cli or S3 commands.
days = 5
x = 0
files = []
while x < days:
filedate = (date.today() - timedelta(x)).isoformat()
path = "s3n://example/example/"+filedate+"/*.json"
if path_exist(sc, path):
files.append(path)
else:
print('Path does not exist, skipping: ' + path)
x += 1
rdd = sc.textFile(",".join(files))
df = sql_context.read.json(rdd, schema)
我在 http://www.learn4master.com/big-data/pyspark/pyspark-check-if-file-exists
这篇关于加载多个文件且缺少一个文件时,PySpark作业失败的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!