Spark将DataFrame进行一些列处理后,需要将之写入mysql,下面是实现过程

1.mysql的信息

mysql的信息我保存在了外部的配置文件,这样方便后续的配置添加。

 //配置文件示例:
[hdfs@iptve2e03 tmp_lillcol]$ cat job.properties
#mysql数据库配置
mysql.driver=com.mysql.jdbc.Driver
mysql.url=jdbc:mysql://127.0.0.1:3306/database1?useSSL=false&autoReconnect=true&failOverReadOnly=false&rewriteBatchedStatements=true
mysql.username=user
mysql.password=123456

2.需要的jar依赖(sbt版本,maven的对应修改即可)

 libraryDependencies += "org.apache.spark" % "spark-core_2.10" % "1.6.0-cdh5.7.2"
libraryDependencies += "org.apache.spark" % "spark-sql_2.10" % "1.6.0-cdh5.7.2"
libraryDependencies += "org.apache.spark" % "spark-hive_2.10" % "1.6.0-cdh5.7.2"
libraryDependencies += "org.apache.hbase" % "hbase-client" % "1.2.0-cdh5.7.2"
libraryDependencies += "org.apache.hbase" % "hbase-server" % "1.2.0-cdh5.7.2"
libraryDependencies += "org.apache.hbase" % "hbase-common" % "1.2.0-cdh5.7.2"
libraryDependencies += "org.apache.hbase" % "hbase-protocol" % "1.2.0-cdh5.7.2"
libraryDependencies += "mysql" % "mysql-connector-java" % "5.1.38"
libraryDependencies += "org.apache.spark" % "spark-streaming_2.10" % "1.6.0-cdh5.7.2"
libraryDependencies += "com.yammer.metrics" % "metrics-core" % "2.2.0"

3.完整实现代码

 import java.io.FileInputStream
import java.sql.{Connection, DriverManager}
import java.util.Properties import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode}
import org.apache.spark.{SparkConf, SparkContext} /**
* @author Administrator
* 2018/10/16-10:15
*
*/
object SaveDataFrameASMysql {
var hdfsPath: String = ""
var proPath: String = ""
var DATE: String = "" val sparkConf: SparkConf = new SparkConf().setAppName(getClass.getSimpleName)
val sc: SparkContext = new SparkContext(sparkConf)
val sqlContext: SQLContext = new HiveContext(sc) def main(args: Array[String]): Unit = {
hdfsPath = args(0)
proPath = args(1)
//不过滤读取
val dim_sys_city_dict: DataFrame = readMysqlTable(sqlContext, "TestMysqlTble1", proPath)
dim_sys_city_dict.show(10) //保存mysql
saveASMysqlTable(dim_sys_city_dict, "TestMysqlTble2", SaveMode.Append, proPath)
} /**
* 将DataFrame保存为Mysql表
*
* @param dataFrame 需要保存的dataFrame
* @param tableName 保存的mysql 表名
* @param saveMode 保存的模式 :Append、Overwrite、ErrorIfExists、Ignore
* @param proPath 配置文件的路径
*/
def saveASMysqlTable(dataFrame: DataFrame, tableName: String, saveMode: SaveMode, proPath: String) = {
var table = tableName
val properties: Properties = getProPerties(proPath)
val prop = new Properties //配置文件中的key 与 spark 中的 key 不同 所以 创建prop 按照spark 的格式 进行配置数据库
prop.setProperty("user", properties.getProperty("mysql.username"))
prop.setProperty("password", properties.getProperty("mysql.password"))
prop.setProperty("driver", properties.getProperty("mysql.driver"))
prop.setProperty("url", properties.getProperty("mysql.url"))
if (saveMode == SaveMode.Overwrite) {
var conn: Connection = null
try {
conn = DriverManager.getConnection(
prop.getProperty("url"),
prop.getProperty("user"),
prop.getProperty("password")
)
val stmt = conn.createStatement
table = table.toUpperCase
stmt.execute(s"truncate table $table") //为了不删除表结构,先truncate 再Append
conn.close()
}
catch {
case e: Exception =>
println("MySQL Error:")
e.printStackTrace()
}
}
dataFrame.write.mode(SaveMode.Append).jdbc(prop.getProperty("url"), table, prop)
} /**
* 获取 Mysql 表的数据
*
* @param sqlContext
* @param tableName 读取Mysql表的名字
* @param proPath 配置文件的路径
* @return 返回 Mysql 表的 DataFrame
*/
def readMysqlTable(sqlContext: SQLContext, tableName: String, proPath: String) = {
val properties: Properties = getProPerties(proPath)
sqlContext
.read
.format("jdbc")
.option("url", properties.getProperty("mysql.url"))
.option("driver", properties.getProperty("mysql.driver"))
.option("user", properties.getProperty("mysql.username"))
.option("password", properties.getProperty("mysql.password"))
// .option("dbtable", tableName.toUpperCase)
.option("dbtable", tableName)
.load() } /**
* 获取 Mysql 表的数据 添加过滤条件
*
* @param sqlContext
* @param table 读取Mysql表的名字
* @param filterCondition 过滤条件
* @param proPath 配置文件的路径
* @return 返回 Mysql 表的 DataFrame
*/
def readMysqlTable(sqlContext: SQLContext, table: String, filterCondition: String, proPath: String) = {
val properties: Properties = getProPerties(proPath)
var tableName = ""
tableName = "(select * from " + table + " where " + filterCondition + " ) as t1"
sqlContext
.read
.format("jdbc")
.option("url", properties.getProperty("mysql.url"))
.option("driver", properties.getProperty("mysql.driver"))
.option("user", properties.getProperty("mysql.username"))
.option("password", properties.getProperty("mysql.password"))
.option("dbtable", tableName)
.load()
} /**
* 获取配置文件
*
* @param proPath
* @return
*/
def getProPerties(proPath: String) = {
val properties: Properties = new Properties()
properties.load(new FileInputStream(proPath))
properties
}
}

4.测试

 def main(args: Array[String]): Unit = {
hdfsPath = args(0)
proPath = args(1)
//不过滤读取
val dim_sys_city_dict: DataFrame = readMysqlTable(sqlContext, "TestMysqlTble1", proPath)
dim_sys_city_dict.show(10) //保存mysql
saveASMysqlTable(dim_sys_city_dict, "TestMysqlTble2", SaveMode.Append, proPath)
}

5.运行结果数据敏感进行过处理

 +-------+-------+---------+---------+--------+----------+---------+--------------------+----+-----------+
|dict_id|city_id|city_name|city_code|group_id|group_name|area_code| bureau_id|sort|bureau_name|
+-------+-------+---------+---------+--------+----------+---------+--------------------+----+-----------+
| 1| 249| **| **_ab| 100| **按时| **-查到|xcaasd...| 21| 张三公司|
| 2| 240| **| **_ab| 300| **按时| **-查到|xcaasd...| 21| 张三公司|
| 3| 240| **| **_ab| 100| **按时| **-查到|xcaasd...| 21| 张三公司|
| 4| 242| **| **_ab| 300| **按时| **-查到|xcaasd...| 01| 张三公司|
| 5| 246| **| **_ab| 100| **按时| **-查到|xcaasd...| 01| 张三公司|
| 6| 246| **| **_ab| 300| **按时| **-查到|xcaasd...| 01| 张三公司|
| 7| 248| **| **_ab| 200| **按时| **-查到|xcaasd...| 01| 张三公司|
| 8| 242| **| **_ab| 400| **按时| **-查到|xcaasd...| 01| 张三公司|
| 9| 247| **| **_ab| 200| **按时| **-查到|xcaasd...| 01| 张三公司|
| 0| 243| **| **_ab| 400| **按时| **-查到|xcaasd...| 01| 张三公司|
+-------+-------+---------+---------+--------+----------+---------+--------------------+----+-----------+ mysql> desc TestMysqlTble1;
+-------------+-------------+------+-----+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+-------------+-------------+------+-----+---------+-------+
| dict_id | varchar(32) | YES | | NULL | |
| city_id | varchar(32) | YES | | NULL | |
| city_name | varchar(32) | YES | | NULL | |
| city_code | varchar(32) | YES | | NULL | |
| group_id | varchar(32) | YES | | NULL | |
| group_name | varchar(32) | YES | | NULL | |
| area_code | varchar(32) | YES | | NULL | |
| bureau_id | varchar(64) | YES | | NULL | |
| sort | varchar(32) | YES | | NULL | |
| bureau_name | varchar(32) | YES | | NULL | |
+-------------+-------------+------+-----+---------+-------+
10 rows in set (0.00 sec) mysql> desc TestMysqlTble2;
+-------------+------+------+-----+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+-------------+------+------+-----+---------+-------+
| dict_id | text | YES | | NULL | |
| city_id | text | YES | | NULL | |
| city_name | text | YES | | NULL | |
| city_code | text | YES | | NULL | |
| group_id | text | YES | | NULL | |
| group_name | text | YES | | NULL | |
| area_code | text | YES | | NULL | |
| bureau_id | text | YES | | NULL | |
| sort | text | YES | | NULL | |
| bureau_name | text | YES | | NULL | |
+-------------+------+------+-----+---------+-------+
10 rows in set (0.00 sec) mysql> select count(1) from TestMysqlTble1;
+----------+
| count(1) |
+----------+
| 21 |
+----------+
1 row in set (0.00 sec) mysql> select count(1) from TestMysqlTble2;
+----------+
| count(1) |
+----------+
| 21 |
+----------+
1 row in set (0.00 sec)

6.效率问题

一开始直接这么用的时候小数据还没什么,但是数据量大一点的时候速度就不行了,于是想方设法的想优化一下,用了几个手段效果不明显,然后进去看源代码,发现了两个关键的片段

  /**
* Saves the content of the [[DataFrame]] to a external database table via JDBC. In the case the
* table already exists in the external database, behavior of this function depends on the
* save mode, specified by the `mode` function (default to throwing an exception).
*
* Don't create too many partitions in parallel on a large cluster; otherwise Spark might crash
* your external database systems.
*
* @param url JDBC database url of the form `jdbc:subprotocol:subname`
* @param table Name of the table in the external database.
* @param connectionProperties JDBC database connection arguments, a list of arbitrary string
* tag/value. Normally at least a "user" and "password" property
* should be included.
*
* @since 1.4.0
*/
def jdbc(url: String, table: String, connectionProperties: Properties): Unit = {
val props = new Properties()
extraOptions.foreach { case (key, value) =>
props.put(key, value)
}
// connectionProperties should override settings in extraOptions
props.putAll(connectionProperties)
val conn = JdbcUtils.createConnectionFactory(url, props)() try {
var tableExists = JdbcUtils.tableExists(conn, url, table) if (mode == SaveMode.Ignore && tableExists) {
return
} if (mode == SaveMode.ErrorIfExists && tableExists) {
sys.error(s"Table $table already exists.")
} if (mode == SaveMode.Overwrite && tableExists) {
JdbcUtils.dropTable(conn, table)
tableExists = false
} // Create the table if the table didn't exist.
if (!tableExists) {
val schema = JdbcUtils.schemaString(df, url)
val sql = s"CREATE TABLE $table ($schema)"
val statement = conn.createStatement
try {
statement.executeUpdate(sql)
} finally {
statement.close()
}
}
} finally {
conn.close()
} JdbcUtils.saveTable(df, url, table, props)//-----------------------------关键点1
} /**
* Saves the RDD to the database in a single transaction.
*/
def saveTable(
df: DataFrame,
url: String,
table: String,
properties: Properties) {
val dialect = JdbcDialects.get(url)
val nullTypes: Array[Int] = df.schema.fields.map { field =>
getJdbcType(field.dataType, dialect).jdbcNullType
} val rddSchema = df.schema
val getConnection: () => Connection = createConnectionFactory(url, properties)
val batchSize = properties.getProperty("batchsize", "1000").toInt
df.foreachPartition { iterator => //------------------------------------关键点2
savePartition(getConnection, table, iterator, rddSchema, nullTypes, batchSize, dialect)
}
}

也就是说,自带的方法就是按照分区来存的,每一个分区开启一个mysql连接,所以最简单的优化方式就是在保存之前对DataFrame进行重新分区,注意数据倾斜问题,不然可能效率没有提升。
当然目前测试过最快的就是文件拿下来直接通过load data的命令导入mysql,但是这个比较麻烦。

下面是分区示例

 def main(args: Array[String]): Unit = {
hdfsPath = args(0)
proPath = args(1)
//不过滤读取
val dim_sys_city_dict: DataFrame = readMysqlTable(sqlContext, "TestMysqlTble1", proPath)
dim_sys_city_dict.show(10) //保存mysql
saveASMysqlTable(dim_sys_city_dict.repartition(10), "TestMysqlTble2", SaveMode.Append, proPath)
}

7.总结

将DataFrame写入mysql有几点需要注意的地方:

  • 需要保存的表最好事先建好,否则字段类型会使用默认的,Text类型实在是耗资源,对比前后两张表,下面分别为源表TestMysqlTble1和DataFrame保存的mysql表TestMysqlTble2
 mysql> desc TestMysqlTble1;
+-------------+-------------+------+-----+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+-------------+-------------+------+-----+---------+-------+
| dict_id | varchar(32) | YES | | NULL | |
| city_id | varchar(32) | YES | | NULL | |
| city_name | varchar(32) | YES | | NULL | |
| city_code | varchar(32) | YES | | NULL | |
| group_id | varchar(32) | YES | | NULL | |
| group_name | varchar(32) | YES | | NULL | |
| area_code | varchar(32) | YES | | NULL | |
| bureau_id | varchar(64) | YES | | NULL | |
| sort | varchar(32) | YES | | NULL | |
| bureau_name | varchar(32) | YES | | NULL | |
+-------------+-------------+------+-----+---------+-------+
10 rows in set (0.00 sec) mysql> desc TestMysqlTble2;
+-------------+------+------+-----+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+-------------+------+------+-----+---------+-------+
| dict_id | text | YES | | NULL | |
| city_id | text | YES | | NULL | |
| city_name | text | YES | | NULL | |
| city_code | text | YES | | NULL | |
| group_id | text | YES | | NULL | |
| group_name | text | YES | | NULL | |
| area_code | text | YES | | NULL | |
| bureau_id | text | YES | | NULL | |
| sort | text | YES | | NULL | |
| bureau_name | text | YES | | NULL | |
+-------------+------+------+-----+---------+-------+
10 rows in set (0.00 sec)
  • 关于 SaveMode.Overwrite
 def jdbc(url: String, table: String, connectionProperties: Properties): Unit = {
val props = new Properties()
extraOptions.foreach { case (key, value) =>
props.put(key, value)
}
// connectionProperties should override settings in extraOptions
props.putAll(connectionProperties)
val conn = JdbcUtils.createConnectionFactory(url, props)() try {
var tableExists = JdbcUtils.tableExists(conn, url, table) if (mode == SaveMode.Ignore && tableExists) {
return
} if (mode == SaveMode.ErrorIfExists && tableExists) {
sys.error(s"Table $table already exists.")
} if (mode == SaveMode.Overwrite && tableExists) {
JdbcUtils.dropTable(conn, table)//----------------------------------------关键点1
tableExists = false
} // Create the table if the table didn't exist.
if (!tableExists) {
val schema = JdbcUtils.schemaString(df, url)
val sql = s"CREATE TABLE $table ($schema)"
val statement = conn.createStatement
try {
statement.executeUpdate(sql)
} finally {
statement.close()
}
}
} finally {
conn.close()
} JdbcUtils.saveTable(df, url, table, props)
} /**
* Drops a table from the JDBC database.
*/
def dropTable(conn: Connection, table: String): Unit = {
val statement = conn.createStatement
try {
statement.executeUpdate(s"DROP TABLE $table")//-------------------------------------关键点2
} finally {
statement.close()
}
}

从上述两段关键代码可以看到,在写入的时候会先判断表存不存在,SaveMode.Overwrite 的时候会执行 dropTable(conn: Connection, table: String)把原来的表删除掉,这也意味着你会失去你的表结构,新建的表会出现上一个问题都用默认类型,所以在保存的方法中我添加了下面的操作

 if (saveMode == SaveMode.Overwrite) {
51 var conn: Connection = null
52 try {
53 conn = DriverManager.getConnection(
54 prop.getProperty("url"),
55 prop.getProperty("user"),
56 prop.getProperty("password")
57 )
58 val stmt = conn.createStatement
59 table = table.toUpperCase
60 stmt.execute(s"truncate table $table") //为了不删除表结构,先truncate 再Append
61 conn.close()
62 }
63 catch {
64 case e: Exception =>
65 println("MySQL Error:")
66 e.printStackTrace()
67 }
truncate仅仅是删除数据,并不删除结构。

如果表一开始不存在

如果一开始不存在需要分两种情况:

1.非SaveMode.Overwrite模式

没有问题,会直接建表,用默认的数据类型

2.SaveMode.Overwrite模式

会报错,下面是在没有TestMysqlTble2的情况下使用SaveMode.Overwrite

 com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: Table 'iptv.TESTMYSQLTBLE2' doesn't exist
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at com.mysql.jdbc.Util.handleNewInstance(Util.java:404)
at com.mysql.jdbc.Util.getInstance(Util.java:387)
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:939)
at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3878)
at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3814)
at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:2478)
at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2625)
at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2547)
at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2505)
at com.mysql.jdbc.StatementImpl.executeInternal(StatementImpl.java:840)
at com.mysql.jdbc.StatementImpl.execute(StatementImpl.java:740)
at com.iptv.job.basedata.SaveDataFrameASMysql$.saveASMysqlTable(SaveDataFrameASMysql.scala:62)
at com.iptv.job.basedata.SaveDataFrameASMysql$.main(SaveDataFrameASMysql.scala:33)
at com.iptv.job.basedata.SaveDataFrameASMysql.main(SaveDataFrameASMysql.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

报错详情

 at com.iptv.job.basedata.SaveDataFrameASMysql$.saveASMysqlTable(SaveDataFrameASMysql.scala:62)
生面报错位置对应的代码为
stmt.execute(s"truncate table $table") //为了不删除表结构,先truncate 再Append
即truncate需要表存在

至此,DataFrame写mysql功能实现

文章为个人工作总结,转载请注明出处!!!!!!!

04-21 07:54