问题描述
我正在从文件夹中读取一组 avro 文件,程序出错并显示错误消息.//格式不正确.
I'm reading a set of avro files from a folders and the program error out with the error message.//Formatting is not properly done.
df =sqlContext.read.format("com.databricks.spark.avro").load("/data/hadoop20180516/22/abc*.avro").count()
[Stage 2:==================================================>(27818 + 4) / 28318]18/06/14 10:53:44 ERROR Executor: Exception in task 27900.0 in stage 2.0 (TID 27905)
java.io.IOException: 不是 Avro 数据文件
java.io.IOException: Not an Avro data file
文件夹中有 3 万多个文件,其中一个文件可能已损坏.我想忽略坏文件并继续加载文件的其余部分.s
Folder has 30K+ files and one of the file might be corrupt.I would like to ignore the bad file and continue to load rest of the file.s
我尝试使用 .option 命令
.option("badRecordsPath", "/tmp/badRecordsPath") 但它不起作用.
I tried to use .option command
.option("badRecordsPath", "/tmp/badRecordsPath") and it didn't work.
有什么建议吗?
推荐答案
我对 Python 的了解不够,无法给你提供一个好的代码示例,但我在 Scala 中解决了这个问题,所以你可以尝试:
I don't know python enough to give you a good code sample, but I solved this in Scala, so you can try:
使用
val paths = sparkContext.wholeTextFiles(folderPath).collect { case x: (String, String) => x._1 }.collect()
这里我使用偏函数来只获取键(文件路径),并再次收集以遍历字符串数组,而不是字符串的 RDD
Here I use a partial function to get only the keys (file paths), and collect again to iterate through an array of strings, not RDD of strings
将每个文件加载为 DataFrame 并跳过失败的文件
Load each file as a DataFrame and skip the ones that fail
val filteredDFs = files.map { path =>
Try(sparkSession
.read
.format(format)
.options(options)
.load(path)).toOption}.filter(_.isDefined).map(_.get)
最后使用 union 创建一个包含所有先前 DFs 的 DataFrame
And finally create a DataFrame containing all previous DFs using union
val finalDF = filteredDfs.reduce((df1, df2) => df1.union(df2))
这篇关于如何在从文件夹读取到 RDD 时忽略坏的 avro 文件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!