Spark讀取數據庫(Mysql)的四種方式講解

2014 Spark亞太峯會會議資料下載、《Hadoop從入門到上手企業開發視頻下載[70集]》、《煉數成金-Spark大數據平臺視頻百度網盤免費下載》、《Spark 1.X 大數據平臺V2百度網盤下載[完整版]》、《深刻淺出Hive視頻教程百度網盤免費下載》   目前Spark支持四種方式從數據庫中讀取數據,這裏以Mysql爲例進行介紹。 文章目錄 [hide] 1 1、不指定查詢條件 2 2、指定數據庫字段的範圍 3 3、根據任意字段進行分區 4 4、經過load獲取 1、不指定查詢條件   這個方式連接MySql的函數原型是: def jdbc(url: String, table: String, properties: Properties): DataFrame   咱們只須要提供Driver的url,須要查詢的表名,以及鏈接表相關屬性properties。下面是具體例子: val url = "jdbc:mysql://www.iteblog.com:3306/iteblog?user=iteblog&password=iteblog" val prop = new Properties() val df = sqlContext.read.jdbc(url, "iteblog", prop ) println(df.count()) println(df.rdd.partitions.size)   咱們運行上面的程序,能夠看到df.rdd.partitions.size輸出結果是1,這個結果的含義是iteblog表的全部數據都是由RDD的一個分區處理的,因此說,若是你這個表很大,極可能會出現OOM WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 14, spark047219): java.lang.OutOfMemoryError: GC overhead limit exceeded at com.mysql.jdbc.MysqlIO.reuseAndReadPacket(MysqlIO.java:3380) 這種方式在數據量大的時候不建議使用。 若是想及時瞭解Spark、Hadoop或者Hbase相關的文章,歡迎關注微信公共賬號:iteblog_hadoop 2、指定數據庫字段的範圍   這種方式就是經過指定數據庫中某個字段的範圍,可是遺憾的是,這個字段必須是數字,來看看這個函數的函數原型: def jdbc( url: String, table: String, columnName: String, lowerBound: Long, upperBound: Long, numPartitions: Int, connectionProperties: Properties): DataFrame   前兩個字段的含義和方法一相似。columnName就是須要分區的字段,這個字段在數據庫中的類型必須是數字;lowerBound就是分區的下界;upperBound就是分區的上界;numPartitions是分區的個數。一樣,咱們也來看看如何使用: val lowerBound = 1 val upperBound = 100000 val numPartitions = 5 val url = "jdbc:mysql://www.iteblog.com:3306/iteblog?user=iteblog&password=iteblog" val prop = new Properties() val df = sqlContext.read.jdbc(url, "iteblog", "id", lowerBound, upperBound, numPartitions, prop)   這個方法能夠將iteblog表的數據分佈到RDD的幾個分區中,分區的數量由numPartitions參數決定,在理想狀況下,每一個分區處理相同數量的數據,咱們在使用的時候不建議將這個值設置的比較大,由於這可能致使數據庫掛掉!可是根據前面介紹,這個函數的缺點就是隻能使用整形數據字段做爲分區關鍵字。   這個函數在極端狀況下,也就是設置將numPartitions設置爲1,其含義和第一種方式一致。 3、根據任意字段進行分區   基於前面兩種方法的限制,Spark還提供了根據任意字段進行分區的方法,函數原型以下: def jdbc( url: String, table: String, predicates: Array[String], connectionProperties: Properties): DataFrame 這個函數相比第一種方式多了predicates參數,咱們能夠經過這個參數設置分區的依據,來看看例子: val predicates = Array[String]("reportDate <= '2014-12-31'", "reportDate > '2014-12-31' and reportDate <= '2015-12-31'") val url = "jdbc:mysql://www.iteblog.com:3306/iteblog?user=iteblog&password=iteblog" val prop = new Properties() val df = sqlContext.read.jdbc(url, "iteblog", predicates, prop) 最後rdd的分區數量就等於predicates.length。 4、經過load獲取 Spark還提供經過load的方式來讀取數據。 sqlContext.read.format("jdbc").options( Map("url" -> "jdbc:mysql://www.iteblog.com:3306/iteblog?user=iteblog&password=iteblog", "dbtable" -> "iteblog")).load()   options函數支持url、driver、dbtable、partitionColumn、lowerBound、upperBound以及numPartitions選項,細心的同窗確定發現這個和方法二的參數一致。是的,其內部實現原理部分和方法二大致一致。同時load方法還支持json、orc等數據源的讀取。
相關文章
相關標籤/搜索