Spark和Storm
Spark基於MapReduce算法實現的分佈式計算,不一樣於MapReduce的是,做業中間結果能夠保存在內存中,而不要再讀寫HDFS,
Spark適用於數據挖掘和機器學習等須要迭代的MapReduce算法
Spark Streaming是創建在Spark上的實時計算框架,能夠結合流式、批處理和交互式進行查詢和實時計算,
基本原理是將Stream數據分紅小的時間片斷,以相似batch批量處理的方式來處理這些小部分數據
Spark Streaming相比於基於Record的其餘處理框架(Storm),彈性分佈式數據集更容易實現高效的容錯處理;
此外小批量處理的方式使得它能夠同時兼容批量和實時數據處理的邏輯和算法,方便了一些須要歷史數據和實時數據聯合分析的特定應用場合
Spark Streaming和Storm兩個框架都提供了可擴展性和容錯性,根本區別在於處理模型,Storm處理的是每次傳入的一個事件,
而Spark Streaming是處理某個時間段窗口內的事件流。所以,Storm處理一個時間能夠達到極低的延遲。java
Hadoop和Storm
Topology=spout+Bolt
Hadoop上運行的是Job(Mapper/Reducer),Storm上運行的是Topology(Spout/Bolt),Job會運行結束,Topology會一直運行下去
Hadoop集羣包含(Master Node/Worker Node),對應到Storm集羣上的(主節點Nimbus/工做節點Supervisor)
Hadoop集羣上的(JobTracker/TaskTracker)對應到Storm集羣上的(Nimbus/Supervisor)算法
Storm架構
Supervisor-->Worker(n個Executor(n個Task))-->Topology
Nimbus和Supervisor經過Zookeeper通訊,而且這兩個進程都是無狀態和快速失敗的,全部狀態只存在於Zookeeper和本地磁盤上
Spout獲取數據源的數據,調用nextTuple函數,發射數據供Bolt消費,發射的數據單元叫Tuple(消息傳遞的基本單元),源源不斷的Tuple組成了Stream
客戶端提交Topology代碼到Nimbus,Nimbus針對該Topology創建本地的目錄,Nimbus中的調度器根據Topology的配置計算Task,並把Task分配到不一樣的Worker上,調度的結果
寫入ZooKeeper中,ZooKeeper上創建assignments節點,存儲Task和Supervisor中Worker的對應關係。在ZooKeeper上建立workerbeats節點來監控Worker的心跳。
Supervisor去ZooKeeper上獲取分配的Tasks信息,啓動一個或者多個Worker來執行,每一個Worker上運行多個Task,Task由Executor來具體執行。
Worker根據Topology信息初始化創建Task之間的連接,相同Worker類的Task經過DisrupterQueue來通訊,不一樣Worker間默認採用Netty來通訊,而後整個Topology就運行起來了
topologies包含全部Topology的靜態信息,而cluster中包含了Topology的運行態信息,根據topologies和cluster中的信息,就能夠進行真正的調度分配
服務器
在worker中,線程間通訊使用的是Disruptor,而進程間的通訊也就是Worker跟Worker之間的通訊使用的是IContext接口實現,也多是Netty和ZMQ,默認使用Netty架構
在storm中的backtype.storm.task包中含有若干上下文(GeneralTopologyContext\WorkerTopologyContext\TopologyContext),用於記錄Topology或者Storm中信息
StormTopology類中定義了不少能夠操做讀取內部信息的方法app
Task是在Executor中,經過調用mk-task方法來建立一個新的task,並經過調用mk-task-data函數爲該Task建立對應的數據框架
Topology、work、Executor、task以及組件關係運維
一個組件(spout/bolt)包含的Executor數量是由在提交Topology時設置的並行度決定的
Topology最終會調度成一個或多個worker,每一個worker即爲一個真正的操做系統執行進程,
每一個worker又能夠有多個task,每一個task是storm中進行計算的最小的運行單位,也就是spout或者bolt實例,dom
spout 的nextTuple()會在同一個循環內被ack()和fail()週期性的調用。沒有任務時它必須釋放對線程的控制,其它方法纔有機會得以執行。機器學習
executor是worker生成的線程,該線程運行着相同的組件(spout或bolt)的一個或多個task,一個task執行着實際的數據處理
1個組件的task數量老是同樣的,可是1個組件的executor的數量能夠改變,thread的數量<=task的數量
parallelism_hint參數指定的是bolt的初始的executor的數量分佈式
eg
1)
builder.setSpout("id",new Spout(),2);//兩個線程執行spout
builder.setBolt("id",new Bolt(),2);//兩個線程執行bolt
stormConf.setNumworkers(3);//work數
由於每個worker默認都會佔用一個executor(每一個executor會啓動一個acker任務(task)),7個executor,7個task
能夠在topology中取消acker任務,這樣的話就不會多出來一個executor和任務了
加上stormConf.setNumAckers(0);2個executor,2個task
2) 一個線程executor 執行多個 任務task(默認一個executor對應一個task)
int worknum = 3;
builder.setSpout("spout", new RandomSpout(),worknum).setNumTasks(worknum*2); 3個executor,6個task
builder.setBolt("bolt", new SenqueceBolt(),2*worknum).shuffleGrouping("spout").setNumTasks(worknum*2); 6個executor,6個task
conf.setNumWorkers(worknum);
conf.setNumAckers(0);
9個executor,12個task
Stream分組,即消息的分區方法,共7種內置分組方式,也能夠經過CustomStreamGrouping接口來定義本身的分組
1 shuffle分組 保證同一級Bolt上的每一個Task處理的Tuple數量一致
2 Fields分組 根據Tuple中的某一個Field或者多個Field的值來劃分,具備相同Field的會被分發到同一個Task上
3 All分組 全部的Tuple都會分發到全部的Task上,爲每一個接收數據的實例複製一份元組副本。這種分組方式用於向bolts發送信號。
4 Global分組 整個Stream會選擇一個Task做爲分發的目的地,一般是具備最新ID的Task
5 None分組 等同於shuffle分組
6 Direct分組 產生數據的Spout/Botl本身明確決定這個Tuple被Bolt的哪些Task所消費,須要使用OutputCollector的emitDirect方法來實現
7 Local or shuffle分組 若是目標Bolt中的一個或多個Task和當前產生數據的Task在同一個Worker進程中,那麼就走內部的線程間通訊,
將Tuple直接發給在當前Worker進程中的目的Task。不然,同Shuffle分組
可靠性
Spout中發射一個新的Tuple時爲其制定一個MessageId,這個MessageId能夠是任意的Object對象,多個Stream Tuple能夠共用一個MessageId,
Storm會告知用戶每個消息單元是否在一個制定的時間內被徹底處理,在Storm中,使用了Acker來解決消息處理的可靠性
消息樹
不論是在spout裏emit仍是bolt裏emit的消息,框架都會給這個消息加一個64位的隨機數當作id,
實際上是<root_id,randomID>這樣的一個結構,即每一個tuple除了上游給他建立了一個隨機64位id外
還帶有一個不變的root_id,來自spout task同一個消息樹的ack信息都發給同一個acker bolt
jstorm使用的是取模hash算法,只須要對spout的tuple 64位id取模就好了。這樣基本上能夠知足上面2點要求,
由於spout tuple的id會透傳給下游的所有消息樹節點,所以,bolt也會正確路由到那個acker bolt
acker bolt弄了一個map來作這件事情,key就是spout tuple id即消息樹的root id(64)位的,
一個root id表明一個消息樹;value則是一個value對,第一個value是task id(spout),
這些都屬於元數據,第二個value是一個64位的數字,這個64位的數字表明瞭一棵消息樹的狀態,
當整個value變成0了,說明,消息樹被「徹底處理」了,就找這個pair的第一個value,發消息就好了。
slot概念
一個節點的slot的數量用來表示某個節點的資源的容量或者說是能力的大小,於是slot是 Hadoop的資源單位
supervisor.slots.ports Storm的slot最好設置成OS核數的整數倍,同時因爲storm是基於內存的實時計算,
slot數不要大於每臺物理機可運行slot個數:(物理內存-虛擬內存)/單個java進程最大可佔用內存數
worker.childopts storm的worker進程的java限制,有效地設置該參數可以在Topology異常時進行緣由分析
-XX:+HeapDumpOnOutOfMemoryError 當內存使用量超過Xmx時,java進程將被JVM殺掉同時會生產java_pidxxx.hprof文件,便於使用MemoryAnalyzer分析內存使用狀況
schedule-topology方法和DefaultScheduler的default-schedule的有一些類似的邏輯,主要根據當前的可用資源完成對Topology的任務分配,包括得到當前的可用Slot資源,計算當前
Topology所能使用的所有Slot數目、對Slot從新分配和進行排序以及獲得最後的分配信息等
Storm運維監控
Ganglia是一個跨平臺可擴展的、高性能計算系統下的分佈式監控系統,監控Storm主機的信息
zabbix來監控 Nimbus和Supervisor進程,當發現進程掛掉後能夠重啓並報警
經過Storm UI來監控和調試相關應用,以Storm Metric、ZooKeeper目錄以及Hook等方式幫助完成一些深刻的調試和監控
鑑於Storm做爲一個平臺提供給不一樣的業務共用,進行資源隔離室必須的,開源資源隔離方案有CGoup(其餘封裝方案)、
YARN及StormOnYarn(可讓Storm、Hadoop、Spark等共同運行在同一套集羣上)
storm文件結構
service cgconfig start
在/cgroup目錄下會生成 blkio cpu cpuacct cpuset devices freezer memory net_cls子目錄,每一個子目錄對應一個控制項,每一個子目錄下都會存在如下配置文件
cgroup.procs 文件內容爲受控制的進程ID
notify_no_release 文件內容設置爲1時,當沒有可控制進程是,出發release_agent指定的內容
release_agent 文件內容爲可執行文件、命令
tasks 文件內容爲受控制的線程ID
內存設置memory.oom_disable爲0或1,能夠控制使用的內存超過限制的內存時時殺死仍是進入休眠
cpu是基於CPU時間片進行的資源帶哦度,cpuset是基於CPU核心進行的資源調度
service cgconfig status
Storm開發
Trident是基於Storm的高級抽象,除了提供實時流聚合、分組、過濾等功能,還提供了對數據持久化和事務性操做,保證了Tuple只能被處理一次而且不丟失
每次發送數據,Tuple被分紅一組組的batch,每個batch分配一個惟一的事務ID,batch之間的更新嚴格有序
分佈式RPC(DRPC)用於對Storm上大量的函數調用進行並行計算過程,分佈式RPC經過DRPC服務器協調接收一個RPC請求,發送請求到Storm Topology,並從Storm Topology接收結果;
一般應用分佈式RPC對Trident存儲的各類數據源進行並行查詢
用戶畫像建模 比較流行成熟的SQL-ON-Hadoop是Spark SQL,Mesos Spark或者Yarn Spark