问题描述
我正在使用 Kafka Spark Streaming 来获取流数据.
I am using Kafka Spark Streaming to get streaming data.
val lines = KafkaUtils.createDirectStream[Array[Byte], String, DefaultDecoder, StringDecoder](ssc, kafkaConf, Set(topic)).map(_._2)
我正在使用这个 DStream 并处理 RDD
I am using this DStream and processing RDDs
val output = lines.foreachRDD(rdd =>
rdd.foreachPartition { partition =>
partition.foreach { file => runConfigParser(file)}
})
runConfigParser
是一种 JAVA 方法,它解析文件并生成我必须保存在 HDFS 中的输出.因此,多个节点将处理 RDD 并将输出写入一个单一的 HDFS 文件.因为我想在 HIVE 中加载这个文件.
runConfigParser
is a JAVA method which parses a file and produces an output which i have to save in HDFS. So multiple nodes will process RDD and write output into one single HDFS file. As i want to load this fie in HIVE.
我应该输出 runConfigParser
的结果并使用 sc.parallze(output).saveAsTextFile(path)
这样我的所有节点都会将 RDD 输出写入单个 HDFS 文件.?这种设计有效吗?
should I output the result of runConfigParser
and use sc.parallze(output).saveAsTextFile(path)
such that all my nodes will write RDD outputs to single HDFS file.? Is this design efficient ?
我将在 HIVE 中加载这个单一的 HDFS 文件(它将作为其流数据不断更新)并使用 Impala 进行查询.
I will load this single HDFS file (which will be constantly updated as its streaming data) in HIVE and query using Impala.
推荐答案
您可以使用一个函数来合并"saveAsTextFile
的结果.像这样:
You can use a function to "merge" the result of saveAsTextFile
. Like this:
import org.apache.hadoop.fs._
def saveAsTextFileAndMerge[T](hdfsServer: String, fileName: String, rdd: RDD[T]) = {
val sourceFile = hdfsServer + "/tmp/"
rdd.saveAsTextFile(sourceFile)
val dstPath = hdfsServer + "/final/"
merge(sourceFile, dstPath, fileName)
}
def merge(srcPath: String, dstPath: String, fileName: String): Unit = {
val hadoopConfig = new Configuration()
val hdfs = FileSystem.get(hadoopConfig)
val destinationPath = new Path(dstPath)
if (!hdfs.exists(destinationPath)) {
hdfs.mkdirs(destinationPath)
}
FileUtil.copyMerge(hdfs, new Path(srcPath), hdfs, new Path(dstPath + "/" + fileName), false, hadoopConfig, null)
}
这篇关于处理 Spark Streaming rdd 并存储到单个 HDFS 文件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!