spark RDD原理:是一個容錯的、並行的(彈性分佈式)數據結構,能夠控制數據存儲至磁盤或者內存,可以獲取數據的分區。其具體特徵,以下:java
1)建立:rdd建立有2種方式,一種爲從穩定存儲中讀取建立;另外一種從父RDD轉換獲得新的RDD。sql
2)只讀:狀態不可變,不能修改。數據庫
3)分區:支持使 RDD 中的元素根據那個 key 來分區 ( partitioning ) ,保存到多個節點上。數組
4)容錯:在 RDD 中血統 ( lineage ) ,即 RDD 有充足的信息關於它是如何從其餘 RDD 產生而來的。緩存
5)持久化:支持將會被重用的 RDD 緩存 ( 如 in-memory 或溢出到磁盤 )。數據結構
6)延遲計算: transformation操做,RDD都不會真正執行運算(記錄lineage),只有當action操做被執行時,運算纔會觸發。。併發
7)操做:豐富的轉換(transformation)和動做 ( action ) , count/reduce/collect/save 等。app
窄依賴是指每一個父RDD的一個Partition最多被子RDD的一個Partition所使用,例如map、filter、union等操做會產生窄依賴;寬依賴是指一個父RDD的Partition會被多個子RDD的Partition所使用,例如groupByKey、reduceByKey、sortByKey等操做會產生寬依賴。如:spark map算子,其父RDD 的每一個partition只被子RDD的一個partition所使用,所以,map算子數據不需求重洗,即不須要進行shuffle操做。而groupByKey,其父RDD的partition被多個子RDD的partition使用,其過程須要進行shuffle操做。分佈式
從Spark 1.6版本推出之後,Spark採用了統一內存管理模型。經過spark.memory.useLegacyMode配置,能夠控制選擇的內存管理器模式。在統一內存管理下,Spark一個executor的JVM Heap內存主要分紅了三大部分:ReservedMemory、UserMemory、SparkMemory。post
Reserved Memory區域的內存是Spark內部保留內存,會存儲一些spark的內部對象等內容,也是咱們沒法使用的部分,默認大小是300MB。
User Memory區域的內存是用戶在程序中開的對象存儲等一系列非Spark管理的內存開銷所佔用的內存(默認值爲(JVM Heap Size - Reserved Memory) * (1-spark.memory.fraction))。
Spark Memory區域的內存是用於Spark管理的內存開銷。主要分紅了兩個部分,Execution Memory和Storage Memory,經過spark.memory.storageFraction來配置兩塊各佔的大小(默認值0.5)。1)Storage Memory。主要用來存儲咱們Cache的數據和臨時空間序列化時Unroll的數據,以及Broadcast變量Cache級別存儲的內容。2)Execution Memory。主要用來存儲Spark Task執行時使用的內存(好比Shuffle時排序所須要的臨時存儲空間)。
爲了提升內存利用率,Spark統一內存管理模型針對StorageMemory 和 Execution Memory有以下策略:1)一方空閒、一方內存不足狀況下,內存不足一方能夠向空閒一方借用內存。2)Execution Memory能夠強制拿回StorageMemory在Execution Memory空閒時,借用的Execution Memory的部份內存(強制取回,而Storage Memory數據丟失,從新計算便可)。3)Storage Memory只能等待Execution Memory主動釋放佔用的StorageMemory空閒時的內存(不強制取回,由於若是task執行,數據丟失就會致使task 失敗)。
Rdd以分區、只讀等特性的彈性分佈式數據集,並提供多種數據類型數據轉換操做;
Dataframe在spark RDD基礎上,按照行爲對象組織的結構化分佈式數據集,相似於傳統關係數據庫的表,同時,相比RDD優化部分性能。
dataframe爲DataSet的特列,即dataframe能夠採用該方式表示dataSet<Row>。
Spark job以spark action算子,觸發產生具體spark job;根據spark 寬窄依賴關係,肯定spark shuffle的過程,且shuffle之間造成不一樣的stage;不一樣stage下,spark 每一個RDD的分區,將造成spark task。所以,spark job包含多個stage,而stage又包含多個task。
spark shuffle是把一組無規則的數據儘可能轉換成一組具備必定規則的數據,其將上一個stage數據傳至下一個stage。Spark 1.2之前,默認採用hash shuffle
優缺點:hash shuffle優勢:小規模計算時,效率高;不須要過多內存用戶數據排序,且shuffle過程當中,數據讀寫各只有一次。缺點:大規模數據計算時,將產生大量的臨時文件和磁盤隨機IO,下降計算性能。
Sort shuffle在hash shuffle基礎上優化,其在map階段合併大量的臨時文件,大規模計算時,提高總體計算性能。缺點:涉及數據的排序,存在內存溢出的風險。
Spark選擇容錯性採用「記錄更新」的方式。RDD只支持粗粒度轉換,即在大量記錄上執行的單個操做。將建立RDD的一系列Lineage(即血統)記錄下來,以便恢復丟失的分區。Lineage本質上很相似於數據庫中的重作日誌(Redo Log),只不過這個重作日誌粒度很大,是對全局數據作一樣的重作進而恢復數據。
Lineage根據spark RDD的依賴關係,當這個RDD的部分分區數據丟失時,它能夠經過Lineage獲取足夠的信息來從新運算和恢復丟失的數據分區。
Broadcast主要用於共享Spark在計算過程當中各個task都會用到的只讀變量,Broadcast變量只會在每臺計算機器上保存一份,而不會每一個task都傳遞一份,這樣就大大節省了空間,節省空間的同時意味着傳輸時間的減小,效率也高。
Broadcast(廣播)變量對數據傳輸進行優化,經過Broadcast變量將用到的大數據量數據進行廣播發送,能夠提高總體速度
任務調度模式有2種:FIFO、FAIR模式。其中,FIFO模式的基本原理:每一個Job被切分爲多個Stage。第一個Job優先獲取全部可用的資源,接下來第二個Job再獲取剩餘資源。以此類推,若是第一個Job並無佔用全部的資源,則第二個Job還能夠繼續獲取剩餘資源,這樣多個Job能夠並行運行。若是第一個Job很大,佔用全部資源,則第二個Job就須要等待第一個任務執行完,釋放空餘資源,再申請和分配Job。FAIR模式的基本原理:Spark在多Job之間以輪詢(round robin)方式爲任務分配資源,全部的任務擁有大體至關的優先級來共享集羣的資源。這就意味着當一個長任務正在執行時,短任務仍能夠分配到資源,提交併執行,而且得到不錯的響應時間。這樣就不用像之前同樣須要等待長任務執行完才能夠。這種調度模式很適合多用戶的場景。用戶能夠經過配置spark.scheduler.mode方式來讓應用以FAIR模式調度。FAIR調度器一樣支持將Job分組加入調度池中調度,用戶能夠同時針對不一樣優先級對每一個調度池配置不一樣的調度權重。這種方式容許更重要的Job配置在高優先級池中優先調度。
reduce是Action操做,reduceByKey是Transformation操做。
Reduce操做會將結果聚集的driver節點,當任務不少,任務的結果數據又比較大時Driver容易形成性能瓶頸。reduceByKey不是把數據聚集到Driver節點,是分佈式進行的,所以不會存在reduce那樣的性能瓶頸。
client模式的原理,Client提交任務給resourceManager,在提交任務的時候,在提交任務的那臺機器上面開啓一個driver服務進程。resourceManager在接收到client提交的任務之後,在集羣中隨機選擇一臺機器分配一個container,在該container裏面開啓一個applicationmaster服務進程。而後,driver去找applicationmaster,applicationmaster又找resourcemanager申請資源,resourcemanager會分配container,在其中開啓excuter,excuter會反向向driver註冊,driver把task放入到excuter裏面執行。Cluster模式的原理,Spark集羣會在集羣中開啓一個driver,此時開啓就是applicationmaster和driver合二爲一了。其餘的都相同。Yarn-client模式適合spark常駐應用,便於應用管理;yarn-cluster模式適合批量場景應用,減小client端壓力。
Diagnostic Messages for this Task: Container [pid=28020,containerID=container_1459428698943_31555_01_004570] is running beyond physical memory limits. Current usage: 4.0 GB of 4 GB physical memory used; 5.0 GB of 16.8 GB virtual memory used. Killing container。 |
減小每一個cup核(線程)處理的數據量,可調整參數:spark.sql.shuffle.partitions、spark.default.parallelism ;增長每一個CPU核,可以使用的內存:增大executor-memory或者executor-cores減小。
Dropping SparkListenerEvent because no remaining room in event queue. This likely means one of the SparkListeners is too slow and cannot keep up with the rate at which tasks are being started by the scheduler |
當消息隊列中的消息數超過其spark.scheduler.listenerbus.eventqueue.size設置的數量(若是沒有設置,默認爲10000)時,會將最新的消息移除,這些消息原本是通知任務運行狀態的,因爲你移除了,狀態沒法獲得更新,因此會出現上面描述的現象。
Spark監聽器模型由如下幾個部分組成:sparkListenerEvent、LiveListenerBus、SparkListener。SparklistenerEnent將事件發送至liveListerBus中,sparkListener從LiveListerBus消費事件。
現象:絕大多數task執行得很快,個別task執行很慢或處理數據大不少;歷史任務,某天忽然出現OOM。
緣由:在進行shuffle的時候,必須將各個節點上相同的key拉取到某個節點上的一個task來進行處理,好比按照key進行聚合或join等操做。此時若是某個key對應的數據量特別大的話,就會發生數據傾斜。
常看法決方法:從數據角度,將數據的key增長部分隨機,以免某一key的數據量較大;從參數調優角度,增長shuffle過程的分區數(spark.default.parallelism/ spark.sql.shuffle.partitions),減小單個task任務的數據量。減小executor的核數(executor-cores),以增長每一個task可用的內存。總體避免數據傾斜致使的OOM。
Dstream表示從數據源獲取持續性的數據流以及通過轉換後的數據流,且由持續的RDD序列組成。
從開發的角度,Dstream可認爲是對RDD的封裝。其按照時間分片,好比此採用1分鐘爲時間間隔,那麼在連續的1分鐘內收集到的數據做爲一個RDD處理單元,且隨着時間的推移,造成一系列離散的RDD處理單元。
Spark streaming從kafka讀取數據的方式有2種:基於Receiver的方式、Direct方式。其中,基於Receiver的方式利用接收器(Receiver)來接收kafka中的數據,其最基本是使用Kafka高階用戶API接口。對於全部的接收器,從kafka接收來的數據會存儲在spark的executor中,以後spark streaming提交的job會處理這些數據。Direct方式沒有receiver這一層,其會週期性的獲取Kafka中每一個topic的每一個partition中的最新offsets,以後根據設定的maxRatePerPartition來處理每一個batch。
Direct方式相較於Receiver方式的優點在於:
簡化的並行:在Receiver的方式中咱們提到建立多個Receiver以後利用union來合併成一個Dstream的方式提升數據傳輸並行度。而在Direct方式中,Kafka中的partition與RDD中的partition是一一對應的並行讀取Kafka數據,這種映射關係也更利於理解和優化。
高效:在Receiver的方式中,爲了達到0數據丟失須要將數據存入Write Ahead Log中,這樣在Kafka和日誌中就保存了兩份數據,而第二種方式不存在這個問題,只要咱們Kafka的數據保留時間足夠長,咱們都可以從Kafka進行數據恢復。
精確一次:在Receiver的方式中,使用的是Kafka的高階API接口從Zookeeper中獲取offset值,這也是傳統的從Kafka中讀取數據的方式,但因爲Spark Streaming消費的數據和Zookeeper中記錄的offset不一樣步,這種方式偶爾會形成數據重複消費。而第二種方式,直接使用了簡單的低階Kafka API,Offsets則利用Spark Streaming的checkpoints進行記錄,消除了這種不一致性。
Spark thriftserver增量取數的基本原理:driver從executor,按照增量和順序的方式獲取每一個分區數據,併發送給client端。而非增量取數方式,採用collect算子,將全部數據聚集在driver進程下的數組,再發送給client端。增量取數方式效率至關低,但減小driver內存溢出的風險。
1)sql語句通過sqlParser解析成Unresolved LogicalPlan;
2)使用analyzer結合數據字典(catalog)進行綁定,生產Resloved LogicalPlan;
3)使用optimizer對Resloved LogicalPlan進行優化,生Optimized LogicalPlan;
4)使用SparkPlan將LogicalPlan轉換成PhysicalPlan;
5)使用prepareForExecution將physicalPlan轉換成可執行物理計劃;
6)使用execute()執行可執行物理計劃,生產schemaRDD。
緣由:spark shuffle過程當中,下游stage的task數量由partition參數決定,一個task操做最後在hive/hdfs落地爲一個文件。若是分區表下,hive表分區數*spark shuffle partition數。
調優方式:減小partition數值、spark post-shuffle機制,以文件大小決定落地的hive/hdfs、spark應用程序採用reparation算子,重定義分區。
Case1:executor lost(executor oom),一般因爲executor內存配置內存有限,executor oom或觸發rdd 自動清理,致使lost task。解決方法:適當增大executor memory和spark.default.parallelism。
Case2:Kryo serialization failed: Buffer overflow,調大spark.kryoserializer.buffer.max配置參數
Case3:Total size of serialized results of 251 tasks (1504.0 MB) is bigger than spark.driver.maxResultSize (1500.0 MB),調大spark.driver.maxResultSize配置參數。
Case4:spark thriftserver增量數據,配置參數spark.sql.thriftServer.incrementalCollect,減小driver端壓力。