我正在读取一个有很多空格的文件,需要过滤掉该空格。之后,我们需要将其转换为数据框。下面的示例输入。

2017123 ¦     ¦10¦running¦00000¦111¦-EXAMPLE

我对此的解决方案是以下函数,该函数解析所有空格并修剪文件。
def truncateRDD(fileName : String): RDD[String] = {
    val example = sc.textFile(fileName)
    example.map(lines => lines.replaceAll("""[\t\p{Zs}]+""", ""))
}

但是,我不确定如何将其放入数据框。 sc.textFile返回一个RDD[String]。我尝试了案例类的方式,但是问题是我们有800个字段模式,案例类不能超过22。

我正在考虑以某种方式将RDD [String]转换为RDD [Row],以便可以使用createDataFrame函数。
val DF = spark.createDataFrame(rowRDD, schema)

有关如何执行此操作的任何建议?

最佳答案

首先将字符串拆分/解析为字段。
rdd.map( line => parse(line))其中parse是一些解析功能。它可能像split一样简单,但您可能需要更强大的功能。这将为您提供RDD[Array[String]]或类似的代码。

然后,您可以使用RDD[Row]转换为rdd.map(a => Row.fromSeq(a))
从那里,您可以转换为wising sqlContext.createDataFrame(rdd, schema)的DataFrame,其中rdd是您的RDD[Row],模式是您的模式StructType。

09-30 15:32
查看更多