一、添加依赖

<!--SparkSQL依赖-->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>

二、代码

1.数据准备

创建person.json文件

{"name":"张三","age":20},
{"name":"李四","age":22},
{"name":"王五","age":19},
{"name":"赵六","age":21},
{"name":"田七","age":22}

2.创建 DataFrame的SQL风格语法

SQL 语法风格是指我们查询数据的时候使用 SQL 语句来查询,这种风格的查询必须要 有临时视图或者全局视图来辅助

object SparkSQLDemo1 {
  def main(args: Array[String]): Unit = {
    val sparkSession = SparkSession.builder().master("local[*]").appName("test").getOrCreate()
    import sparkSession.implicits._
    //读取 JSON 文件创建 DataFrame
    val df = sparkSession.read.json("input\\person.json")
    //对 DataFrame 创建一个临时表
    df.createOrReplaceTempView("person")
    //通过 SQL 语句实现查询年龄大于20
    val result = sparkSession.sql("select * from person where age > 20")
    //结果展示
    result.show()
    sparkSession.stop()
  }
}

运行结果:

+---+----+
|age|name|
+---+----+
| 22|  李四|
| 21|  赵六|
| 22|  田七|
+---+----+

3.创建DataFrame的DSL风格语法

DSL 语法:DataFrame 提供一个特定领域语言(domain-specific language, DSL)去管理结构化的数据。 可以在 Scala, Java, Python 和 R 中使用 DSL,使用 DSL 语法风格不必去创建临时视图了

object SparkSQLDemo2 {
  def main(args: Array[String]): Unit = {
    val sparkSession = SparkSession.builder().master("local[*]").appName("test").getOrCreate()
    import sparkSession.implicits._
     //创建一个 DataFrame
    val df = sparkSession.read.json("input\\person.json")
    //只查看"name"列数据,
    df.select("name").show()
    sparkSession.stop()
  }
}

运行结果:

+----+
|name|
+----+
|  张三|
|  李四|
|  王五|
|  赵六|
|  田七|
+----+

4.RDD转换DataFrame

object SparkSQLDemo3 {
  def main(args: Array[String]): Unit = {
    val sparkSession = SparkSession.builder().master("local[*]").appName("test").getOrCreate()
    import sparkSession.implicits._
    val rdd:RDD[(String,Int)]=sparkSession.sparkContext.makeRDD(List(("张三",20),("李四",19),("王五",21)))
    val df = rdd.toDF("name","age")
    df.show()
    sparkSession.stop()
  }
}

运行结果:

+----+---+
|name|age|
+----+---+
|  张三| 20|
|  李四| 19|
|  王五| 21|
+----+---+

5.DataFrame转换DataSet

DataFrame 其实是 DataSet 的特例,所以它们之间是可以互相转换的。

object SparkSQLDemo4 {
  def main(args: Array[String]): Unit = {
    val sparkSession = SparkSession.builder().master("local[*]").appName("test").getOrCreate()
    import sparkSession.implicits._
    val rdd:RDD[(String,Int)]=sparkSession.sparkContext.makeRDD(List(("张三",20),("李四",19),("王五",21)))
    val df = rdd.toDF("name","age")
    val ds = df.as[Person2]
    ds.show()
    sparkSession.stop()
  }
}
//样例类
case class Person2(name:String,age:Int)

运行结果:

+----+---+
|name|age|
+----+---+
|  张三| 20|
|  李四| 19|
|  王五| 21|
+----+---+

6.DataFrame转换RDD

DataFrame 其实就是对 RDD 的封装,所以可以直接获取内部的 RDD

object SparkSQLDemo5 {
  def main(args: Array[String]): Unit = {
    val sparkSession = SparkSession.builder().master("local[*]").appName("test").getOrCreate()
    import sparkSession.implicits._
    val rdd:RDD[(String,Int)]=sparkSession.sparkContext.makeRDD(List(("张三",20),("李四",19),("王五",21)))
    val df = rdd.toDF("name","age")
    val result = df.rdd
    result.foreach(line=>println(line.getString(0),line.getInt(1)))
    sparkSession.stop()
  }
}

运行结果:

(李四,19)
(张三,20)
(王五,21)

7.RDD转换DataSet

SparkSQL 能够自动将包含有 case 类的 RDD 转换成 DataSet,case 类定义了 table 的结 构,case 类属性通过反射变成了表的列名。Case 类可以包含诸如 Seq 或者 Array 等复杂的结 构。

object SparkSQLDemo6 {
  def main(args: Array[String]): Unit = {
    val sparkSession = SparkSession.builder().master("local[*]").appName("test").getOrCreate()
    import sparkSession.implicits._
    val rdd:RDD[(String,Int)]=sparkSession.sparkContext.makeRDD(List(("张三",20),("李四",19),("王五",21)))
    val ds = rdd.map{
      case(name,age)=>Person2(name,age)
    }.toDS()
    ds.show()
    sparkSession.stop()
  }
}
//样例类
case class Person2(name:String,age:Int)

运行结果:

+----+---+
|name|age|
+----+---+
|  张三| 20|
|  李四| 19|
|  王五| 21|
+----+---+
06-27 23:15