spark讀取myslq優化--單機版

1.依賴環境:java

<dependencies>
    <dependency>
        <groupId>org.scala-lang</groupId>
        <artifactId>scala-library</artifactId>
        <version>2.10.4</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.10</artifactId>
        <version>2.2.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.10</artifactId>
        <version>2.2.0</version>
    </dependency>
    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <version>5.1.37</version>
    </dependency>
</dependencies>

2.實現方式:mysql

val conf = new SparkConf().setMaster("local[*]").setAppName("msyql數據讀取")
  val spark = SparkSession.builder().config(conf).getOrCreate()

  val url = "jdbc:mysql://localhost:3306/hisms_sn?user=root&password=root"
  val prop = new Properties()
  val properties=Map("url"->"jdbc:mysql://192.168.0.135:3306/disease-qy?useUnicode=true&characterEncoding=UTF-8",
    "driver"->"com.mysql.jdbc.Driver",
    "user"->"root",
    "password"->"root")


  //讀取mysql的5中方式

  //1.不指定查詢條件---並行度爲1
  def method1(): Unit ={
    val df = spark.read.jdbc(url,"t_kc21k1",prop)
    println(df.count())
    println(df.rdd.partitions.size)
    df.show(5)
  }
  //2.指定數據庫字段的範圍--並行度爲5

/**
  * 方式二:指定數據庫字段的範圍
  * 經過lowerBound和upperBound 指定分區的範圍
  * 經過columnName 指定分區的列(只支持整形)
  * 經過numPartitions 指定分區數量 (不宜過大)
  *
*/
def method2(): Unit ={
  val lowerBound = 1
  val upperBound = 100000
  val numPartitions = 5
    val df = spark.read.jdbc(url,"t_kc21k1","id",lowerBound,upperBound,numPartitions,prop)
    println(df.count())
    println(df.rdd.partitions.size)
    df.show(5)
  }
  //3.根據任意字段進行分區--並行度爲2
  def method3(): Unit ={
    //經過predicates將數據根據akc194分爲2個區
    val predicates = Array[String]("akc194 <= '2016-06-30'", "akc194 <= '2017-01-01' and akc194>'2016-06-30'")
    val df = spark.read.jdbc(url,"t_kc21k1",predicates,prop)
    println(df.count())
    println(df.rdd.partitions.size)
    df.show(5)
  }

  //4.經過load獲取---與method1同樣 並行度爲1
  def method4(): Unit ={
    val df = spark.read.format("jdbc").options(Map("url"->url,"dbtable"->"t_kc21k1")).option("fetchSize",1000).load()
    println(df.count())
    println(df.rdd.partitions.size)
    df.show(5)
  }

  //5.加載條件查詢後的數據
  def method5(): Unit ={
    //經過predicates將數據根據akc194分爲2個區
    val query="SELECT id,aac003,id_drg,name_drg from t_kc21k1 where id>50000"
    //定要用左右括號包起來,由於dbtable的value會被當成一張table做查詢,mysql connector會自動dbtable後面加上where 1=1
    val df = spark.read.format("jdbc").options(Map("url"->url,"dbtable"->s"($query)kc21k1")).load()
    println(df.count())
    println(df.rdd.partitions.size)
    df.show(5)
  }

經過增長分區讀取數據,只是增長了並行度,但若是對單機版的spark,仍是不能減小內存的使用,spark讀取數據庫的規則就是該數據提取至內存,再作內存計算。sql

問題:數據庫

    windows上使用單機版spark,不依賴hive環境,讀取mysql數據表很大的時候,作join操做,sparksql容易發生內存溢出,apache

1.目前只能經過減小數據的讀取方式方式內存爆炸----好比:根據結果只選取須要的字段。windows

2.能夠同配置使用hive環境,sparksql將會藉助hive環境,而不依賴本地內存作計算,防止內存溢出。fetch

相關文章
相關標籤/搜索