5.1 RDD編程

1、RDD編程基礎

1.建立

spark採用textFile()方法來從文件系統中加載數據建立RDD,該方法把文件的URL做爲參數,這個URL能夠是:node

  1. 本地文件系統的地址
  2. 分佈式文件系統HDFS的地址
  3. 從雲端加載數據,好比亞馬遜的雲端存儲S3

(1)從本地文件系統中加載數據建立RDD

(2)從分佈式文件系統HDFS中加載數據

上面三條等價的前提是當前登陸linux系統的用戶名必須是Hadoop用戶,這樣當執行括號裏給的文本文件名稱word.txt時,系統默認去當前登陸Ubuntu系統的用戶在HDFS當中對應的用戶主目錄中尋找python

/user/hadoop是hdfs中的用戶主目錄linux

(3)經過並行集合(數組)建立RDD

能夠調用SparkContext對象的Parallelize方法,在Driver中一個已經存在的集合(數組)上建立。apache

  a.經過數組建立RDD編程

 

  b.經過列表建立RDD數組

2.基本操做

(1)轉換操做

  1. 對於RDD而言,每一次轉換操做都會產生不一樣的RDD,供給下一個「轉換」使用
  2. 轉換獲得的RDD是惰性求值的,也就是說,整個轉換過程只是記錄了轉換的軌跡,並不會發生真正的計算,只有遇到行動操做時,纔會發生真正的計算,開始從血緣關係源頭開始,進行物理的轉換操做

RDD經常使用轉換操做:緩存

  a.filter(func)

  b.map(func)操做

  c.flatMap(func)操做

  d.groupByKey(func)操做

groupByKey()應用於(K,V)鍵值對的數據集時,返回一個新的(K, Iterable)形式的數據集網絡

  e.reduceByKey(func)操做

reduceByKey(func)應用於(K,V)鍵值對的數據集時,返回一個新的(K, V)形式的數據集,其中的每一個值是將每一個key傳遞到函數func中進行聚合後獲得的結果分佈式

reduceByKey和groupByKey的區別:reduceByKey不只會進行一個分組,還會把value list再根據傳入的函數再作操做函數

(2)行動操做

  a.count()

返回數據集中的元素個數。

  b.collect()

把在不一樣電腦上生成結果都收集回來,能夠在發起程序的終端上顯示出來。

  c.first()

返回數據集中的第一個元素

  d.take(n)

返回數據集中的前n個元素,而且以數組的形式

  e.reduce(func)

func是一個高階函數

  f.foreach(func):遍歷

(3)惰性機制

惰性機制:整個轉換過程只是記錄了轉換的軌跡,並不會發生真正的計算,只有遇到行動操做時,纔會觸發「從頭至尾」的真正的計算。

3.持久化

在Spark中,RDD採用惰性求值的機制,每次遇到行動操做,都會從頭開始執行計算。每次調用行動操做,都會觸發一次從頭開始的計算。這對於迭代計算而言,代價是很大的,迭代計算常常須要屢次重複使用同一組數據

rdd.count()第一次動做類型,獲得的結果先把它存起來,這樣下面再去執行rdd.collect()又遇到第二次動做類型操做的時候,就不用再從頭至尾去算了,只須要用剛纔存起來的值就能夠了。可是若是計算結果不須要重複用的話,就不要隨便去用持久化,會很耗內存。

job的由來:每次遇到一個動做類型操做,都會生成一個job。

持久化的方式:

  1. 能夠經過持久化(緩存)機制避免這種重複計算的開銷
  2. 可使用persist()方法對一個RDD標記爲持久化
  3. 之因此說「標記爲持久化」,是由於出現persist()語句的地方,並不會立刻計算生成RDD並把它持久化,而是要等到遇到第一個行動操做觸發真正計算之後,纔會把計算結果進行持久化
  4. 持久化後的RDD將會被保留在計算節點的內存中被後面的行動操做重複使用

  1. persist()的圓括號中包含的是持久化級別參數:
  2. persist(MEMORY_ONLY):標記爲持久化,並不會真正緩存rdd,將RDD做爲反序列化的對象存儲於JVM中,若是內存不足,就要按照LRU原則替換緩存中的內容 ,RDD.cache()等價於RDD.persist(MEMORY_ONLY)
  3. persist(MEMORY_AND_DISK)表示將RDD做爲反序列化的對象存儲在JVM中,若是內存不足,超出的分區將會被存放在硬盤上
  4. 通常而言,使用cache()方法時,會調用persist(MEMORY_ONLY) 
  5. 可使用unpersist()方法手動地把持久化的RDD從緩存中移除

針對上面的實例,增長持久化語句之後的執行過程以下:

4.分區

RDD是彈性分佈式數據集,一般RDD很大,會被分紅不少個分區,分別保存在不一樣的節點上

(1)分區的做用

  a.增長並行度

圖 RDD分區被保存到不一樣節點上

  b.減小通訊開銷

u1,u2,...是放在不一樣機器上的,右邊也是放在不一樣機器上的,會在網絡上大量機器之間來回傳輸。

圖:未分區時對UserData和Events兩個表進行鏈接操做

圖:採用分區之後對UserData和Events兩個表進行鏈接操做 

userData表:u1存儲1到10號,u2存儲11到20號,...

events表:挑出1到10號的扔給u1,11到20的扔給u2,...

(2)分區原則

RDD分區的一個原則是使得分區的個數儘可能等於集羣中的CPU核心(core)數目,對於不一樣的Spark部署模式而言(本地模式、Standalone模式、YARN模式、Mesos模式),均可以經過設置spark.default.parallelism這個參數的值,來配置默認的分區數目,通常而言:

  1. *本地模式:默認爲本地機器的CPU數目,若設置了local[N],則默認爲N
  2. *Apache Mesos:默認的分區數爲8
  3. *Standalone或YARN:在「集羣中全部CPU核心數目總和」和「2」兩者中取較大值做爲默認值

分區個數=集羣中CPU核心數目

若是分區個數和CPU核數不接近,好比說有4個分區,8個CPU,那意味着4個CPU的線程是浪費掉的,由於只有4個分區,頂多能夠啓動4個線程。

若是有16個分區,8個線程,那隻能有8個分區去運做,剩下8個分區得等待。

(3)設置分區的個數

  a.建立RDD時手動指定分區個數

在調用textFile()和parallelize()方法的時候手動指定分區個數便可,語法格式以下: sc.textFile(path, partitionNum) 其中,path參數用於指定要加載的文件的地址,partitionNum參數用於指定分區個數。

語法格式:

sc.textFile(path,partitionNum) // partitionNum表示指定分區個數

  b.使用repartition方法從新設置分區個數

(4)自定義分區方法

  Spark提供了自帶的HashPartitioner(哈希分區)與RangePartitioner(區域分區),可以知足大多數應用場景的需求。與此同時,Spark也支持自定義分區方式,即經過提供一個自定義的Partitioner對象來控制RDD的分區方式,從而利用領域知識進一步減小通訊開銷。

  要實現自定義分區,須要定義一個類,這個自定義類須要繼承org.apache.spark.Partitioner類,並實現下面三個方法:

  1. numPartitions: Int 返回建立出來的分區數
  2. getPartition(key: Any): Int 返回給定鍵的分區編號(0到numPartitions-1)
  3. equals():Java判斷相等性的標準方法

實例:根據key值的最後一位數字,寫到不一樣的文件,例如:

 用單例對象定義入口函數,凡是寫在單例對象中的都是靜態的方法。

map(_._n)表示任意元組tuple對象,後面的數字n表示取第幾個數.(n>=1的整數)

注意:partitioner自定義分區類只支持RDD爲鍵值對的形式

5.基本實例---詞頻統計

  假設有一個本地文件word.txt,裏面包含了不少行文本,每行文本由多個單詞構成,單詞之間用空格分隔。可使用以下語句進行詞頻統計(即統計每一個單詞出現的次數):

 

在實際應用中,單詞文件可能很是大,會被保存到分佈式文件系統HDFS中,Spark和Hadoop會統一部署在一個集羣上。spark的wordnode和hdfs的datanode部署在一塊兒的(讓數據更靠近計算) 

先在不一樣的機器上分別並行執行詞頻統計,獲得統計結果後,再到spark master節點上進行彙總,最終獲得總的結果。

相關文章
相關標籤/搜索