Spark支持四種方式從數據庫中讀取數據

目前 Spark支持四種方式從數據庫中讀取數據,這裏以Mysql爲例進行介紹。

1、不指定查詢條件

  這個方式連接MySql的函數原型是:mysql

def jdbc(url : String, table : String, properties : Properties) : DataFrame

  咱們只須要提供Driver的url,須要查詢的表名,以及鏈接表相關屬性properties。下面是具體例子:sql

 
val prop = new Properties()
val df = sqlContext.read.jdbc(url, "iteblog" , prop )
 
println(df.count())
println(df.rdd.partitions.size)

  咱們運行上面的程序,能夠看到df.rdd.partitions.size輸出結果是1,這個結果的含義是iteblog表的全部數據都是由RDD的一個分區處理的,因此說,若是你這個表很大,極可能會出現OOM數據庫

WARN TaskSetManager : Lost task 0.0 in stage 1.0 (TID 14 , spark 047219 ) :
  java.lang.OutOfMemoryError : GC overhead limit exceeded at com.mysql.jdbc.MysqlIO.reuseAndReadPacket(MysqlIO.java : 3380 )

這種方式在數據量大的時候不建議使用。json


若是想及時瞭解 Spark、Hadoop或者Hbase相關的文章,歡迎關注微信公共賬號: iteblog_hadoop

2、指定數據庫字段的範圍

  這種方式就是經過指定數據庫中某個字段的範圍,可是遺憾的是,這個字段必須是數字,來看看這個函數的函數原型:微信

def jdbc(
     url : String,
     table : String,
     columnName : String,
     lowerBound : Long,
     upperBound : Long,
     numPartitions : Int,
     connectionProperties : Properties) : DataFrame

  前兩個字段的含義和方法一相似。columnName就是須要分區的字段,這個字段在數據庫中的類型必須是數字;lowerBound就是分區的下界;upperBound就是分區的上界;numPartitions是分區的個數。一樣,咱們也來看看如何使用:ide

val lowerBound = 1
val upperBound = 100000
val numPartitions = 5
 
val prop = new Properties()
val df = sqlContext.read.jdbc(url, "iteblog" , "id" , lowerBound, upperBound, numPartitions, prop)

  這個方法能夠將iteblog表的數據分佈到RDD的幾個分區中,分區的數量由numPartitions參數決定,在理想狀況下,每一個分區處理相同數量的數據,咱們在使用的時候不建議將這個值設置的比較大,由於這可能致使數據庫掛掉!可是根據前面介紹,這個函數的缺點就是隻能使用整形數據字段做爲分區關鍵字。函數

  這個函數在極端狀況下,也就是設置將numPartitions設置爲1,其含義和第一種方式一致。oop

3、根據任意字段進行分區

  基於前面兩種方法的限制,Spark還提供了根據任意字段進行分區的方法,函數原型以下:url

def jdbc(
     url : String,
     table : String,
     predicates : Array[String],
     connectionProperties : Properties) : DataFrame

這個函數相比第一種方式多了predicates參數,咱們能夠經過這個參數設置分區的依據,來看看例子:

val predicates = Array[String]( "reportDate <= '2014-12-31'" ,
     "reportDate > '2014-12-31' and reportDate <= '2015-12-31'" )
 
val prop = new Properties()
val df = sqlContext.read.jdbc(url, "iteblog" , predicates, prop)

最後rdd的分區數量就等於predicates.length。

4、經過load獲取

Spark還提供經過load的方式來讀取數據。

sqlContext.read.format( "jdbc" ).options(
     "dbtable" -> "iteblog" )).load()

  options函數支持url、driver、dbtable、partitionColumn、lowerBound、upperBound以及numPartitions選項,細心的同窗確定發現這個和方法二的參數一致。是的,其內部實現原理部分和方法二大致一致。同時load方法還支持json、orc等數據源的讀取。