package com.zy.sparksql
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{IntegerType, StringType, StructType}
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
/**
* RDD转化成DataFrame:通过StructType指定schema
*/
object StructTypeSchema {
def main(args: Array[String]): Unit = {
//创建sparkSession对象
val sparkSession: SparkSession = SparkSession.builder().appName("StructTypeSchema").master("local[2]").getOrCreate()
//获取sparkContext
val sc: SparkContext = sparkSession.sparkContext
//设置日志级别
sc.setLogLevel("WARN")
//读取文件
val textFile: RDD[String] = sc.textFile("D:\\person.txt")
//切分文件
val lineArrayRDD: RDD[Array[String]] = textFile.map(_.split(","))
//关联对象
val rowRDD: RDD[Row] = lineArrayRDD.map(x => Row(x(0).toInt, x(1), x(2).toInt))
//创建rdd的schema信息
val schema: StructType = (new StructType)
.add("id", IntegerType, true, "id")
.add("name", StringType, false, "姓名")
.add("age", IntegerType, true, "年龄")
//根据rdd和schema信息创建DataFrame
val personDF: DataFrame = sparkSession.createDataFrame(rowRDD, schema)
//DSL操作
personDF.show()
//sql 操作
//将df注册成表
personDF.createTempView("person")
sparkSession.sql("select * from person where id =3").show()
sparkSession.stop()
}
}
package com.zy.sparksql
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}
/**
* RDD转化成DataFrame:利用反射机制推断schema
*/
//todo 定义一个样例类
case class Person(id: Int, name: String, age: Int)
object CaseClassSchema {
def main(args: Array[String]): Unit = {
//构建sparkSession 指定appName和master地址(本地测试local)
val sparkSession: SparkSession = SparkSession.builder().appName("CaseClassSchema").master("local[2]").getOrCreate()
//获取sparkContext
val sc: SparkContext = sparkSession.sparkContext
//设置日志输出级别
sc.setLogLevel("WARN")
//加载数据
val dataRDD: RDD[String] = sc.textFile("D:\\person.txt")
//切分数据
val lineArrayRDD: RDD[Array[String]] = dataRDD.map(_.split(","))
//将rdd和person样例类关联
val personRDD: RDD[Person] = lineArrayRDD.map(x => Person(x(0).toInt, x(1), x(2).toInt))
//将rdd转换成dataFrame 导入隐式转换
import sparkSession.implicits._
val personDF: DataFrame = personRDD.toDF
//DSL语法
personDF.show()
personDF.printSchema()
personDF.select("name").show()
personDF.filter($"age" > 30).show()
println("---------------------------------------------")
//sql语法
//首先要创建临时视图
personDF.createTempView("person")
sparkSession.sql("select * from person where id>1").show()
sparkSession.stop()
}
}