问题描述
我想要的基本上是让每个数据元素由10行组成。但是,使用下面的代码,每个元素仍然是一行。 val conf = new SparkConf()。setAppName(MyApp)
conf .set(spark.serializer,org.apache.spark.serializer.KryoSerializer)
conf.registerKryoClasses(Array [Class [_]](classOf [NLineInputFormat],classOf [LongWritable],
classOf [Text]))
val sc = new SparkContext(conf)
$ b $ val c = new配置(sc.hadoopConfiguration)
c.set(lineinputformat.linespermap ,10);
val data = sc.newAPIHadoopFile(fname,classOf [NLineInputFormat],classOf [LongWritable],
classOf [Text],c)
NLineInputFormat
按设计:
正如你所看到的,它修改了如何分割(Spark命名法中的分区),而不是如何确定记录。
下面的例子: def nline(n:Int,path:String)= {
val sc = SparkContext.getOrCreate
val conf = new配置(sc.hadoopConfiguration)
conf.setInt(mapreduce.input.lineinputformat.linespermap,n);
sc.newAPIHadoopFile(path,
classOf [NLineInputFormat],classOf [LongWritable],classOf [Text],conf
)
}
require(nline(1,README.md)。glom.map(_。size).first == 1)
require(nline(2,README.md)。glom.map _size).first == 2)
require(nline(3,README.md)。glom.map(_。size).first == 3)
$ c $如上所示,每个分区(可能不包括最后一个分区)都包含n行。
$ b p>尽管您可以尝试对此进行改进以适应您的情况,但不建议您使用 linepermap
参数的小值。
What I want is basically to have each element of data consist of 10 lines. However, with the following code, each element is still one line. What mistake am I doing here?
val conf = new SparkConf().setAppName("MyApp")
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.registerKryoClasses(Array[Class[_]](classOf[NLineInputFormat], classOf[LongWritable],
classOf[Text]))
val sc = new SparkContext(conf)
val c = new Configuration(sc.hadoopConfiguration)
c.set("lineinputformat.linespermap", 10);
val data = sc.newAPIHadoopFile(fname, classOf[NLineInputFormat], classOf[LongWritable],
classOf[Text], c)
解决方案 NLineInputFormat
by design just doesn't perform operation you expect it to:
As you can see it modifies how splits (partitions in the Spark nomenclature) are computed, not how records are determined.
If description is not clear we can illustrate that with a following example:
def nline(n: Int, path: String) = {
val sc = SparkContext.getOrCreate
val conf = new Configuration(sc.hadoopConfiguration)
conf.setInt("mapreduce.input.lineinputformat.linespermap", n);
sc.newAPIHadoopFile(path,
classOf[NLineInputFormat], classOf[LongWritable], classOf[Text], conf
)
}
require(nline(1, "README.md").glom.map(_.size).first == 1)
require(nline(2, "README.md").glom.map(_.size).first == 2)
require(nline(3, "README.md").glom.map(_.size).first == 3)
As show above each partition (possibly excluding the last one) contains exactly n lines.
While you can try to retrofit this to fit your case it won't be recommended for small values of the linespermap
parameter.
这篇关于NLineInputFormat不在Spark中工作的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!