spark之JDBC開發(鏈接數據庫測試)html
[hadoop@CloudDeskTop software]$ cd /project/RDDToJDBC/
[hadoop@CloudDeskTop RDDToJDBC]$ mkdir -p lib
[hadoop@CloudDeskTop RDDToJDBC]$ ls
bin lib srcjava
2.一、將MySql的jar包拷貝到工程目錄RDDToJDBC下的lib目錄下
[hadoop@CloudDeskTop software]$ cp -a /software/hive-1.2.2/lib/mysql-connector-java-3.0.17-ga-bin.jar /project/RDDToJDBC/lib/
2.一、將Spark的開發庫Spark2.1.1-All追加到RDDToJDBC工程的classpath路徑中去(能夠經過添加用戶庫的方式來解決);Spark2.1.1-All中包含哪些包,請點擊此處mysql
[hadoop@CloudDeskTop spark]$ cd /home/hadoop/test/jdbc/ [hadoop@CloudDeskTop jdbc]$ ls myuser testJDBC.txt [hadoop@CloudDeskTop jdbc]$ cat myuser lisi 123456 165 1998-9-9 lisan 123ss 187 2009-10-19 wangwu 123qqwe 177 1990-8-3
package com.mmzs.bigdata.spark.core.local; import java.io.File; import java.sql.Connection; import java.sql.Date; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.SQLException; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.VoidFunction; import scala.Tuple4; public class TestMain { /** * 全局計數器 */ private static int count; /** * 數據庫鏈接 */ private static Connection conn; /** * 預編譯語句 */ private static PreparedStatement pstat; private static final File OUT_PATH=new File("/home/hadoop/test/jdbc/output"); static{ delDir(OUT_PATH); try { String sql="insert into myuser(userName,passWord,height,birthday) values(?,?,?,?)"; String url="jdbc:mysql://127.0.0.1:3306/test?useUnicode=true&characterEncoding=utf8"; Class.forName("com.mysql.jdbc.Driver"); conn=DriverManager.getConnection(url, "root", "123456"); pstat=conn.prepareStatement(sql); } catch (ClassNotFoundException e) { e.printStackTrace(); } catch (SQLException e) { e.printStackTrace(); } } /** * 刪除任何目錄或文件 * @param f */ private static void delDir(File f){ if(!f.exists())return; if(f.isFile()||(f.isDirectory()&&f.listFiles().length==0)){ f.delete(); return; } File[] files=f.listFiles(); for(File fp:files)delDir(fp); f.delete(); } private static void batchSave(Tuple4<String,String,Double,Date> line,boolean isOver){ try{ pstat.setString(1, line._1()); pstat.setString(2, line._2()); pstat.setDouble(3, line._3()); pstat.setDate(4, line._4()); if(isOver){//若是結束了循環則直接寫磁盤 pstat.addBatch(); pstat.executeBatch(); pstat.clearBatch(); pstat.clearParameters(); }else{ //若是沒有結束則將sql語句添加到批處理中去 pstat.addBatch(); count++; if(count%100==0){ //若是滿一個批次就提交一次批處理操做 pstat.executeBatch(); pstat.clearBatch(); pstat.clearParameters(); } } }catch(SQLException e){ e.printStackTrace(); } } /** * 將RDD集合中的數據存儲到關係數據庫MYSql中去 * @param statResRDD */ private static void saveToDB(JavaRDD<String> statResRDD){ final long rddNum=statResRDD.count(); statResRDD.foreach(new VoidFunction<String>(){ private long count=0; @Override public void call(String line) throws Exception { String[] fields=line.split(" "); String userName=fields[0]; String passWord=fields[1]; Double height=Double.parseDouble(fields[2]); Date birthday=Date.valueOf(fields[3]); Tuple4<String,String,Double,Date> fieldTuple=new Tuple4<String,String,Double,Date>(userName,passWord,height,birthday); if(++count<rddNum){ batchSave(fieldTuple,false); }else{ batchSave(fieldTuple,true); } } }); try{ if(null!=pstat)pstat.close(); if(null!=conn)conn.close(); }catch(SQLException e){ e.printStackTrace(); } } public static void main(String[] args) { SparkConf conf=new SparkConf(); conf.setAppName("Java Spark local"); conf.setMaster("local"); //根據Spark配置生成Spark上下文 JavaSparkContext jsc=new JavaSparkContext(conf); //讀取本地的文本文件成內存中的RDD集合對象 JavaRDD<String> lineRdd=jsc.textFile("/home/hadoop/test/jdbc/myuser"); //...........其它轉換或統計操做................ //存儲統計以後的結果到磁盤文件中去 //lineRdd.saveAsTextFile("/home/hadoop/test/jdbc/output"); saveToDB(lineRdd); //關閉Spark上下文 jsc.close(); } }
A、啓動MySql數據庫服務sql
[root@DB03 ~]# cd /software/mysql-5.5.32/multi-data/3306/ [root@DB03 3306]# ls data my.cnf my.cnf.bak mysqld [root@DB03 3306]# ./mysqld start Starting MySQL...
B、創建test庫數據庫
[root@CloudDeskTop 3306]# cd /software/mysql-5.5.32/bin/ [root@CloudDeskTop bin]# ./mysql -h192.168.154.134 -P3306 -uroot -p123456 -e "show databases;" +--------------------+ | Database | +--------------------+ | information_schema | | mysql | | performance_schema | +--------------------+ [root@CloudDeskTop bin]# ./mysql -h192.168.154.134 -P3306 -uroot -p123456 -e "create database test character set utf8;" [root@CloudDeskTop bin]# ./mysql -h192.168.154.134 -P3306 -uroot -p123456 -e "show databases;" +--------------------+ | Database | +--------------------+ | information_schema | | mysql | | performance_schema | | test | +--------------------+
C、創建myuser表:apache
[root@CloudDeskTop bin]# ./mysql -h192.168.154.134 -P3306 -uroot -p123456 -e "create table if not exists test.myuser(uid int(11) auto_increment primary key,username varchar(30),password varchar(30),height double(10,1),birthday date)engine=myisam charset=utf8;" [root@CloudDeskTop bin]# ./mysql -h192.168.154.134 -P3306 -uroot -p123456 -e "use test;show tables;" +-------------------+ | Tables_in_test | +-------------------+ | myuser | +-------------------+ [root@CloudDeskTop bin]# ./mysql -h192.168.154.134 -P3306 -uroot -p123456 -e "use test;desc test.myuser;" +----------+--------------+------+-----+---------+----------------+ | Field | Type | Null | Key | Default | Extra | +----------+--------------+------+-----+---------+----------------+ | uid | int(11) | NO | PRI | NULL | auto_increment | | username | varchar(30) | YES | | NULL | | | password | varchar(30) | YES | | NULL | | | height | double(10,1) | YES | | NULL | | | birthday | date | YES | | NULL | | +----------+--------------+------+-----+---------+----------------+ #目前數據庫表中尚未數據 [root@CloudDeskTop bin]# ./mysql -h192.168.154.134 -P3306 -uroot -p123456 -e "select * from test.myuser;"
[root@CloudDeskTop bin]# ./mysql -h192.168.154.134 -P3306 -uroot -p123456 -e "select * from test.myuser;" +-----+----------+----------+--------+------------+ | uid | username | password | height | birthday | +-----+----------+----------+--------+------------+ | 1 | lisi | 123456 | 165.0 | 1998-09-09 | | 2 | lisan | 123ss | 187.0 | 2009-10-19 | | 3 | wangwu | 123qqwe | 177.0 | 1990-08-03 | +-----+----------+----------+--------+------------+