我想在AWS中创建数据处理管道,以最终将处理后的数据用于机器学习。
我有一个Scala脚本,该脚本从S3中获取原始数据,进行处理,然后使用 Spark-CSV 将其写入HDFS甚至S3中。如果我想使用 AWS Machine Learning 工具来训练预测模型,我想可以使用多个文件作为输入。但是,如果我想使用其他东西,我想最好是收到一个CSV输出文件。
当前,由于出于性能目的,我不想使用 repartition(1)或 coalesce(1),所以我已将 hadoop fs -getmerge 用于手动测试,但它只是合并了作业输出的内容文件,我遇到了一个小问题。在数据文件中,我需要单行 header 来训练预测模型。
如果我将.option("header","true")
用于spark-csv,则它将 header 写入每个输出文件,并且在合并后,数据中的 header 行与输出文件一样多。但是,如果header选项为false,则不会添加任何标题。
现在,我找到了一个将Scala脚本中的文件与Hadoop API FileUtil.copyMerge
合并的选项。我在spark-shell
中尝试了以下代码。
import org.apache.hadoop.fs.FileUtil
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
val configuration = new Configuration();
val fs = FileSystem.get(configuration);
FileUtil.copyMerge(fs, new Path("smallheaders"), fs, new Path("/home/hadoop/smallheaders2"), false, configuration, "")
但是此解决方案仍然只是将文件彼此串联在一起,并且不处理 header 。 如何获得只有一行标题的输出文件?
我什至尝试添加
df.columns.mkString(",")
作为copyMerge
的最后一个参数,但这仍然多次添加 header ,而不是一次。 最佳答案
你可以这样走动。
这样,所有分区都没有标题,除了单个分区的内容具有来自headerDF的标题名称行。当所有分区合并在一起时,文件顶部只有一个 header 。示例代码如下
//dataFrame is the data to save on disk
//cast types of all columns to String
val dataDF = dataFrame.select(dataFrame.columns.map(c => dataFrame.col(c).cast("string")): _*)
//create a new data frame containing only header names
import scala.collection.JavaConverters._
val headerDF = sparkSession.createDataFrame(List(Row.fromSeq(dataDF.columns.toSeq)).asJava, dataDF.schema)
//merge header names with data
headerDF.union(dataDF).write.mode(SaveMode.Overwrite).option("header", "false").csv(outputFolder)
//use hadoop FileUtil to merge all partition csv files into a single file
val fs = FileSystem.get(sparkSession.sparkContext.hadoopConfiguration)
FileUtil.copyMerge(fs, new Path(outputFolder), fs, new Path("/folder/target.csv"), true, spark.sparkContext.hadoopConfiguration, null)