实训笔记——Spark SQL编程
Spark SQL编程
Spark SQL属于Spark计算框架的一部分,是专门负责结构化数据的处理计算框架,Spark SQL提供了两种数据抽象:DataFrame、Dataset,都是基于RDD之上的一种高级数据抽象,在RDD基础之上增加了一个schema表结构。
一、准备Spark SQL的编程环境
1.1 创建Spark SQL的编程项目,scala语言支持的
1.2 引入编程依赖:
spark_core_2.12
hadoop_hdfs
spark_sql_2.12
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>3.1.4</version>
<exclusions>
<exclusion>
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
二、Spark SQL程序编程的入口
2.1 SQLContext
SQLContext
:只能做SQL编程,无法操作Hive以及使用HQL操作
2.2 HiveContext
HiveContext
:专门提供用来操作和Hive相关的编程
2.3 SparkSession
SparkSession
:全新的Spark SQL程序执行入口,把SQLContext和HiveContext功能全部整合了,SparkSession底层封装了一个SparkContext,而且SparkSession可以开启Hive的支持
三、DataFrame的创建
DataFrame是以前旧版本的数据抽象(untyped类型的数据抽象),Dataset是新版本的数据抽象(typed有类型的数据抽象),新版本当中DataFrame底层就是Dataset[Row]
3.1 使用隐式转换函数
使用隐式转换函数从RDD、Scala集合创建DataFrame toDF() toDF(columnName*)
如果集合或者RDD的类型不是Bean,而且再toDF没有传入任何的列名,那么Spark会默认按照列的个数给生成随机的列名,但是如果类型是一个Bean类型,那么toDF产生的随机列名就是bean的属性名
3.2 通过SparkSession
通过SparkSession
自带的createDataFrame
函数从集合或者RDD中创建DataFrame
3.3 从Spark SQL
从Spark SQL
支持的数据源创建DataFrame(HDFS、Hive、JSON文件、CSV文件等等)
3.3.1 HDFS、本地文件系统创建
- 普通的文本文档
- CSV文件
- JSON文件
- ORC文件
- Parquet文件
ss.read.option(xxx,xxx).csv/json/text/orc/parquet(path)
3.3.2 JDBC支持的数据库数据源创建
ss.read.jdbc(url,table,properties)
3.3.3 Spark SQL On Hive创建
使用Hive做数据存储,使用Spark SQL读取Hive的数据进行处理
有个提前的准备
- 开启SparkSession的Hive支持
- 引入spark-hive的编程依赖
- 还需要将Hive的配置文件hive-site.xml放到指定的位置
ss.sql(“HQL语句”)
3.3.1 外部存储HDFS中读取数据成为DataFrame
ss.read.format("jsonxx").load("path") 不太好用
ss.read.option(key,value).option(....).csv/json(path)
3.3.2 从jdbc支持的数据库创建DataFrame
ss.read.jdbc(url,table,properties)
3.3.3 读取Hive数据成为DataFrame
- 通过
SparkSession
开启Hive的支持 - 引入
spark-hive
的编程依赖 - 通过
ss.sql()
3.4 从其他的DataFrame转换的来
四、DataFrame的编程风格
通过代码来操作计算DataFrame中数据
4.1 SQL编程风格
Dataset提供的一系列转换算子来进行操作
4.1.1 将创建的DataFrame或者Dataset转换成为一张临时表格
4.1.2 然后通过ss.sql(sql语句)进行数据的查询
4.2 DSL编程风格
DataFrame和Dataset提供了一系列的API操作,API说白了就是Spark SQL中算子操作,可以通过算子操作来获取DataFrame或者Dataset中的数据
4.2.1 转换算子
RDD具备的算子DataFrame基本上都可以使用
DataFrame还增加了一些和SQL操作有关的算子: selectExpr、where/filter、groupBy、orderBy/sort、limit、join
4.2.2 行动算子
-
文件系统
df/ds.write.mode(SaveMode).csv/json/parquet/orc/text(path--目录)
- text纯文本文档要求DataFrame和Dataset的结果集只有一列 而且列必须是String类型
-
JDBC支持的数据库
-
Hive
五、DataSet的创建和使用
Dataset有类型,DataFrame无类型的。
5.1 创建
5.1.1 隐式转换,toDS()
5.1.2 通过SparkSession的createDataset函数创建
5.1.3 通过DataFrame转换得到Dataset df.as[类型-Bean对象必须有getter、setter方法] 也是需要隐式转换的
六、Spark SQL的函数操作
Spark SQL基本上常见的MySQL、Hive中函数都是支持的
6.1 Spark SQL特点
6.1.1 易整合
6.1.2 统一的数据访问方式
6.1.3 兼容Hive
6.1.4 标准的数据库连接
6.2 自定义函数
ss.udf.register(name,函数)