我正在读取一个有很多空格的文件,需要过滤掉该空格。之后,我们需要将其转换为数据框。下面的示例输入。
2017123 ¦ ¦10¦running¦00000¦111¦-EXAMPLE
我对此的解决方案是以下函数,该函数解析所有空格并修剪文件。
def truncateRDD(fileName : String): RDD[String] = {
val example = sc.textFile(fileName)
example.map(lines => lines.replaceAll("""[\t\p{Zs}]+""", ""))
}
但是,我不确定如何将其放入数据框。
sc.textFile
返回一个RDD[String]
。我尝试了案例类的方式,但是问题是我们有800个字段模式,案例类不能超过22。我正在考虑以某种方式将RDD [String]转换为RDD [Row],以便可以使用
createDataFrame
函数。val DF = spark.createDataFrame(rowRDD, schema)
有关如何执行此操作的任何建议?
最佳答案
首先将字符串拆分/解析为字段。rdd.map( line => parse(line))
其中parse是一些解析功能。它可能像split一样简单,但您可能需要更强大的功能。这将为您提供RDD[Array[String]]
或类似的代码。
然后,您可以使用RDD[Row]
转换为rdd.map(a => Row.fromSeq(a))
从那里,您可以转换为wising sqlContext.createDataFrame(rdd, schema)
的DataFrame,其中rdd是您的RDD[Row]
,模式是您的模式StructType。