上一篇裏我提到能夠把RDD看成一個數組,這樣咱們在學習spark的API時候不少問題就能很好理解了。上篇文章裏的API也都是基於RDD是數組的數據模型而進行操做的。javascript
Spark是一個計算框架,是對mapreduce計算框架的改進,mapreduce計算框架是基於鍵值對也就是map的形式,之因此使用鍵值對是人們發現世界上大部分計算均可以使用map這樣的簡單計算模型進行計算。可是Spark裏的計算模型倒是數組形式,RDD如何處理Map的數據格式了?本篇文章就主要講解RDD是如何處理Map的數據格式。java
Pair RDD及鍵值對RDD,Spark裏建立Pair RDD也是能夠經過兩種途徑,一種是從內存裏讀取,一種是從文件讀取。python
首先是從文件讀取,上篇裏咱們看到使用textFile方法讀取文件,讀取的文件是按行組織成一個數組,要讓其變成map格式就的進行轉化,代碼以下所示:算法
/* * 測試文件數據: * x01,1,4 x02,11,1 x01,3,9 x01,2,6 x02,18,12 x03,7,9 * * */ val rddFile:RDD[(String,String)] = sc.textFile("file:///F:/sparkdata01.txt", 1).map { x => (x.split(",")(0),x.split(",")(1) + "," + x.split(",")(2)) } val rFile:RDD[String] = rddFile.keys println("=========createPairMap File=========") println(rFile.collect().mkString(","))// x01,x02,x01,x01,x02,x03 println("=========createPairMap File=========")
咱們由此能夠看到以讀取文件方式構造RDD,咱們須要使用map函數進行轉化,讓其變成map的形式。apache
下面是經過內存方式進行建立,代碼以下:編程
val rdd:RDD[(String,Int)] = sc.makeRDD(List(("k01",3),("k02",6),("k03",2),("k01",26))) val r:RDD[(String,Int)] = rdd.reduceByKey((x,y) => x + y) println("=========createPairMap=========") println(r.collect().mkString(","))// (k01,29),(k03,2),(k02,6) println("=========createPairMap=========")
RDD任然是數組形式,只不過數組的元素是("k01",3)格式是scala裏面特有的Tuple2及二元組,元組能夠看成一個集合,這個集合能夠是各類不一樣數據類型組合而成,二元組就是隻包含兩個元素的元組。數組
因而可知Pair RDD也是數組,只不過是一個元素爲二元組的數組而已,上篇裏對RDD的操做也是一樣適用於Pair RDD的。app
下面是Pair RDD的API講解,一樣咱們先說轉化操做的API:框架
reduceByKey:合併具備相同鍵的值; groupByKey:對具備相同鍵的值進行分組; keys:返回一個僅包含鍵值的RDD; values:返回一個僅包含值的RDD; sortByKey:返回一個根據鍵值排序的RDD; flatMapValues:針對Pair RDD中的每一個值應用一個返回迭代器的函數,而後對返回的每一個元素都生成一個對應原鍵的鍵值對記錄; mapValues:對Pair RDD裏每個值應用一個函數,可是不會對鍵值進行操做; combineByKey:使用不一樣的返回類型合併具備相同鍵的值; subtractByKey:操做的RDD咱們命名爲RDD1,參數RDD命名爲參數RDD,剔除掉RDD1裏和參數RDD中鍵相同的元素; join:對兩個RDD進行內鏈接; rightOuterJoin:對兩個RDD進行鏈接操做,第一個RDD的鍵必須存在,第二個RDD的鍵再也不第一個RDD裏面有那麼就會被剔除掉,相同鍵的值會被合併; leftOuterJoin:對兩個RDD進行鏈接操做,第二個RDD的鍵必須存在,第一個RDD的鍵再也不第二個RDD裏面有那麼就會被剔除掉,相同鍵的值會被合併; cogroup:將兩個RDD裏相同鍵的數據分組在一塊兒
下面就是行動操做的API了,具體以下:函數
countByKey:對每一個鍵的元素進行分別計數; collectAsMap:將結果變成一個map; lookup:在RDD裏使用鍵值查找數據
接下來我再提提那些不是很經常使用的RDD操做,具體以下:
轉化操做的:
sample:對RDD採樣;
行動操做:
take(num):返回RDD裏num個元素,隨機的; top(num):返回RDD裏最前面的num個元素,這個方法實用性還比較高; takeSample:從RDD裏返回任意一些元素; sample:對RDD裏的數據採樣; takeOrdered:從RDD裏按照提供的順序返回最前面的num個元素
接下來就是示例代碼了,以下所示:
package cn.com.sparktest import org.apache.spark.SparkConf import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.rdd.RDD import org.apache.spark.util.collection.CompactBuffer object SparkPairMap { val conf:SparkConf = new SparkConf().setAppName("spark pair map").setMaster("local[2]") val sc:SparkContext = new SparkContext(conf) /** * 構建Pair RDD */ def createPairMap():Unit = { val rdd:RDD[(String,Int)] = sc.makeRDD(List(("k01",3),("k02",6),("k03",2),("k01",26))) val r:RDD[(String,Int)] = rdd.reduceByKey((x,y) => x + y) println("=========createPairMap=========") println(r.collect().mkString(","))// (k01,29),(k03,2),(k02,6) println("=========createPairMap=========") /* * 測試文件數據: * x01,1,4 x02,11,1 x01,3,9 x01,2,6 x02,18,12 x03,7,9 * * */ val rddFile:RDD[(String,String)] = sc.textFile("file:///F:/sparkdata01.txt", 1).map { x => (x.split(",")(0),x.split(",")(1) + "," + x.split(",")(2)) } val rFile:RDD[String] = rddFile.keys println("=========createPairMap File=========") println(rFile.collect().mkString(","))// x01,x02,x01,x01,x02,x03 println("=========createPairMap File=========") } /** * 關於Pair RDD的轉化操做和行動操做 */ def pairMapRDD(path:String):Unit = { val rdd:RDD[(String,Int)] = sc.makeRDD(List(("k01",3),("k02",6),("k03",2),("k01",26))) val other:RDD[(String,Int)] = sc.parallelize(List(("k01",29)), 1) // 轉化操做 val rddReduce:RDD[(String,Int)] = rdd.reduceByKey((x,y) => x + y) println("====reduceByKey===:" + rddReduce.collect().mkString(","))// (k01,29),(k03,2),(k02,6) val rddGroup:RDD[(String,Iterable[Int])] = rdd.groupByKey() println("====groupByKey===:" + rddGroup.collect().mkString(","))// (k01,CompactBuffer(3, 26)),(k03,CompactBuffer(2)),(k02,CompactBuffer(6)) val rddKeys:RDD[String] = rdd.keys println("====keys=====:" + rddKeys.collect().mkString(","))// k01,k02,k03,k01 val rddVals:RDD[Int] = rdd.values println("======values===:" + rddVals.collect().mkString(","))// 3,6,2,26 val rddSortAsc:RDD[(String,Int)] = rdd.sortByKey(true, 1) val rddSortDes:RDD[(String,Int)] = rdd.sortByKey(false, 1) println("====rddSortAsc=====:" + rddSortAsc.collect().mkString(","))// (k01,3),(k01,26),(k02,6),(k03,2) println("======rddSortDes=====:" + rddSortDes.collect().mkString(","))// (k03,2),(k02,6),(k01,3),(k01,26) val rddFmVal:RDD[(String,Int)] = rdd.flatMapValues { x => List(x + 10) } println("====flatMapValues===:" + rddFmVal.collect().mkString(","))// (k01,13),(k02,16),(k03,12),(k01,36) val rddMapVal:RDD[(String,Int)] = rdd.mapValues { x => x + 10 } println("====mapValues====:" + rddMapVal.collect().mkString(","))// (k01,13),(k02,16),(k03,12),(k01,36) val rddCombine:RDD[(String,(Int,Int))] = rdd.combineByKey(x => (x,1), (param:(Int,Int),x) => (param._1 + x,param._2 + 1), (p1:(Int,Int),p2:(Int,Int)) => (p1._1 + p2._1,p1._2 + p2._2)) println("====combineByKey====:" + rddCombine.collect().mkString(","))//(k01,(29,2)),(k03,(2,1)),(k02,(6,1)) val rddSubtract:RDD[(String,Int)] = rdd.subtractByKey(other); println("====subtractByKey====:" + rddSubtract.collect().mkString(","))// (k03,2),(k02,6) val rddJoin:RDD[(String,(Int,Int))] = rdd.join(other) println("=====rddJoin====:" + rddJoin.collect().mkString(","))// (k01,(3,29)),(k01,(26,29)) val rddRight:RDD[(String,(Option[Int],Int))] = rdd.rightOuterJoin(other) println("====rightOuterJoin=====:" + rddRight.collect().mkString(","))// (k01,(Some(3),29)),(k01,(Some(26),29)) val rddLeft:RDD[(String,(Int,Option[Int]))] = rdd.leftOuterJoin(other) println("=====rddLeft=====:" + rddLeft.collect().mkString(","))// (k01,(3,Some(29))),(k01,(26,Some(29))),(k03,(2,None)),(k02,(6,None)) val rddCogroup: RDD[(String, (Iterable[Int], Iterable[Int]))] = rdd.cogroup(other) println("=====cogroup=====:" + rddCogroup.collect().mkString(","))// (k01,(CompactBuffer(3, 26),CompactBuffer(29))),(k03,(CompactBuffer(2),CompactBuffer())),(k02,(CompactBuffer(6),CompactBuffer())) // 行動操做 val resCountByKey = rdd.countByKey() println("=====countByKey=====:" + resCountByKey)// Map(k01 -> 2, k03 -> 1, k02 -> 1) val resColMap = rdd.collectAsMap() println("=====resColMap=====:" + resColMap)//Map(k02 -> 6, k01 -> 26, k03 -> 2) val resLookup = rdd.lookup("k01") println("====lookup===:" + resLookup) // WrappedArray(3, 26) } /** * 其餘一些不經常使用的RDD操做 */ def otherRDDOperate(){ val rdd:RDD[(String,Int)] = sc.makeRDD(List(("k01",3),("k02",6),("k03",2),("k01",26))) println("=====first=====:" + rdd.first())//(k01,3) val resTop = rdd.top(2).map(x => x._1 + ";" + x._2) println("=====top=====:" + resTop.mkString(","))// k03;2,k02;6 val resTake = rdd.take(2).map(x => x._1 + ";" + x._2) println("=======take====:" + resTake.mkString(","))// k01;3,k02;6 val resTakeSample = rdd.takeSample(false, 2).map(x => x._1 + ";" + x._2) println("=====takeSample====:" + resTakeSample.mkString(","))// k01;26,k03;2 val resSample1 = rdd.sample(false, 0.25) val resSample2 = rdd.sample(false, 0.75) val resSample3 = rdd.sample(false, 0.5) println("=====sample======:" + resSample1.collect().mkString(","))// 無 println("=====sample======:" + resSample2.collect().mkString(","))// (k01,3),(k02,6),(k01,26) println("=====sample======:" + resSample3.collect().mkString(","))// (k01,3),(k01,26) } def main(args: Array[String]): Unit = { createPairMap() pairMapRDD("file:///F:/sparkdata01.txt") otherRDDOperate() } }
本篇到此就將我知道的spark的API所有講完了,兩篇文章裏的示例代碼都是通過測試的,能夠直接運行,你們在閱讀代碼時候最好注意這個特色:我在寫RDD轉化代碼時候都是很明確的寫上了轉化後的RDD的數據類型,這樣作的目的就是讓讀者更加清晰的認識不一樣RDD轉化後的數據類型,這點在實際開發裏很是重要,在實際的計算裏咱們常常會不一樣的計算算法不停的轉化RDD的數據類型,而使用scala開發spark程序時候,我發現scala和javascript很相似,咱們不去指定返回值數據類型,scala編譯器也會自動推算結果的數據類型,所以編碼時候咱們能夠不指定具體數據類型。這個特色就會讓咱們在實際開發裏碰到種種問題,所以我在示例代碼裏明確了RDD轉化後的數據類型。
在使用Pair RDD時候,咱們要引入:
import org.apache.spark.SparkContext._
不然代碼就有可能報錯,說找不到對應的方法,這個引入就是scala裏導入的隱世類型轉化的功能,原理和上段文字說到的內容差很少。
開發spark程序不只僅只可使用scala,還可使用python,java,不過scala使用起來更加方便,spark的API簡單清晰,這樣的編程大大下降了原先使用mapreduce編程的難度,可是若是咱們要深刻掌握這些API那麼就要更加深刻的學習下scala。下一篇我就根據spark裏RDD的API講解一些scala的語法,經過這些語法讓咱們更好的掌握Spark的API。