Spark 學習筆記 (一): 初探Spark 程序設計RDD

Spark 學習筆記 (一): 初探Spark 程序設計之RDD

本文主要介紹Spark基本數據結構RDD的原理和使用,以及搭建了基於Docker的Spark集羣開發測試環境,最後給出了幾個實際程序例子,算是Saprk入了門:)node

1、 RDD

RDD是Spark中最核心的概念算法

1.初識RDD --- Resilient Distributed Datasets 彈性分佈式數據集

  • 數據集:RDD是數據集合的抽象,分佈在集羣中的只讀對象的集合
    • 一個RDD由多個Partition構成,也就是一個RDD被分區成Partiton存在不一樣節點上
    • 一個Partition能夠存儲在此磁盤或內存中
    • 經過並行transform操做進行一個RDD到另外一個RDD的轉換
  • 分佈式:
    • Partition 是分佈式存儲的
    • 數據的計算是多個節點協同計算獲得的
  • 彈性:RDD 能夠在不改變內部存儲數據記錄的前提下,去調整並行計算計算單元的劃分結構,彈性這一特性,也是爲並行計算服務的docker

  • 容錯性:分佈式的通常問題是須要具備容錯性,那麼RDD自己是具備容錯性的,apache

RDD 內部的數據集合在邏輯上和物理上被劃分紅多個小子集合Partition,這樣的每個子集合咱們將其稱爲分區,分區的個數會決定並行計算的粒度,而每個分區數值的計算都是在一個單獨的任務Task中進行,所以並行任務的個數,也是由 RDD分區的個數決定的
Partition -> Task編程

2. Spark運行模式

Spark

先放個圖,看下Spark總體程序是怎麼執行的瀏覽器

  • 整個集羣分爲 Master 節點和 Worker 節點,至關於 Hadoop 的 Master 和 Slave 節點緩存

  • Master 節點上常駐 Master 守護進程,負責管理所有的 Worker 節點bash

  • Worker 節點上常駐 Worker 守護進程,負責與 Master 節點通訊並管理 executors數據結構

  • Driver 官方解釋是 「The process running the main() function of the application and creating the SparkContext」。 Application 就是用戶本身寫的 Spark 程序(driver program)併發

  • 每一個 Worker 上存在一個或者多個 ExecutorBackend 進程。每一個進程包含一個 Executor 對象,該對象持有一個線程池,每一個線程能夠執行一個 task。

3.Spark程序設計

  • Scala基礎
    • 用函數式編程的方式能夠很方便處理集合:
    var list = List(1, 2, 3)
    list.foreach(println)

Spark 的RDD,封裝了各類相似於Scala集合的算子map、filter、reduce等,且都是分佈式執行的

  • Spark程序設計基本流程
      1. 建立SparkContext對象:定義了Spark執行環境和配置參數;注意每一個Spark程序有且僅有一個SparkContext
      1. 建立RDD:從Scala集合或者在Hadoops數據集上建立
      • (1) 從Scala集合映射成RDD:sc.parallelize()建立,第二個參數是Partition數目
      val slices = 10   //Partition數目,即並行的task數目啓動10個map task進行處理
      val n = 100000 * slices 
      val count = sc.parallelize(1 to n, slices).map { i => 
          val x = random * 2 - 1 
          val y = random * 2 - 1 
          (x*x + y*y < 1) 1 else 0
      }.reduce(_ + _)
      • (2) 將本地文件/HDFS文件映射成RDD:
        • 文本文件:
        sc.textFile("tile.txt")   //將本地文本文件加載成RDD
        sc.textFile(「hdfs://nn:9000/path/file」)   //hdfs文件或目錄
        • sequenceFile文件:
        sc.sequenceFile(「file.txt」)  //將本地二進制文件加載成RDD
        • 使用任意自定義的Hadoop InputFormat
        sc.hadoopFile(path, inputFmt, keyClass, valClass)
        • 讀取HDFS建立RDD:
        inputRdd = sc.textFile(「hdfs:///data/input」)
        inputRdd = sc.textFile(「hdfs://namenode:8020/data/input」)

        HDFS的datanode的Block和Spark數據的partiton是一一映射的,也和task一一映射,也就是下圖所示的就會啓動5個task
        Spark2

        • 讀取HBase建立RDD:
        import org.apache.spark._ 
        import org.apache.hadoop.hbase.mapreduce.TableInputFormat
        
        //建立SparkContext 
        val sparkConf = new SparkConf()
        val sc = new SparkContext(conf )
        
        // 設置hbase configuration val hbaseConf = HBaseConfiguration.create() hbaseConf.addResource(new Path(「hbase-site.xml")) hbaseConf.set(TableInputFormat.INPUT_TABLE, tableName)
        
        //建立hbase RDD 
        val hBaseRDD = sc.newAPIHadoopRDD(hbaseConf, classOf[TableInputFormat],classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],classOf[org.apache.hadoop.hbase.client.Result])
        
        //獲取總行數 
        al count = hBaseRDD.count()
      1. 在RDD上進行轉換transformation和action
      • Transformation:將一個RDD經過一種規則,映射成另外一種RDD; Action: 返回結果或者保存結果,只有action纔會觸發程序的執行,注意Spark中遇到action的時候計算纔會去分佈式執行

        在 Spark 中,全部的轉換(transformations)都是惰性(lazy)的,它們不會立刻計算它們的結果。相反的,它們僅僅記錄轉換操做是應用到哪些基礎數據集(例如一個文件)上的。轉換僅僅在這個時候計算:當動做(action) 須要一個結果返回給驅動程序的時候。這個設計可以讓 Spark 運行得更加高效。例如,咱們能夠實現:經過 map 建立一個新數據集在 reduce 中使用,而且僅僅返回 reduce 的結果給 driver,而不是整個大的映射過的數據集。

        • 常見的操做集合:
          ac
      • Transformation函數例子:
      //建立RDD 
      val listRdd = sc.parallelize(List(1, 2, 3), 3)
      
      // 將RDD傳入函數,生成新的RDD 
      val squares = listRdd.map(x => x*x) // {1, 4, 9}
      
      // 對RDD中元素進行過濾,生成新的RDD 
      val even = squares.filter(_ % 2 == 0) // {4}
      
      // 將一個元素映射成多個,生成新的RDD 
      nums.flatMap(x => 1 to x) // => {1, 1, 2, 1, 2, 3}
      • Action函數例子:
      //建立新的RDD 
      val nums = sc.parallelize(List(1, 2, 3), 2) 
      
      // 將RDD保存爲本地集合(返回到driver端) 
      nums.collect() // => Array(1, 2, 3) 
      
      // 返回前K個元素 
      nums.take(2) // => Array(1, 2) 
      
      // 計算元素總數 
      nums.count() // => 3 
      
      // 合併集合元素 
      nums.reduce(_ + _) // => 6 
      
      // 將RDD寫到HDFS中 
      nums.saveAsTextFile(「hdfs://nn:8020/output」) 
      nums.saveAsSequenceFile(「hdfs://nn:8020/output」)
      • Key/Value類型的RDD的操做
      val pets = sc.parallelize( List((「cat」, 1), (「dog」, 1), (「cat」, 2))) 
      pets.reduceByKey(_ + _) // => {(cat, 3), (dog, 1)} 
      pets.groupByKey() // => {(cat, Seq(1, 2)), (dog, Seq(1)} 
      pets.sortByKey() // => {(cat, 1), (cat, 2), (dog, 1)}

      reduceByKey自動在map端進行本地combine

      • 控制ReduceTasks數目:有一個參數執行併發度
      words.reduceByKey(_ + _, 5)

      用戶也能夠經過修改spark.default.parallelism設置默認並行度 默認並行度爲最初的RDD partition數目

    留一個思考問題:那麼這些操做都是怎麼分佈式執行的呢?

      1. 返回結果:保存到HFDS或者Hive或者HBase
      • 將結果保存的HBase:

4. 其餘RDD操做

  • Sample()從數據集採樣

  • union() 合併多個RDD

  • cartesian 求笛卡爾積

  • 共享變量:Accumulators和Broadcast Variables

    通常來講上述的操做都是對數據在遠端worker node上拷貝的數據進行操做,對數據的效果並不會回傳

    • Accumulator(累加器,計數器)
      • 相似於MapReduce中的counter,將數據從一個節點發送到其餘各個節點上去;

      • 一般用於監控,調試,記錄符合某類特徵的數據數目等
      import SparkContext._ 
      
      val total_counter = sc.accumulator(0L, "total_counter") 
      val counter0 = sc.accumulator(0L, "counter0") 
      val counter1 = sc.accumulator(0L, "counter1")
      
      val count = sc.parallelize(1 to n, slices).map { i => 
          total_counter += 1 
          val x = random * 2 - 1 
          val y = random * 2 – 1 
          if (x*x + y*y < 1) { 
              counter1 += 1 
              } else { 
                  counter0 += 1 
                  } 
          if (x*x + y*y < 1) 1 else 0 
      }.reduce(_ + _)
    • 廣播變量
      • 廣播機制 : 高效分發大對象,好比字典(map),集合(set)等,每一個executor一份, 而不是每一個task一份; 引用自Spark doc裏的介紹:

        Spark 的 action(動做)操做是經過一系列的 stage(階段)進行執行的,這些 stage(階段)是經過分佈式的 「shuffle」 操做進行拆分的。Spark 會自動廣播出每一個 stage(階段)內任務所須要的公共數據。這種狀況下廣播的數據使用序列化的形式進行緩存,並在每一個任務運行前進行反序列化。這也就意味着,只有在跨越多個 stage(階段)的多個任務會使用相同的數據,或者在使用反序列化形式的數據特別重要的狀況下,使用廣播變量會有比較好的效果。

      • 包括HttpBroadcast和TorrentBroadcast兩種

      • HttpBroadcast與TorrentBroadcast
        • 廣播
          b
      • 總結:若是一個變量很是大,每個task計算邏輯都要用到這個變量,則應該將其廣播出去,更高效

5.Cache基本概念與使用

  • 容許將RDD緩存到內存中或磁盤上,以便於重用,若是想屢次使用某個 RDD,強烈推薦在該 RDD 上調用 persist 方法.

    Spark 中一個很重要的能力是將數據 persisting 持久化(或稱爲 caching 緩存),在多個操做間均可以訪問這些持久化的數據。當持久化一個 RDD 時,每一個節點的其它分區均可以使用 RDD 在內存中進行計算,在該數據上的其餘 action 操做將直接使用內存中的數據。這樣會讓之後的 action 操做計算速度加快(一般運行速度會加速 10 倍)。緩存是迭代算法和快速的交互式使用的重要工具。

  • Spark提供了多種緩存級別,以便於用戶根據實際需求進行調整
    • 存儲
  • 如何選擇存儲級別?
    • trade-off: Spark 的存儲級別的選擇,核心問題是在 memory 內存使用率和 CPU 效率之間進行權衡。

6.基於Docker的Spark開發測試環境搭建:

原項目能夠直接在本機跑,默認是Spark的單機模式;

  • (1)首先下載鏡像並啓動:
> docker pull sequenceiq/spark:1.5.1
> sudo docker run -it sequenceiq/spark:1.5.1 bash

遇到問題:在Docker中啓動master的時候,ip是Docker 的地址,個人宿主機訪問不到,緣由是啓動Docker 的時候沒有端口映射,因此從新run一遍鏡像:

docker run -p 127.0.0.1:8081:8080  -it sequenceiq/spark:1.5.1 bash

將宿主機的8081端口映射到Docker的8080端口

  • (2)
cd /usr/local/spark 
cp conf/spark-env.sh.template conf/spark-env.sh 
vi conf/spark-env.sh

加入兩行代碼:

 
  • (3) 啓動Master和Slave
./sbin/start-master.sh 
./sbin/start-slave.sh 172.17.0.109:7077

而後宿主機瀏覽器訪問http://localhost:8081就能夠訪問到Spark UI界面惹!

7.幾個實際例子:

    1. 分佈式估算Pi
/**
  * 並行估算pi
  * Area1 = x * x , Area2 = Pi * (x / 2) * (x / 2)
  * Area1 / Area2 = 4 / pi
  * 4 / pi = x / y  => pi = 4 * y / x
  */

object SparkPi {

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("Spark Pi").setMaster("local[1]");
    val sc = new SparkContext(conf);
    val slices = if (args.length > 0) args(0).toInt else 2;
    val areaSqure = 100000 * slices;

    //並行估算areaCircle的值:也就是撒areaSqure這麼多個點,求落在圓內的多少個點,就近似等於圓的面積
    val areaCircle = sc.parallelize(1 to areaSqure, slices).map{i =>
      val x = new Random().nextInt() * 2 - 1
      val y = new Random().nextInt() * 2 - 1
      if (x * x + y * y < 1) 1 else 0
    }.reduce(_ + _)

    println("Pi is roughly " + 4.0 * areaCircle / areaSqure)
  }
}
    1. log query

      任務:如何統計每一個用戶在每臺機器(ip)上查詢(query)的次數和返回結果累積大小(byte)?

/**
  * 日誌查詢任務:統計每一個用戶在每臺機器(ip)上查詢(query)的次數和返回結果累積大小(byte)
  * 分析:key: 每一個用戶在每臺機器上的query ,value:次數和結果累積大小(byte)
  */

object LogQuery {

  val apacheLogRegex =
    """^([\d.]+) (\S+) (\S+) \[([\w\d:/]+\s[+\-­‐]\d{4})\] "(.+?)" (\d{3}) ([\d\-­‐]+) "([^"]+)" "([^"]+)".*""".r

  def extractKey(line : String): (String, String, String) = {
    apacheLogRegex.findFirstIn(line) match {
      case Some(apacheLogRegex(ip, _, user, dateTime, query, status, bytes, referer, ua)) =>
        if (user != "\"-­‐\"") (ip, user, query)
        else (null, null, null)
      case _ => (null, null, null)
    }
  }

  def extractStats(line: String): Stats = {
    apacheLogRegex.findFirstIn(line) match {
      case Some(apacheLogRegex(ip, _, user, dateTime, query, status, bytes, referer, ua)) =>
        new Stats(1, bytes.toInt)
      case _ => new Stats(1, 0)
    }
  }

  class Stats(val count: Int, val numBytes: Int) extends Serializable {
    def merge(other: Stats) = new Stats(count + other.count, numBytes + other.numBytes)
    override def toString = "bytes=%s\tn=%s".format(numBytes, count)
  }

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("LogQuery").setMaster("local[1]")
    val sc = new SparkContext(conf)
    val dataset = sc.textFile(args(0))

    dataset.map(line => (extractKey(line), extractStats(line)))
      .reduceByKey((a, b) => a.merge(b))
        .collect().foreach {
      case (user, query) => println("%s\t%s".format(user, query))
    }
  }
}
    1. 邏輯迴歸 找出一條最優的線,將全部點分紅兩部分
相關文章
相關標籤/搜索