原文:『 Spark 』2. spark 基本概念解析 html
本系列是綜合了本身在學習spark過程當中的理解記錄 + 對參考文章中的一些理解 + 我的實踐spark過程當中的一些心得而來。寫這樣一個系列僅僅是爲了梳理我的學習spark的筆記記錄,並不是爲了作什麼教程,因此一切以我的理解梳理爲主,沒有必要的細節就不會記錄了。若想深刻了解,最好閱讀參考文章和官方文檔。python
其次,本系列是基於目前最新的 spark 1.6.0 系列開始的,spark 目前的更新速度很快,記錄一下版本好仍是必要的。
最後,若是各位以爲內容有誤,歡迎留言備註,全部留言 24 小時內一定回覆,很是感謝。
Tips: 若是插圖看起來不明顯,能夠:1. 放大網頁;2. 新標籤中打開圖片,查看原圖哦。git
用戶在 spark 上構建的程序,包含了 driver 程序以及在集羣上運行的程序代碼,物理機器上涉及了 driver,master,worker 三個節點.github
建立 sc ,定義 udf 函數,定義一個 spark 應用程序所須要的三大步驟的邏輯:加載數據集,處理數據,結果展現。web
集羣的資源管理器,在集羣上獲取資源的外部服務。
拿 Yarn 舉例,客戶端程序會向 Yarn 申請計算我這個任務須要多少的 memory,多少 CPU,etc。
而後 Cluster Manager 會經過調度告訴客戶端可使用,而後客戶端就能夠把程序送到每一個 Worker Node 上面去執行了。數據庫
集羣中任何一個能夠運行spark應用代碼的節點。Worker Node就是物理節點,能夠在上面啓動Executor進程。express
在每一個 Worker Node 上爲某應用啓動的一個進程,該進程負責運行任務,而且負責將數據存在內存或者磁盤上,每一個任務都有各自獨立的 Executor。
Executor 是一個執行 Task 的容器。它的主要職責是:apache
初始化程序要執行的上下文 SparkEnv,解決應用程序須要運行時的 jar 包的依賴,加載類。編程
同時還有一個 ExecutorBackend 向 cluster manager 彙報當前的任務狀態,這一方面有點相似 hadoop的 tasktracker 和 task。api
總結:Executor 是一個應用程序運行的監控和執行容器。
包含不少 task 的並行計算,能夠認爲是 Spark RDD 裏面的 action,每一個 action 的觸發會生成一個job。
用戶提交的 Job 會提交給 DAGScheduler,Job 會被分解成 Stage,Stage 會被細化成 Task,Task 簡單的說就是在一個數據 partition 上的單個數據處理流程。關於 job,stage,task,詳細能夠參考這篇文章:『 Spark 』6. 深刻研究 spark 運行原理之 job, stage, task
A job is triggered by an action
, like count()
or saveAsTextFile()
, click on a job to see info about the stages
of tasks
inside it.
一個 Job 會被拆分爲多組 Task,每組任務被稱爲一個 Stage 就像 Map Stage, Reduce Stage。
Stage 的劃分在 RDD 的論文中有詳細的介紹,簡單的說是以 shuffle 和 result 這兩種類型來劃分。
在 Spark 中有兩類 task:
shuffleMapTask
輸出是shuffle所需數據, stage的劃分也以此爲依據,shuffle以前的全部變換是一個stage,shuffle以後的操做是另外一個stage。
resultTask,
輸出是result,好比 rdd.parallize(1 to 10).foreach(println) 這個操做沒有shuffle,直接就輸出了,那麼只有它的task是resultTask,stage也只有一個;若是是rdd.map(x => (x, 1)).reduceByKey(_ + _).foreach(println), 這個job由於有reduce,因此有一個shuffle過程,那麼reduceByKey以前的是一個stage,執行shuffleMapTask,輸出shuffle所需的數據,reduceByKey到最後是一個stage,直接就輸出結果了。若是job中有屢次shuffle,那麼每一個shuffle以前都是一個stage。
被送到 executor 上的工做單元。
Partition 相似 hadoop 的 Split,計算是以 partition 爲單位進行的,固然 partition 的劃分依據有不少,這是能夠本身定義的,像 HDFS 文件,劃分的方式就和 MapReduce 同樣,以文件的 block 來劃分不一樣的 partition。總而言之,Spark 的 partition 在概念上與 hadoop 中的 split 是類似的,提供了一種劃分數據的方式。
先看看原文 [Resilient Distributed Datasets: A Fault-Tolerant Abstraction for
In-Memory Cluster Computing](http://litaotao.github.io/files/spark-rd... 是怎麼介紹 RDD 的。
a distributed memory abstraction
that lets programmers perform in-memory computations on large clusters in a fault-tolerant manner.
RDDs are motivated by two types of applications that current computing frameworks handle inefficiently:
iterative algorithms;
interactive data mining tools;
In both cases, keeping data in memory can improve performance by an order of magnitude.
To achieve fault tolerance efficiently, RDDs provide a restricted form of shared memory, based on coarsegrained transformations rather than fine-grained updates to shared state. However, we show that RDDs are expressive enough to capture a wide class of computations, including recent specialized programming models for iterative jobs, such as Pregel, and new applications that these models do not capture. We have implemented RDDs in a system called Spark, which we evaluate through a variety of user applications and benchmarks.
每一個RDD有5個主要的屬性:
一組分片(partition),即數據集的基本組成單位
一個計算每一個分片的函數
對parent RDD的依賴,這個依賴描述了RDD之間的lineage
對於key-value的RDD,一個Partitioner,這是可選擇的
一個列表,存儲存取每一個partition的preferred位置。對於一個HDFS文件來講,存儲每一個partition所在的塊的位置。這也是可選擇的
把上面這5個主要的屬性總結一下,能夠得出RDD的大體概念。首先要知道,RDD大概是這樣一種表示數據集的東西,它具備以上列出的一些屬性。是spark項目組設計用來表示數據集的一種數據結構。而spark項目組爲了讓RDD能handle更多的問題,又規定RDD應該是隻讀的,分區記錄的一種數據集合中。能夠經過兩種方式來建立RDD:一種是基於物理存儲中的數據,好比說磁盤上的文件;另外一種,也是大多數建立RDD的方式,即經過其餘RDD來建立【之後叫作轉換】而成。而正由於RDD知足了這麼多特性,因此spark把RDD叫作Resilient Distributed Datasets,中文叫作彈性分佈式數據集。不少文章都是先講RDD的定義,概念,再來講RDD的特性。我以爲其實也能夠倒過來,經過RDD的特性反過來理解RDD的定義和概念,經過這種由果溯因的方式來理解RDD也何嘗不可。反正對我我的而言這種方式是挺好的。
RDD是Spark的核心,也是整個Spark的架構基礎,能夠總下出幾個它的特性來:
它是不變的數據結構存儲
它是支持跨集羣的分佈式數據結構
能夠根據數據記錄的key對結構進行分區
提供了粗粒度的操做,且這些操做都支持分區
它將數據存儲在內存中,從而提供了低延遲性
關於 rdd 的更多詳情,能夠參考這篇文章:『 Spark 』4. spark 之 RDD
先看看 api 文檔裏是怎麼說的:parallelize
parallelize(c, numSlices=None)
Distribute a local Python collection to form an RDD. Using xrange is recommended if the input represents a range for performance.
簡單的說,parallelize 就是把 driver 端定義的一個數據集,或者一個獲取數據集的生成器,分發到 worker 上的 executor 中,以供後續分析。這種方式在測試代碼邏輯時常常用到,但在構建真正的 spark 應用程序時不多會用到,通常都是從 hdfs 或者數據庫去讀取數據。
提交 spark 應用時,spark 會把應用代碼分發到全部的 worker 上面,應用依賴的包須要在全部的worker上都存在,有兩種解決 worker 上相關包依賴的問題:
選用一些工具統一部署 spark cluster;
在提交 spark 應用的時候,指定應用依賴的相關包,把 應用代碼,應用依賴包 一塊兒分發到 worker;
cache 是否支持 priority,目前不支持,並且 spark 裏面對 rdd 的 cache 和咱們常見的緩存系統是不同的。細節能夠找我討論。
The number of cores to use on each executor. For YARN and standalone mode only. In standalone mode, setting this parameter allows an application to run multiple executors on the same worker, provided that there are enough cores on that worker. Otherwise, only one executor per application will run on each worker.
每個 core,至關於一個 worker 上的進程,這些進程會同時執行分配到這個 worker 上的任務。簡單的說,就是 spark manager 把一個 job 切分幾個 task 分發到 worker 上同步執行,而每一個 worker 把分配給本身的 task 再切分紅幾個 subtask,分配給當前 worker 上的不一樣進程。
分配給 spark 應用的內存是僅僅給 cache 數據用嗎?
不是,分配給 spark 應用的內存有三個方面的應用:
spark 自己
spark 應用
spark 應用過程當中 runtime 使用,好比 UDF 函數
spark 應用中的 cache
RDD 之間的依賴類別[ 或者,建立一個 RDD 的不一樣方法 ]
所謂本地內存,是指在 driver 端的程序所須要的內存,由 driver 機器提供,通常用來生成測試數據,接受運算結果等;
所謂集羣內存,是指提交到集羣的做業可以向集羣申請的最多內存使用量,通常用來存儲關鍵數據;
能夠在啓動 spark 應用的時候申請;徹底可控。
FIFO 資源分配方式。
spark app 的起點和入口,通常用來加載數據集,生成第一個 rdd。
不會,只會cache一次。stackoverflow
下一篇,經過幾個簡單的例子來介紹 spark 的基本編程模式。