本文介绍了NLineInputFormat不在Spark中工作的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想要的基本上是让每个数据元素由10行组成。但是,使用下面的代码,每个元素仍然是一行。

  val conf = new SparkConf()。setAppName(MyApp)
conf .set(spark.serializer,org.apache.spark.serializer.KryoSerializer)
conf.registerKryoClasses(Array [Class [_]](classOf [NLineInputFormat],classOf [LongWritable],
classOf [Text]))
val sc = new SparkContext(conf)
$ b $ val c = new配置(sc.hadoopConfiguration)
c.set(lineinputformat.linespermap ,10);
val data = sc.newAPIHadoopFile(fname,classOf [NLineInputFormat],classOf [LongWritable],
classOf [Text],c)


解决方案

NLineInputFormat 按设计:

正如你所看到的,它修改了如何分割(Spark命名法中的分区),而不是如何确定记录。

下面的例子:

  def nline(n:Int,path:String)= {
val sc = SparkContext.getOrCreate
val conf = new配置(sc.hadoopConfiguration)
conf.setInt(mapreduce.input.lineinputformat.linespermap,n);

sc.newAPIHadoopFile(path,
classOf [NLineInputFormat],classOf [LongWritable],classOf [Text],conf

}

require(nline(1,README.md)。glom.map(_。size).first == 1)
require(nline(2,README.md)。glom.map _size).first == 2)
require(nline(3,README.md)。glom.map(_。size).first == 3)


$ b p>尽管您可以尝试对此进行改进以适应您的情况,但不建议您使用 linepermap 参数的小值。


What I want is basically to have each element of data consist of 10 lines. However, with the following code, each element is still one line. What mistake am I doing here?

val conf = new SparkConf().setAppName("MyApp")
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.registerKryoClasses(Array[Class[_]](classOf[NLineInputFormat], classOf[LongWritable], 
 classOf[Text]))
val sc = new SparkContext(conf)

val c = new Configuration(sc.hadoopConfiguration)
c.set("lineinputformat.linespermap", 10);
val data = sc.newAPIHadoopFile(fname, classOf[NLineInputFormat], classOf[LongWritable], 
 classOf[Text], c)
解决方案

NLineInputFormat by design just doesn't perform operation you expect it to:

As you can see it modifies how splits (partitions in the Spark nomenclature) are computed, not how records are determined.

If description is not clear we can illustrate that with a following example:

def nline(n: Int, path: String) = {
  val sc = SparkContext.getOrCreate
  val conf = new Configuration(sc.hadoopConfiguration)
  conf.setInt("mapreduce.input.lineinputformat.linespermap", n);

  sc.newAPIHadoopFile(path,
    classOf[NLineInputFormat], classOf[LongWritable], classOf[Text], conf
  )
}

require(nline(1, "README.md").glom.map(_.size).first == 1)
require(nline(2, "README.md").glom.map(_.size).first == 2)
require(nline(3, "README.md").glom.map(_.size).first == 3)

As show above each partition (possibly excluding the last one) contains exactly n lines.

While you can try to retrofit this to fit your case it won't be recommended for small values of the linespermap parameter.

这篇关于NLineInputFormat不在Spark中工作的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

09-22 10:33