问题描述
这是一个问题,我already问火花用户的邮件列表上,我希望在这里获得更多的成功。
This is a question i've already asked on the spark user mailing list and i hope to get more success here.
我不知道它直接关系到引发火花,虽然有事情做的事实,我不能很容易地解决这个问题。
I'm not sure it's directly related to spark though spark has something to do with the fact I can't easily resolve that problem.
我试图用各种方式从S3获得一些文件。我的问题是,一些这些模式可能会返回任何结果,而当他们这样做,我得到以下异常:
I'm trying to get some files from S3 using various patterns. My problem is that some of those patterns may return nothing, and when they do so, i get the following exception:
org.apache.hadoop.mapred.InvalidInputException: Input Pattern s3n://bucket/mypattern matches 0 files
at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:197)
at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:208)
at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:140)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:207)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:205)
at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:207)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:205)
at org.apache.spark.rdd.FlatMappedRDD.getPartitions(FlatMappedRDD.scala:30)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:207)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:205)
at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:52)
at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:52)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at org.apache.spark.rdd.UnionRDD.getPartitions(UnionRDD.scala:52)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:207)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:205)
at org.apache.spark.Partitioner$.defaultPartitioner(Partitioner.scala:58)
at org.apache.spark.api.java.JavaPairRDD.reduceByKey(JavaPairRDD.scala:335)
... 2 more
我想一个办法忽略丢失的文件,只是什么也不做在这种情况下。这里的问题国际海事组织是,我不知道,如果一个模式将返回的东西,直到它的实际执行和火花开始处理数据只发生操作时(这里的 reduceByKey
部分)。所以,我不能只赶上一个错误的地方,并让事情继续。
I would like a way to ignore missing files and just do nothing in that case. The problem here IMO is that i don't know if a pattern will return something until it's actually executed and spark starts processing data only when an action occurs (here, the reduceByKey
part). So i can't just catch an error somewhere and let things continue on.
一个解决办法是强制火花单独处理每个路径,但是这可能会花费配发的速度和/或存储方面,所以我正在寻找一个其他的选择,这将是有效的。
One solution would be to force spark to process each path individually but that will probably cost allot in terms of speed and/or memory so i'm looking for an other option that would be efficient.
我使用的火花0.9.1。谢谢
I'm using spark 0.9.1.Thanks
推荐答案
好了,挖一点到Spark和感谢别人的火花在用户列表上的指导我,我觉得我得到它:
Ok, digging a bit into Spark and thanks to someone guiding me on the spark user list I think i got it:
sc.newAPIHadoopFile("s3n://missingPattern/*", EmptiableTextInputFormat.class, LongWritable.class, Text.class, sc.hadoopConfiguration())
.map(new Function<Tuple2<LongWritable, Text>, String>() {
@Override
public String call(Tuple2<LongWritable, Text> arg0) throws Exception {
return arg0._2.toString();
}
})
.count();
和由它来完成神奇的EmptiableTextInputFormat:
And the EmptiableTextInputFormat which does the magic:
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.lib.input.InvalidInputException;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
public class EmptiableTextInputFormat extends TextInputFormat {
@Override
public List<InputSplit> getSplits(JobContext arg0) throws IOException {
try {
return super.getSplits(arg0);
} catch (InvalidInputException e) {
return Collections.<InputSplit> emptyList();
}
}
}
有人可能会最终检查InvalidInputException的消息律更precision。
One could eventually check the message of the InvalidInputException for a lil more precision.
这篇关于绕过org.apache.hadoop.ma pred.InvalidInputException:输入模式S3N:// [...]匹配0文件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!