一、添加依赖
<!--SparkSQL依赖-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
二、代码
1.数据准备
创建person.json文件
{"name":"张三","age":20},
{"name":"李四","age":22},
{"name":"王五","age":19},
{"name":"赵六","age":21},
{"name":"田七","age":22}
2.创建 DataFrame的SQL风格语法
SQL 语法风格是指我们查询数据的时候使用 SQL 语句来查询,这种风格的查询必须要 有临时视图或者全局视图来辅助
object SparkSQLDemo1 {
def main(args: Array[String]): Unit = {
val sparkSession = SparkSession.builder().master("local[*]").appName("test").getOrCreate()
import sparkSession.implicits._
//读取 JSON 文件创建 DataFrame
val df = sparkSession.read.json("input\\person.json")
//对 DataFrame 创建一个临时表
df.createOrReplaceTempView("person")
//通过 SQL 语句实现查询年龄大于20
val result = sparkSession.sql("select * from person where age > 20")
//结果展示
result.show()
sparkSession.stop()
}
}
运行结果:
+---+----+
|age|name|
+---+----+
| 22| 李四|
| 21| 赵六|
| 22| 田七|
+---+----+
3.创建DataFrame的DSL风格语法
DSL 语法:DataFrame 提供一个特定领域语言(domain-specific language, DSL)去管理结构化的数据。 可以在 Scala, Java, Python 和 R 中使用 DSL,使用 DSL 语法风格不必去创建临时视图了
object SparkSQLDemo2 {
def main(args: Array[String]): Unit = {
val sparkSession = SparkSession.builder().master("local[*]").appName("test").getOrCreate()
import sparkSession.implicits._
//创建一个 DataFrame
val df = sparkSession.read.json("input\\person.json")
//只查看"name"列数据,
df.select("name").show()
sparkSession.stop()
}
}
运行结果:
+----+
|name|
+----+
| 张三|
| 李四|
| 王五|
| 赵六|
| 田七|
+----+
4.RDD转换DataFrame
object SparkSQLDemo3 {
def main(args: Array[String]): Unit = {
val sparkSession = SparkSession.builder().master("local[*]").appName("test").getOrCreate()
import sparkSession.implicits._
val rdd:RDD[(String,Int)]=sparkSession.sparkContext.makeRDD(List(("张三",20),("李四",19),("王五",21)))
val df = rdd.toDF("name","age")
df.show()
sparkSession.stop()
}
}
运行结果:
+----+---+
|name|age|
+----+---+
| 张三| 20|
| 李四| 19|
| 王五| 21|
+----+---+
5.DataFrame转换DataSet
DataFrame 其实是 DataSet 的特例,所以它们之间是可以互相转换的。
object SparkSQLDemo4 {
def main(args: Array[String]): Unit = {
val sparkSession = SparkSession.builder().master("local[*]").appName("test").getOrCreate()
import sparkSession.implicits._
val rdd:RDD[(String,Int)]=sparkSession.sparkContext.makeRDD(List(("张三",20),("李四",19),("王五",21)))
val df = rdd.toDF("name","age")
val ds = df.as[Person2]
ds.show()
sparkSession.stop()
}
}
//样例类
case class Person2(name:String,age:Int)
运行结果:
+----+---+
|name|age|
+----+---+
| 张三| 20|
| 李四| 19|
| 王五| 21|
+----+---+
6.DataFrame转换RDD
DataFrame 其实就是对 RDD 的封装,所以可以直接获取内部的 RDD
object SparkSQLDemo5 {
def main(args: Array[String]): Unit = {
val sparkSession = SparkSession.builder().master("local[*]").appName("test").getOrCreate()
import sparkSession.implicits._
val rdd:RDD[(String,Int)]=sparkSession.sparkContext.makeRDD(List(("张三",20),("李四",19),("王五",21)))
val df = rdd.toDF("name","age")
val result = df.rdd
result.foreach(line=>println(line.getString(0),line.getInt(1)))
sparkSession.stop()
}
}
运行结果:
(李四,19)
(张三,20)
(王五,21)
7.RDD转换DataSet
SparkSQL 能够自动将包含有 case 类的 RDD 转换成 DataSet,case 类定义了 table 的结 构,case 类属性通过反射变成了表的列名。Case 类可以包含诸如 Seq 或者 Array 等复杂的结 构。
object SparkSQLDemo6 {
def main(args: Array[String]): Unit = {
val sparkSession = SparkSession.builder().master("local[*]").appName("test").getOrCreate()
import sparkSession.implicits._
val rdd:RDD[(String,Int)]=sparkSession.sparkContext.makeRDD(List(("张三",20),("李四",19),("王五",21)))
val ds = rdd.map{
case(name,age)=>Person2(name,age)
}.toDS()
ds.show()
sparkSession.stop()
}
}
//样例类
case class Person2(name:String,age:Int)
运行结果:
+----+---+
|name|age|
+----+---+
| 张三| 20|
| 李四| 19|
| 王五| 21|
+----+---+