问题描述
我想要的基本上是让数据的每个元素由 10 行组成.但是,对于下面的代码,每个元素仍然是一行.我在这里做错了什么?
What I want is basically to have each element of data consist of 10 lines. However, with the following code, each element is still one line. What mistake am I doing here?
val conf = new SparkConf().setAppName("MyApp")
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.registerKryoClasses(Array[Class[_]](classOf[NLineInputFormat], classOf[LongWritable],
classOf[Text]))
val sc = new SparkContext(conf)
val c = new Configuration(sc.hadoopConfiguration)
c.set("lineinputformat.linespermap", 10);
val data = sc.newAPIHadoopFile(fname, classOf[NLineInputFormat], classOf[LongWritable],
classOf[Text], c)
推荐答案
NLineInputFormat
by design just 不执行您期望的操作:
NLineInputFormat
by design just doesn't perform operation you expect it to:
NLineInputFormat 将 N 行输入拆分为一个拆分.(...) 分割输入文件,这样默认情况下,一行作为值提供给一个 map 任务.
如您所见,它修改了拆分(Spark 命名法中的分区)的计算方式,而不是记录的确定方式.
As you can see it modifies how splits (partitions in the Spark nomenclature) are computed, not how records are determined.
如果描述不清楚,我们可以用下面的例子来说明:
If description is not clear we can illustrate that with a following example:
def nline(n: Int, path: String) = {
val sc = SparkContext.getOrCreate
val conf = new Configuration(sc.hadoopConfiguration)
conf.setInt("mapreduce.input.lineinputformat.linespermap", n);
sc.newAPIHadoopFile(path,
classOf[NLineInputFormat], classOf[LongWritable], classOf[Text], conf
)
}
require(nline(1, "README.md").glom.map(_.size).first == 1)
require(nline(2, "README.md").glom.map(_.size).first == 2)
require(nline(3, "README.md").glom.map(_.size).first == 3)
如上图所示,每个分区(可能不包括最后一个)正好包含 n 行.
As show above each partition (possibly excluding the last one) contains exactly n lines.
虽然您可以尝试对其进行改造以适应您的情况,但不建议将 linespermap
参数用于较小的值.
While you can try to retrofit this to fit your case it won't be recommended for small values of the linespermap
parameter.
这篇关于NLineInputFormat 在 Spark 中不起作用的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!