學習筆記--Spark

參考來源:http://www.yiibai.com/spark/node

概述
Apache Spark是一個集羣計算設計的快速計算。它是創建在Hadoop MapReduce之上,它擴展了 MapReduce 模式,有效地使用更多類型的計算,其中包括交互式查詢和流處理。Spark的主要特徵是其內存集羣計算,增長的應用程序的處理速度。shell

三種部署方法:apache

  • 單機版 − Spark獨立部署是指Spark佔據在HDFS之上(Hadoop分佈式文件系統)並將空間分配給HDFS。在這裏,Spark和MapReduce將並列覆蓋全部Spark的做業集羣。
  • Hadoop Yarn − Hadoop Yarn部署方式,簡單地說,spark運行在Yarn沒有任何須要預安裝或使用root訪問權限。它有助於Spark融入Hadoop生態系統和Hadoop堆棧。它容許在其它部件疊上層的頂部上運行。
  • Spark 在MapReduce (SIMR) − Spark在MapReduce的用於啓動spark做業,除了獨立部署。經過SIMR,用戶能夠啓動Spark和使用Shell,而不須要任何管理權限。

Spark RDD
彈性分佈式數據集(RDD)是Spark的基本數據結構。它是對象的不可變的分佈式集合。在RDD中每一個數據集被劃分紅邏輯分區,這多是在羣集中的不一樣節點上計算的。RDDS能夠包含任何類型,如:Python,Java,或者Scala的對象,包括用戶定義的類。編程

安裝
按順序安裝Java、Scala、Spark數組

Spark核心編程
建立簡單RDD
Spark容器會自動建立Spark 上下文對象名爲sc緩存

$ spark-shell
scala> val inputfile = sc.textFile(「input.txt」)

RDD轉換
S.No | 轉換&含義
--------|----------------
1 | map(func) 返回一個新的分佈式數據集,傳遞源的每一個元素造成經過一個函數 func
2 | filter(func) 返回由選擇在func返回true,源元素組成了一個新的數據集
3 | flatMap(func) 相似映射,但每一個輸入項目能夠被映射到0以上輸出項目(因此func應返回seq而不是單一的項目)
4 | mapPartitions(func) 相似映射,只不過是單獨的每一個分區(塊)上運行RDD,所以 func 的類型必須是Iterator ⇒ Iterator 對類型T在RDD上運行時
5 | mapPartitionsWithIndex(func) 相似映射分區,並且還提供func 來表示分區的索引的整數值,所以 func 必須是類型 (Int, Iterator ) ⇒ Iterator 當類型T在RDD上運行時
6 | sample(withReplacement, fraction, seed) 採樣數據的一小部分,有或沒有更換,利用給定的隨機數發生器的種子
7 | union(otherDataset) 返回一個新的數據集,其中包含源數據和參數元素的結合
8 | intersection(otherDataset) 返回包含在源數據和參數元素的新RDD交集
9 | distinct([numTasks]) 返回一個新的數據集包含源數據集的不一樣元素
10 | groupByKey([numTasks]) 當調用(K,V)數據集,返回(K, Iterable ) 對數據集
11 | reduceByKey(func, [numTasks])
12 | aggregateByKey(zeroValue)(seqOp, combOp, [numTasks])
13 | sortByKey([ascending], [numTasks])
14 | join(otherDataset, [numTasks])
15 | cogroup(otherDataset, [numTasks])
16 | cartesian(otherDataset) 當上調用類型T和U的數據集,返回(T,U)對數據集(全部元素對)
17 | pipe(command, [envVars]) RDD經過shell命令每一個分區,例如:一個Perl或bash腳本。RDD元素被寫入到進程的標準輸入和線路輸出,標準輸出形式返回一個字符串RDD
18 | coalesce(numPartitions) 減小RDD到numPartitions分區的數量。過濾大型數據集後,更高效地運行的操做
19 | repartition(numPartitions) 打亂RDD數據隨機創造更多或更少的分區,並在它們之間平衡。這老是打亂的全部數據在網絡上
20 | repartitionAndSortWithinPartitions(partitioner) 根據給定的分區從新分區RDD及在每一個結果分區,排序鍵記錄。這是調用從新分配排序在每一個分區內,由於它能夠推進分揀向下進入混洗機制效率更高。
bash

動做
S.No | 操做 & 含義
--------|---------------------
1 | reduce(func) 合計數據集的元素,使用函數 func (其中有兩個參數和返回一行). 該函數應該是可交換和可結合,以便它能夠正確地在並行計算。
2 | collect() 返回數據集的全部做爲數組在驅動程序的元素。這是一個過濾器或其它操做以後返回數據的一個足夠小的子集,一般是有用的
3 | count() 返回該數據集的元素數
4 | first() 返回的數據集的第一個元素(相似於使用(1))
5 | take(n) 返回與該數據集的前n個元素的陣列。
6 | takeSample (withReplacement,num, [seed]) 返回數組的數據集num個元素,有或沒有更換隨機抽樣,預指定的隨機數發生器的種子可選
7 | takeOrdered(n, [ordering]) 返回RDD使用或者按其天然順序或自定義比較的前第n個元素
8 | saveAsTextFile(path) 寫入數據集是一個文本文件中的元素(或一組文本文件),在給定的目錄的本地文件系統,HDFS或任何其餘的Hadoop支持的文件系統。Spark調用每一個元素的 toString,將其轉換爲文件中的文本行
9 | saveAsSequenceFile(path) (Java and Scala) 寫入數據集,爲Hadoop SequenceFile元素在給定的路徑寫入在本地文件系統,HDFS或任何其餘Hadoop支持的文件系統。 這是適用於實現Hadoop可寫接口RDDS的鍵 - 值對。在Scala中,它也能夠在屬於隱式轉換爲可寫(Spark包括轉換爲基本類型,如 Int, Double, String 等等)類型。
10 | saveAsObjectFile(path) (Java and Scala) 寫入數據集的內容使用Java序列化爲一個簡單的格式,而後可使用SparkContext.objectFile()加載。
11 | countByKey() 僅適用於RDDS的類型 (K, V). 返回(K, Int)對與每一個鍵的次數的一個HashMap。
12 | foreach(func) 數據集的每一個元素上運行函數func。這一般對於不良反應,例如更新累加器或與外部存儲系統進行交互進行。網絡

示例程序數據結構

//打開Spark-Shell
$ spark-shell 
//建立一個RDD
scala> val inputfile = sc.textFile("input.txt")
//執行字數轉換
scala> val counts = inputfile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_+_);
//當前RDD
scala> counts.toDebugString
//緩存轉換
scala> counts.cache()
//應用動做
scala> counts.saveAsTextFile("output")

Spark部署
Spark應用程序使用spark-submit(shell命令)來部署在集羣中的Spark應用程序
示例:
SparkWordCount.scalaapp

import org.apache.spark.SparkContext 
import org.apache.spark.SparkContext._ 
import org.apache.spark._  

object SparkWordCount { 
  def main(args: Array[String]) { 

      val sc = new SparkContext( "local", "Word Count", "/usr/local/spark", Nil, Map(), Map()) 
        
      /* local = master URL; Word Count = application name; */  
      /* /usr/local/spark = Spark Home; Nil = jars; Map = environment */ 
      /* Map = variables to work nodes */ 
      /*creating an inputRDD to read text file (in.txt) through Spark context*/ 
      val input = sc.textFile("in.txt") 
      /* Transform the inputRDD into countRDD */ 
        
      valcount = input.flatMap(line ⇒ line.split(" ")) 
      .map(word ⇒ (word, 1)) 
      .reduceByKey(_ + _) 
      
      /* saveAsTextFile method is an action that effects on the RDD */  
      count.saveAsTextFile("outfile") 
      System.out.println("OK"); 
  } 
}

步驟:
一、下載Spark Ja
下載spark-core_2.10-1.3.0.jar
二、編譯程序

$ scalac -classpath "spark-core_2.10-1.3.0.jar:/usr/local/spark/lib/spark-assembly-1.4.0-hadoop2.6.0.jar" SparkPi.scala

三、建立JAR

jar -cvf wordcount.jar SparkWordCount*.class spark-core_2.10-1.3.0.jar/usr/local/spark/lib/spark-assembly-1.4.0-hadoop2.6.0.jar

四、提交spark應用

spark-submit --class SparkWordCount --master local wordcount.jar
相關文章
相關標籤/搜索