轉載請標明出處http://www.cnblogs.com/haozhengfei/p/e353daff460b01a5be13688fe1f8c952.html html
Spark_總結五
1.Storm 和 SparkStreaming區別
Storm | 純實時的流式處理,來一條數據就當即進行處理 |
SparkStreaming | 微批處理,每次處理的都是一批很是小的數據 |
Storm支持動態調整並行度(動態的資源分配),SparkStreaming(粗粒度, 比較消耗資源) |
Storm 優勢 || 缺點
Storm 流式計算(扶梯)
優勢:
數據延遲度很低,
Storm的事務機制要比SparkStreaming的事務機制要完善(
什麼是事務機制?對於一條數據,很少處理也很多處理,對於一條數據剛好處理一次,好比金融,股票等要求實時性比較高,那麼就須要選Storm
)
缺點:
一直持有着資源,
每一條數據都要在集羣中某一臺節點處理,要計算的數據會進行網絡傳輸,吞吐量小,另外Storm不適合作複雜的業務邏輯(適合彙總)
SparkStreaming 優勢 || 缺點
SparkStreaming 微批處理(相似於電梯),
它並非純的批處理
優勢:
吞吐量大,能夠
作複雜的業務邏輯(保證每一個job的處理小於batch interval)
缺點:
數據延遲度較高
公司中爲何選用SparkStreaming要多一些?
1.秒級別延遲,一般應用程序是能夠接受的,
2.能夠應用機器學習,SparkSQL...可擴展性比較好,數據吞吐量較高
2.SparkStreaming
2.1什麼是SparkStreaming?
SparkStreaming是一個流式處理框架,處理的模式是微批處理(微批有多大?經過時間來設置這個批有多大
[For example:Batch Interval 5s])
SparkStreaming
基於DStream(Discretized Streams:離散的數據流
)來進行編程,處理的是一個流,這個流何時切成一個rdd-->根據batchinterval來決定什麼時候切割成一個RDD。
SparkStreaming 架構圖
job的個數是由output operator決定的,
StreamContext底層封裝了SparkContext
2.2圖解SparkStreaming || SparkStreaming執行流程
從圖上能夠看到,Batch Interval的間隔是5s,也就是說每通過5s,SparkStreaming會將這5s內的信息封裝成一個DStream,而後提交到Spark集羣進行計算
執行流程
第一個 DStream 裏面是 0-5s 的數據,在第6s的時候會觸發 DStream 的job執行,這時會另啓動一個線程執行這個job
(假設這個job只須要3s)
,同時在6-10s期間繼續接受數據,在第11s的時候會觸發 DStream 的job的執行,
這時會另啓動一個線程執行這個job
(假設這個job只須要3s)
,
同時在11-15s期間繼續接受數據...
注意!
若是這個job執行的時間大於5s會有什麼問題?
數據在5s內處理不完,又啓動一個job,致使數據越積越多,
從而致使
SparkStreaming down
2.3SparkStreaming代碼TransformOperator
案例:過濾黑名單
這裏模擬了一份黑名單,SparkStreaming監控服務器中指定端口,時間設定爲每5秒處理一次數據。每當job觸發時,用戶輸入的數據與黑名單中的數據進行左外鏈接,而後過濾
node1 建立一個Socket Server
nc -lk 8888 (頁面停下了,開始輸入數據進入8888端口,此時SparkStreaming監聽這個端口)
hello world
hello jack
hello tom(
過濾tom)
result:
注意事項!
1.爲何會沒有數據?
由於只開啓了一條線程(這裏只有接收數據的線程),因此local的模擬SparkStreaming必須至少設置兩個線程,
new SparkConf().setMaster("local[2]").setAppName("TransformBlacklist");
2.Durations時間的設置
--接收數據的延遲時間,多久觸發一次job
final JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(5));
3.建立JavaStreamingContext有兩種方式(sparkconf、sparkcontext)
4.業務邏輯完成後,須要有一個output operator,將SparkStreaming處理後的數據輸出(HDFS,DBMS)
5.關於
JavaStreamingContext 的
start() || stop()
JavaStreamingContext.start() //straming框架啓動以後是不能再添加業務邏輯
JavaStreamingContext.stop() //無
參的stop方法會將sparkContext一同關閉,解決辦法:stop(false)
JavaStreamingContext.stop() //
中止以後是不能在調用start()
6.
DStreams(Discretized Streams--離散的流),
應用在每一個DStream的算子操做
,應用在RDD,應用在Partition,應用在Partition中的一條條數據,
因此最終應用到每一條記錄上
2.4Window窗口操做
window operation
普通的
每隔
多長時間切割RDD
基於窗口的操做:
每隔
多長時間切割RDD,
每隔
多長時間計算一次,每次計算的量是多少
爲何須要有窗口操做?
好比
別人要求可以實時看到此刻以前一段時間的數據狀況,若是使用批處理的話,那麼咱們只能固定一個整段時間而後對這個整段時間進行spark core的計算,可是別人的要求是每個時刻都須要有結果,那麼就須要窗口操做?可是窗口操做確定會有不少的
重複計算,這裏有一個優化的地方(
這個優化也不是必須的,
視具體狀況而定,好比說咱們要查看最近30分鐘最熱門的頭條,咱們在設計的時候不可能每隔30分鐘計算一次,這裏定義了滑動窗口時間是1分鐘,然而計算量是30分鐘內的數據,那麼確定會有29分鐘重複的數據計算);可是
優化的話就會有一個前提,必需要checkpoint
每次計算都是最近15s的數據,基於這個特性(微博熱點:最近30分鐘內最熱門的頭條)
問題一:batch interval 5s,窗口大小能夠是8s麼?
不行,有的batch就不能被窗口所包含,必須是batch interval的整數倍
問題二:滑動窗口時間 8s 能夠麼?
必須是batch interval的整數倍
優化:如何避免time3被重複計算(圖中time3在兩個window中都被計算了),能夠沒有,可是有的話,就須要這種優化
Batch Interval 1s
||
窗口大小 5s
||
滑動窗口 1s
思考:計算一個趨勢的時候,須要基於滑動窗口的操做,是否必需要優化,避免重複計算?(未必)
For example:
1.查看微博中每小時的熱門微博,每隔1分鐘計算一次,至關於重複計算了59分鐘的內容
2.商家想看前5分鐘的銷售額,每隔30秒看一次,也須要基於窗口的操做
2.5UpdateStateByKey
updateStateByKey
的使用須要
checkpoint
,隔幾回記錄一次到磁盤中
UpdateStateByKey的主要功能
一、Spark Streaming中爲
每個Key維護一份state狀態,這個
state類型能夠是任意類型的的, 能夠是一個自定義的對象,那麼更新函數也能夠是任意類型的。
二、經過更新函數對該key的狀態不斷更新,
對於每一個新的batch而言,Spark Streaming會在使用updateStateByKey的時候爲已經存在的key進行state的狀態更新
(對於每一個新出現的key,會一樣的執行state的更新函數操做)
三、
若是要不斷的更新每一個key的state,就涉及到了狀態的保存和容錯,這個時候就須要開啓checkpoint機制和功能 (
one.checkpoint(Durations.seconds(10)) //容錯更好就須要犧牲性能,容錯不須要過高,時間能夠設置的長一些,(好比這裏將checkpoint設置爲10s,也就是每隔10s會將標記有checkpoint的RDD計算的結果持久化到磁盤,若是咱們設置的Batch Interval = 5s, 在第15s時觸發job進行計算, 可是在第17s時down, 這時候只能恢復到10s的數據,那麼10s至17s的數據就丟失了,具體設置多少,視具體狀況而定)
)
UpdateStateByKey用處:
統計廣告點擊流量,統計這一天的車流量...
案例:全面的廣告點擊分析
UpdateStateByKeyOperator
這裏作了
checkpoint操做,
jsc.checkpoint("hdfs://ndoe1:8020/user/sscheckpoint");
node1建立一個Socket Server,指定8888端口,SparkStreaming與服務器這個端口創建通訊,那麼用戶的數據從這裏流向SparkStreaming進行計算。
在這個案例中,
用以空格分割的單詞來模擬用戶點擊的廣告,每一個單詞表明一個廣告,
統計每個廣告(單詞)出現的次數(就是WordCount)
最後的
conunts.print() //output operator類型的算子
result: 利用SparkStreaming作到了微批處理,近似實時計算
查看hdfs,發現設置checkpoint會將SparkStreaming的處理結果進行了持久化
2.6reduceByKeyAndWindow
基於滑動窗口的熱點搜索詞實時統計--WindowOperator
1.未優化的
2.優化的必需要設置checkpoint的目錄
如下是優化的過的reduceByKeyAndWindow
補充:
1.Spark master 8080端口 監控資源
Drive
4040端口 監控任務,能夠看到有一個Streaming job(它裏面有一個線程,是一直運行的,負責接收咱們的數據)
2.transform 和
foreachRDD 的區別?
transform Transformation類型算子,
transform很是厲害,能夠拿到每個DStream的rdd;這樣就能夠利用每個rdd的全部算子來轉換;甚至在裏面應用spark core,或者將rdd轉換成DataFrame來使用SparkSQL操做
foreachRDD Action類型算子,對於每一個RDD進行操做,
何時會用?最後存結果的時候會用
3.
transform取出DStream中的RDD
使用transform將DStream中的RDD抽取出來,調用了RDD的Action類的算子(是能夠執行的)
是在Driver端執行的,若是不在Driver端執行,調用Action類的算子就不會觸發一個job了
對RDD的操做纔會發送到Executor端執行,
transform是對
DStream進行操做(transform中抽取RDD,對這個RDD執行collect類型的數據,在job Generator時執行的,生成了多個job,以
jobSet的形式發送給
jobSecheduler),這樣的話就能夠
預警:對數據的預警,與標準進行比較,若是超過了這個標準就進行報警(一旦發現某個黑名單就當即進行報警,),總體的代碼是在Driver端執行的,可是部分代碼對RDD的操做是在Executor段執行的
SparkContext sc = userClickRDD.context();
Object obj = "能夠來源於數據庫,動態的更改廣播變量"
sc.broadCast(obj)
2.6SparkStreaming--Driver HA
2.6.1Driver也有可能掛掉,如何實現它的高可用?
當一個Driver掛掉後,(回憶:當初的Master是由zookeeper進行託管),另外啓動一個Driver,它就須要從上一個Driver中得到相關的信息(包括batch的進度,data的位置,job執行進度,
DStream的Graph(基於DStream的業務邏輯))
如何實現Driver的高可用-->基於HDFS上面的元數據(Driver的信息)進行恢復,
注意!不會從新new SparkContext,由於這樣至關於又建立了一個全新的Driver
2.6.2Driver HA的代碼套路
1.指定了去哪個目錄下面尋找Driver的元數據信息
2.
提交Application到集羣中執行的時候,必須使用cluster模式,同時必須指定一個參數 --supervise(當某一個Driver掛掉,新的Driver須要另外一個Driver中的信息來繼續job的執行)
2.6.3監控HDFS上指定目錄下文件數量的變化
示例代碼
SparkStreamingOnHDFS
1.爲了狀態的保存和容錯,開啓了checkpoint機制,Driver HA
2.
ssc.textFileStream("hdfs://node1:8020/userhdfs/")
//監控hdfs上/user/hdfs的變化
命令:hadoop fs -put wc /user/hdfs
2.6.4SparkStreaming 監控 HDFS 上文件數量的變化,並將變化寫入到MySql中
示例代碼
SparkStreamingOnHDFSToMySQL
1.爲了狀態的保存和容錯,開啓了checkpoint機制,Driver HA
2.
ssc.textFileStream("hdfs://node1:8020/userhdfs/")
//監控hdfs上/user/hdfs的變化
3.Kafka
3.1Kafka定義
Apache Kafka是一個高吞吐的集發佈與訂閱與一體的分佈式消息系統
流式處理的數據源是kafka,批處理的數據源是hive,也就是hdfs;
3.2消息隊列常見的場景
1.系統之間的解耦合
-queue模型
-publish-subscribe模型
2.峯值壓力緩衝,若是高峯期日誌大量到SparkSreaming,那麼會形成計算時間超過BatchInterval),能夠在日誌服務器和SparkStreaming中間加入Kafka,起到緩衝的做用
3.異步通訊
3.3Kafka的架構
消費者的
消費偏移量存儲在
zookeeper中,生產者生產數據,消費者消費數據,kafka並不會生產數據,但kafka默認一週刪除一次數據。
broker就是代理,在kafka cluster這一層這裏,其實裏面是有不少個broker
topic就至關於
Queue,
Queue裏面有生產者消費者模型
3.4Kafka的消息存儲和生產消費模型
topic:一個kafka集羣中能夠劃分n多的topic,一個topic能夠分紅多個partition(
這個是爲了作並行的
),
每一個partition內部消息強有序,其
中的每一個消息都有一個序號叫offset,一個partition對應一個broker,一個broker能夠管多個partition,
這個partition能夠很簡單想象爲一個文件,當數據發過來的時候它就往這個
partition上面append,追加就行,kafka和不少消息系統不同,不少消息系統是消費完了我就把它刪掉,而kafka是根據時間策略刪除,而不是消費完就刪除,在
kafka裏面沒有一個消費完這麼個概念,只有過時這樣一個概念
生產者本身決定往哪一個partition寫消息
(輪循的負載均衡或者是基於hash的partition策略)
消費者能夠訂閱某一個topic,這個topic一旦有數據,會將數據推送給消費者
3.5kafka 組內queue消費模型 || 組間publish-subscribe消費模型
3.6kafka有哪些特色
3.7爲何Kafka的吞吐量高?
3.7.1 什麼是Zero Copy?
零拷貝」是指計算機操做的過程當中,CPU不須要爲數
據在內存之間的拷貝消耗資源。而
它一般是指計算機在網絡上發送文件時,不須要
將文件內容拷貝到用戶空間(User Space)而直接在內核空間(Kernel Space)中
傳輸到網絡的方式
3.7.2
kafka採用零拷貝Zero Copy的方式
從上圖中能夠清楚的看到,
Zero Copy的模式中,避免了數據在內存空間和用戶空間
之間的拷貝,從而提升了系統的總體性能。
Linux中的sendfile()以及
Java NIO中
的FileChannel.transferTo()方法都實現了零拷貝的功能,而在Netty中也經過在
FileRegion中包裝了NIO的FileChannel.transferTo()方法實現了零拷貝
3.8搭建Kafka集羣--leader的均衡機制
Kafka中leader的均衡機制
Kafka中一個topic有多個partition,如上圖,kfk有0,1,2共三個partition,每一個partition都有對應的leader來進行管理,對於leader1來講它來管理partition0,當leader1掛掉以後,由於partition0配置了副本數(在broker0,broker2還存在partition0的副本),那麼此時會在broker0,broker2上選出一臺當作leader繼續管理partition0(好比說選取了broker2當作partition0的leader),這時候若是咱們配置了leader均衡機制,從新恢復了broker1,那麼partition0的leader就會從broker2轉移到broker1,減輕了broker2的讀取壓力,實現了負載均衡。固然若是不開啓leader均衡機制的話,從新恢復broker1,那麼partition0的leader仍就是broker2。
Kafka中leader的均衡機制在哪裏配置?
在server.properties添加以下一句話
auto.leader.rebalance.enable=true
3.9Kafka_code注意事項
注意一:
向kafka中寫數據的時候咱們
必需要指定所配置kafka的全部brokers節點,而不能只配置一個節點,由於咱們寫的話,是不知道這個topic最終存放在什麼地方,因此必須指定全,
讀取Kafka中的數據的時候是須要指定zk的節點,只須要指定一個節點就能夠了;目前咱們使用的在代碼中直接寫上這些節點,之後所有要寫到配置文件中
注意二:
kafka中存儲的是鍵值對,即便咱們沒有明確些出來key,
獲取的時候也是須要利用tuple的方式獲取值的;而對於放到一個kafka中的數據,
這個數據到底存放到那個partition中呢?這個就須要使用hashPartition方式或者普通的輪詢方式存放;對於沒有明確指定key的發往kafka的數據,使用的就是輪詢方式;
4.SparkStreaming + Kafka 兩種模式--Receive模式 || Direct模式
Receive模式--SparkStreaming + Kafka 總體架構
注意!每一步都是阻塞的,上一步完成以後才能進行下一步
流程:
1.接收數據(
SparkStreaming做爲
消費者,若是訂閱了一個topic,那麼topic一有數據就會主動推送給SparkSreaming)
2.Executor將接收來的數據備份到其餘Executor中(Executor中執行的job做爲一個receiver,裏面的task一直在接收kafka推送來的數據,而後將接收來的數據進行持久化,
默認的持久化級別是
MEMORY_DISK_SER_2)
3.Executor備份完成以後,向
Driver中的
ReceiverTracker彙報數據存儲在哪一些的
Executor中(
Driver{ReceiverTracker,DAGScheduler,TaskScheduler})
4.在zookeeper中更新消費偏移量
5.Driver負責分發
task到數據所在的
Executor中執行(達到移動計算,而不是移動數據)
注意!
1.在SparkStreaming中Driver一旦掛掉,它下面的Executor也會掛掉
若是在第四步完成後,Driver掛掉了,會有什麼問題?
其實數據並無被處理,數據就丟了,所以kafka的事務機制並不完善
所以對於如上這種狀況,提供了一個解決方案,就是WAL機制(
WriteAheadLog--預寫機制)
可是WAL機制有什麼問題?(每一次接收來的數據,都要往HDFS上寫一份,性能會有所降低)
代碼示例
SparkStreamingOnKafkaReceiver
SparkStreamingDataManuallyProducerForKafka
須要啓動HDFS
Direct模式
SparkStreaming和
Kafka直接鏈接,SparkStreaming去Kafka去
pull數據,這個
消費偏移量由SparkStreaming本身來維護(實際上經過checkpoint來管理的,checkpoint和job是異步的,總的來時SparkStreaming的事務機制並非很完善),避免了數據的丟失(相對而言,不是絕對的)
並行度:
1.
linesDStream裏面封裝的是RDD,RDD裏面有partition,RDD裏面的partition與這個topic的partition是一一對應的
2.從kafka中讀來的數據,封裝到一個DStream中,能夠
對這個DStream重分區,lines.repartition(10),增長partition的數量,提升並行度。
並行度:
batch->rdd->DStream
batchInterval 5s
blockInterval = 200ms
batch = 25block
將一個
blockInterval設置的小一些,有更多的block,對應更多的split,也就有更多的partition,從而提升並行度
官方建議:blockInterval不要低於50ms,不然
batchInterval/
blockInterval 獲得的block過多,partition就過多,啓動多個線程並行計算,影響執行job的性能
Receive模式 || Direct模式
最大的不一樣:消費偏移量誰來管理
附件列表
©All rights reservednode