本文介绍了如何在从文件夹读取到 RDD 时忽略坏的 avro 文件的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在从文件夹中读取一组 avro 文件,程序出错并显示错误消息.//格式不正确.

I'm reading a set of avro files from a folders and the program error out with the error message.//Formatting is not properly done.

df =sqlContext.read.format("com.databricks.spark.avro").load("/data/hadoop20180516/22/abc*.avro").count()
[Stage 2:==================================================>(27818 + 4) / 28318]18/06/14 10:53:44 ERROR Executor: Exception in task 27900.0 in stage 2.0 (TID 27905)

java.io.IOException: 不是 Avro 数据文件

java.io.IOException: Not an Avro data file

文件夹中有 3 万多个文件,其中一个文件可能已损坏.我想忽略坏文件并继续加载文件的其余部分.s

Folder has 30K+ files and one of the file might be corrupt.I would like to ignore the bad file and continue to load rest of the file.s

我尝试使用 .option 命令
.option("badRecordsPath", "/tmp/badRecordsPath") 但它不起作用.

I tried to use .option command
.option("badRecordsPath", "/tmp/badRecordsPath") and it didn't work.

有什么建议吗?

推荐答案

我对 Python 的了解不够,无法给你提供一个好的代码示例,但我在 Scala 中解决了这个问题,所以你可以尝试:

I don't know python enough to give you a good code sample, but I solved this in Scala, so you can try:

使用

val paths = sparkContext.wholeTextFiles(folderPath).collect { case x: (String, String) => x._1 }.collect()

这里我使用偏函数来只获取键(文件路径),并再次收集以遍历字符串数组,而不是字符串的 RDD

Here I use a partial function to get only the keys (file paths), and collect again to iterate through an array of strings, not RDD of strings

将每个文件加载为 DataFrame 并跳过失败的文件

Load each file as a DataFrame and skip the ones that fail

val filteredDFs = files.map { path =>
      Try(sparkSession
        .read
        .format(format)
        .options(options)
        .load(path)).toOption}.filter(_.isDefined).map(_.get)

最后使用 union 创建一个包含所有先前 DFs 的 DataFrame

And finally create a DataFrame containing all previous DFs using union

val finalDF = filteredDfs.reduce((df1, df2) => df1.union(df2))

这篇关于如何在从文件夹读取到 RDD 时忽略坏的 avro 文件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

09-05 11:12