在平常的工做中,無論是生產環境仍是實驗環境,咱們運行Spark任務都是基於Spark集羣環境,雖然有時候咱們能夠在本地使用Maven來搭建spark的開發環境來作一些測試,來完成代碼的編寫。(上家公司是一個例外,在本地IDEA寫完程序,就能夠直接鏈接到大數據平臺HDP直接運行)可是正規的場景下,辦公網絡和集羣網絡是隔離的,因此咱們編寫的spark任務,都要依賴於各類數據源,e.g.HDFS、Kafka、RDBMS,因此都要打成Jar包放到集羣環境來運行,將數據寫到設計好的目的地。java
Spark cluster is just some computers running Spark and working together.node
cluster consist on:
Master: is one of the computers that orchestrate(安排協調) how everything works. It distributes the work and take care of everything.
Slaves: these are the computers that get the job done. They process chunks(大塊的) of your massive (大量的)datasets following the Map Reduce paradigm(範式樣式). A computer can be master and slave at the same time.spring
It just mean that Spark is installed in every computer involved in the cluster. The cluster manager in use is provided by Spark. (‘standalone’是spark自身內建的cluster manager(集羣管理器 ))There are other cluster managers like Apache Mesos and Hadoop YARN.sql
![圖片上傳中...]shell
簡單的介紹完Spark Cluster後,下面給出Spark 2.x版本的安裝指南,爲何沒有具體的寫一遍安裝教程呢,由於Spark集羣環境的搭建相對於Hadoop的安裝來講,簡單的多,只要是按照步驟,一步一步的配置,就會成功。
安裝指南:
https://www.davidadrian.cc/po...apache
Spark 應用在集羣上做爲獨立的進程組來運行,在您的 main 程序中經過 SparkContext 來協調(稱之爲 driver 程序)。具體的說,爲了運行在集羣上,SparkContext 能夠鏈接至幾種類型的 Cluster Manager(既能夠用 Spark 內置的 Standlone Cluster Manager,也可使用外部的Mesos和 YARN),它們會分配應用的資源。一旦鏈接上,Spark 得到集羣中節點上的 Executor,這些進程能夠運行計算而且爲您的應用存儲數據。接下來,它將發送您的應用代碼(經過 JAR 或者 Python 文件定義傳遞給 SparkContext)至 Executor。最終,SparkContext 將發送 Task 到 Executor 以運行。編程
每一個應用獲取到它本身的 Executor 進程,它們會保持在整個應用的生命週期中而且在多個線程中運行 Task(任務)。這樣作的優勢是把應用互相隔離,在調度方面(每一個 driver 調度它本身的 task)和 Executor 方面(來自不一樣應用的 task 運行在不一樣的 JVM 中)。然而,這也意味着如果不把數據寫到外部的存儲系統中的話,數據就不可以被不一樣的 Spark 應用(SparkContext 的實例)之間共享。Spark 是不知道底層的 Cluster Manager 究竟是什麼類型的。只要它可以得到 Executor 進程,而且它們能夠和彼此之間通訊,那麼即便是在一個也支持其它應用的 Cluster Manager(例如,Mesos / YARN)上來運行它也是相對簡單的。Driver 程序必須在本身的生命週期內(例如,請參閱 在網絡配置章節中的 spark.driver.port 章節。 監聽和接受來自它的 Executor 的鏈接請求。一樣的,driver 程序必須能夠從 worker 節點上網絡尋址(就是網絡沒問題)。由於 driver 調度了集羣上的 task(任務),更好的方式應該是在相同的局域網中靠近 worker 的節點上運行。若是您不喜歡發送請求到遠程的集羣,倒不如打開一個 RPC 至 driver 並讓它就近提交操做而不是從很遠的 worker 節點上運行一個 driver。緩存
Application:基於Spark的用戶程序,包含了一個driver program和集羣中多個executor
Driver Program:運行Application的main()函數並建立SparkContext。一般SparkContext表明driver program
Executor:爲某Application運行在worker node上的一個進程。該進程負責運行Task,並負責將數據存在內存或者磁盤上。每一個Application都有本身獨立的executors
Cluster Manager: 在集羣上得到資源的外部服務(例如 Spark Standalon,Mesos、Yarn)
Worker Node: 集羣中任何可運行Application代碼的節點
Task:被送到executor上執行的工做單元。
Job:能夠被拆分紅Task並行計算的工做單元,通常由Spark Action觸發的一次執行做業。
Stage:每一個Job會被拆分紅不少組Task,每組任務被稱爲stage,也可稱TaskSet。該術語能夠常常在日誌中看打。
RDD:Spark的基本計算單元,經過Scala集合轉化、讀取數據集生成或者由其餘RDD通過算子操做獲得。安全
系統目前支持三種 Cluster Manager:
Standalone – 包含在 Spark 中使得它更容易來安裝集羣的一個簡單的 Cluster Manager。
Apache Mesos – 一個通用的 Cluster Manager,它也能夠運行 Hadoop MapReduce 和其它服務應用。
Hadoop YARN –Hadoop 2 中的 resource manager(資源管理器)。Kubernetes (experimental) – 除了上述以外,還有 Kubernetes 的實驗支持。 Kubernetes 提供以容器爲中心的基礎設施的開源平臺。 Kubernetes 的支持正在 apache-spark-on-k8s Github 組織中積極開發。有關文檔,請參閱該項目的 README。服務器
Spark 有好幾計算資源調度的方式. 首先,回憶一下 集羣模式概述, 每一個Spark 應用(包含一個SparkContext實例)中運行了一些其獨佔的執行器(executor)進程. 集羣管理器提供了Spark 應用與應用之間的資源調度scheduling across applications.其次, 在各個Spark應用內部,各個線程可能併發地經過action算子提交多個Spark做業(job).若是你的應用服務於網絡請求,那這種狀況是很常見的.在Spark應用內部(對應同一個SparkContext)各個做業之間,Spark默認FIFO調度,同時也能夠支持公平調度 fair scheduler.
若是在集羣上運行,每一個Spark應用都會SparkContext得到一批獨佔的執行器JVM,來運行其任務並存儲數據. 若是有多個用戶共享集羣,那麼會有不少資源分配相關的選項,如何設計還取覺於具體的集羣管理器.
對Spark所支持的各個集羣管理器而言,最簡單的的資源分配,就是靜態劃分.這種方式就意味着,每一個Spark應用都是設定一個最大可用資源總量,而且該應用在整個生命週期內都會佔住這個資源.這種方式在 Spark’s獨立部署 standalone 和 YARN調度,以及Mesos粗粒度模式下均可用.coarse-grained Mesos mode .
Standalone mode: 默認狀況下,Spark應用在獨立部署的集羣中都會以FIFO(first-in-first-out)模式順序提交運行,而且每一個spark應用都會佔用集羣中全部可用節點.不過你能夠經過設置spark.cores.max或者spark.deploy.defaultCores來限制單個應用所佔用的節點個數.最後,除了能夠控制對CPU的使用數量以外,還能夠經過spark.executor.memory來控制各個應用的內存佔用量.
Mesos: 在Mesos中要使用靜態劃分的話,須要將spark.mesos.coarse設爲true,一樣,你也須要配置spark.cores.max來控制各個應用的CPU總數,以及spark.executor.memory來控制各個應用的內存佔用.Mesos上還有一種動態共享CPU的方式。在這種模式下,每一個Spark應用的內存佔用仍然是固定且獨佔的(仍由spark.exexcutor.memory決定),可是若是該Spark應用沒有在某個機器上執行任務的話,那麼其它應用能夠佔用該機器上的CPU。這種模式對集羣中有大量不是很活躍應用的場景很是有效,例如:集羣中有不少不一樣用戶的Spark shell session.但這種模式不適用於低延時的場景,由於當Spark應用須要使用CPU的時候,可能須要等待一段時間才能取得對CPU的使用權。要使用這種模式,只須要在mesos://URL上設置spark.mesos.coarse屬性爲false便可。
YARN: 在YARN中須要使用 –num-executors 選項來控制Spark應用在集羣中分配的執行器的個數.對於單個執行器(executor)所佔用的資源,可使用 –executor-memory和–executor-cores來控制.
值得注意的是,目前尚未任何一種資源分配模式支持跨Spark應用的內存共享。若是你想經過這種方式共享數據,咱們建議你能夠單獨使用一個服務(例如:alluxio),這樣就能實現多應用訪問同一個RDD的數據。
動態資源分配
Spark 提供了一種基於負載來動態調節Spark應用資源佔用的機制。這意味着,你的應用會在資源空閒的時間將其釋放給集羣,須要時再從新申請。這一特性在多個應用Spark集羣資源的狀況下特別有用.
這個特性默認是禁止的,可是在全部的粗粒度集羣管理器上都是可用的,如:i.e. 獨立部署模式standalone mode, YARN mode, and 粗粒度模式Mesos coarse-grained mode.
整體上來講,Spark應該在executors 空閒時將其關閉,而在後續要用時再申請。由於沒有一個固定的方法,能夠預測一個executors 在後續是否立刻會被分配去執行任務,或者一個新分配的executors 其實是空閒的,因此咱們須要一個試探性的方法,來決定是否申請或是移除一個executors 。
一個啓用了動態分配的Spark應用會有等待任務須要調度的時候,申請額外的executors 。在這種狀況下,一定意味着已有的executors 已經不足以同時執行全部未完成的任務。Spark會分輪次來申請executors 。實際的資源申請,會在任務掛起spark.dynamicAllocation.schedulerBacklogTimeout秒後首次觸發,其後若是等待隊列中仍有掛起的任務,則每過spark.dynamicAllocation.sustainedSchedulerBacklogTimeout秒後觸發一次資源申請。另外,每一輪申請的executors 個數以指數形式增加。例如:一個Spark應用可能在首輪申請1個執行器,後續的輪次申請個數多是2個、4個、8個….。
採用指數級增加策略的緣由有兩個:第一,對於任何一個Spark應用若是隻須要多申請少數幾個executors 的話,那麼必須很是謹慎的啓動資源申請,這和TCP慢啓動有些相似;第二,若是一旦Spark應用確實須要申請多個執行器的話,那麼能夠確保其所需的計算資源及時增加。
移除executors 的策略就簡單得多了。Spark應用會在某個executors 空閒超過spark.dynamicAllocation.executorIdleTimeout秒後將其刪除,在大多數狀況下,executors 的移除條件和申請條件都是互斥的,也就是說,executors 在有等待執行任務掛起時,不該該空閒。
非動態分配模式下,executor可能的退出緣由有執行失敗或是相關Spark應用已經退出。無論是哪一種緣由,執行器的全部狀態都已經再也不須要,能夠丟棄掉。可是在動態分配的狀況下,executor有可能在Spark應用運行期間被移除。這時候,若是Spark應用嘗試去訪問該executor存儲的狀態,就必須重算這一部分數據。所以,Spark須要一種機制,可以優雅的關閉executor,同時還保留其狀態數據。
這種需求對於shuffle操做尤爲重要。shuffle過程當中,Spark 執行器首先將 map 輸出寫到本地磁盤,同時executor自己又是一個文件服務器,這樣其餘executor就可以經過該執行器得到對應的 map 結果數據。一旦有某些任務執行時間過長,動態分配有可能在shuffle結束前移除任務異常的執行器,而這些被移除的執行器對應的數據將會被從新計算,但這些重算實際上是沒必要要的。
要解決這一問題,就須要用到 external shuffle service ,該服務在 Spark 1.2 引入。該服務在每一個節點上都會啓動一個不依賴於任何 Spark 應用或executor的獨立進程。一旦該服務啓用,Spark 執行器再也不從各個執行器上獲取 shuffle 文件,轉而從這個 service 獲取。這意味着,任何執行器輸出的shuffle狀態數據均可能存留時間比對應的執行器進程還長。
除了shuffle文件以外,executor也會在磁盤或者內存中緩存數。一旦executor被移除,其緩存數據將沒法訪問。這個問題目前尚未解決。或許在將來的版本中,可能會採用外部shuffle服務相似的方法,將緩存數據保存在堆外存儲中以解決這一問題。
在指定的 Spark 應用內部(對應同一 SparkContext 實例),多個線程可能併發地提交 Spark 做業(job)。在本節中,做業(job)是指,由 Spark action 算子(如 : collect)觸發的一系列計算任務的集合。Spark 調度器是徹底線程安全的,而且可以支持 Spark 應用同時處理多個請求(好比 : 來自不一樣用戶的查詢)。
默認,Spark 應用內部使用 FIFO 調度策略。每一個做業被劃分爲多個階段(stage)(例如 : map 階段和 reduce 階段),第一個做業在其啓動後會優先獲取全部的可用資源,而後是第二個做業再申請,再第三個……。若是前面的做業沒有把集羣資源佔滿,則後續的做業能夠當即啓動運行,不然,後提交的做業會有明顯的延遲等待。
不過從 Spark 0.8 開始,Spark 也能支持各個做業間的公平(Fair)調度。公平調度時,Spark 以輪詢的方式給每一個做業分配資源,所以全部的做業得到的資源大致上是平均分配。這意味着,即便有大做業在運行,小的做業再提交也能當即得到計算資源而不是等待前面的做業結束,大大減小了延遲時間。這種模式特別適合於多用戶配置。 要啓用公平調度器,只需設置一下 SparkContext 中 spark.scheduler.mode 屬性爲 FAIR 便可 :
val conf = new SparkConf().setMaster(...).setAppName(...) conf.set("spark.scheduler.mode", "FAIR") val sc = new SparkContext(conf)
以上就是Spark集羣環境的做業調度和資源調度的介紹,更加詳細的講解,能夠自行Google
Spark開發環境的搭建,我就拿我本身日常常用的IDEA+Maven來舉例,固然如今不少人在用sbt,這只是我的習慣問題。
注: 這種開發環境不依賴與Spark集羣環境,直接打開IDEA進行下面的操做就能夠~~~~
1.打開IDEA,建立Maven項目
(臨時掉鏈子,archetype刷新不出來,選擇maven-archetype-quickstart就ok)
2.隨後打開pom.xml文件,加入下面內容
<build> <defaultGoal>package</defaultGoal> <finalName>sparkprocess</finalName> <outputDirectory>target/classes</outputDirectory> <resources> <resource> <directory>${project.basedir}/src/main/scala</directory> <excludes> <exclude>**/*.java</exclude> </excludes> </resource> <resource> <targetPath>config/</targetPath> <directory>${project.basedir}/src/main/resources</directory> <includes> <include>*.*</include> </includes> </resource> <resource> <directory>${project.basedir}/src/main/resources</directory> <includes> <include>*.*</include> </includes> </resource> </resources> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"> <resource>META-INF/spring.handlers</resource> </transformer> <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"> <resource>META-INF/spring.schemas</resource> </transformer> </transformers> </configuration> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>1.7</source> <target>1.7</target> </configuration> </plugin> </plugins> <sourceDirectory>src/main/scala</sourceDirectory> </build>
<dependencies> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>2.11.8</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>2.1.0</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>2.1.0</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.11</artifactId> <version>2.1.0</version> </dependency> <!--若是Spark streaming對接Kafka,就要使用這個依賴--> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-10_2.11</artifactId> <version>2.1.0</version> </dependency> </dependencies>
以上就是要在pom.xml文件中加入的依賴
其中:
在項目工程中咱們要引入JDK 1.8以上
若是咱們使用scala來開發Spark,咱們還要引入Scala環境。其中scala環境的選擇有兩種方式:
1.咱們能夠在window本機上,像安裝java環境同樣,安裝scala,再在IDEA中引入。
2.在pom.xml文件中引入scala的依賴,本教程的方式就是第二種方式,引入了scala 2.11.8
在上面的依賴中,咱們看到了3個spark的依賴,其中Spark Core是必須引入的,而其餘則按照咱們須要進行的spark任務須要用到的Spark組件來進行選擇性的引入,具體的緣由,我會在接下來每一個Spark組件的分享中來介紹。
下一篇分享就會進入到Spark的編程實踐了,會從Spark Core開始,開啓Spark開發之路