背景:去年10月,咱們實現了Spark 1.5.2版本運行在Mesos這個資源管理框架上。隨後Spark出了新版本咱們又對Spark進行了小升級,升級並無什麼太大的難度,沿用以前的修改過的代碼從新編譯,替換一下包,把歷史任務所有發一遍就能很好的升級到1.6.1也就是如今集羣的版本,1.6.2並無升級由於感受改動不是很大。到如今正好一年的時間,線上已經註冊了44 個Spark任務,其中28個爲Streaming任務,在運行這些任務的過程當中,咱們遇到了不少問題,其中最大的問題是動態擴容問題,即當業務線增長更復雜的代碼邏輯或者業務的增加致使處理量上升的時候會使Spark因計算資源不足,這時候若是沒有作流量控制則Spark任務會因內存承受不了而失敗,若是作了流量控制則Kafka的lag會有堆積,這時候通常就須要增長更多的executor來處理,可是增長多少合適通常不太好判斷,因而要反覆地修改配置從新發布來找到一個合理的配置。git
咱們在Marathon上使用Logstash的時候也有相似的問題,當因爲接入一個比較大的日誌致使流量忽然增長使得Logstash處理不了時,Kafka的Lag產生堆積,這時咱們只需直接上Marathon的界面上點Scale而後填入更大的實例數字就能啓動了一些Logstash實例自動平衡地去處理了。當發現某個結點是慢結點不幹活的時候,只須要在Marathon上將對應的任務Kill掉就會自動再發一個任務替補他的位置,那麼Logstash既然均可以作到爲何Spark不能夠?所以咱們決定在Spark 2.0版本的時候來實現這個功能,同時咱們也會改進其它的一些問題,另外Spark2.0是一個比較大的版本升級,配置與以前的1.6.1不一樣,不能作到直接所有重發一遍任務來作到所有升級。
swift
( 圖1)使用Logstash的管理架構架構
在這裏咱們首先介紹一些Mesos的一些相關概念,Mesos的Framework是資源分配與調度的發起者,Spark自帶了一個spark-mesos-dispacher的Framework用來管理Spark的資源調度。而Marathon也是一個Framework他的本質和mesos-dispacher或spark schedular相同。app
(圖2)Mesos-dispacher架構框架
在圖2在這個架構中,你首先得向mesos註冊一個mesos-dispacher的Framework,而後,經過spark-sumbit腳原本向mesos-dispacher發佈任務,mesos-dispacher接到任務之後開始調度他負責發一個Spark Driver,而後driver在mesos模式下,他會再次向mesos註冊這個任務的Framework也就是咱們看到的Spark UI,也能夠理解爲他本身也是個調度器,而後這個Framework根據配置來向Mesos申請資源來發一些Spark Executor。
tcp
(圖3)Mesos-dispacher功能截圖性能
從圖3能夠看出,mesos-dispacher只提供了下功能:測試
因此mesos-dispacher並非一個完備的Framework,在咱們使用的過程當中發現了存在如下的問題:優化
沒有動態擴容功能。咱們但願作到的就是可讓Spark能夠在運行時增長實例或減小,可是受於架構限制mesos-dispacher只能管理driver,若是改mesos-dispacher的代碼的話只能實現動態擴driver沒有意義。編碼
此外也有另外一種方案就是幫助Spark改進他的Framework使他更強大,可是咱們發現只須要Marathon這一個優秀的Framework就能夠了,重複造輪子的成本比較大。同時也不但願對Spark代碼有過多的修改,這樣不利於升級。
因爲mesos發佈有不少種模式,咱們在作這個時候主要考察了2種模式。
該模式須要啓動一個master做爲發佈的入口,再對每一個實例分別啓動slave。這時候每一個slave在啓動的時候資源已經固定了。再增長資源的時候須要啓動新的slave而後中止以前的任務修改資源配置數重發,這種模式的好處是有一個單獨的界面,你能夠直接給業務線這個獨立集羣模式的界面來用,界面上他們能夠根據本身固定的資源發多個任務,而且在SparkUI上能夠直接看到日誌。另外它是預先佔資源模式,在發佈時不會有資源爭搶致使資源不夠的狀況,可是缺點就是作不到運行時的動態擴容。
該模式下,咱們使用Marathon這個framework來模仿mesos-dispacher所作的事,就是先發一個driver而後再發executor掛載到driver來執行任務。關於日誌,咱們仍是使用以前的方式調用Mesos的接口來得到日誌。當須要增長資源的時候直接往結點繼續掛executor就能夠,當須要刪除結點的時候直接中止executor便可。
(圖4)仿mesos-dispacher模式
咱們要作的事其實是把圖2的架構圖變成圖4的模式,其中Step 1和Step 2須要模仿,而Step 0則不須要,由於Step 0只是啓動Framework的。咱們經過觀察meos-dispacher發現Step 1所作的其實是調用Spark Submit向Mesos註冊一個Framework而後再由driver來負責調度,咱們利用mesos的constraints的特性,設置一個不存在的不可調度的策略,例如:colo:none,這樣一來driver就沒法管理資源,而咱們使用Marathon本身來發布Spark Executor來掛到driver上來實現Marathon控制Spark的資源調度策略。因爲Mesos他是把Offer推送給Framework的這一特性,咱們使用的這種方式也不會有性能問題。
(圖5)主要代碼
那麼圖2中的Step 2是如何作到的呢?咱們經過分析Spark源代碼發現,Spark 2.0.2在Executor掛到drvier上是經過圖5的命令來作到的。因此經過Marathon發佈Spark Executor的基本原理就是模仿上面的圖5代碼。
從圖6能夠看出Marathon發佈的時候先發Spark Driver拿到mesos分配的Spark Driver的IP和PORT填入腳本,這個參數是Driver與Executor之間通訊的通道,在發Spark Executor的時候須要提供,這個Driver的IP咱們經過Mesos接口能夠拿到,由於Driver會向Mesos註冊一個Framework,咱們拿到Framework的信息就拿到了IP和PORT,同時咱們還能夠拿到FrameworkID那這個PORT是在製做Docker鏡像的時候隨機分配的一個PORT0的一個環境變量,而後經過spark.driver.port指定,這樣Executor這端就能夠調用Marathon的REST API來拿到driver的Port。
而參數executor-id是Spark Driver調度時按順序分配的ID,從0開始每次遞增1,如何生成executor-id呢?這個由Spark Executor本身生成一個不超過int的範圍的不重複的隨機數便可,這個的ID的不會影響其它行爲。hostname能夠直接經過命令獲取。cores是咱們經過用戶提交的配置來計算出來的,這個Core須要填spark.executor.cores也就是每一個Spark Executor的正常使用的Core與spark.mesos.extra.cores分配給每一個Spark Executor之和。
(圖6)Executor發佈示意圖
最後一項目app-id經過研究發如今Mesos上實際上就是Framework ID直接經過Mesos接口就能夠得到。這樣咱們就完成了Executor的發佈,經過拼上述的命令來把Spark Executor掛到了Driver上,可是實際生產應用中,咱們發現了,他還存在Driver和Executor的同步問題。
這裏介紹一下在Kafka使用了高階API時,影響Spark性能的Receiver平衡問題,使用低階API則不會有這個問題。若是使用Spark提供的Kafka高階API,你會在代碼裏預先指定好Receiver的數量,而後再作一個Union,在Spark代碼中他其實是這樣作的,他會先等待Executor連上Driver,默認是30s若是超過了調度的時間則開始進行Receiver的調度,而調度策略是ReceiverPolicy類裏寫死的,ReceiverPolicy的調度策略能夠歸納爲,儘可能保證均勻的分配給每一個Host必定量的Recevier。
(圖7)啓動3個Spark Executor 示例
舉個例子來講,如圖7當你啓動了3個Spark Executor時,若是代碼裏指定了啓動1個Executor,若是每一個Executor啓動在了不一樣的Host下,Spark在Receiver調度開始時隨機地指定一個Executor啓動Receiver並分配1個Core給這個Task。可是若是代碼裏指定爲2個Receiver而2個Executor啓動在了同1個Host1上,另外一個啓動在了Host2上,也就是Receiver的數量等於Host Unique數量,則他會在Host1中保證其中的一個Executor啓動1個Receiver,Host2中啓動一個Receiver。若是Receiver的數量,大於了Host Unique的數量如第三張圖,則他會在隨機地在Host1或者Host2中開Receiver,這就帶來了一個問題。分析Spark源代碼可知Spark Driver和Spark Executor之間經過運行一個DummyJob,也就是一個MapReduce任務來保證他們之間的同步的,可是他這種作法只能保證一個Spark Executor掛在了Spark Driver上,而不可以保證全部的Executor好比當只有一個Spark Executor掛在Spark Driver上的時候,這時候開始Receiver開始調度。
讀過Spark官方文檔的朋友都知道,Spark提供了2個參數去解決這個問題,他們分別是spark.scheduler.maxRegisteredResourcesWaitingTime用來設置一個等待Executor掛上的時間和spark.scheduler.minRegisteredResourcesRatio用來檢查資源分配的比例,可是使用咱們這種方式這兩個參數都不起做用了,由於Spark在實現的過程當中是經過一個運行一個DummyJob來保證這種掛載的方式同步的,這也是爲何第一個任務必定是70個Task的緣由,可是他這種方式只能保證一個Executor掛上去了之後纔開始調度Recevier,所以咱們對源代碼進行了修改,主要是ReceiverTracker那部分經過咱們自定義的一個配置,讓Executor數量達到指定的個數之後纔開始發佈,這樣在Receiver調度的時候纔會保證可以均勻地分配在各個結點,從而實現最好的性能。另外對於業務線寫的jar包,咱們是要求打成assembly包而後提交到咱們的發佈系統,發佈系統會上傳到swift上,在發佈的時候,咱們會先在容器裏把包下載下來,而後啓動Spark Driver,而當Spark Executor掛在Spark Driver上的時候,他們會自動從Spark Driver獲取對應的jar包。
以前在部署1.6.1的mesos-dispacher架構的時候,咱們就已經發現, Spark打出的中文日誌會產生亂碼,而後咱們作了各類實驗發現,不管如何設置JVM參數,或是使用代碼進行內部的轉換都解決不了亂碼問題,在新架構的Docker環境中也不例外,不過最終仍是讓咱們解決了,咱們發現經過設置JAVA_TOOL_OPTIONS這個環境變量,JAVA虛擬機的參數才真正的修改生效,因而咱們在容器啓動的時候配置了file.encoding=UTF-8,亂碼問題得才以解決,除此以外在Docker鏡像中系統的時間也是不許確的,默認是UTC時間,而系統時間對代碼的影響也很大,有可能寫入到HDFS的文件是以時間戳生成的,咱們一開始解決這個問題的方法是經過以只讀的方式在Docker中掛載宿主機上的/etc/localtime來修正時間,可是發現時間仍是不正確,這時由於Spark內部還會根據時區自動修正時間爲UTC,因此還須要給JVM加一個環境變量設置user.timezone=PRC 這樣時間才能夠保證時間是對的,另外使用這種架構的時候spark.driver.extraJavaOptions和spark.executor.extraJavaOptions這兩個參數也不會生效,須要用戶經過發佈配置傳過來,而後在容器中追加到JAVA_TOOL_OPTIONS。另外值得注意的是SPARK_EXECUTOR_MEMORY也不會同步,須要手動來進行設置。
雖然咱們以前解決了marathon發佈driver和executor之間的鏈接問題,可是因爲mesos接口慢,在咱們實際測試中,發30個executor就能夠把mesos打掛,所以,咱們想了另外一個辦法來解決這個問題,咱們首先修改了Spark代碼,讓他的Spark Driver在不依賴mesos-dispacher的狀況下實現driver的HA,HA的實現原理大概就是每次在Spark Driver啓動註冊Framework的時候,把Framework ID存到zk裏,而後在程序掛掉了之後保持Framework與Mesos的鏈接,在下次啓動的時候從新註冊這個Framework,這樣的話,Framework ID能夠基本保持不變,在發佈Spark Executor的時候就能夠固定住這個Framework ID在Executor掛掉的時候marathon拉起來也能保證重連,而driver若是掛掉的話,他會從新註冊,得到的Framework ID不變,又能夠繼續運行,這樣作只須要在Spark Driver發佈完成之後調用一次Mesos接口拿到Framework ID分發給Spark Executor就能夠了。順便說一下Spark Executor拿Spark Driver的ip和port是經過調Marathon接口實現了,而Marathon接口速度很快,不會有這個問題。
對於業務線的任務來講升級Spark是一件比較麻煩的事,主要緣由是須要他們改代碼,不過從改代碼的角度來講,變化也不算大,也就是Spark版本和Scala版本變一下,另外就是有些API也須要作一點調整,另外就是升級麻煩的另外一個緣由也是由於以前沒有使用Marathon+Docker的模式,若是以前就使用了這種模式,那隻須要把鏡像給修改了,願意升級的升級,不肯意升級的可使用原來的鏡像跑,在之後的升級中,咱們只須要製做新鏡像就能夠了,很是方便遷移,可讓他跑在任何集羣。那如今爲了過渡到這種模式,再結合以前發佈的經驗,咱們使用的模式是舊的有一套配置,新的也有一套配置,而後經過在git上打tag的方式,在舊的配置里加入升級信息,而後發佈邏輯改成優先讀取是否要升級,若是須要升級則發在新集羣上,若是不須要則保持原來不變,咱們會先讓業務線進行測試,同時保持舊的任務在線,當他們測試經過了之後,再中止舊的做務,把改好的新版本發到新集羣上,當發現有問題的時候能夠用原來的tag進行回滾,由於原來的tag裏的配置會先判斷是否須要升級,而以前的配置確定沒有須要升級的選項。
Spark自身有一套metric監控,這個在新版本也不例外,在咱們集羣中惟一的變動就是把不靠譜的udp改爲了tcp,另外咱們由於使用的是Docker容器,這樣咱們就還有另外一套監控,這個監控是分析cgroup裏的數據,使用的是咱們開源的pyadvisor來作的,咱們能夠經過監控來觀察CPU和內存的使用狀況,很好的提出優化改進資源使用的建議,另外,對於業務線們,咱們推薦他們使用的是Spark裏自帶的Accumulator,先在Spark Driver上作一個聚合1分鐘的指標,而後再往watcher上打他們的業務指標,這樣即不會有以前不一樣host之間的聚合指標的問題,同時也給watcher減輕了壓力。
以上就是咱們所作的新的Spark架構,綜合看來有如下的優勢: