數據的讀取與保存

                數據的讀取與保存java

                                     做者:尹正傑mysql

版權聲明:原創做品,謝絕轉載!不然將追究法律責任。算法

 

 

 

一.數據讀取與保存概述sql

  Spark的數據讀取及數據保存能夠從兩個維度來做區分:文件格式以及文件系統。

  文件格式分爲:
    Text文件
    Json文件
    Csv文件
    Sequence文件
    Object文件
    ...

  文件系統分爲:
    本地文件系統
    HDFS
    HBASE
    MySQl
    ...

 

二.文件類數據讀取與保存數據庫

1>.Text文件apache

package com.yinzhengjie.bigdata.spark.reader

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object TextDemo {
  def main(args: Array[String]): Unit = {
    //初始化配置信息及SparkContext
    val sparkConf: SparkConf = new SparkConf().setAppName("TextFile").setMaster("local[*]")
    val sc = new SparkContext(sparkConf)

    //讀取hdfs上的數據
    val passwd:RDD[String] = sc.textFile("hdfs://hadoop101.yinzhengjie.org.cn:9000/yinzhengjie/etc/passwd")

    //遍歷已經讀取到的數據
    passwd.foreach(println)

    //釋放資源
    sc.stop()

  }
}
textFile(String)讀取文件案例
package com.yinzhengjie.bigdata.spark.transformations.action

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

object SaveAsTextFileOperate {
  def main(args: Array[String]): Unit = {
    //建立SparkConf對象
    val  config:SparkConf = new SparkConf()
    config.setMaster("local[*]")
    config.setAppName("WordCount")

    //建立Spark上下文對象
    val sc = new SparkContext(config)


    val listRDD:RDD[(String,Int)] = sc.parallelize(List(("A",130),("B",121),("A",140),("B",113),("A",127)),2)

    /**
      *   將數據集的元素以textfile的形式保存到HDFS文件系統或者其餘支持的文件系統,對於每一個元素,Spark將會調用toString方法,將它裝換爲文件中的文本
      */
    listRDD.saveAsTextFile("E:\\yinzhengjie\\bigdata\\spark\\text")
  }
}
saveAsTextFile(path)保存文件案例

2>.Json文件json

{"name":"yinzhengjie","passwd":"2020"}
{"name":"Jason","passwd":"666666"}
{"name":"Liming","passwd":"123"}
{"name":"Jenny","passwd":"456"}
{"name":"Danny","passwd":"789"}
user.json文件內容
package com.yinzhengjie.bigdata.spark.reader

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import scala.util.parsing.json.JSON

object JsonDemo {
  def main(args: Array[String]): Unit = {
    //初始化配置信息及SparkContext
    val sparkConf: SparkConf = new SparkConf().setAppName("JsonFile").setMaster("local[*]")
    val sc = new SparkContext(sparkConf)

    /**
      *   若是JSON文件中每一行就是一個JSON記錄,那麼能夠經過將JSON文件當作文本文件來讀取,而後利用相關的JSON庫對每一條數據進行JSON解析。
      */
    val user:RDD[String] = sc.textFile("E:\\yinzhengjie\\bigdata\\input\\json\\user.json")

    //解析json數據
    val result:RDD[Option[Any]] = user.map(JSON.parseFull)

    //遍歷已經讀取到的數據
    result.foreach(println)

    //釋放資源
    sc.stop()
  }
}
讀取json文件案例

3>.Sequence文件ide

  SequenceFile文件是Hadoop用來存儲二進制形式的key-value對而設計的一種平面文件(Flat File)。Spark 有專門用來讀取 SequenceFile 的接口。

  在SparkContext中,能夠調用 sequenceFile[ keyClass, valueClass](path)。
  舒適提示:
    SequenceFile文件只針對PairRDD
package com.yinzhengjie.bigdata.spark.transformations.action

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

object SaveAsSequenceFileOperate {
  def main(args: Array[String]): Unit = {
      //建立SparkConf對象
      val  config:SparkConf = new SparkConf()
      config.setMaster("local[*]")
      config.setAppName("WordCount")

      //建立Spark上下文對象
      val sc = new SparkContext(config)


      val listRDD:RDD[(String,Int)] = sc.parallelize(List(("A",130),("B",121),("A",140),("B",113),("A",127)),2)

      /**
        *   將數據集中的元素以Hadoop sequencefile的格式保存到指定的目錄下,能夠使HDFS或者其餘Hadoop支持的文件系統。
        */
      listRDD.saveAsSequenceFile("E:\\yinzhengjie\\bigdata\\spark\\sequence")
  }
}
saveAsSequenceFile(path)案例

4>.對象文件函數

  對象文件是將對象序列化後保存的文件,採用Java的序列化機制。

  能夠經過objectFile[k,v](path) 函數接收一個路徑,讀取對象文件,返回對應的 RDD,也能夠經過調用saveAsObjectFile() 實現對對象文件的輸出。由於是序列化因此要指定類型。
package com.yinzhengjie.bigdata.spark.transformations.action

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

object SaveAsObjectFileOperate {
  def main(args: Array[String]): Unit = {
      //建立SparkConf對象
      val config: SparkConf = new SparkConf()
      config.setMaster("local[*]")
      config.setAppName("WordCount")

      //建立Spark上下文對象
      val sc = new SparkContext(config)


      val listRDD: RDD[(String, Int)] = sc.parallelize(List(("A", 130), ("B", 121), ("A", 140), ("B", 113), ("A", 127)), 2)

      /**
        * 用於將RDD中的元素序列化成對象,存儲到文件中。
        */
      listRDD.saveAsObjectFile("E:\\yinzhengjie\\bigdata\\spark\\object")
  }
}
saveAsObjectFile(path)案例

 

三.文件系統類數據讀取與保存oop

1>.HDFS

  Spark的整個生態系統與Hadoop是徹底兼容的,因此對於Hadoop所支持的文件類型或者數據庫類型,Spark也一樣支持。

  另外,因爲Hadoop的API有新舊兩個版本,因此Spark爲了可以兼容Hadoop全部的版本,也提供了兩套建立操做接口.對於外部存儲建立操做而言,hadoopRDD和newHadoopRDD是最爲抽象的兩個函數接口,主要包含如下四個參數.
    輸入格式(InputFormat):
      制定數據輸入的類型,如TextInputFormat等,新舊兩個版本所引用的版本分別是org.apache.hadoop.mapred.InputFormat和org.apache.hadoop.mapreduce.InputFormat(NewInputFormat)
    鍵類型: 
      指定[K,V]鍵值對中K的類型
    值類型:
      指定[K,V]鍵值對中V的類型
    分區值:
      指定由外部存儲生成的RDD的partition數量的最小值,若是沒有指定,系統會使用默認值defaultMinSplits
  舒適提示:
    其餘建立操做的API接口都是爲了方便最終的Spark程序開發者而設置的,是這兩個接口的高效實現版本.例如,對於textFile而言,只有path這個指定文件路徑的參數,其餘參數在系統內部指定了默認值。
    在Hadoop中以壓縮形式存儲的數據,不須要指定解壓方式就可以進行讀取,由於Hadoop自己有一個解壓器會根據壓縮文件的後綴推斷解壓算法進行解壓.     若是用Spark從Hadoop中讀取某種類型的數據不知道怎麼讀取的時候,上網查找一個使用map-reduce的時候是怎麼讀取這種這種數據的,而後再將對應的讀取方式改寫成上面的hadoopRDD和newAPIHadoopRDD兩個類就好了.

2>.MySQL

安裝MariaDB數據庫:
[root@hadoop101.yinzhengjie.org.cn ~]# yum -y install mariadb-
server

將數據庫設置爲開機自啓動:
[root@hadoop101.yinzhengjie.org.cn ~]# systemctl enable mariadb
Created symlink from /etc/systemd/system/multi-user.target.wants/mariadb.service to /usr/lib/systemd/system/mariadb.service.
[root@hadoop101.yinzhengjie.org.cn ~]# 
[root@hadoop101.yinzhengjie.org.cn ~]# systemctl start mariadb
[root@hadoop101.yinzhengjie.org.cn ~]# 

登陸數據庫,建立spark數據庫並受權用戶登陸:
MariaDB [(none)]> CREATE SCHEMA IF NOT EXISTS spark DEFAULT CHARACTER SET = utf8mb4;
Query OK, 1 row affected (0.00 sec)

MariaDB [(none)]> 
MariaDB [(none)]> CREATE USER jason@'172.200.%' IDENTIFIED BY 'yinzhengjie';
Query OK, 0 rows affected (0.00 sec)

MariaDB [(none)]> 
MariaDB [(none)]> GRANT ALL ON spark.* TO jason@'172.200.%';
Query OK, 0 rows affected (0.00 sec)

MariaDB [(none)]> 
登陸數據庫,建立spark數據庫並受權用戶登陸(詳細步驟戳這裏)
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.27</version>
        </dependency>
往pom.xml添加依賴關係
package com.yinzhengjie.bigdata.spark.reader

import java.sql.DriverManager

import org.apache.spark.rdd.JdbcRDD
import org.apache.spark.{SparkConf, SparkContext}

object MySQLDemo {
  def main(args: Array[String]): Unit = {
    //1.建立spark配置信息
    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("JdbcRDD")

    //2.建立SparkContext
    val sc = new SparkContext(sparkConf)

    //3.定義鏈接mysql的參數
    val driver = "com.mysql.jdbc.Driver"
    val url = "jdbc:mysql://hadoop101.yinzhengjie.org.cn:3306/spark"
    val username = "jason"
    val passwd = "yinzhengjie"
    val sql = "select name,passwd from user where id >= ? and id <= ?;"
    val lowerBound = 1
    val upperBound = 3
    val numPartitions = 2
    //建立JdbcRDD,訪問我們的數據庫(固然你得自定義數據庫的表信息喲)
    val jdbcRDD = new JdbcRDD(
      //指定SparkContext
      sc,
      //建立MySQL數據庫鏈接對象(指定一個無參函數,其返回值是一個鏈接對象)
      () => {
        Class.forName(driver)   //指定數據庫的鏈接驅動
        DriverManager.getConnection(url, username, passwd)  //獲取鏈接
      },
      //定義要執行的SQL語句
      sql,
      //指定查詢的下限
      lowerBound,
      //指定查詢的上限
      upperBound,
      //指定分區數
      numPartitions,
      //對查詢的結果進行操做
      (resultSet) => {
        println(resultSet.getString(1) + ", " + resultSet.getString(2)) //我就查詢了2個字段,每一個字段都是varchar類型,所以均用"getString"方法取對應列的數據
      }
    )
    jdbcRDD.collect()

    //打印最後查詢結果的條數
    println(jdbcRDD.count())

    //釋放資源
    sc.stop()

  }
}
從MySQL中讀取的數據
package com.yinzhengjie.bigdata.spark.writer

import java.sql.{Connection, DriverManager, PreparedStatement}

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.{JdbcRDD, RDD}

object MySQLDemo {
  def main(args: Array[String]): Unit = {
    //1.建立spark配置信息
    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("JdbcRDD")

    //2.建立SparkContext
    val sc = new SparkContext(sparkConf)

    //3.定義鏈接mysql的參數
    val driver = "com.mysql.jdbc.Driver"
    val url = "jdbc:mysql://hadoop101.yinzhengjie.org.cn:3306/spark"
    val username = "jason"
    val password = "yinzhengjie"


    //4.準備要寫入的數據
   val dataRDD:RDD[(String,String)] = sc.makeRDD(List(("zhangsan","2020"),("lisi","2020"),("wangwu","2020"),("zhaoliu","2020")))

    /**
      *   5.保存數據
      *
      *   舒適提示:
      *     foreach是行動算子,上面的dataRDD數據可能會發送到不一樣的Executor中,所以寫入數據庫的順序可能和咱們定義List的順序不一樣喲~
      */

    dataRDD.foreachPartition(datas =>{
      Class.forName(driver)   //指定數據庫的鏈接驅動
      val conn:Connection = DriverManager.getConnection(url, username, password)  //獲取鏈接
      datas.foreach{
        case (name,passwd) => {
          val sql = "insert into user (name,passwd) values (?,?);"//定義SQL
          val statment:PreparedStatement = conn.prepareStatement(sql)//解析SQL語句
          statment.setString(1,name)
          statment.setString(2,passwd)
          statment.executeUpdate()
          statment.close()
        }
      }
      conn.close()  //釋放鏈接
    })

    //釋放資源
    sc.stop()
  }
}
往MySQL中寫入數據
相關文章
相關標籤/搜索