一、环境准备:
安装mysql后,进入mysql命令行,创建测试表、数据:
将 mysql-connector-java 的jar文件拷贝到 \spark_home\lib\下,你可以使用最新版本,下载地址:
http://dev.mysql.com/downloads/connector/j/
二、实现代码
1、准备工作:
SparkConf conf = new SparkConf().setAppName("JDBCDataSource");//.setMaster("local"); JavaSparkContext sc = new JavaSparkContext(conf); SQLContext sqlContext = new SQLContext(sc);
2、从JDBC数据读取:
Map<String, String> options = new HashMap<String, String>(); options.put("url", "jdbc:mysql://spark1:3306/testdb");
//读取第一个表 options.put("dbtable", "student_infos");
DataFrame studentInfosDF = sqlContext.read().format("jdbc")
.options(options).load(); //读取第二个表 options.put("dbtable", "student_scores"); DataFrame studentScoresDF = sqlContext.read().format("jdbc") .options(options).load();
3、 写入数据到JDBC
studentsDF.javaRDD().foreach(new VoidFunction<Row>() {
privatestaticfinallong serialVersionUID = 1L;
public void call(Row row) throws Exception {
// TODO Auto-generated method stub
String sql = "insert into good_student_infos values("
+ "'" + String.valueOf(row.getString(0)) + "',"
+ Integer.valueOf(String.valueOf(row.get(1))) + ","
+ Integer.valueOf(String.valueOf(row.get(2))) + ")";
Class.forName("com.mysql.jdbc.Driver");
Connection conn = null;
Statement stmt = null;
try {
conn = DriverManager.getConnection("jdbc:mysql://spark1:3306/testdb", "", ""); //可能有重复创建conn的问题,此处不做讨论
stmt = conn.createStatement();
stmt.executeUpdate(sql);
} catch (Exception e) {
e.printStackTrace();
} finally {
if(stmt != null) {
stmt.close();
}
if(conn != null) {
conn.close();
}
}
}
});
三、bug解决
写本片文章的目的就是要记录这个错误,按照以上的方式实现,仍然会有报错,错误信息:"No suitable driver found for jdbc:mysql://spark1:3306/testdb",问题在于,仅仅将mysql-connector-java 的jar文件放到/spark_home/lib 目录是不够的,下面是群里技术前辈告诉我的解决方法, 原样贴上: