我想在AWS中创建数据处理管道,以最终将处理后的数据用于机器学习。

我有一个Scala脚本,该脚本从S3中获取原始数据,进行处理,然后使用 Spark-CSV 将其写入HDFS甚至S3中。如果我想使用 AWS Machine Learning 工具来训练预测模型,我想可以使用多个文件作为输入。但是,如果我想使用其他东西,我想最好是收到一个CSV输出文件。

当前,由于出于性能目的,我不想使用 repartition(1) coalesce(1),所以我已将 hadoop fs -getmerge 用于手动测试,但它只是合并了作业输出的内容文件,我遇到了一个小问题。在数据文件中,我需要单行 header 来训练预测模型。

如果我将.option("header","true")用于spark-csv,则它将 header 写入每个输出文件,并且在合并后,数据中的 header 行与输出文件一样多。但是,如果header选项为false,则不会添加任何标题。

现在,我找到了一个将Scala脚本中的文件与Hadoop API FileUtil.copyMerge 合并的选项。我在spark-shell中尝试了以下代码。

import org.apache.hadoop.fs.FileUtil
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
val configuration = new Configuration();
val fs = FileSystem.get(configuration);
FileUtil.copyMerge(fs, new Path("smallheaders"), fs, new Path("/home/hadoop/smallheaders2"), false, configuration, "")

但是此解决方案仍然只是将文件彼此串联在一起,并且不处理 header 。 如何获得只有一行标题的输出文件?

我什至尝试添加df.columns.mkString(",")作为copyMerge的最后一个参数,但这仍然多次添加 header ,而不是一次。

最佳答案

你可以这样走动。

  • 1.创建一个包含标题名称的新DataFrame(headerDF)。
  • 2.与包含数据的DataFrame(dataDF)合并。
  • 3.使用选项(“header”,“false”)将联合的DataFrame输出到磁盘。
  • 4.使用hadoop FileUtil合并分区文件(part-0000 ** 0.csv)

  • 这样,所有分区都没有标题,除了单个分区的内容具有来自headerDF的标题名称行。当所有分区合并在一起时,文件顶部只有一个 header 。示例代码如下
      //dataFrame is the data to save on disk
      //cast types of all columns to String
      val dataDF = dataFrame.select(dataFrame.columns.map(c => dataFrame.col(c).cast("string")): _*)
    
      //create a new data frame containing only header names
      import scala.collection.JavaConverters._
      val headerDF = sparkSession.createDataFrame(List(Row.fromSeq(dataDF.columns.toSeq)).asJava, dataDF.schema)
    
      //merge header names with data
      headerDF.union(dataDF).write.mode(SaveMode.Overwrite).option("header", "false").csv(outputFolder)
    
      //use hadoop FileUtil to merge all partition csv files into a single file
      val fs = FileSystem.get(sparkSession.sparkContext.hadoopConfiguration)
      FileUtil.copyMerge(fs, new Path(outputFolder), fs, new Path("/folder/target.csv"), true, spark.sparkContext.hadoopConfiguration, null)
    

    10-05 21:14
    查看更多