大數據系列之並行計算引擎Spark介紹 大數據系列之並行計算引擎Spark部署及應用

相關博文:大數據系列之並行計算引擎Spark部署及應用

Spark:

    Apache Spark 是專爲大規模數據處理而設計的快速通用的計算引擎。html

    Spark是UC Berkeley AMP lab (加州大學伯克利分校的AMP實驗室)所開源的類Hadoop MapReduce的通用並行框架,Spark,擁有Hadoop MapReduce所具備的優勢;但不一樣於MapReduce的是Job中間輸出結果能夠保存在內存中,從而再也不須要讀寫HDFS,所以Spark能更好地適用於數據挖掘與機器學習等須要迭代的MapReduce的算法。算法

Spark 是一種與 Hadoop 類似的開源集羣計算環境,可是二者之間還存在一些不一樣之處,這些有用的不一樣之處使 Spark 在某些工做負載方面表現得更加優越,換句話說,Spark 啓用了內存分佈數據集,除了可以提供交互式查詢外,它還能夠優化迭代工做負載。apache

    Spark 是在 Scala 語言中實現的,它將 Scala 用做其應用程序框架。與 Hadoop 不一樣,Spark 和 Scala 可以緊密集成,其中的 Scala 能夠像操做本地集合對象同樣輕鬆地操做分佈式數據集。編程

儘管建立 Spark 是爲了支持分佈式數據集上的迭代做業,可是實際上它是對 Hadoop 的補充,能夠在 Hadoop 文件系統中並行運行。經過名爲 Mesos 的第三方集羣框架能夠支持此行爲。Spark 由加州大學伯克利分校 AMP 實驗室 (Algorithms, Machines, and People Lab) 開發,可用來構建大型的、低延遲的數據分析應用程序。數組

Spark的性能特色:

1.更快的速度:內存計算下,Spark 比 Hadoop 快100倍。緩存

  1.內存計算引擎,提供Cache機制來支持須要反覆迭代計算或者屢次數據共享,減小數據讀取的I/O開銷多線程

  2.DAG引擎,減小屢次計算之間中間結果寫到HDFS的開銷;併發

  3.使用多線程池模型來減小task啓動開銷,shuffle過程當中避免沒必要要的sort操做已經減小磁盤I/O操做;框架

2.易用性:機器學習

  1.Spark 提供了80多個高級運算符。

  2.提供了豐富的API,支持JAVA,Scala,Python和R四種語言;

  3.代碼量比MapReduce少2~5倍;

3.通用性:Spark 提供了大量的庫,包括SQL、DataFrames、MLlib、GraphX、Spark Streaming。 開發者能夠在同一個應用程序中無縫組合使用這些庫。

4.支持多種資源管理器:Spark 支持 Hadoop YARN,Apache Mesos,及其自帶的獨立集羣管理器

Spark基本原理:

  Spark Streaming:構建在Spark上處理Stream數據的框架,基本的原理是將Stream數據分紅小的時間片段(幾秒),以相似batch批量處理的方式來處理這小部分數據。Spark Streaming構建在Spark上,一方面是由於Spark的低延遲執行引擎(100ms+),雖然比不上專門的流式數據處理軟件,也能夠用於實時計算,另外一方面相比基於Record的其它處理框架(如Storm),一部分窄依賴的RDD數據集能夠從源數據從新計算達到容錯處理目的。此外小批量處理的方式使得它能夠同時兼容批量和實時數據處理的邏輯和算法。方便了一些須要歷史數據和實時數據聯合分析的特定應用場合。

Spark背景:

 

  1.MapReduce侷限性:

  

    1.僅支持Map和Reduce兩種操做;

    2.處理效率低效;不適合迭代計算(如機器學習、圖計算等),交互式處理(數據挖掘)和流失處理(日誌分析)

    3.Map中間結果須要寫磁盤,Reduce寫HDFS,多個MR之間經過HDFS交換數據;

    4.任務調度和啓動開銷大;

    5.沒法充分利用內存;(與MR產生時代有關,MR出現時內存價格比較高,採用磁盤存儲代價小)

    6.Map端和Reduce端均須要排序;

  2.MapReduce編程不夠靈活。(比較Scala函數式編程而言)

  3.框架多樣化[採用一種框架技術(Spark)同時實現批處理、流式計算、交互式計算]:

    1.批處理:MapReduce、Hive、Pig;

 

    2.流式計算:Storm

    3.交互式計算:Impala  

Spark核心概念:

  1.RDD:Resilient Distributed Datasets,彈性分佈式數據集

 

  

    

    1.分佈在集羣中的只讀對象集合(由多個Partition 構成);

    2.能夠存儲在磁盤或內存中(多種存儲級別);

    3.經過並行「轉換」操做構造;

    4.失效後自動重構;

    5.RDD基本操做(operator)

      

Transformation具體內容

    • map(func) :返回一個新的分佈式數據集,由每一個原元素通過func函數轉換後組成
    • filter(func) : 返回一個新的數據集,由通過func函數後返回值爲true的原元素組成
      *flatMap(func) : 相似於map,可是每個輸入元素,會被映射爲0到多個輸出元素(所以,func函數的返回值是一個Seq,而不是單一元素)
    • flatMap(func) : 相似於map,可是每個輸入元素,會被映射爲0到多個輸出元素(所以,func函數的返回值是一個Seq,而不是單一元素)
    • sample(withReplacement, frac, seed) :
      根據給定的隨機種子seed,隨機抽樣出數量爲frac的數據
    • union(otherDataset) : 返回一個新的數據集,由原數據集和參數聯合而成
    • groupByKey([numTasks]) :
      在一個由(K,V)對組成的數據集上調用,返回一個(K,Seq[V])對的數據集。注意:默認狀況下,使用8個並行任務進行分組,你能夠傳入numTask可選參數,根據數據量設置不一樣數目的Task
    • reduceByKey(func, [numTasks]) : 在一個(K,V)對的數據集上使用,返回一個(K,V)對的數據集,key相同的值,都被使用指定的reduce函數聚合到一塊兒。和groupbykey相似,任務的個數是能夠經過第二個可選參數來配置的。
    • join(otherDataset, [numTasks]) :
      在類型爲(K,V)和(K,W)類型的數據集上調用,返回一個(K,(V,W))對,每一個key中的全部元素都在一塊兒的數據集
    • groupWith(otherDataset, [numTasks]) : 在類型爲(K,V)和(K,W)類型的數據集上調用,返回一個數據集,組成元素爲(K, Seq[V], Seq[W]) Tuples。這個操做在其它框架,稱爲CoGroup
    • cartesian(otherDataset) : 笛卡爾積。但在數據集T和U上調用時,返回一個(T,U)對的數據集,全部元素交互進行笛卡爾積。
    • flatMap(func) :
      相似於map,可是每個輸入元素,會被映射爲0到多個輸出元素(所以,func函數的返回值是一個Seq,而不是單一元素)

Actions具體內容

    • reduce(func) : 經過函數func彙集數據集中的全部元素。Func函數接受2個參數,返回一個值。這個函數必須是關聯性的,確保能夠被正確的併發執行
    • collect() : 在Driver的程序中,以數組的形式,返回數據集的全部元素。這一般會在使用filter或者其它操做後,返回一個足夠小的數據子集再使用,直接將整個RDD集Collect返回,極可能會讓Driver程序OOM
    • count() : 返回數據集的元素個數
    • take(n) : 返回一個數組,由數據集的前n個元素組成。注意,這個操做目前並不是在多個節點上,並行執行,而是Driver程序所在機器,單機計算全部的元素(Gateway的內存壓力會增大,須要謹慎使用)
    • first() : 返回數據集的第一個元素(相似於take(1))
    • saveAsTextFile(path) : 將數據集的元素,以textfile的形式,保存到本地文件系統,hdfs或者任何其它hadoop支持的文件系統。Spark將會調用每一個元素的toString方法,並將它轉換爲文件中的一行文本
    • saveAsSequenceFile(path) : 將數據集的元素,以sequencefile的格式,保存到指定的目錄下,本地系統,hdfs或者任何其它hadoop支持的文件系統。RDD的元素必須由key-value對組成,並都實現了Hadoop的Writable接口,或隱式能夠轉換爲Writable(Spark包括了基本類型的轉換,例如Int,Double,String等等)
    • foreach(func) : 在數據集的每個元素上,運行函數func。這一般用於更新一個累加器變量,或者和外部存儲系統作交互

算子分類

大體能夠分爲三大類算子:

      1. Value數據類型的Transformation算子,這種變換並不觸發提交做業,針對處理的數據項是Value型的數據。
      2. Key-Value數據類型的Transfromation算子,這種變換並不觸發提交做業,針對處理的數據項是Key-Value型的數據對。
      3. Action算子,這類算子會觸發SparkContext提交Job做業。

 

      3.示例:

     

    4.Spark RDD cache/persist

      1.Spark RDD cache

        1.容許將RDD緩存到內存中或磁盤上,以便於重用

        2.提供了多種緩存級別,以便於用戶根據實際需求進行調整

          

        3.cache使用

        

       2.以前用MapReduce實現過WordCount,如今咱們用Scala實現下wordCount.是否是很簡潔呢?!

        Scala學習連接:https://yq.aliyun.com/topic/69

import org.apache.spark.{SparkConf, SparkContext}

object SparkWordCount{
  def main(args: Array[String]) {
    if (args.length == 0) {
      System.err.println("Usage: SparkWordCount <inputfile> <outputfile>")
      System.exit(1)
    }

    val conf = new SparkConf().setAppName("SparkWordCount")
    val sc = new SparkContext(conf)

    val file=sc.textFile("file:///hadoopLearning/spark-1.5.1-bin-hadoop2.4/README.md")
    val counts=file.flatMap(line=>line.split(" "))
                   .map(word=>(word,1))
                   .reduceByKey(_+_)
    counts.saveAsTextFile("file:///hadoopLearning/spark-1.5.1-bin-hadoop2.4/countReslut.txt")

  }
}

 

      3.關於RDD的Transformation與Action的特色咱們介紹下;

        1.接口定義方式不一樣:

          Transformation: RDD[X]-->RDD[y]

          Action:RDD[x]-->Z (Z不是一個RDD,多是一個基本類型,數組等)

        2.惰性執行:

          Transformation:只會記錄RDD轉化關係,並不會觸發計算

          Action:是觸發程序執行(分佈式)的算子。

          

        程序的執行流程:

      

 

Spark運行模式:

1.Local(本地模式):

  1.單機運行,一般用於測試;

    1.local:只啓動一個executor

    2.local[k]:啓動k個executor

    3.local[*]:啓動跟cpu數目相同的executor

2.standalone(獨立模式)

  1.獨立運行在一個集羣中

  

3.Yarn/mesos

  1.運行在資源管理系統上,好比Yarn或mesos

  2.Spark On Yarn存在兩種模式

    1.yarn-client

    

    2.yanr-cluster

    

    3.比較兩種方式區別:

    

 

Spark在企業中的應用場景

1.基於日誌數據的快速查詢系統業務;

  1.構建於Spark之上的SparkSQL ,利用其快速以及內存表等優點,承擔了日誌數據的即席查詢工做。

2.典型算法的Spark實現

  1.預測用戶的廣告點擊機率;

  2.計算兩個好友間的共同好友數;

  3.用於ETL的SparkSQL和DAG任務;

相關文章
相關標籤/搜索