问题描述
-
我使用的卡夫卡星火流获取流数据。
I am using Kafka Spark Streaming to get streaming data.
val lines = KafkaUtils.createDirectStream[Array[Byte], String, DefaultDecoder, StringDecoder](ssc, kafkaConf, Set(topic)).map(_._2)
我用这DSTREAM和处理RDDS
I am using this DStream and processing RDDs
val output = lines.foreachRDD(rdd =>
rdd.foreachPartition { partition =>
partition.foreach { file => runConfigParser(file)}
})
runConfigParser
是分析文件和产生一个输出,我在HDFS保存Java方法。因此,多个节点将处理RDD和输出写入到一个单一的HDFS文件。因为我想加载这个FIE在HIVE。
runConfigParser
is a JAVA method which parses a file and produces an output which i have to save in HDFS. So multiple nodes will process RDD and write output into one single HDFS file. As i want to load this fie in HIVE.
我应该输出 runConfigParser
的结果,并使用 sc.parallze(输出).saveAsTextFile(路径)
这样的我所有的节点会写RDD输出到单个文件的HDFS?这是设计的效率?
should I output the result of runConfigParser
and use sc.parallze(output).saveAsTextFile(path)
such that all my nodes will write RDD outputs to single HDFS file.? Is this design efficient ?
我将使用帕拉加载HIVE此单HDFS文件(其将不断更新作为其流数据)和查询。
I will load this single HDFS file (which will be constantly updated as its streaming data) in HIVE and query using Impala.
推荐答案
您可以使用一个函数来合并 saveAsTextFile
的结果。像这样的:
You can use a function to "merge" the result of saveAsTextFile
. Like this:
import org.apache.hadoop.fs._
def saveAsTextFileAndMerge[T](hdfsServer: String, fileName: String, rdd: RDD[T]) = {
val sourceFile = hdfsServer + "/tmp/"
rdd.saveAsTextFile(sourceFile)
val dstPath = hdfsServer + "/final/"
merge(sourceFile, dstPath, fileName)
}
def merge(srcPath: String, dstPath: String, fileName: String): Unit = {
val hadoopConfig = new Configuration()
val hdfs = FileSystem.get(hadoopConfig)
val destinationPath = new Path(dstPath)
if (!hdfs.exists(destinationPath)) {
hdfs.mkdirs(destinationPath)
}
FileUtil.copyMerge(hdfs, new Path(srcPath), hdfs, new Path(dstPath + "/" + fileName), false, hadoopConfig, null)
}
这篇关于流程星火RDD流和存储单一HDFS文件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!