數據的讀取與保存java
做者:尹正傑mysql
版權聲明:原創做品,謝絕轉載!不然將追究法律責任。算法
一.數據讀取與保存概述sql
Spark的數據讀取及數據保存能夠從兩個維度來做區分:文件格式以及文件系統。
文件格式分爲:
Text文件
Json文件
Csv文件
Sequence文件
Object文件
...
文件系統分爲:
本地文件系統
HDFS
HBASE
MySQl
...
二.文件類數據讀取與保存數據庫
1>.Text文件apache
package com.yinzhengjie.bigdata.spark.reader import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object TextDemo { def main(args: Array[String]): Unit = { //初始化配置信息及SparkContext val sparkConf: SparkConf = new SparkConf().setAppName("TextFile").setMaster("local[*]") val sc = new SparkContext(sparkConf) //讀取hdfs上的數據 val passwd:RDD[String] = sc.textFile("hdfs://hadoop101.yinzhengjie.org.cn:9000/yinzhengjie/etc/passwd") //遍歷已經讀取到的數據 passwd.foreach(println) //釋放資源 sc.stop() } }
package com.yinzhengjie.bigdata.spark.transformations.action import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.rdd.RDD object SaveAsTextFileOperate { def main(args: Array[String]): Unit = { //建立SparkConf對象 val config:SparkConf = new SparkConf() config.setMaster("local[*]") config.setAppName("WordCount") //建立Spark上下文對象 val sc = new SparkContext(config) val listRDD:RDD[(String,Int)] = sc.parallelize(List(("A",130),("B",121),("A",140),("B",113),("A",127)),2) /** * 將數據集的元素以textfile的形式保存到HDFS文件系統或者其餘支持的文件系統,對於每一個元素,Spark將會調用toString方法,將它裝換爲文件中的文本 */ listRDD.saveAsTextFile("E:\\yinzhengjie\\bigdata\\spark\\text") } }
2>.Json文件json
{"name":"yinzhengjie","passwd":"2020"} {"name":"Jason","passwd":"666666"} {"name":"Liming","passwd":"123"} {"name":"Jenny","passwd":"456"} {"name":"Danny","passwd":"789"}
package com.yinzhengjie.bigdata.spark.reader import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} import scala.util.parsing.json.JSON object JsonDemo { def main(args: Array[String]): Unit = { //初始化配置信息及SparkContext val sparkConf: SparkConf = new SparkConf().setAppName("JsonFile").setMaster("local[*]") val sc = new SparkContext(sparkConf) /** * 若是JSON文件中每一行就是一個JSON記錄,那麼能夠經過將JSON文件當作文本文件來讀取,而後利用相關的JSON庫對每一條數據進行JSON解析。 */ val user:RDD[String] = sc.textFile("E:\\yinzhengjie\\bigdata\\input\\json\\user.json") //解析json數據 val result:RDD[Option[Any]] = user.map(JSON.parseFull) //遍歷已經讀取到的數據 result.foreach(println) //釋放資源 sc.stop() } }
3>.Sequence文件ide
SequenceFile文件是Hadoop用來存儲二進制形式的key-value對而設計的一種平面文件(Flat File)。Spark 有專門用來讀取 SequenceFile 的接口。
在SparkContext中,能夠調用 sequenceFile[ keyClass, valueClass](path)。
舒適提示:
SequenceFile文件只針對PairRDD
package com.yinzhengjie.bigdata.spark.transformations.action import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.rdd.RDD object SaveAsSequenceFileOperate { def main(args: Array[String]): Unit = { //建立SparkConf對象 val config:SparkConf = new SparkConf() config.setMaster("local[*]") config.setAppName("WordCount") //建立Spark上下文對象 val sc = new SparkContext(config) val listRDD:RDD[(String,Int)] = sc.parallelize(List(("A",130),("B",121),("A",140),("B",113),("A",127)),2) /** * 將數據集中的元素以Hadoop sequencefile的格式保存到指定的目錄下,能夠使HDFS或者其餘Hadoop支持的文件系統。 */ listRDD.saveAsSequenceFile("E:\\yinzhengjie\\bigdata\\spark\\sequence") } }
4>.對象文件函數
對象文件是將對象序列化後保存的文件,採用Java的序列化機制。
能夠經過objectFile[k,v](path) 函數接收一個路徑,讀取對象文件,返回對應的 RDD,也能夠經過調用saveAsObjectFile() 實現對對象文件的輸出。由於是序列化因此要指定類型。
package com.yinzhengjie.bigdata.spark.transformations.action import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.rdd.RDD object SaveAsObjectFileOperate { def main(args: Array[String]): Unit = { //建立SparkConf對象 val config: SparkConf = new SparkConf() config.setMaster("local[*]") config.setAppName("WordCount") //建立Spark上下文對象 val sc = new SparkContext(config) val listRDD: RDD[(String, Int)] = sc.parallelize(List(("A", 130), ("B", 121), ("A", 140), ("B", 113), ("A", 127)), 2) /** * 用於將RDD中的元素序列化成對象,存儲到文件中。 */ listRDD.saveAsObjectFile("E:\\yinzhengjie\\bigdata\\spark\\object") } }
三.文件系統類數據讀取與保存oop
1>.HDFS
Spark的整個生態系統與Hadoop是徹底兼容的,因此對於Hadoop所支持的文件類型或者數據庫類型,Spark也一樣支持。
另外,因爲Hadoop的API有新舊兩個版本,因此Spark爲了可以兼容Hadoop全部的版本,也提供了兩套建立操做接口.對於外部存儲建立操做而言,hadoopRDD和newHadoopRDD是最爲抽象的兩個函數接口,主要包含如下四個參數. 輸入格式(InputFormat):
制定數據輸入的類型,如TextInputFormat等,新舊兩個版本所引用的版本分別是org.apache.hadoop.mapred.InputFormat和org.apache.hadoop.mapreduce.InputFormat(NewInputFormat) 鍵類型:
指定[K,V]鍵值對中K的類型 值類型:
指定[K,V]鍵值對中V的類型 分區值:
指定由外部存儲生成的RDD的partition數量的最小值,若是沒有指定,系統會使用默認值defaultMinSplits
舒適提示:
其餘建立操做的API接口都是爲了方便最終的Spark程序開發者而設置的,是這兩個接口的高效實現版本.例如,對於textFile而言,只有path這個指定文件路徑的參數,其餘參數在系統內部指定了默認值。 在Hadoop中以壓縮形式存儲的數據,不須要指定解壓方式就可以進行讀取,由於Hadoop自己有一個解壓器會根據壓縮文件的後綴推斷解壓算法進行解壓. 若是用Spark從Hadoop中讀取某種類型的數據不知道怎麼讀取的時候,上網查找一個使用map-reduce的時候是怎麼讀取這種這種數據的,而後再將對應的讀取方式改寫成上面的hadoopRDD和newAPIHadoopRDD兩個類就好了.
2>.MySQL
安裝MariaDB數據庫: [root@hadoop101.yinzhengjie.org.cn ~]# yum -y install mariadb- server 將數據庫設置爲開機自啓動: [root@hadoop101.yinzhengjie.org.cn ~]# systemctl enable mariadb Created symlink from /etc/systemd/system/multi-user.target.wants/mariadb.service to /usr/lib/systemd/system/mariadb.service. [root@hadoop101.yinzhengjie.org.cn ~]# [root@hadoop101.yinzhengjie.org.cn ~]# systemctl start mariadb [root@hadoop101.yinzhengjie.org.cn ~]# 登陸數據庫,建立spark數據庫並受權用戶登陸: MariaDB [(none)]> CREATE SCHEMA IF NOT EXISTS spark DEFAULT CHARACTER SET = utf8mb4; Query OK, 1 row affected (0.00 sec) MariaDB [(none)]> MariaDB [(none)]> CREATE USER jason@'172.200.%' IDENTIFIED BY 'yinzhengjie'; Query OK, 0 rows affected (0.00 sec) MariaDB [(none)]> MariaDB [(none)]> GRANT ALL ON spark.* TO jason@'172.200.%'; Query OK, 0 rows affected (0.00 sec) MariaDB [(none)]>
<dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.27</version> </dependency>
package com.yinzhengjie.bigdata.spark.reader import java.sql.DriverManager import org.apache.spark.rdd.JdbcRDD import org.apache.spark.{SparkConf, SparkContext} object MySQLDemo { def main(args: Array[String]): Unit = { //1.建立spark配置信息 val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("JdbcRDD") //2.建立SparkContext val sc = new SparkContext(sparkConf) //3.定義鏈接mysql的參數 val driver = "com.mysql.jdbc.Driver" val url = "jdbc:mysql://hadoop101.yinzhengjie.org.cn:3306/spark" val username = "jason" val passwd = "yinzhengjie" val sql = "select name,passwd from user where id >= ? and id <= ?;" val lowerBound = 1 val upperBound = 3 val numPartitions = 2 //建立JdbcRDD,訪問我們的數據庫(固然你得自定義數據庫的表信息喲) val jdbcRDD = new JdbcRDD( //指定SparkContext sc, //建立MySQL數據庫鏈接對象(指定一個無參函數,其返回值是一個鏈接對象) () => { Class.forName(driver) //指定數據庫的鏈接驅動 DriverManager.getConnection(url, username, passwd) //獲取鏈接 }, //定義要執行的SQL語句 sql, //指定查詢的下限 lowerBound, //指定查詢的上限 upperBound, //指定分區數 numPartitions, //對查詢的結果進行操做 (resultSet) => { println(resultSet.getString(1) + ", " + resultSet.getString(2)) //我就查詢了2個字段,每一個字段都是varchar類型,所以均用"getString"方法取對應列的數據 } ) jdbcRDD.collect() //打印最後查詢結果的條數 println(jdbcRDD.count()) //釋放資源 sc.stop() } }
package com.yinzhengjie.bigdata.spark.writer import java.sql.{Connection, DriverManager, PreparedStatement} import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.rdd.{JdbcRDD, RDD} object MySQLDemo { def main(args: Array[String]): Unit = { //1.建立spark配置信息 val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("JdbcRDD") //2.建立SparkContext val sc = new SparkContext(sparkConf) //3.定義鏈接mysql的參數 val driver = "com.mysql.jdbc.Driver" val url = "jdbc:mysql://hadoop101.yinzhengjie.org.cn:3306/spark" val username = "jason" val password = "yinzhengjie" //4.準備要寫入的數據 val dataRDD:RDD[(String,String)] = sc.makeRDD(List(("zhangsan","2020"),("lisi","2020"),("wangwu","2020"),("zhaoliu","2020"))) /** * 5.保存數據 * * 舒適提示: * foreach是行動算子,上面的dataRDD數據可能會發送到不一樣的Executor中,所以寫入數據庫的順序可能和咱們定義List的順序不一樣喲~ */ dataRDD.foreachPartition(datas =>{ Class.forName(driver) //指定數據庫的鏈接驅動 val conn:Connection = DriverManager.getConnection(url, username, password) //獲取鏈接 datas.foreach{ case (name,passwd) => { val sql = "insert into user (name,passwd) values (?,?);"//定義SQL val statment:PreparedStatement = conn.prepareStatement(sql)//解析SQL語句 statment.setString(1,name) statment.setString(2,passwd) statment.executeUpdate() statment.close() } } conn.close() //釋放鏈接 }) //釋放資源 sc.stop() } }