本文主要介紹Spark基本數據結構RDD的原理和使用,以及搭建了基於Docker的Spark集羣開發測試環境,最後給出了幾個實際程序例子,算是Saprk入了門:)node
RDD是Spark中最核心的概念算法
彈性:RDD 能夠在不改變內部存儲數據記錄的前提下,去調整並行計算計算單元的劃分結構,彈性這一特性,也是爲並行計算服務的docker
容錯性:分佈式的通常問題是須要具備容錯性,那麼RDD自己是具備容錯性的,apache
RDD 內部的數據集合在邏輯上和物理上被劃分紅多個小子集合Partition,這樣的每個子集合咱們將其稱爲分區,分區的個數會決定並行計算的粒度,而每個分區數值的計算都是在一個單獨的任務Task中進行,所以並行任務的個數,也是由 RDD分區的個數決定的
Partition -> Task編程
先放個圖,看下Spark總體程序是怎麼執行的瀏覽器
整個集羣分爲 Master 節點和 Worker 節點,至關於 Hadoop 的 Master 和 Slave 節點緩存
Master 節點上常駐 Master 守護進程,負責管理所有的 Worker 節點bash
Worker 節點上常駐 Worker 守護進程,負責與 Master 節點通訊並管理 executors數據結構
Driver 官方解釋是 「The process running the main() function of the application and creating the SparkContext」。 Application 就是用戶本身寫的 Spark 程序(driver program)併發
每一個 Worker 上存在一個或者多個 ExecutorBackend 進程。每一個進程包含一個 Executor 對象,該對象持有一個線程池,每一個線程能夠執行一個 task。
var list = List(1, 2, 3) list.foreach(println)
Spark 的RDD,封裝了各類相似於Scala集合的算子
map、filter、reduce
等,且都是分佈式執行的
sc.parallelize()
建立,第二個參數是Partition數目val slices = 10 //Partition數目,即並行的task數目啓動10個map task進行處理 val n = 100000 * slices val count = sc.parallelize(1 to n, slices).map { i => val x = random * 2 - 1 val y = random * 2 - 1 (x*x + y*y < 1) 1 else 0 }.reduce(_ + _)
sc.textFile("tile.txt") //將本地文本文件加載成RDD sc.textFile(「hdfs://nn:9000/path/file」) //hdfs文件或目錄
sc.sequenceFile(「file.txt」) //將本地二進制文件加載成RDD
sc.hadoopFile(path, inputFmt, keyClass, valClass)
inputRdd = sc.textFile(「hdfs:///data/input」) inputRdd = sc.textFile(「hdfs://namenode:8020/data/input」)
HDFS的datanode的Block和Spark數據的partiton是一一映射的,也和task一一映射,也就是下圖所示的就會啓動5個task
import org.apache.spark._ import org.apache.hadoop.hbase.mapreduce.TableInputFormat //建立SparkContext val sparkConf = new SparkConf() val sc = new SparkContext(conf ) // 設置hbase configuration val hbaseConf = HBaseConfiguration.create() hbaseConf.addResource(new Path(「hbase-site.xml")) hbaseConf.set(TableInputFormat.INPUT_TABLE, tableName) //建立hbase RDD val hBaseRDD = sc.newAPIHadoopRDD(hbaseConf, classOf[TableInputFormat],classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],classOf[org.apache.hadoop.hbase.client.Result]) //獲取總行數 al count = hBaseRDD.count()
Transformation:將一個RDD經過一種規則,映射成另外一種RDD; Action: 返回結果或者保存結果,只有action纔會觸發程序的執行,注意Spark中遇到action的時候計算纔會去分佈式執行
在 Spark 中,全部的轉換(transformations)都是惰性(lazy)的,它們不會立刻計算它們的結果。相反的,它們僅僅記錄轉換操做是應用到哪些基礎數據集(例如一個文件)上的。轉換僅僅在這個時候計算:當動做(action) 須要一個結果返回給驅動程序的時候。這個設計可以讓 Spark 運行得更加高效。例如,咱們能夠實現:經過 map 建立一個新數據集在 reduce 中使用,而且僅僅返回 reduce 的結果給 driver,而不是整個大的映射過的數據集。
//建立RDD val listRdd = sc.parallelize(List(1, 2, 3), 3) // 將RDD傳入函數,生成新的RDD val squares = listRdd.map(x => x*x) // {1, 4, 9} // 對RDD中元素進行過濾,生成新的RDD val even = squares.filter(_ % 2 == 0) // {4} // 將一個元素映射成多個,生成新的RDD nums.flatMap(x => 1 to x) // => {1, 1, 2, 1, 2, 3}
//建立新的RDD val nums = sc.parallelize(List(1, 2, 3), 2) // 將RDD保存爲本地集合(返回到driver端) nums.collect() // => Array(1, 2, 3) // 返回前K個元素 nums.take(2) // => Array(1, 2) // 計算元素總數 nums.count() // => 3 // 合併集合元素 nums.reduce(_ + _) // => 6 // 將RDD寫到HDFS中 nums.saveAsTextFile(「hdfs://nn:8020/output」) nums.saveAsSequenceFile(「hdfs://nn:8020/output」)
val pets = sc.parallelize( List((「cat」, 1), (「dog」, 1), (「cat」, 2))) pets.reduceByKey(_ + _) // => {(cat, 3), (dog, 1)} pets.groupByKey() // => {(cat, Seq(1, 2)), (dog, Seq(1)} pets.sortByKey() // => {(cat, 1), (cat, 2), (dog, 1)}
reduceByKey自動在map端進行本地combine
words.reduceByKey(_ + _, 5)
用戶也能夠經過修改spark.default.parallelism設置默認並行度 默認並行度爲最初的RDD partition數目
留一個思考問題:那麼這些操做都是怎麼分佈式執行的呢?
Sample()
從數據集採樣
union()
合併多個RDD
cartesian
求笛卡爾積
共享變量:Accumulators和Broadcast Variables
通常來講上述的操做都是對數據在遠端worker node上拷貝的數據進行操做,對數據的效果並不會回傳
Accumulator
(累加器,計數器)
相似於MapReduce中的counter,將數據從一個節點發送到其餘各個節點上去;
import SparkContext._ val total_counter = sc.accumulator(0L, "total_counter") val counter0 = sc.accumulator(0L, "counter0") val counter1 = sc.accumulator(0L, "counter1") val count = sc.parallelize(1 to n, slices).map { i => total_counter += 1 val x = random * 2 - 1 val y = random * 2 – 1 if (x*x + y*y < 1) { counter1 += 1 } else { counter0 += 1 } if (x*x + y*y < 1) 1 else 0 }.reduce(_ + _)
廣播機制 : 高效分發大對象,好比字典(map),集合(set)等,每一個executor一份, 而不是每一個task一份; 引用自Spark doc裏的介紹:
Spark 的 action(動做)操做是經過一系列的 stage(階段)進行執行的,這些 stage(階段)是經過分佈式的 「shuffle」 操做進行拆分的。Spark 會自動廣播出每一個 stage(階段)內任務所須要的公共數據。這種狀況下廣播的數據使用序列化的形式進行緩存,並在每一個任務運行前進行反序列化。這也就意味着,只有在跨越多個 stage(階段)的多個任務會使用相同的數據,或者在使用反序列化形式的數據特別重要的狀況下,使用廣播變量會有比較好的效果。
包括HttpBroadcast和TorrentBroadcast兩種
總結:若是一個變量很是大,每個task計算邏輯都要用到這個變量,則應該將其廣播出去,更高效
容許將RDD緩存到內存中或磁盤上,以便於重用,若是想屢次使用某個 RDD,強烈推薦在該 RDD 上調用 persist 方法.
Spark 中一個很重要的能力是將數據 persisting 持久化(或稱爲 caching 緩存),在多個操做間均可以訪問這些持久化的數據。當持久化一個 RDD 時,每一個節點的其它分區均可以使用 RDD 在內存中進行計算,在該數據上的其餘 action 操做將直接使用內存中的數據。這樣會讓之後的 action 操做計算速度加快(一般運行速度會加速 10 倍)。緩存是迭代算法和快速的交互式使用的重要工具。
trade-off: Spark 的存儲級別的選擇,核心問題是在 memory 內存使用率和 CPU 效率之間進行權衡。
原項目能夠直接在本機跑,默認是Spark的單機模式;
> docker pull sequenceiq/spark:1.5.1 > sudo docker run -it sequenceiq/spark:1.5.1 bash
遇到問題:在Docker中啓動master的時候,ip是Docker 的地址,個人宿主機訪問不到,緣由是啓動Docker 的時候沒有端口映射,因此從新run一遍鏡像:
docker run -p 127.0.0.1:8081:8080 -it sequenceiq/spark:1.5.1 bash
將宿主機的8081端口映射到Docker的8080端口
cd /usr/local/spark cp conf/spark-env.sh.template conf/spark-env.sh vi conf/spark-env.sh
加入兩行代碼:
./sbin/start-master.sh ./sbin/start-slave.sh 172.17.0.109:7077
而後宿主機瀏覽器訪問http://localhost:8081
就能夠訪問到Spark UI界面惹!
/** * 並行估算pi * Area1 = x * x , Area2 = Pi * (x / 2) * (x / 2) * Area1 / Area2 = 4 / pi * 4 / pi = x / y => pi = 4 * y / x */ object SparkPi { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("Spark Pi").setMaster("local[1]"); val sc = new SparkContext(conf); val slices = if (args.length > 0) args(0).toInt else 2; val areaSqure = 100000 * slices; //並行估算areaCircle的值:也就是撒areaSqure這麼多個點,求落在圓內的多少個點,就近似等於圓的面積 val areaCircle = sc.parallelize(1 to areaSqure, slices).map{i => val x = new Random().nextInt() * 2 - 1 val y = new Random().nextInt() * 2 - 1 if (x * x + y * y < 1) 1 else 0 }.reduce(_ + _) println("Pi is roughly " + 4.0 * areaCircle / areaSqure) } }
log query
任務:如何統計每一個用戶在每臺機器(ip)上查詢(query)的次數和返回結果累積大小(byte)?
/** * 日誌查詢任務:統計每一個用戶在每臺機器(ip)上查詢(query)的次數和返回結果累積大小(byte) * 分析:key: 每一個用戶在每臺機器上的query ,value:次數和結果累積大小(byte) */ object LogQuery { val apacheLogRegex = """^([\d.]+) (\S+) (\S+) \[([\w\d:/]+\s[+\-‐]\d{4})\] "(.+?)" (\d{3}) ([\d\-‐]+) "([^"]+)" "([^"]+)".*""".r def extractKey(line : String): (String, String, String) = { apacheLogRegex.findFirstIn(line) match { case Some(apacheLogRegex(ip, _, user, dateTime, query, status, bytes, referer, ua)) => if (user != "\"-‐\"") (ip, user, query) else (null, null, null) case _ => (null, null, null) } } def extractStats(line: String): Stats = { apacheLogRegex.findFirstIn(line) match { case Some(apacheLogRegex(ip, _, user, dateTime, query, status, bytes, referer, ua)) => new Stats(1, bytes.toInt) case _ => new Stats(1, 0) } } class Stats(val count: Int, val numBytes: Int) extends Serializable { def merge(other: Stats) = new Stats(count + other.count, numBytes + other.numBytes) override def toString = "bytes=%s\tn=%s".format(numBytes, count) } def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("LogQuery").setMaster("local[1]") val sc = new SparkContext(conf) val dataset = sc.textFile(args(0)) dataset.map(line => (extractKey(line), extractStats(line))) .reduceByKey((a, b) => a.merge(b)) .collect().foreach { case (user, query) => println("%s\t%s".format(user, query)) } } }