spark之JDBC開發(實戰)

1、概述

Spark Core、Spark-SQL與Spark-Streaming都是相同的,編寫好以後打成jar包使用spark-submit命令提交到集羣運行應用
$SPARK_HOME/bin#./spark-submit  --master spark://Master01:7077  --class MainClassFullName [--files $HIVE_HOME/conf/hive-site.xml] JarNameFullPath [slices]html

說明:
--master參數用於指定提交到的Spark集羣入口,這個入口一般是Spark的Master節點(即Master進程或ResourceManager進程所在的節點),若是須要爲該參數指定一個高可用集羣則集羣節點之間使用英文逗號分割
--class參數用於指定Spark之Driver的入口Main類(必須指定該Main類的全名)
若是使用Spark操做Hive倉庫則須要使用--files參數指定Hive的配置文件
若是使用Spark操做關係數據庫則須要將關係數據庫的驅動包放置於Spark安裝目錄下的library目錄下(在Spark2.x中應該放置於jars目錄下),如:
[hadoop@CloudDeskTop jars]$ pwd
/software/spark-2.1.1/jars
JarNameFullPath表示的是提交的Spark應用所在的JAR包全名(最好指定爲絕對的全路徑)
slices:表示的是讀取數據的並行度(值爲一個數值,根據實際的物理內存配置來指定,內存較小時指定爲1或者不用指定),通常在Streaming應用中是不須要指定的前端

2、Spark之JDBC實戰

(一)、本地模式操做

典型業務場景描述:將CloudDeskTop客戶端本地的數據,經過Spark處理,而後將結果寫入遠端關係數據庫中,供前端在線事務系統使用java

一、在Eclipse4.5中創建工程RDDToJDBC,並建立一個文件夾lib用於放置第三方驅動包

[hadoop@CloudDeskTop software]$ cd /project/RDDToJDBC/
[hadoop@CloudDeskTop RDDToJDBC]$ mkdir -p lib
[hadoop@CloudDeskTop RDDToJDBC]$ ls
bin lib srcnode

二、添加必要的環境

2.一、將MySql的jar包拷貝到工程目錄RDDToJDBC下的lib目錄下
[hadoop@CloudDeskTop software]$ cp -a /software/hive-1.2.2/lib/mysql-connector-java-3.0.17-ga-bin.jar /project/RDDToJDBC/lib/
2.一、將Spark的開發庫Spark2.1.1-All追加到RDDToJDBC工程的classpath路徑中去(能夠經過添加用戶庫的方式來解決);Spark2.1.1-All中包含哪些包,請點擊此處mysql

三、基於RDD到DB的Java源碼

  1 package com.mmzs.bigdata.spark.core.local;
  2 
  3 import java.io.File;
  4 import java.sql.Connection;
  5 import java.sql.DriverManager;
  6 import java.sql.PreparedStatement;
  7 import java.sql.SQLException;
  8 import java.util.Arrays;
  9 import java.util.Iterator;
 10 import java.util.List;
 11 
 12 import org.apache.spark.SparkConf;
 13 import org.apache.spark.api.java.JavaPairRDD;
 14 import org.apache.spark.api.java.JavaRDD;
 15 import org.apache.spark.api.java.JavaSparkContext;
 16 import org.apache.spark.api.java.function.FlatMapFunction;
 17 import org.apache.spark.api.java.function.Function2;
 18 import org.apache.spark.api.java.function.PairFunction;
 19 import org.apache.spark.api.java.function.VoidFunction;
 20 
 21 import scala.Tuple2;
 22 import scala.Tuple4;
 23 
 24 public class RDDToDB {
 25     /**
 26      * 全局計數器
 27      */
 28     private static int count;
 29     
 30     /**
 31      * 數據庫鏈接
 32      */
 33     private static Connection conn;
 34     
 35     /**
 36      * 預編譯語句
 37      */
 38     private static PreparedStatement pstat;
 39     
 40     private static final File OUT_PATH=new File("/home/hadoop/test/output");
 41     
 42     static{
 43         delDir(OUT_PATH);
 44         try {
 45             String sql="insert into wordcount(word,count) values(?,?)";
 46             String url="jdbc:mysql://127.0.0.1:3306/test?useUnicode=true&characterEncoding=utf8";
 47             Class.forName("com.mysql.jdbc.Driver");
 48             conn=DriverManager.getConnection(url, "root", "123456");
 49             pstat=conn.prepareStatement(sql);
 50         } catch (ClassNotFoundException e) {
 51             e.printStackTrace();
 52         } catch (SQLException e) {
 53             e.printStackTrace();
 54         }
 55     }
 56     /**
 57      * 刪除任何目錄或文件
 58      * @param f
 59      */
 60     private static void delDir(File f){
 61         if(!f.exists())return;
 62         if(f.isFile()||(f.isDirectory()&&f.listFiles().length==0)){
 63             f.delete();
 64             return;
 65         }
 66         File[] files=f.listFiles();
 67         for(File fp:files)delDir(fp);
 68         f.delete();
 69     }
 70     
 71     //分批存儲
 72     private static void batchSave(Tuple2<String, Integer> line,boolean isOver){
 73         try{
 74             pstat.setString(1, line._1());
 75             pstat.setInt(2, line._2());
 76             
 77             if(isOver){//若是結束了循環則直接寫磁盤
 78                 pstat.addBatch();
 79                 pstat.executeBatch();
 80                 pstat.clearBatch();
 81                 pstat.clearParameters();
 82             }else{ //若是沒有結束則將sql語句添加到批處理中去
 83                 pstat.addBatch();
 84                 count++;
 85                 if(count%100==0){ //若是滿一個批次就提交一次批處理操做
 86                     pstat.executeBatch();
 87                     pstat.clearBatch();
 88                     pstat.clearParameters();
 89                 }
 90             }
 91         }catch(SQLException e){
 92             e.printStackTrace();
 93         }
 94     }
 95     
 96     /**
 97      * 將RDD集合中的數據存儲到關係數據庫MYSql中去
 98      * @param statResRDD
 99      */
100     private static void saveToDB(JavaPairRDD<String, Integer> statResRDD){
101         final long rddNum=statResRDD.count();
102         statResRDD.foreach(new VoidFunction<Tuple2<String,Integer>>(){
103             private long count=0;
104             @Override
105             public void call(Tuple2<String, Integer> line) throws Exception {
106                 if(++count<rddNum){
107                     batchSave(line,false);
108                 }else{
109                     batchSave(line,true);
110                 }
111             }
112         });
113         
114         try{
115             if(null!=pstat)pstat.close();
116             if(null!=conn)conn.close();
117         }catch(SQLException e){
118             e.printStackTrace();
119         }
120     }
121     
122     public static void main(String[] args) {
123         SparkConf conf=new SparkConf();
124         conf.setAppName("Java Spark local");
125         conf.setMaster("local");
126         
127         //根據Spark配置生成Spark上下文
128         JavaSparkContext jsc=new JavaSparkContext(conf);
129         
130         //讀取本地的文本文件成內存中的RDD集合對象
131         JavaRDD<String> lineRdd=jsc.textFile("/home/hadoop/test/jdbc");
132         
133         //切分每一行的字串爲單詞數組,並將字串數組中的單詞字串釋放到外層的JavaRDD集合中
134         JavaRDD<String> flatMapRdd=lineRdd.flatMap(new FlatMapFunction<String,String>(){
135             @Override
136             public Iterator<String> call(String line) throws Exception {
137                 String[] words=line.split(" ");
138                 List<String> list=Arrays.asList(words);
139                 Iterator<String> its=list.iterator();
140                 return its;
141             }
142         });
143         
144         //爲JavaRDD集合中的每個單詞進行計數,將其轉換爲元組
145         JavaPairRDD<String, Integer> mapRdd=flatMapRdd.mapToPair(new PairFunction<String, String,Integer>(){
146             @Override
147             public Tuple2<String,Integer> call(String word) throws Exception {
148                 return new Tuple2<String,Integer>(word,1);
149             }
150         });
151         
152         //根據元組中的第一個元素(Key)進行分組並統計單詞出現的次數
153         JavaPairRDD<String, Integer> reduceRdd=mapRdd.reduceByKey(new Function2<Integer,Integer,Integer>(){
154             @Override
155             public Integer call(Integer pre, Integer next) throws Exception {
156                 return pre+next;
157             }
158         });
159         
160         //將單詞元組中的元素反序以方便後續排序
161         JavaPairRDD<Integer, String> mapRdd02=reduceRdd.mapToPair(new PairFunction<Tuple2<String, Integer>,Integer,String>(){
162             @Override
163             public Tuple2<Integer, String> call(Tuple2<String, Integer> wordTuple) throws Exception {
164                 return new Tuple2<Integer,String>(wordTuple._2,wordTuple._1);
165             }
166         });
167         
168         //將JavaRDD集合中的單詞按出現次數進行將序排列
169         JavaPairRDD<Integer, String> sortRdd=mapRdd02.sortByKey(false, 1);
170         
171         //排序以後將元組中的順序換回來
172         JavaPairRDD<String, Integer> mapRdd03=sortRdd.mapToPair(new PairFunction<Tuple2<Integer, String>,String,Integer>(){
173             @Override
174             public Tuple2<String, Integer> call(Tuple2<Integer, String> wordTuple) throws Exception {
175                 return new Tuple2<String, Integer>(wordTuple._2,wordTuple._1);
176             }
177         });
178         
179         //存儲統計以後的結果到磁盤文件中去
180         //mapRdd03.saveAsTextFile("/home/hadoop/test/jdbc/output");
181         
182         saveToDB(mapRdd03);
183         
184         //關閉Spark上下文
185         jsc.close();
186     }
187 }
View Code

四、測試Spark的JDBC應用

4.一、初始化MySql數據庫服務(節點在192.168.154.134上)

A、啓動MySql數據庫服務sql

[root@DB03 ~]# cd /software/mysql-5.5.32/multi-data/3306/
[root@DB03 3306]# ls
data my.cnf my.cnf.bak mysqld
[root@DB03 3306]# ./mysqld start
Starting MySQL...

B、創建test庫數據庫

[root@CloudDeskTop 3306]# cd /software/mysql-5.5.32/bin/
[root@CloudDeskTop bin]# ./mysql -h192.168.154.134 -P3306 -uroot -p123456 -e "show databases;"
+--------------------+
| Database           |
+--------------------+
| information_schema |
| mysql              |
| performance_schema |
+--------------------+
[root@CloudDeskTop bin]# ./mysql -h192.168.154.134 -P3306 -uroot -p123456 -e "create database test character set utf8;"
[root@CloudDeskTop bin]# ./mysql -h192.168.154.134 -P3306 -uroot -p123456 -e "show databases;"
+--------------------+
| Database           |
+--------------------+
| information_schema |
| mysql              |
| performance_schema |
| test               |
+--------------------+

C、創建wordcount表apache

[root@DB03 bin]# ./mysql -h192.168.154.134 -P3306 -uroot -p123456 -e "create table if not exists test.wordcount(wid int(11) auto_increment primary key,word varchar(30),count int(3))engine=myisam charset=utf8;"
[root@DB03 bin]# ./mysql -h192.168.154.134 -P3306 -uroot -p123456 -e "desc test.wordcount;"
+-------+-------------+------+-----+---------+----------------+
| Field | Type        | Null | Key | Default | Extra          |
+-------+-------------+------+-----+---------+----------------+
| wid   | int(11)     | NO   | PRI | NULL    | auto_increment |
| word  | varchar(30) | YES  |     | NULL    |                |
| count | int(3)      | YES  |     | NULL    |                |
+-------+-------------+------+-----+---------+----------------+

#目前數據庫表中尚未數據
[root@DB03 bin]# ./mysql -h192.168.154.134 -P3306 -uroot -p123456 -e "select * from test.wordcount;"
4.二、準備Spark的源數據
[hadoop@CloudDeskTop jdbc]$ pwd
/home/hadoop/test/jdbc
[hadoop@CloudDeskTop jdbc]$ ls
myuser  testJDBC.txt
[hadoop@CloudDeskTop jdbc]$ cat testJDBC.txt myuser 
zhnag san shi yi ge hao ren
jin tian shi yi ge hao tian qi 
wo zai zhe li zuo le yi ge ce shi 
yi ge guan yu scala de ce shi 
welcome to mmzs
歡迎 歡迎
lisi 123456 165 1998-9-9
lisan 123ss 187 2009-10-19
wangwu 123qqwe 177 1990-8-3
4.三、在Eclipse4.5中直接運行Spark代碼,觀察Eclipse控制檯輸出
4.四、檢查在關係數據庫MySql中是否已經存在數據

 [root@CloudDeskTop bin]# ./mysql -h192.168.154.134 -P3306 -uroot -p123456 -e "select * from test.wordcount;"api

 

(二)、集羣模式操做

典型業務場景描述:將HDFS集羣中的數據經過Spark處理以後,將結果寫入遠端關係數據庫中,供前端在線事務系統使用數組

一、在Eclipse4.5中的工程RDDToJDBC下建立一個package文件夾用於放置打包文件

[hadoop@CloudDeskTop software]$ cd /project/RDDToJDBC/
[hadoop@CloudDeskTop RDDToJDBC]$ mkdir -p package
[hadoop@CloudDeskTop RDDToJDBC]$ ls
bin package src

二、將關係數據庫的驅動包放置到Spark安裝目錄下的jars目錄下

在客戶端上傳所需的mysql-connector-java-3.0.17-ga-bin.jar包:
[hadoop@CloudDeskTop jars]# pwd
/software/spark-2.1.1/jars
而後分發到集羣:
[hadoop@CloudDeskTop software]$ scp -r /software/spark-2.1.1/jars/mysql-connector-java-3.0.17-ga-bin.jar master01:/software/spark-2.1.1/jars/
[hadoop@master01 software]$ scp -r /software/spark-2.1.1/jars/mysql-connector-java-3.0.17-ga-bin.jar master02:/software/spark-2.1.1/jars/
[hadoop@master01 software]$ scp -r /software/spark-2.1.1/jars/mysql-connector-java-3.0.17-ga-bin.jar slave01:/software/spark-2.1.1/jars/
[hadoop@master01 software]$ scp -r /software/spark-2.1.1/jars/mysql-connector-java-3.0.17-ga-bin.jar slave02:/software/spark-2.1.1/jars/
[hadoop@master01 software]$ scp -r /software/spark-2.1.1/jars/mysql-connector-java-3.0.17-ga-bin.jar slave03:/software/spark-2.1.1/jars/

三、開發源碼

  1 package com.mmzs.bigdata.spark.core.cluster;
  2 
  3 import java.sql.Connection;
  4 import java.sql.DriverManager;
  5 import java.sql.PreparedStatement;
  6 import java.sql.SQLException;
  7 import java.util.Arrays;
  8 import java.util.Iterator;
  9 import java.util.List;
 10 
 11 import org.apache.spark.SparkConf;
 12 import org.apache.spark.api.java.JavaPairRDD;
 13 import org.apache.spark.api.java.JavaRDD;
 14 import org.apache.spark.api.java.JavaSparkContext;
 15 import org.apache.spark.api.java.function.FlatMapFunction;
 16 import org.apache.spark.api.java.function.Function2;
 17 import org.apache.spark.api.java.function.PairFunction;
 18 import org.apache.spark.api.java.function.VoidFunction;
 19 
 20 import scala.Tuple2;
 21 
 22 public class RDDToDB {
 23     /**
 24      * 全局計數器
 25      */
 26     private static int count;
 27     
 28     /**
 29      * 數據庫鏈接
 30      */
 31     private static Connection conn;
 32     
 33     /**
 34      * 預編譯語句
 35      */
 36     private static PreparedStatement pstat;
 37     
 38     static{
 39         try {
 40             String sql="insert into wordcount(word,count) values(?,?)";
 41             String url="jdbc:mysql://192.168.154.134:3306/test?useUnicode=true&characterEncoding=utf8";
 42             Class.forName("com.mysql.jdbc.Driver");
 43             conn=DriverManager.getConnection(url, "root", "123456");
 44             pstat=conn.prepareStatement(sql);
 45         } catch (ClassNotFoundException e) {
 46             e.printStackTrace();
 47         } catch (SQLException e) {
 48             e.printStackTrace();
 49         }
 50     }
 51     
 52     /**
 53      * 批量存儲數據
 54      * @param line
 55      * @throws SQLException
 56      */
 57     private static void batchSave(Tuple2<String, Integer> line,boolean isOver){
 58         try{
 59             pstat.setString(1, line._1());
 60             pstat.setInt(2, line._2());
 61             
 62             if(isOver){//若是結束了循環則直接寫磁盤。
 63                 //若是RDD數據已經迭代結束,則執行剩下的批量語句。
 64                 pstat.addBatch();
 65                 pstat.executeBatch();
 66                 pstat.clearBatch();
 67                 pstat.clearParameters();
 68             }else{ //若是沒有結束則將sql語句添加到批處理中去。
 69                 //若是RDD數據的迭代還不曾結束,則直接將當前語句添加到批處理計劃中去;
 70                 //可是若是批處理語句數量超過了100則沖刷一次緩衝區中批處理並重置計數器。
 71                 pstat.addBatch();
 72                 count++;
 73                 if(count%100==0){ //若是滿一個批次就提交一次批處理操做
 74                     pstat.executeBatch();
 75                     pstat.clearBatch();
 76                     pstat.clearParameters();
 77                 }
 78             }
 79         }catch(SQLException e){
 80             e.printStackTrace();
 81         }
 82     }
 83     
 84     /**
 85      * 將RDD集合中的數據存儲到關係數據庫MYSql中去。
 86      * 存儲結果到關係數據庫中
 87      * 必須將內部類對象方法(如:call)中的操做分離到一個獨立的方法(如:batchSave)中去,
 88      * 由於Spark給定的內部類API都是可序列化的,而執行JDBC操做的Statement和Connection都是不能被序列化的
 89      * @param wordGroupList
 90      * @throws ClassNotFoundException 
 91      */
 92     private static void saveToDB(JavaPairRDD<String, Integer> statResRDD){
 93         final long rddNum=statResRDD.count();
 94         statResRDD.foreach(new VoidFunction<Tuple2<String,Integer>>(){
 95             private long count=0;
 96             @Override
 97             public void call(Tuple2<String, Integer> line) throws Exception {
 98                 if(++count<rddNum){
 99                     batchSave(line,false);
100                 }else{
101                     batchSave(line,true);
102                 }
103             }
104         });
105         
106         try{
107             if(null!=pstat)pstat.close();
108             if(null!=conn)conn.close();
109         }catch(SQLException e){
110             e.printStackTrace();
111         }
112     }
113     
114     public static void main(String[] args) {
115         SparkConf conf=new SparkConf();
116         conf.setAppName("Java Spark Cluster");
117         
118         //根據Spark配置生成Spark上下文
119         JavaSparkContext jsc=new JavaSparkContext(conf);
120         
121         //讀取本地的文本文件成內存中的RDD集合對象
122         JavaRDD<String> lineRdd=jsc.textFile("/spark/input", 1);
123         
124         //切分每一行的字串爲單詞數組,並將字串數組中的單詞字串釋放到外層的JavaRDD集合中
125         JavaRDD<String> flatMapRdd=lineRdd.flatMap(new FlatMapFunction<String,String>(){
126             @Override
127             public Iterator<String> call(String line) throws Exception {
128                 String[] words=line.split(" ");
129                 List<String> list=Arrays.asList(words);
130                 Iterator<String> its=list.iterator();
131                 return its;
132             }
133         });
134         
135         //爲JavaRDD集合中的每個單詞進行計數,將其轉換爲元組
136         JavaPairRDD<String, Integer> mapRdd=flatMapRdd.mapToPair(new PairFunction<String, String,Integer>(){
137             @Override
138             public Tuple2<String,Integer> call(String word) throws Exception {
139                 return new Tuple2<String,Integer>(word,1);
140             }
141         });
142         
143         //根據元組中的第一個元素(Key)進行分組並統計單詞出現的次數
144         JavaPairRDD<String, Integer> reduceRdd=mapRdd.reduceByKey(new Function2<Integer,Integer,Integer>(){
145             @Override
146             public Integer call(Integer pre, Integer next) throws Exception {
147                 return pre+next;
148             }
149         });
150         
151         //將單詞元組中的元素反序以方便後續排序
152         JavaPairRDD<Integer, String> mapRdd02=reduceRdd.mapToPair(new PairFunction<Tuple2<String, Integer>,Integer,String>(){
153             @Override
154             public Tuple2<Integer, String> call(Tuple2<String, Integer> wordTuple) throws Exception {
155                 return new Tuple2<Integer,String>(wordTuple._2,wordTuple._1);
156             }
157         });
158         
159         //將JavaRDD集合中的單詞按出現次數進行將序排列
160         JavaPairRDD<Integer, String> sortRdd=mapRdd02.sortByKey(false, 1);
161         
162         //排序以後將元組中的順序換回來
163         JavaPairRDD<String, Integer> mapRdd03=sortRdd.mapToPair(new PairFunction<Tuple2<Integer, String>,String,Integer>(){
164             @Override
165             public Tuple2<String, Integer> call(Tuple2<Integer, String> wordTuple) throws Exception {
166                 return new Tuple2<String, Integer>(wordTuple._2,wordTuple._1);
167             }
168         });
169         
170         //存儲統計以後的結果到磁盤文件中去
171         //mapRdd03.saveAsTextFile("/spark/output");
172         
173         saveToDB(mapRdd03);
174         
175         //關閉Spark上下文
176         jsc.close();
177     }
178 }
View Code

說明:
  在集羣模式下,Spark操做關係數據庫是經過啓動一個Job來完成的,而啓動Job則是經過RDD的操做來觸發的,所以在Spark集羣模式下其關係數據庫的全部操做必須位於RDD操做級別纔是有效的,不然數據的操做將沒法影響到關係數據庫中去,而RDD級別以外的操做都屬於Spark Core的客戶端Driver級別(好比:SparkSQL和SparkStreaming),在上面的代碼中,只有RDD對象在被foreachXXX時纔會進入到SparkCore級別的Job操做,在RDD以外的操做是屬於Driver級別的操做,沒法啓動Job。
  在基於RDD級別的SparkCore操做過程當中,其數據都是被封裝成Job提交到集羣,並在集羣的各個節點上執行分配的Task,數據在各個Task節點之間傳遞須要數據自己支持可序列化,所以在Spark應用中出現的高頻率內部類對象(好比上面的VoidFunction)都必須支持可序列化,這意味着在這些內部類對象中出現的成員也必須是可序列化的,所以咱們在這些內部類對象所在的上下文中編寫代碼時必須注意不能出現不可序列化的對象或引用(如不能出現基於瞬態的流化對象Connection、Statement、Thread等),即在這些內部類對象上下文中出現的對象引用必須是實現了java.io.Seralizable接口的。

四、打包工程代碼到dist目錄下

[hadoop@CloudDeskTop ~]$ cd /project/RDDToJDBC/bin/
[hadoop@CloudDeskTop bin]$ ls
com mysql-connector-java-3.0.17-ga-bin.jar
[hadoop@CloudDeskTop bin]$ jar -cvfe /project/RDDToJDBC/package/RDDToJDBC.jar com.mmzs.bigdata.spark.core.cluster.RDDToDB com/
[hadoop@CloudDeskTop bin]$ cd ../package
[hadoop@CloudDeskTop package]$ ls
RDDToJDBC.jar

五、集羣模式下的應用提交測試

A、啓動spark集羣運行環境:[hadoop@master01 install]$ sh start-total.sh 

#!/bin/bash
echo "請首先確認你已經切換到hadoop用戶"
#啓動zookeeper集羣
for node in hadoop@slave01 hadoop@slave02 hadoop@slave03;do ssh $node "source /etc/profile; cd /software/zookeeper-3.4.10/bin/; ./zkServer.sh start; jps";done

#開啓dfs集羣
cd /software/ && start-dfs.sh && jps

#開啓spark集羣
#啓動master01的Master進程,slave節點的Worker進程
cd /software/spark-2.1.1/sbin/ && ./start-master.sh && ./start-slaves.sh && jps
#啓動master02的Master進程
ssh hadoop@master02 "cd /software/spark-2.1.1/sbin/; ./start-master.sh; jps"

#spark集羣的日誌服務,通常不開,由於比較佔資源
#cd /software/spark-2.1.1/sbin/ && ./start-history-server.sh && cd - && jps

start-spark.sh
start-spark.sh

B、在CloudDeskTop客戶端節點上提交Spark應用
#將數據庫中的舊數據刪除掉
[root@CloudDeskTop bin]# pwd
/software/mysql-5.5.32/bin
[root@CloudDeskTop bin]# ./mysql -h192.168.154.134 -P3306 -uroot -p123456 -e "truncate table test.wordcount;"
[root@CloudDeskTop bin]# ./mysql -h192.168.154.134 -P3306 -uroot -p123456 -e "select * from test.wordcount;"

#準備源數據
[hadoop@CloudDeskTop jdbc]$ hdfs dfs -put testJDBC.txt /spark/input/
[hadoop@master02 ~]$ hdfs dfs -ls /spark/
Found 1 items
drwxr-xr-x - hadoop supergroup 0 2018-02-26 21:56 /spark/input
[hadoop@master02 ~]$ hdfs dfs -ls /spark/input
Found 1 items
-rw-r--r-- 3 hadoop supergroup 156 2018-02-26 21:56 /spark/input/testJDBC.txt
[hadoop@master02 ~]$ hdfs dfs -cat /spark/input/testJDBC.txt
zhnag san shi yi ge hao ren
jin tian shi yi ge hao tian qi
wo zai zhe li zuo le yi ge ce shi
yi ge guan yu scala de ce shi
welcome to mmzs
歡迎 歡迎

#提交Spark應用

首先:
[hadoop@CloudDeskTop lib]$ cd /software/spark-2.1.1/bin/
而後:
第一種提交方式:(可能會出現空指針異常的狀況)
[hadoop@CloudDeskTop bin]$ ./spark-submit --master spark://master01:7077 --class com.mmzs.bigdata.spark.core.cluster.RDDToDB /project/RDDToJDBC/package/RDDToJDBC.jar
第二種提交方式:
[hadoop@CloudDeskTop bin]$ ./spark-submit --master spark://master01:7077 --class com.mmzs.bigdata.spark.core.cluster.RDDToDB --jars /software/spark-2.1.1/jars/mysql-connector-java-3.0.17-ga-bin.jar /project/RDDToJDBC/package/RDDToJDBC.jar

C、測試關係數據庫中是否已經有數據
 [root@CloudDeskTop bin]# ./mysql -h192.168.154.134 -P3306 -uroot -p123456 -e "select * from test.wordcount;"

相關文章
相關標籤/搜索