Spark Core、Spark-SQL與Spark-Streaming都是相同的,編寫好以後打成jar包使用spark-submit命令提交到集羣運行應用
$SPARK_HOME/bin#./spark-submit --master spark://Master01:7077 --class MainClassFullName [--files $HIVE_HOME/conf/hive-site.xml] JarNameFullPath [slices]html
說明:
--master參數用於指定提交到的Spark集羣入口,這個入口一般是Spark的Master節點(即Master進程或ResourceManager進程所在的節點),若是須要爲該參數指定一個高可用集羣則集羣節點之間使用英文逗號分割
--class參數用於指定Spark之Driver的入口Main類(必須指定該Main類的全名)
若是使用Spark操做Hive倉庫則須要使用--files參數指定Hive的配置文件
若是使用Spark操做關係數據庫則須要將關係數據庫的驅動包放置於Spark安裝目錄下的library目錄下(在Spark2.x中應該放置於jars目錄下),如:
[hadoop@CloudDeskTop jars]$ pwd
/software/spark-2.1.1/jars
JarNameFullPath表示的是提交的Spark應用所在的JAR包全名(最好指定爲絕對的全路徑)
slices:表示的是讀取數據的並行度(值爲一個數值,根據實際的物理內存配置來指定,內存較小時指定爲1或者不用指定),通常在Streaming應用中是不須要指定的前端
典型業務場景描述:將CloudDeskTop客戶端本地的數據,經過Spark處理,而後將結果寫入遠端關係數據庫中,供前端在線事務系統使用java
[hadoop@CloudDeskTop software]$ cd /project/RDDToJDBC/
[hadoop@CloudDeskTop RDDToJDBC]$ mkdir -p lib
[hadoop@CloudDeskTop RDDToJDBC]$ ls
bin lib srcnode
2.一、將MySql的jar包拷貝到工程目錄RDDToJDBC下的lib目錄下
[hadoop@CloudDeskTop software]$ cp -a /software/hive-1.2.2/lib/mysql-connector-java-3.0.17-ga-bin.jar /project/RDDToJDBC/lib/
2.一、將Spark的開發庫Spark2.1.1-All追加到RDDToJDBC工程的classpath路徑中去(能夠經過添加用戶庫的方式來解決);Spark2.1.1-All中包含哪些包,請點擊此處mysql
1 package com.mmzs.bigdata.spark.core.local; 2 3 import java.io.File; 4 import java.sql.Connection; 5 import java.sql.DriverManager; 6 import java.sql.PreparedStatement; 7 import java.sql.SQLException; 8 import java.util.Arrays; 9 import java.util.Iterator; 10 import java.util.List; 11 12 import org.apache.spark.SparkConf; 13 import org.apache.spark.api.java.JavaPairRDD; 14 import org.apache.spark.api.java.JavaRDD; 15 import org.apache.spark.api.java.JavaSparkContext; 16 import org.apache.spark.api.java.function.FlatMapFunction; 17 import org.apache.spark.api.java.function.Function2; 18 import org.apache.spark.api.java.function.PairFunction; 19 import org.apache.spark.api.java.function.VoidFunction; 20 21 import scala.Tuple2; 22 import scala.Tuple4; 23 24 public class RDDToDB { 25 /** 26 * 全局計數器 27 */ 28 private static int count; 29 30 /** 31 * 數據庫鏈接 32 */ 33 private static Connection conn; 34 35 /** 36 * 預編譯語句 37 */ 38 private static PreparedStatement pstat; 39 40 private static final File OUT_PATH=new File("/home/hadoop/test/output"); 41 42 static{ 43 delDir(OUT_PATH); 44 try { 45 String sql="insert into wordcount(word,count) values(?,?)"; 46 String url="jdbc:mysql://127.0.0.1:3306/test?useUnicode=true&characterEncoding=utf8"; 47 Class.forName("com.mysql.jdbc.Driver"); 48 conn=DriverManager.getConnection(url, "root", "123456"); 49 pstat=conn.prepareStatement(sql); 50 } catch (ClassNotFoundException e) { 51 e.printStackTrace(); 52 } catch (SQLException e) { 53 e.printStackTrace(); 54 } 55 } 56 /** 57 * 刪除任何目錄或文件 58 * @param f 59 */ 60 private static void delDir(File f){ 61 if(!f.exists())return; 62 if(f.isFile()||(f.isDirectory()&&f.listFiles().length==0)){ 63 f.delete(); 64 return; 65 } 66 File[] files=f.listFiles(); 67 for(File fp:files)delDir(fp); 68 f.delete(); 69 } 70 71 //分批存儲 72 private static void batchSave(Tuple2<String, Integer> line,boolean isOver){ 73 try{ 74 pstat.setString(1, line._1()); 75 pstat.setInt(2, line._2()); 76 77 if(isOver){//若是結束了循環則直接寫磁盤 78 pstat.addBatch(); 79 pstat.executeBatch(); 80 pstat.clearBatch(); 81 pstat.clearParameters(); 82 }else{ //若是沒有結束則將sql語句添加到批處理中去 83 pstat.addBatch(); 84 count++; 85 if(count%100==0){ //若是滿一個批次就提交一次批處理操做 86 pstat.executeBatch(); 87 pstat.clearBatch(); 88 pstat.clearParameters(); 89 } 90 } 91 }catch(SQLException e){ 92 e.printStackTrace(); 93 } 94 } 95 96 /** 97 * 將RDD集合中的數據存儲到關係數據庫MYSql中去 98 * @param statResRDD 99 */ 100 private static void saveToDB(JavaPairRDD<String, Integer> statResRDD){ 101 final long rddNum=statResRDD.count(); 102 statResRDD.foreach(new VoidFunction<Tuple2<String,Integer>>(){ 103 private long count=0; 104 @Override 105 public void call(Tuple2<String, Integer> line) throws Exception { 106 if(++count<rddNum){ 107 batchSave(line,false); 108 }else{ 109 batchSave(line,true); 110 } 111 } 112 }); 113 114 try{ 115 if(null!=pstat)pstat.close(); 116 if(null!=conn)conn.close(); 117 }catch(SQLException e){ 118 e.printStackTrace(); 119 } 120 } 121 122 public static void main(String[] args) { 123 SparkConf conf=new SparkConf(); 124 conf.setAppName("Java Spark local"); 125 conf.setMaster("local"); 126 127 //根據Spark配置生成Spark上下文 128 JavaSparkContext jsc=new JavaSparkContext(conf); 129 130 //讀取本地的文本文件成內存中的RDD集合對象 131 JavaRDD<String> lineRdd=jsc.textFile("/home/hadoop/test/jdbc"); 132 133 //切分每一行的字串爲單詞數組,並將字串數組中的單詞字串釋放到外層的JavaRDD集合中 134 JavaRDD<String> flatMapRdd=lineRdd.flatMap(new FlatMapFunction<String,String>(){ 135 @Override 136 public Iterator<String> call(String line) throws Exception { 137 String[] words=line.split(" "); 138 List<String> list=Arrays.asList(words); 139 Iterator<String> its=list.iterator(); 140 return its; 141 } 142 }); 143 144 //爲JavaRDD集合中的每個單詞進行計數,將其轉換爲元組 145 JavaPairRDD<String, Integer> mapRdd=flatMapRdd.mapToPair(new PairFunction<String, String,Integer>(){ 146 @Override 147 public Tuple2<String,Integer> call(String word) throws Exception { 148 return new Tuple2<String,Integer>(word,1); 149 } 150 }); 151 152 //根據元組中的第一個元素(Key)進行分組並統計單詞出現的次數 153 JavaPairRDD<String, Integer> reduceRdd=mapRdd.reduceByKey(new Function2<Integer,Integer,Integer>(){ 154 @Override 155 public Integer call(Integer pre, Integer next) throws Exception { 156 return pre+next; 157 } 158 }); 159 160 //將單詞元組中的元素反序以方便後續排序 161 JavaPairRDD<Integer, String> mapRdd02=reduceRdd.mapToPair(new PairFunction<Tuple2<String, Integer>,Integer,String>(){ 162 @Override 163 public Tuple2<Integer, String> call(Tuple2<String, Integer> wordTuple) throws Exception { 164 return new Tuple2<Integer,String>(wordTuple._2,wordTuple._1); 165 } 166 }); 167 168 //將JavaRDD集合中的單詞按出現次數進行將序排列 169 JavaPairRDD<Integer, String> sortRdd=mapRdd02.sortByKey(false, 1); 170 171 //排序以後將元組中的順序換回來 172 JavaPairRDD<String, Integer> mapRdd03=sortRdd.mapToPair(new PairFunction<Tuple2<Integer, String>,String,Integer>(){ 173 @Override 174 public Tuple2<String, Integer> call(Tuple2<Integer, String> wordTuple) throws Exception { 175 return new Tuple2<String, Integer>(wordTuple._2,wordTuple._1); 176 } 177 }); 178 179 //存儲統計以後的結果到磁盤文件中去 180 //mapRdd03.saveAsTextFile("/home/hadoop/test/jdbc/output"); 181 182 saveToDB(mapRdd03); 183 184 //關閉Spark上下文 185 jsc.close(); 186 } 187 }
A、啓動MySql數據庫服務sql
[root@DB03 ~]# cd /software/mysql-5.5.32/multi-data/3306/
[root@DB03 3306]# ls
data my.cnf my.cnf.bak mysqld
[root@DB03 3306]# ./mysqld start
Starting MySQL...
B、創建test庫數據庫
[root@CloudDeskTop 3306]# cd /software/mysql-5.5.32/bin/
[root@CloudDeskTop bin]# ./mysql -h192.168.154.134 -P3306 -uroot -p123456 -e "show databases;"
+--------------------+
| Database |
+--------------------+
| information_schema |
| mysql |
| performance_schema |
+--------------------+
[root@CloudDeskTop bin]# ./mysql -h192.168.154.134 -P3306 -uroot -p123456 -e "create database test character set utf8;"
[root@CloudDeskTop bin]# ./mysql -h192.168.154.134 -P3306 -uroot -p123456 -e "show databases;"
+--------------------+
| Database |
+--------------------+
| information_schema |
| mysql |
| performance_schema |
| test |
+--------------------+
C、創建wordcount表apache
[root@DB03 bin]# ./mysql -h192.168.154.134 -P3306 -uroot -p123456 -e "create table if not exists test.wordcount(wid int(11) auto_increment primary key,word varchar(30),count int(3))engine=myisam charset=utf8;"
[root@DB03 bin]# ./mysql -h192.168.154.134 -P3306 -uroot -p123456 -e "desc test.wordcount;"
+-------+-------------+------+-----+---------+----------------+
| Field | Type | Null | Key | Default | Extra |
+-------+-------------+------+-----+---------+----------------+
| wid | int(11) | NO | PRI | NULL | auto_increment |
| word | varchar(30) | YES | | NULL | |
| count | int(3) | YES | | NULL | |
+-------+-------------+------+-----+---------+----------------+
#目前數據庫表中尚未數據
[root@DB03 bin]# ./mysql -h192.168.154.134 -P3306 -uroot -p123456 -e "select * from test.wordcount;"
[hadoop@CloudDeskTop jdbc]$ pwd
/home/hadoop/test/jdbc
[hadoop@CloudDeskTop jdbc]$ ls
myuser testJDBC.txt
[hadoop@CloudDeskTop jdbc]$ cat testJDBC.txt myuser
zhnag san shi yi ge hao ren
jin tian shi yi ge hao tian qi
wo zai zhe li zuo le yi ge ce shi
yi ge guan yu scala de ce shi
welcome to mmzs
歡迎 歡迎
lisi 123456 165 1998-9-9
lisan 123ss 187 2009-10-19
wangwu 123qqwe 177 1990-8-3
[root@CloudDeskTop bin]# ./mysql -h192.168.154.134 -P3306 -uroot -p123456 -e "select * from test.wordcount;"api
典型業務場景描述:將HDFS集羣中的數據經過Spark處理以後,將結果寫入遠端關係數據庫中,供前端在線事務系統使用數組
[hadoop@CloudDeskTop software]$ cd /project/RDDToJDBC/
[hadoop@CloudDeskTop RDDToJDBC]$ mkdir -p package
[hadoop@CloudDeskTop RDDToJDBC]$ ls
bin package src
在客戶端上傳所需的mysql-connector-java-3.0.17-ga-bin.jar包:
[hadoop@CloudDeskTop jars]# pwd
/software/spark-2.1.1/jars
而後分發到集羣:
[hadoop@CloudDeskTop software]$ scp -r /software/spark-2.1.1/jars/mysql-connector-java-3.0.17-ga-bin.jar master01:/software/spark-2.1.1/jars/
[hadoop@master01 software]$ scp -r /software/spark-2.1.1/jars/mysql-connector-java-3.0.17-ga-bin.jar master02:/software/spark-2.1.1/jars/
[hadoop@master01 software]$ scp -r /software/spark-2.1.1/jars/mysql-connector-java-3.0.17-ga-bin.jar slave01:/software/spark-2.1.1/jars/
[hadoop@master01 software]$ scp -r /software/spark-2.1.1/jars/mysql-connector-java-3.0.17-ga-bin.jar slave02:/software/spark-2.1.1/jars/
[hadoop@master01 software]$ scp -r /software/spark-2.1.1/jars/mysql-connector-java-3.0.17-ga-bin.jar slave03:/software/spark-2.1.1/jars/
1 package com.mmzs.bigdata.spark.core.cluster; 2 3 import java.sql.Connection; 4 import java.sql.DriverManager; 5 import java.sql.PreparedStatement; 6 import java.sql.SQLException; 7 import java.util.Arrays; 8 import java.util.Iterator; 9 import java.util.List; 10 11 import org.apache.spark.SparkConf; 12 import org.apache.spark.api.java.JavaPairRDD; 13 import org.apache.spark.api.java.JavaRDD; 14 import org.apache.spark.api.java.JavaSparkContext; 15 import org.apache.spark.api.java.function.FlatMapFunction; 16 import org.apache.spark.api.java.function.Function2; 17 import org.apache.spark.api.java.function.PairFunction; 18 import org.apache.spark.api.java.function.VoidFunction; 19 20 import scala.Tuple2; 21 22 public class RDDToDB { 23 /** 24 * 全局計數器 25 */ 26 private static int count; 27 28 /** 29 * 數據庫鏈接 30 */ 31 private static Connection conn; 32 33 /** 34 * 預編譯語句 35 */ 36 private static PreparedStatement pstat; 37 38 static{ 39 try { 40 String sql="insert into wordcount(word,count) values(?,?)"; 41 String url="jdbc:mysql://192.168.154.134:3306/test?useUnicode=true&characterEncoding=utf8"; 42 Class.forName("com.mysql.jdbc.Driver"); 43 conn=DriverManager.getConnection(url, "root", "123456"); 44 pstat=conn.prepareStatement(sql); 45 } catch (ClassNotFoundException e) { 46 e.printStackTrace(); 47 } catch (SQLException e) { 48 e.printStackTrace(); 49 } 50 } 51 52 /** 53 * 批量存儲數據 54 * @param line 55 * @throws SQLException 56 */ 57 private static void batchSave(Tuple2<String, Integer> line,boolean isOver){ 58 try{ 59 pstat.setString(1, line._1()); 60 pstat.setInt(2, line._2()); 61 62 if(isOver){//若是結束了循環則直接寫磁盤。 63 //若是RDD數據已經迭代結束,則執行剩下的批量語句。 64 pstat.addBatch(); 65 pstat.executeBatch(); 66 pstat.clearBatch(); 67 pstat.clearParameters(); 68 }else{ //若是沒有結束則將sql語句添加到批處理中去。 69 //若是RDD數據的迭代還不曾結束,則直接將當前語句添加到批處理計劃中去; 70 //可是若是批處理語句數量超過了100則沖刷一次緩衝區中批處理並重置計數器。 71 pstat.addBatch(); 72 count++; 73 if(count%100==0){ //若是滿一個批次就提交一次批處理操做 74 pstat.executeBatch(); 75 pstat.clearBatch(); 76 pstat.clearParameters(); 77 } 78 } 79 }catch(SQLException e){ 80 e.printStackTrace(); 81 } 82 } 83 84 /** 85 * 將RDD集合中的數據存儲到關係數據庫MYSql中去。 86 * 存儲結果到關係數據庫中 87 * 必須將內部類對象方法(如:call)中的操做分離到一個獨立的方法(如:batchSave)中去, 88 * 由於Spark給定的內部類API都是可序列化的,而執行JDBC操做的Statement和Connection都是不能被序列化的 89 * @param wordGroupList 90 * @throws ClassNotFoundException 91 */ 92 private static void saveToDB(JavaPairRDD<String, Integer> statResRDD){ 93 final long rddNum=statResRDD.count(); 94 statResRDD.foreach(new VoidFunction<Tuple2<String,Integer>>(){ 95 private long count=0; 96 @Override 97 public void call(Tuple2<String, Integer> line) throws Exception { 98 if(++count<rddNum){ 99 batchSave(line,false); 100 }else{ 101 batchSave(line,true); 102 } 103 } 104 }); 105 106 try{ 107 if(null!=pstat)pstat.close(); 108 if(null!=conn)conn.close(); 109 }catch(SQLException e){ 110 e.printStackTrace(); 111 } 112 } 113 114 public static void main(String[] args) { 115 SparkConf conf=new SparkConf(); 116 conf.setAppName("Java Spark Cluster"); 117 118 //根據Spark配置生成Spark上下文 119 JavaSparkContext jsc=new JavaSparkContext(conf); 120 121 //讀取本地的文本文件成內存中的RDD集合對象 122 JavaRDD<String> lineRdd=jsc.textFile("/spark/input", 1); 123 124 //切分每一行的字串爲單詞數組,並將字串數組中的單詞字串釋放到外層的JavaRDD集合中 125 JavaRDD<String> flatMapRdd=lineRdd.flatMap(new FlatMapFunction<String,String>(){ 126 @Override 127 public Iterator<String> call(String line) throws Exception { 128 String[] words=line.split(" "); 129 List<String> list=Arrays.asList(words); 130 Iterator<String> its=list.iterator(); 131 return its; 132 } 133 }); 134 135 //爲JavaRDD集合中的每個單詞進行計數,將其轉換爲元組 136 JavaPairRDD<String, Integer> mapRdd=flatMapRdd.mapToPair(new PairFunction<String, String,Integer>(){ 137 @Override 138 public Tuple2<String,Integer> call(String word) throws Exception { 139 return new Tuple2<String,Integer>(word,1); 140 } 141 }); 142 143 //根據元組中的第一個元素(Key)進行分組並統計單詞出現的次數 144 JavaPairRDD<String, Integer> reduceRdd=mapRdd.reduceByKey(new Function2<Integer,Integer,Integer>(){ 145 @Override 146 public Integer call(Integer pre, Integer next) throws Exception { 147 return pre+next; 148 } 149 }); 150 151 //將單詞元組中的元素反序以方便後續排序 152 JavaPairRDD<Integer, String> mapRdd02=reduceRdd.mapToPair(new PairFunction<Tuple2<String, Integer>,Integer,String>(){ 153 @Override 154 public Tuple2<Integer, String> call(Tuple2<String, Integer> wordTuple) throws Exception { 155 return new Tuple2<Integer,String>(wordTuple._2,wordTuple._1); 156 } 157 }); 158 159 //將JavaRDD集合中的單詞按出現次數進行將序排列 160 JavaPairRDD<Integer, String> sortRdd=mapRdd02.sortByKey(false, 1); 161 162 //排序以後將元組中的順序換回來 163 JavaPairRDD<String, Integer> mapRdd03=sortRdd.mapToPair(new PairFunction<Tuple2<Integer, String>,String,Integer>(){ 164 @Override 165 public Tuple2<String, Integer> call(Tuple2<Integer, String> wordTuple) throws Exception { 166 return new Tuple2<String, Integer>(wordTuple._2,wordTuple._1); 167 } 168 }); 169 170 //存儲統計以後的結果到磁盤文件中去 171 //mapRdd03.saveAsTextFile("/spark/output"); 172 173 saveToDB(mapRdd03); 174 175 //關閉Spark上下文 176 jsc.close(); 177 } 178 }
說明:
在集羣模式下,Spark操做關係數據庫是經過啓動一個Job來完成的,而啓動Job則是經過RDD的操做來觸發的,所以在Spark集羣模式下其關係數據庫的全部操做必須位於RDD操做級別纔是有效的,不然數據的操做將沒法影響到關係數據庫中去,而RDD級別以外的操做都屬於Spark Core的客戶端Driver級別(好比:SparkSQL和SparkStreaming),在上面的代碼中,只有RDD對象在被foreachXXX時纔會進入到SparkCore級別的Job操做,在RDD以外的操做是屬於Driver級別的操做,沒法啓動Job。
在基於RDD級別的SparkCore操做過程當中,其數據都是被封裝成Job提交到集羣,並在集羣的各個節點上執行分配的Task,數據在各個Task節點之間傳遞須要數據自己支持可序列化,所以在Spark應用中出現的高頻率內部類對象(好比上面的VoidFunction)都必須支持可序列化,這意味着在這些內部類對象中出現的成員也必須是可序列化的,所以咱們在這些內部類對象所在的上下文中編寫代碼時必須注意不能出現不可序列化的對象或引用(如不能出現基於瞬態的流化對象Connection、Statement、Thread等),即在這些內部類對象上下文中出現的對象引用必須是實現了java.io.Seralizable接口的。
[hadoop@CloudDeskTop ~]$ cd /project/RDDToJDBC/bin/
[hadoop@CloudDeskTop bin]$ ls
com mysql-connector-java-3.0.17-ga-bin.jar
[hadoop@CloudDeskTop bin]$ jar -cvfe /project/RDDToJDBC/package/RDDToJDBC.jar com.mmzs.bigdata.spark.core.cluster.RDDToDB com/
[hadoop@CloudDeskTop bin]$ cd ../package
[hadoop@CloudDeskTop package]$ ls
RDDToJDBC.jar
A、啓動spark集羣運行環境:[hadoop@master01 install]$ sh start-total.sh
#!/bin/bash echo "請首先確認你已經切換到hadoop用戶" #啓動zookeeper集羣 for node in hadoop@slave01 hadoop@slave02 hadoop@slave03;do ssh $node "source /etc/profile; cd /software/zookeeper-3.4.10/bin/; ./zkServer.sh start; jps";done #開啓dfs集羣 cd /software/ && start-dfs.sh && jps #開啓spark集羣 #啓動master01的Master進程,slave節點的Worker進程 cd /software/spark-2.1.1/sbin/ && ./start-master.sh && ./start-slaves.sh && jps #啓動master02的Master進程 ssh hadoop@master02 "cd /software/spark-2.1.1/sbin/; ./start-master.sh; jps" #spark集羣的日誌服務,通常不開,由於比較佔資源 #cd /software/spark-2.1.1/sbin/ && ./start-history-server.sh && cd - && jps start-spark.sh
B、在CloudDeskTop客戶端節點上提交Spark應用
#將數據庫中的舊數據刪除掉
[root@CloudDeskTop bin]# pwd
/software/mysql-5.5.32/bin
[root@CloudDeskTop bin]# ./mysql -h192.168.154.134 -P3306 -uroot -p123456 -e "truncate table test.wordcount;"
[root@CloudDeskTop bin]# ./mysql -h192.168.154.134 -P3306 -uroot -p123456 -e "select * from test.wordcount;"
#準備源數據
[hadoop@CloudDeskTop jdbc]$ hdfs dfs -put testJDBC.txt /spark/input/
[hadoop@master02 ~]$ hdfs dfs -ls /spark/
Found 1 items
drwxr-xr-x - hadoop supergroup 0 2018-02-26 21:56 /spark/input
[hadoop@master02 ~]$ hdfs dfs -ls /spark/input
Found 1 items
-rw-r--r-- 3 hadoop supergroup 156 2018-02-26 21:56 /spark/input/testJDBC.txt
[hadoop@master02 ~]$ hdfs dfs -cat /spark/input/testJDBC.txt
zhnag san shi yi ge hao ren
jin tian shi yi ge hao tian qi
wo zai zhe li zuo le yi ge ce shi
yi ge guan yu scala de ce shi
welcome to mmzs
歡迎 歡迎
#提交Spark應用
首先: [hadoop@CloudDeskTop lib]$ cd /software/spark-2.1.1/bin/ 而後: 第一種提交方式:(可能會出現空指針異常的狀況) [hadoop@CloudDeskTop bin]$ ./spark-submit --master spark://master01:7077 --class com.mmzs.bigdata.spark.core.cluster.RDDToDB /project/RDDToJDBC/package/RDDToJDBC.jar 第二種提交方式: [hadoop@CloudDeskTop bin]$ ./spark-submit --master spark://master01:7077 --class com.mmzs.bigdata.spark.core.cluster.RDDToDB --jars /software/spark-2.1.1/jars/mysql-connector-java-3.0.17-ga-bin.jar /project/RDDToJDBC/package/RDDToJDBC.jar
C、測試關係數據庫中是否已經有數據
[root@CloudDeskTop bin]# ./mysql -h192.168.154.134 -P3306 -uroot -p123456 -e "select * from test.wordcount;"