写在前面
主要是加载文件为RDD,再把RDD转换为DataFrame,进而使用DataFrame的API或Sql进行数据的方便操作
简单理解:DataFrame=RDD+Schema
贴代码
package february.sql
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SparkSession}
/**
* Description: ============Spark SQL支持两种不同的方法将现有RDD转换为Datasets数据集==============
*
*
* (1) 反射 case class 前提:事先需要知道你的字段,字段类型
* (2) 编程 事先不知道有哪几列
* **** 优先选择第一种 ****
*
* @Author: 留歌36
* @Date: 2019/2/25 18:41
*/
object DataFrameRDDApp {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.appName(this.getClass.getSimpleName)
.master("local[2]")
.getOrCreate()
// 方法一:反射
// inferReflection(spark)
// 方法二:编程
program(spark)
spark.stop()
}
/**
* 编程的方式
* @param spark
*/
private def program(spark: SparkSession) = {
val textFile = spark.sparkContext.textFile("f:\\infos.txt")
val infoRdd = textFile.map(_.split(",")).map(line => Row(line(0).toInt, line(1), line(2).toInt))
val structType = StructType(Array(
StructField("id", IntegerType, true),
StructField("name", StringType, true),
StructField("age", IntegerType, true)))
val DF =spark.createDataFrame(infoRdd, structType)
DF.printSchema()
DF.show()
}
/**
* 反射的方式
* @param spark
*/
private def inferReflection(spark: SparkSession) = {
// RDD ==> DataFrame rdd.toDF()
val textFile = spark.sparkContext.textFile("f:\\infos.txt")
// split()返回 String[]
// 注意:需要导入隐式转换
import spark.implicits._
val infoDF = textFile.map(_.split(",")).map(line => Info(line(0).toInt, line(1), line(2).toInt)).toDF()
// =====基于dataframe的API=======之后的就都是DataFrame 的操作了==============
infoDF.show()
infoDF.filter(infoDF.col("age") > 30).show()
// ======基于SQL的API===========DataFrame 创建为一张表================
infoDF.createOrReplaceTempView("infos")
spark.sql("select * from infos where age > 30").show()
}
//类似java bean实体类
// 反射的方式,将RDD的 每个字段 与 这里的实体类 进行一一映射
case class Info(id: Int, name: String, age: Int)
}
更多相关小demo:每天一个程序:https://blog.csdn.net/liuge36/column/info/34094