Spark_總結五

 轉載請標明出處http://www.cnblogs.com/haozhengfei/p/e353daff460b01a5be13688fe1f8c952.html html


Spark_總結五

1.Storm 和 SparkStreaming區別

Storm                      純實時的流式處理,來一條數據就當即進行處理
SparkStreaming 微批處理,每次處理的都是一批很是小的數據
Storm支持動態調整並行度(動態的資源分配),SparkStreaming(粗粒度, 比較消耗資源)
 
Storm 優勢 || 缺點
Storm 流式計算(扶梯)
    優勢: 數據延遲度很低Storm的事務機制要比SparkStreaming的事務機制要完善 什麼是事務機制?對於一條數據,很少處理也很多處理,對於一條數據剛好處理一次,好比金融,股票等要求實時性比較高,那麼就須要選Storm
    缺點: 一直持有着資源, 每一條數據都要在集羣中某一臺節點處理,要計算的數據會進行網絡傳輸,吞吐量小,另外Storm不適合作複雜的業務邏輯(適合彙總)

SparkStreaming 優勢 || 缺點

SparkStreaming 微批處理(相似於電梯), 它並非純的批處理
    優勢: 吞吐量大,能夠 作複雜的業務邏輯(保證每一個job的處理小於batch interval)
    缺點: 數據延遲度較高
 
公司中爲何選用SparkStreaming要多一些?
    1.秒級別延遲,一般應用程序是能夠接受的,
    2.能夠應用機器學習,SparkSQL...可擴展性比較好,數據吞吐量較高

2.SparkStreaming

2.1什麼是SparkStreaming?

     SparkStreaming是一個流式處理框架,處理的模式是微批處理(微批有多大?經過時間來設置這個批有多大 [For example:Batch Interval 5s]
     SparkStreaming 基於DStream(Discretized Streams:離散的數據流 )來進行編程,處理的是一個流,這個流何時切成一個rdd-->根據batchinterval來決定什麼時候切割成一個RDD。

SparkStreaming 架構圖

    job的個數是由output operator決定的, StreamContext底層封裝了SparkContext
 

2.2圖解SparkStreaming   ||   SparkStreaming執行流程

   從圖上能夠看到,Batch Interval的間隔是5s,也就是說每通過5s,SparkStreaming會將這5s內的信息封裝成一個DStream,而後提交到Spark集羣進行計算
執行流程
    第一個 DStream 裏面是 0-5s 的數據,在第6s的時候會觸發 DStream 的job執行,這時會另啓動一個線程執行這個job (假設這個job只須要3s) ,同時在6-10s期間繼續接受數據,在第11s的時候會觸發 DStream 的job的執行, 這時會另啓動一個線程執行這個job (假設這個job只須要3s) 同時在11-15s期間繼續接受數據...
 
注意!
     若是這個job執行的時間大於5s會有什麼問題?
    數據在5s內處理不完,又啓動一個job,致使數據越積越多, 從而致使  SparkStreaming down

2.3SparkStreaming代碼TransformOperator

案例:過濾黑名單
    這裏模擬了一份黑名單,SparkStreaming監控服務器中指定端口,時間設定爲每5秒處理一次數據。每當job觸發時,用戶輸入的數據與黑名單中的數據進行左外鏈接,而後過濾
 
node1 建立一個Socket Server
    nc -lk 8888 (頁面停下了,開始輸入數據進入8888端口,此時SparkStreaming監聽這個端口)
    hello world
    hello jack
    hello tom( 過濾tom)
result:
 
注意事項!
1.爲何會沒有數據?
    由於只開啓了一條線程(這裏只有接收數據的線程),因此local的模擬SparkStreaming必須至少設置兩個線程, new SparkConf().setMaster("local[2]").setAppName("TransformBlacklist");
 
2.Durations時間的設置 --接收數據的延遲時間,多久觸發一次job
final JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(5));
 
3.建立JavaStreamingContext有兩種方式(sparkconf、sparkcontext)
 
4.業務邏輯完成後,須要有一個output operator,將SparkStreaming處理後的數據輸出(HDFS,DBMS)
 
5.關於  JavaStreamingContext 的  start()   ||   stop()
        JavaStreamingContext.start() //straming框架啓動以後是不能再添加業務邏輯
        JavaStreamingContext.stop() //參的stop方法會將sparkContext一同關閉,解決辦法:stop(false)
        JavaStreamingContext.stop()  // 中止以後是不能在調用start()
 
6. DStreams(Discretized Streams--離散的流), 應用在每一個DStream的算子操做 ,應用在RDD,應用在Partition,應用在Partition中的一條條數據, 因此最終應用到每一條記錄上
 

2.4Window窗口操做

window operation
普通的  每隔 多長時間切割RDD
基於窗口的操做: 每隔 多長時間切割RDD, 每隔 多長時間計算一次,每次計算的量是多少
 
 
爲何須要有窗口操做?
   好比 別人要求可以實時看到此刻以前一段時間的數據狀況,若是使用批處理的話,那麼咱們只能固定一個整段時間而後對這個整段時間進行spark core的計算,可是別人的要求是每個時刻都須要有結果,那麼就須要窗口操做?可是窗口操做確定會有不少的 重複計算,這裏有一個優化的地方這個優化也不是必須的視具體狀況而定,好比說咱們要查看最近30分鐘最熱門的頭條,咱們在設計的時候不可能每隔30分鐘計算一次,這裏定義了滑動窗口時間是1分鐘,然而計算量是30分鐘內的數據,那麼確定會有29分鐘重複的數據計算);可是 優化的話就會有一個前提,必需要checkpoint
 
                                               每次計算都是最近15s的數據,基於這個特性(微博熱點:最近30分鐘內最熱門的頭條)
問題一:batch interval 5s,窗口大小能夠是8s麼?
    不行,有的batch就不能被窗口所包含,必須是batch interval的整數倍
問題二:滑動窗口時間 8s 能夠麼?
    必須是batch interval的整數倍
 
優化:如何避免time3被重複計算(圖中time3在兩個window中都被計算了),能夠沒有,可是有的話,就須要這種優化
                                                            Batch Interval 1s    ||     窗口大小 5s    ||     滑動窗口 1s
 
思考:計算一個趨勢的時候,須要基於滑動窗口的操做,是否必需要優化,避免重複計算?(未必
For example:
    1.查看微博中每小時的熱門微博,每隔1分鐘計算一次,至關於重複計算了59分鐘的內容
    2.商家想看前5分鐘的銷售額,每隔30秒看一次,也須要基於窗口的操做

2.5UpdateStateByKey

updateStateByKey 的使用須要 checkpoint ,隔幾回記錄一次到磁盤中
UpdateStateByKey的主要功能
   一、Spark Streaming中爲 每個Key維護一份state狀態,這個 state類型能夠是任意類型的的, 能夠是一個自定義的對象,那麼更新函數也能夠是任意類型的。
    二、經過更新函數對該key的狀態不斷更新對於每一個新的batch而言,Spark Streaming會在使用updateStateByKey的時候爲已經存在的key進行state的狀態更新 (對於每一個新出現的key,會一樣的執行state的更新函數操做)
    三、 若是要不斷的更新每一個key的state,就涉及到了狀態的保存和容錯,這個時候就須要開啓checkpoint機制和功能 (   one.checkpoint(Durations.seconds(10)) //容錯更好就須要犧牲性能,容錯不須要過高,時間能夠設置的長一些,(好比這裏將checkpoint設置爲10s,也就是每隔10s會將標記有checkpoint的RDD計算的結果持久化到磁盤,若是咱們設置的Batch Interval = 5s, 在第15s時觸發job進行計算, 可是在第17s時down, 這時候只能恢復到10s的數據,那麼10s至17s的數據就丟失了,具體設置多少,視具體狀況而定) )
 
UpdateStateByKey用處: 統計廣告點擊流量,統計這一天的車流量...
 
案例:全面的廣告點擊分析 UpdateStateByKeyOperator
    這裏作了 checkpoint操做, jsc.checkpoint("hdfs://ndoe1:8020/user/sscheckpoint");
     node1建立一個Socket Server,指定8888端口,SparkStreaming與服務器這個端口創建通訊,那麼用戶的數據從這裏流向SparkStreaming進行計算。
    在這個案例中, 用以空格分割的單詞來模擬用戶點擊的廣告,每一個單詞表明一個廣告, 統計每個廣告(單詞)出現的次數(就是WordCount)
 
    最後的 conunts.print()    //output operator類型的算子
 
result:   利用SparkStreaming作到了微批處理,近似實時計算

查看hdfs,發現設置checkpoint會將SparkStreaming的處理結果進行了持久化
 

2.6reduceByKeyAndWindow

基於滑動窗口的熱點搜索詞實時統計--WindowOperator
    1.未優化的
    2.優化的必需要設置checkpoint的目錄
如下是優化的過的reduceByKeyAndWindow
 
補充:
1.Spark master 8080端口 監控資源
              Drive  4040端口 監控任務,能夠看到有一個Streaming job(它裏面有一個線程,是一直運行的,負責接收咱們的數據)
 
2.transform 和  foreachRDD 的區別?
    transform Transformation類型算子, transform很是厲害,能夠拿到每個DStream的rdd;這樣就能夠利用每個rdd的全部算子來轉換甚至在裏面應用spark core,或者將rdd轉換成DataFrame來使用SparkSQL操做
    foreachRDD Action類型算子,對於每一個RDD進行操做, 何時會用?最後存結果的時候會用
 
3. transform取出DStream中的RDD
    使用transform將DStream中的RDD抽取出來,調用了RDD的Action類的算子(是能夠執行的) 是在Driver端執行的,若是不在Driver端執行,調用Action類的算子就不會觸發一個job了
 
    對RDD的操做纔會發送到Executor端執行transform是對 DStream進行操做(transform中抽取RDD,對這個RDD執行collect類型的數據,在job Generator時執行的,生成了多個job,以 jobSet的形式發送給 jobSecheduler),這樣的話就能夠 預警:對數據的預警,與標準進行比較,若是超過了這個標準就進行報警(一旦發現某個黑名單就當即進行報警,),總體的代碼是在Driver端執行的,可是部分代碼對RDD的操做是在Executor段執行的
    SparkContext sc = userClickRDD.context();
    Object obj = "能夠來源於數據庫,動態的更改廣播變量"
    sc.broadCast(obj)

2.6SparkStreaming--Driver HA

2.6.1Driver也有可能掛掉,如何實現它的高可用?

 
        當一個Driver掛掉後,(回憶:當初的Master是由zookeeper進行託管),另外啓動一個Driver,它就須要從上一個Driver中得到相關的信息(包括batch的進度,data的位置,job執行進度, DStream的Graph(基於DStream的業務邏輯))
 
       如何實現Driver的高可用-->基於HDFS上面的元數據(Driver的信息)進行恢復, 注意!不會從新new SparkContext,由於這樣至關於又建立了一個全新的Driver
2.6.2Driver HA的代碼套路
    1.指定了去哪個目錄下面尋找Driver的元數據信息
    2. 提交Application到集羣中執行的時候,必須使用cluster模式,同時必須指定一個參數 --supervise(當某一個Driver掛掉,新的Driver須要另外一個Driver中的信息來繼續job的執行)
2.6.3監控HDFS上指定目錄下文件數量的變化
示例代碼 SparkStreamingOnHDFS
    1.爲了狀態的保存和容錯,開啓了checkpoint機制,Driver HA
    2. ssc.textFileStream("hdfs://node1:8020/userhdfs/")    //監控hdfs上/user/hdfs的變化
 
命令:hadoop fs -put wc /user/hdfs
2.6.4SparkStreaming 監控 HDFS 上文件數量的變化,並將變化寫入到MySql中
示例代碼 SparkStreamingOnHDFSToMySQL
    1.爲了狀態的保存和容錯,開啓了checkpoint機制,Driver HA
    2. ssc.textFileStream("hdfs://node1:8020/userhdfs/")    //監控hdfs上/user/hdfs的變化

3.Kafka

3.1Kafka定義

  Apache Kafka是一個高吞吐的集發佈與訂閱與一體的分佈式消息系統
 
     流式處理的數據源是kafka,批處理的數據源是hive,也就是hdfs;

3.2消息隊列常見的場景

    1.系統之間的解耦合
        -queue模型
        -publish-subscribe模型
 
    2.峯值壓力緩衝,若是高峯期日誌大量到SparkSreaming,那麼會形成計算時間超過BatchInterval),能夠在日誌服務器和SparkStreaming中間加入Kafka,起到緩衝的做用
    3.異步通訊
 
 

3.3Kafka的架構

    消費者的 消費偏移量存儲在 zookeeper中,生產者生產數據,消費者消費數據,kafka並不會生產數據,但kafka默認一週刪除一次數據。
     broker就是代理,在kafka cluster這一層這裏,其實裏面是有不少個broker
     topic就至關於 QueueQueue裏面有生產者消費者模型

3.4Kafka的消息存儲和生產消費模型

 
   topic:一個kafka集羣中能夠劃分n多的topic,一個topic能夠分紅多個partition( 這個是爲了作並行的 ,  每一個partition內部消息強有序,其 中的每一個消息都有一個序號叫offset,一個partition對應一個broker,一個broker能夠管多個partition, 這個partition能夠很簡單想象爲一個文件,當數據發過來的時候它就往這個 partition上面append,追加就行,kafka和不少消息系統不同,不少消息系統是消費完了我就把它刪掉,而kafka是根據時間策略刪除,而不是消費完就刪除,在 kafka裏面沒有一個消費完這麼個概念,只有過時這樣一個概念
   生產者本身決定往哪一個partition寫消息 (輪循的負載均衡或者是基於hash的partition策略)
    消費者能夠訂閱某一個topic,這個topic一旦有數據,會將數據推送給消費者

3.5kafka   組內queue消費模型   ||   組間publish-subscribe消費模型

 

3.6kafka有哪些特色

3.7爲何Kafka的吞吐量高?

3.7.1 什麼是Zero Copy?
   零拷貝」是指計算機操做的過程當中,CPU不須要爲數 據在內存之間的拷貝消耗資源。而 它一般是指計算機在網絡上發送文件時,不須要 將文件內容拷貝到用戶空間(User Space)而直接在內核空間(Kernel Space)中 傳輸到網絡的方式
 
3.7.2  kafka採用零拷貝Zero Copy的方式
 
 
   從上圖中能夠清楚的看到, Zero Copy的模式中,避免了數據在內存空間和用戶空間 之間的拷貝,從而提升了系統的總體性能。 Linux中的sendfile()以及 Java NIO中
的FileChannel.transferTo()方法都實現了零拷貝的功能,而在Netty中也經過在
FileRegion中包裝了NIO的FileChannel.transferTo()方法實現了零拷貝

3.8搭建Kafka集羣--leader的均衡機制

 
 
Kafka中leader的均衡機制
     Kafka中一個topic有多個partition,如上圖,kfk有0,1,2共三個partition,每一個partition都有對應的leader來進行管理,對於leader1來講它來管理partition0,當leader1掛掉以後,由於partition0配置了副本數(在broker0,broker2還存在partition0的副本),那麼此時會在broker0,broker2上選出一臺當作leader繼續管理partition0(好比說選取了broker2當作partition0的leader),這時候若是咱們配置了leader均衡機制,從新恢復了broker1,那麼partition0的leader就會從broker2轉移到broker1,減輕了broker2的讀取壓力,實現了負載均衡。固然若是不開啓leader均衡機制的話,從新恢復broker1,那麼partition0的leader仍就是broker2。
 
Kafka中leader的均衡機制在哪裏配置?
在server.properties添加以下一句話
auto.leader.rebalance.enable=true

3.9Kafka_code注意事項

注意一:
     向kafka中寫數據的時候咱們 必需要指定所配置kafka的全部brokers節點,而不能只配置一個節點,由於咱們寫的話,是不知道這個topic最終存放在什麼地方,因此必須指定全,
    讀取Kafka中的數據的時候是須要指定zk的節點,只須要指定一個節點就能夠了;目前咱們使用的在代碼中直接寫上這些節點,之後所有要寫到配置文件中
注意二:
      kafka中存儲的是鍵值對,即便咱們沒有明確些出來key, 獲取的時候也是須要利用tuple的方式獲取值的;而對於放到一個kafka中的數據, 這個數據到底存放到那個partition中呢?這個就須要使用hashPartition方式或者普通的輪詢方式存放;對於沒有明確指定key的發往kafka的數據,使用的就是輪詢方式;
 

4.SparkStreaming + Kafka    兩種模式--Receive模式    ||    Direct模式

Receive模式--SparkStreaming + Kafka 總體架構

 
                                                注意!每一步都是阻塞的,上一步完成以後才能進行下一步
流程:
1.接收數據( SparkStreaming做爲 消費者,若是訂閱了一個topic,那麼topic一有數據就會主動推送給SparkSreaming)
 
2.Executor將接收來的數據備份到其餘Executor中(Executor中執行的job做爲一個receiver,裏面的task一直在接收kafka推送來的數據,而後將接收來的數據進行持久化, 默認的持久化級別MEMORY_DISK_SER_2
 
3.Executor備份完成以後,Driver中的 ReceiverTracker彙報數據存儲在哪一些的 Executor中( Driver{ReceiverTracker,DAGScheduler,TaskScheduler}
 
4.在zookeeper中更新消費偏移量
 
5.Driver負責分發 task到數據所在的 Executor中執行(達到移動計算,而不是移動數據)
 
注意!
    1.在SparkStreaming中Driver一旦掛掉,它下面的Executor也會掛掉
        若是在第四步完成後,Driver掛掉了,會有什麼問題?
            其實數據並無被處理,數據就丟了,所以kafka的事務機制並不完善
       所以對於如上這種狀況,提供了一個解決方案,就是WAL機制( WriteAheadLog--預寫機制 
       可是WAL機制有什麼問題?(每一次接收來的數據,都要往HDFS上寫一份,性能會有所降低)
 
代碼示例
SparkStreamingOnKafkaReceiver
SparkStreamingDataManuallyProducerForKafka
    須要啓動HDFS

Direct模式

    SparkStreamingKafka直接鏈接,SparkStreaming去Kafka去 pull數據,這個 消費偏移量由SparkStreaming本身來維護(實際上經過checkpoint來管理的,checkpoint和job是異步的,總的來時SparkStreaming的事務機制並非很完善),避免了數據的丟失(相對而言,不是絕對的)
 
並行度:
      1. linesDStream裏面封裝的是RDD,RDD裏面有partition,RDD裏面的partition與這個topic的partition是一一對應的
      2.從kafka中讀來的數據,封裝到一個DStream中,能夠 對這個DStream重分區,lines.repartition(10),增長partition的數量,提升並行度。
 
並行度:
    batch->rdd->DStream
    batchInterval 5s
    blockInterval = 200ms
    
    batch = 25block
    將一個 blockInterval設置的小一些,有更多的block,對應更多的split,也就有更多的partition,從而提升並行度
官方建議:blockInterval不要低於50ms,不然 batchInterval/ blockInterval 獲得的block過多,partition就過多,啓動多個線程並行計算,影響執行job的性能
 
Receive模式    ||    Direct模式     最大的不一樣:消費偏移量誰來管理
 

 ©All rights reservednode

超越永無止境
分類: Spark, Kafka
標籤: Spark總結, Kafka
3
0
« 上一篇: Flume介紹
» 下一篇: Spark性能調優之Shuffle調優
posted on 2017-03-12 15:26 日月的彎刀 閱讀( 30594) 評論( 1) 編輯 收藏
評論:
  • #1樓    &程序猿   Posted @ 2018-03-23 14:52
    你好,driver的HA具體實現方式是什麼,沒有看的太明白,樓主可否給解釋清楚一點
      

相關文章
相關標籤/搜索