spark之JDBC開發(鏈接數據庫測試)

spark之JDBC開發(鏈接數據庫測試)html


如下操做屬於本地模式操做:

一、在Eclipse4.5中創建工程RDDToJDBC,並建立一個文件夾lib用於放置第三方驅動包

[hadoop@CloudDeskTop software]$ cd /project/RDDToJDBC/
[hadoop@CloudDeskTop RDDToJDBC]$ mkdir -p lib
[hadoop@CloudDeskTop RDDToJDBC]$ ls
bin lib srcjava

二、添加必要的環境

2.一、將MySql的jar包拷貝到工程目錄RDDToJDBC下的lib目錄下
[hadoop@CloudDeskTop software]$ cp -a /software/hive-1.2.2/lib/mysql-connector-java-3.0.17-ga-bin.jar /project/RDDToJDBC/lib/
2.一、將Spark的開發庫Spark2.1.1-All追加到RDDToJDBC工程的classpath路徑中去(能夠經過添加用戶庫的方式來解決);Spark2.1.1-All中包含哪些包,請點擊此處mysql

三、準備spark的源數據:

[hadoop@CloudDeskTop spark]$ cd /home/hadoop/test/jdbc/
[hadoop@CloudDeskTop jdbc]$ ls
myuser  testJDBC.txt
[hadoop@CloudDeskTop jdbc]$ cat myuser 
lisi 123456 165 1998-9-9
lisan 123ss 187 2009-10-19
wangwu 123qqwe 177 1990-8-3

四、開發源碼:

package com.mmzs.bigdata.spark.core.local;

import java.io.File;
import java.sql.Connection;
import java.sql.Date;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.VoidFunction;

import scala.Tuple4;

public class TestMain {
    /**
     * 全局計數器
     */
    private static int count;
    
    /**
     * 數據庫鏈接
     */
    private static Connection conn;
    
    /**
     * 預編譯語句
     */
    private static PreparedStatement pstat;
    
    private static final File OUT_PATH=new File("/home/hadoop/test/jdbc/output");
    
    static{
        delDir(OUT_PATH);
        try {
            String sql="insert into myuser(userName,passWord,height,birthday) values(?,?,?,?)";
            String url="jdbc:mysql://127.0.0.1:3306/test?useUnicode=true&characterEncoding=utf8";
            Class.forName("com.mysql.jdbc.Driver");
            conn=DriverManager.getConnection(url, "root", "123456");
            pstat=conn.prepareStatement(sql);
        } catch (ClassNotFoundException e) {
            e.printStackTrace();
        } catch (SQLException e) {
            e.printStackTrace();
        }
    }
    /**
     * 刪除任何目錄或文件
     * @param f
     */
    private static void delDir(File f){
        if(!f.exists())return;
        if(f.isFile()||(f.isDirectory()&&f.listFiles().length==0)){
            f.delete();
            return;
        }
        File[] files=f.listFiles();
        for(File fp:files)delDir(fp);
        f.delete();
    }
    
    private static void batchSave(Tuple4<String,String,Double,Date> line,boolean isOver){
        try{
            pstat.setString(1, line._1());
            pstat.setString(2, line._2());
            pstat.setDouble(3, line._3());
            pstat.setDate(4, line._4());
            
            if(isOver){//若是結束了循環則直接寫磁盤
                pstat.addBatch();
                pstat.executeBatch();
                pstat.clearBatch();
                pstat.clearParameters();
            }else{ //若是沒有結束則將sql語句添加到批處理中去
                pstat.addBatch();
                count++;
                if(count%100==0){ //若是滿一個批次就提交一次批處理操做
                    pstat.executeBatch();
                    pstat.clearBatch();
                    pstat.clearParameters();
                }
            }
        }catch(SQLException e){
            e.printStackTrace();
        }
    }
    
    /**
     * 將RDD集合中的數據存儲到關係數據庫MYSql中去
     * @param statResRDD
     */
    private static void saveToDB(JavaRDD<String> statResRDD){
        final long rddNum=statResRDD.count();
        statResRDD.foreach(new VoidFunction<String>(){
            private long count=0;
            @Override
            public void call(String line) throws Exception {
                String[] fields=line.split(" ");
                String userName=fields[0];
                String passWord=fields[1];
                Double height=Double.parseDouble(fields[2]);
                Date birthday=Date.valueOf(fields[3]);
                Tuple4<String,String,Double,Date> fieldTuple=new Tuple4<String,String,Double,Date>(userName,passWord,height,birthday);
                if(++count<rddNum){
                    batchSave(fieldTuple,false);
                }else{
                    batchSave(fieldTuple,true);
                }
            }
        });
        
        try{
            if(null!=pstat)pstat.close();
            if(null!=conn)conn.close();
        }catch(SQLException e){
            e.printStackTrace();
        }
    }
    
    public static void main(String[] args) {
        SparkConf conf=new SparkConf();
        conf.setAppName("Java Spark local");
        conf.setMaster("local");
        
        //根據Spark配置生成Spark上下文
        JavaSparkContext jsc=new JavaSparkContext(conf);
        
        //讀取本地的文本文件成內存中的RDD集合對象
        JavaRDD<String> lineRdd=jsc.textFile("/home/hadoop/test/jdbc/myuser");
        
        //...........其它轉換或統計操做................
        
        //存儲統計以後的結果到磁盤文件中去
        //lineRdd.saveAsTextFile("/home/hadoop/test/jdbc/output");
        saveToDB(lineRdd);
        
        //關閉Spark上下文
        jsc.close();
    }
}

五、初始化MySql數據庫服務(節點在192.168.154.134上)

A、啓動MySql數據庫服務sql

[root@DB03 ~]# cd /software/mysql-5.5.32/multi-data/3306/
[root@DB03 3306]# ls
data my.cnf my.cnf.bak mysqld
[root@DB03 3306]# ./mysqld start
Starting MySQL...

B、創建test庫數據庫

複製代碼
[root@CloudDeskTop 3306]# cd /software/mysql-5.5.32/bin/
[root@CloudDeskTop bin]# ./mysql -h192.168.154.134 -P3306 -uroot -p123456 -e "show databases;"
+--------------------+
| Database           |
+--------------------+
| information_schema |
| mysql              |
| performance_schema |
+--------------------+
[root@CloudDeskTop bin]# ./mysql -h192.168.154.134 -P3306 -uroot -p123456 -e "create database test character set utf8;"
[root@CloudDeskTop bin]# ./mysql -h192.168.154.134 -P3306 -uroot -p123456 -e "show databases;"
+--------------------+
| Database           |
+--------------------+
| information_schema |
| mysql              |
| performance_schema |
| test               |
+--------------------+
複製代碼

C、創建myuser表:apache

[root@CloudDeskTop bin]# ./mysql -h192.168.154.134 -P3306 -uroot -p123456 -e "create table if not exists test.myuser(uid int(11) auto_increment primary key,username varchar(30),password varchar(30),height double(10,1),birthday date)engine=myisam charset=utf8;"
[root@CloudDeskTop bin]# ./mysql -h192.168.154.134 -P3306 -uroot -p123456 -e "use test;show tables;"
+-------------------+
| Tables_in_test    |
+-------------------+
| myuser            |
+-------------------+
[root@CloudDeskTop bin]# ./mysql -h192.168.154.134 -P3306 -uroot -p123456 -e "use test;desc test.myuser;"
+----------+--------------+------+-----+---------+----------------+
| Field    | Type         | Null | Key | Default | Extra          |
+----------+--------------+------+-----+---------+----------------+
| uid      | int(11)      | NO   | PRI | NULL    | auto_increment |
| username | varchar(30)  | YES  |     | NULL    |                |
| password | varchar(30)  | YES  |     | NULL    |                |
| height   | double(10,1) | YES  |     | NULL    |                |
| birthday | date         | YES  |     | NULL    |                |
+----------+--------------+------+-----+---------+----------------+

#目前數據庫表中尚未數據
[root@CloudDeskTop bin]# ./mysql -h192.168.154.134 -P3306 -uroot -p123456 -e "select * from test.myuser;"

六、運行並查看數據庫中結果

6.一、在Eclipse4.5中直接運行Spark代碼,觀察Eclipse控制檯輸出
6.二、檢查在關係數據庫MySql中是否已經存在數據
[root@CloudDeskTop bin]# ./mysql -h192.168.154.134 -P3306 -uroot -p123456 -e "select * from test.myuser;"
+-----+----------+----------+--------+------------+
| uid | username | password | height | birthday   |
+-----+----------+----------+--------+------------+
|   1 | lisi     | 123456   |  165.0 | 1998-09-09 |
|   2 | lisan    | 123ss    |  187.0 | 2009-10-19 |
|   3 | wangwu   | 123qqwe  |  177.0 | 1990-08-03 |
+-----+----------+----------+--------+------------+
相關文章
相關標籤/搜索