实训笔记——Spark SQL编程

Spark SQL编程

Spark SQL属于Spark计算框架的一部分,是专门负责结构化数据的处理计算框架,Spark SQL提供了两种数据抽象:DataFrame、Dataset,都是基于RDD之上的一种高级数据抽象,在RDD基础之上增加了一个schema表结构。

一、准备Spark SQL的编程环境

1.1 创建Spark SQL的编程项目,scala语言支持的

1.2 引入编程依赖:

spark_core_2.12

hadoop_hdfs

spark_sql_2.12

<dependency>      
    <groupId>org.apache.hadoop</groupId>      
    <artifactId>hadoop-hdfs</artifactId>      
    <version>3.1.4</version>      
    <exclusions>        
    	<exclusion>          
        <groupId>com.fasterxml.jackson.module</groupId>          
        <artifactId>*</artifactId>        
        </exclusion>        
        <exclusion>          
            <groupId>com.fasterxml.jackson.core</groupId>        
            <artifactId>*</artifactId>        
        </exclusion>      
    </exclusions>    
</dependency>

二、Spark SQL程序编程的入口

2.1 SQLContext

SQLContext:只能做SQL编程,无法操作Hive以及使用HQL操作

2.2 HiveContext

HiveContext:专门提供用来操作和Hive相关的编程

2.3 SparkSession

SparkSession:全新的Spark SQL程序执行入口,把SQLContext和HiveContext功能全部整合了,SparkSession底层封装了一个SparkContext,而且SparkSession可以开启Hive的支持

三、DataFrame的创建

DataFrame是以前旧版本的数据抽象(untyped类型的数据抽象),Dataset是新版本的数据抽象(typed有类型的数据抽象),新版本当中DataFrame底层就是Dataset[Row]

3.1 使用隐式转换函数

使用隐式转换函数从RDD、Scala集合创建DataFrame toDF() toDF(columnName*)

如果集合或者RDD的类型不是Bean,而且再toDF没有传入任何的列名,那么Spark会默认按照列的个数给生成随机的列名,但是如果类型是一个Bean类型,那么toDF产生的随机列名就是bean的属性名

3.2 通过SparkSession

通过SparkSession自带的createDataFrame函数从集合或者RDD中创建DataFrame

3.3 从Spark SQL

Spark SQL支持的数据源创建DataFrame(HDFS、Hive、JSON文件、CSV文件等等)

3.3.1 HDFS、本地文件系统创建
  1. 普通的文本文档
  2. CSV文件
  3. JSON文件
  4. ORC文件
  5. Parquet文件

ss.read.option(xxx,xxx).csv/json/text/orc/parquet(path)

3.3.2 JDBC支持的数据库数据源创建

ss.read.jdbc(url,table,properties)

3.3.3 Spark SQL On Hive创建

使用Hive做数据存储,使用Spark SQL读取Hive的数据进行处理

有个提前的准备

  1. 开启SparkSession的Hive支持
  2. 引入spark-hive的编程依赖
  3. 还需要将Hive的配置文件hive-site.xml放到指定的位置

ss.sql(“HQL语句”)

3.3.1 外部存储HDFS中读取数据成为DataFrame

ss.read.format("jsonxx").load("path") 不太好用

ss.read.option(key,value).option(....).csv/json(path)

3.3.2 从jdbc支持的数据库创建DataFrame

ss.read.jdbc(url,table,properties)

3.3.3 读取Hive数据成为DataFrame
  1. 通过SparkSession开启Hive的支持
  2. 引入spark-hive的编程依赖
  3. 通过ss.sql()

3.4 从其他的DataFrame转换的来

四、DataFrame的编程风格

通过代码来操作计算DataFrame中数据

4.1 SQL编程风格

Dataset提供的一系列转换算子来进行操作

4.1.1 将创建的DataFrame或者Dataset转换成为一张临时表格
4.1.2 然后通过ss.sql(sql语句)进行数据的查询

4.2 DSL编程风格

DataFrame和Dataset提供了一系列的API操作,API说白了就是Spark SQL中算子操作,可以通过算子操作来获取DataFrame或者Dataset中的数据

4.2.1 转换算子

RDD具备的算子DataFrame基本上都可以使用

DataFrame还增加了一些和SQL操作有关的算子: selectExpr、where/filter、groupBy、orderBy/sort、limit、join

4.2.2 行动算子
  1. 文件系统

    1. df/ds.write.mode(SaveMode).csv/json/parquet/orc/text(path--目录)
    2. text纯文本文档要求DataFrame和Dataset的结果集只有一列 而且列必须是String类型
  2. JDBC支持的数据库

  3. Hive

五、DataSet的创建和使用

Dataset有类型,DataFrame无类型的。

5.1 创建

5.1.1 隐式转换,toDS()
5.1.2 通过SparkSession的createDataset函数创建
5.1.3 通过DataFrame转换得到Dataset df.as[类型-Bean对象必须有getter、setter方法] 也是需要隐式转换的

六、Spark SQL的函数操作

Spark SQL基本上常见的MySQL、Hive中函数都是支持的

6.1 Spark SQL特点

6.1.1 易整合
6.1.2 统一的数据访问方式
6.1.3 兼容Hive
6.1.4 标准的数据库连接

6.2 自定义函数

ss.udf.register(name,函数)

09-27 21:55