Storm技術加強
注:學習本課程,請先學習Storm基礎
課程目標:
經過本模塊的學習,可以掌握Storm底層的通訊機制、消息容錯機制、storm目錄樹及任務提交流程。
課程大綱:
一、 Storm程序的併發機制
二、 Storm框架通訊機制(worker內部通訊與外部通訊)
三、 Storm組件本地目錄樹
四、 Storm zookeeper目錄樹
五、 Storm 任務提交的過程
一、Storm程序的併發機制
1.一、概念
Workers (JVMs): 在一個物理節點上能夠運行一個或多個獨立的JVM 進程。一個Topology能夠包含一個或多個worker(並行的跑在不一樣的物理機上), 因此worker process就是執行一個topology的子集, 而且worker只能對應於一個topology
Executors (threads): 在一個worker JVM進程中運行着多個Java線程。一個executor線程能夠執行一個或多個tasks。但通常默認每一個executor只執行一個task。一個worker能夠包含一個或多個executor, 每一個component (spout或bolt)至少對應於一個executor, 因此能夠說executor執行一個compenent的子集, 同時一個executor只能對應於一個component。
Tasks(bolt/spout instances):Task就是具體的處理邏輯對象,每個Spout和Bolt會被看成不少task在整個集羣裏面執行。每個task對應到一個線程,而stream grouping則是定義怎麼從一堆task發射tuple到另一堆task。你能夠調用TopologyBuilder.setSpout和TopBuilder.setBolt來設置並行度 — 也就是有多少個task。
1.二、配置並行度
對於併發度的配置, 在storm裏面能夠在多個地方進行配置, 優先級爲:
defaults.yaml < storm.yaml < topology-specific configuration
< internal component-specific configuration < external component-specific configuration
worker processes的數目, 能夠經過配置文件和代碼中配置, worker就是執行進程, 因此考慮併發的效果, 數目至少應該大亍machines的數目
executor的數目, component的併發線程數,只能在代碼中配置(經過setBolt和setSpout的參數), 例如, setBolt(「green-bolt」, new GreenBolt(), 2)
tasks的數目, 能夠不配置, 默認和executor1:1, 也能夠經過setNumTasks()配置
Topology的worker數經過config設置,即執行該topology的worker(java)進程數。它能夠經過 storm rebalance 命令任意調整。
Config conf = newConfig();
conf.setNumWorkers(2); //用2個worker
topologyBuilder.setSpout(「blue-spout」, newBlueSpout(), 2); //設置2個併發度
topologyBuilder.setBolt(「green-bolt」, newGreenBolt(), 2).setNumTasks(4).shuffleGrouping(「blue-spout」); //設置2個併發度,4個任務
topologyBuilder.setBolt(「yellow-bolt」, newYellowBolt(), 6).shuffleGrouping(「green-bolt」); //設置6個併發度
StormSubmitter.submitTopology(「mytopology」, conf, topologyBuilder.createTopology());java
3個組件的併發度加起來是10,就是說拓撲一共有10個executor,一共有2個worker,每一個worker產生10 / 2 = 5條線程。
綠色的bolt配置成2個executor和4個task。爲此每一個executor爲這個bolt運行2個task。node
動態的改變並行度
Storm支持在不 restart topology 的狀況下, 動態的改變(增減) worker processes 的數目和 executors 的數目, 稱爲rebalancing. 經過Storm web UI,或者經過storm rebalance命令實現:
storm rebalance mytopology -n 5 -e blue-spout=3 -e yellow-bolt=10web
二、Storm通訊機制
Worker間的通訊常常須要經過網絡跨節點進行,Storm使用ZeroMQ或Netty(0.9之後默認使用)做爲進程間通訊的消息框架。
Worker進程內部通訊:不一樣worker的thread通訊使用LMAX Disruptor來完成。
不一樣topologey之間的通訊,Storm不負責,須要本身想辦法實現,例如使用kafka等;
2.一、Worker進程間通訊
worker進程間消息傳遞機制,消息的接收和處理的大概流程見下圖編程
對於worker進程來講,爲了管理流入和傳出的消息,每一個worker進程有一個獨立的接收線程(對配置的TCP端口supervisor.slots.ports進行監聽);
對應Worker接收線程,每一個worker存在一個獨立的發送線程,它負責從worker的transfer-queue中讀取消息,並經過網絡發送給其餘worker
每一個executor有本身的incoming-queue和outgoing-queue。
Worker接收線程將收到的消息經過task編號傳遞給對應的executor(一個或多個)的incoming-queues;
每一個executor有單獨的線程分別來處理spout/bolt的業務邏輯,業務邏輯輸出的中間數據會存放在outgoing-queue中,當executor的outgoing-queue中的tuple達到必定的閥值,executor的發送線程將批量獲取outgoing-queue中的tuple,併發送到transfer-queue中。
每一個worker進程控制一個或多個executor線程,用戶可在代碼中進行配置。其實就是咱們在代碼中設置的併發度個數。
2.二、Worker進程間通訊分析api
一、 Worker接受線程經過網絡接受數據,並根據Tuple中包含的taskId,匹配到對應的executor;而後根據executor找到對應的incoming-queue,將數據存發送到incoming-queue隊列中。
二、 業務邏輯執行現成消費incoming-queue的數據,經過調用Bolt的execute(xxxx)方法,將Tuple做爲參數傳輸給用戶自定義的方法
三、 業務邏輯執行完畢以後,將計算的中間數據發送給outgoing-queue隊列,當outgoing-queue中的tuple達到必定的閥值,executor的發送線程將批量獲取outgoing-queue中的tuple,併發送到Worker的transfer-queue中
四、 Worker發送線程消費transfer-queue中數據,計算Tuple的目的地,鏈接不一樣的node+port將數據經過網絡傳輸的方式傳送給另外一個的Worker。
五、 另外一個worker執行以上步驟1的操做。
2.三、Worker進程間技術(Netty、ZeroMQ)
2.3.一、Netty
Netty是一個NIO client-server(客戶端服務器)框架,使用Netty能夠快速開發網絡應用,例如服務器和客戶端協議。Netty提供了一種新的方式來使開發網絡應用程序,這種新的方式使得它很容易使用和有很強的擴展性。Netty的內部實現時很複雜的,可是Netty提供了簡單易用的api從網絡處理代碼中解耦業務邏輯。Netty是徹底基於NIO實現的,因此整個Netty都是異步的。
書籍:Netty權威指南
2.3.二、ZeroMQ
ZeroMQ是一種基於消息隊列的多線程網絡庫,其對套接字類型、鏈接處理、幀、甚至路由的底層細節進行抽象,提供跨越多種傳輸協議的套接字。ZeroMQ是網絡通訊中新的一層,介於應用層和傳輸層之間(按照TCP/IP劃分),其是一個可伸縮層,可並行運行,分散在分佈式系統間。
ZeroMQ定位爲:一個簡單好用的傳輸層,像框架同樣的一個socket library,他使得Socket編程更加簡單、簡潔和性能更高。是一個消息處理隊列庫,可在多個線程、內核和主機盒之間彈性伸縮。ZMQ的明確目標是「成爲標準網絡協議棧的一部分,以後進入Linux內核」。
2.四、Worker 內部通訊技術(Disruptor)
2.4.一、 Disruptor的來歷
一個公司的業務與技術的關係,通常能夠分爲三個階段。第一個階段就是跟着業務跑。第二個階段是經歷了幾年的時間,才達到的驅動業務階段。第三個階段,技術引領業務的發展乃至企業的發展。因此咱們在學習Disruptor這個技術時,不得不提LMAX這個機構,由於Disruptor這門技術就是由LMAX公司開發並開源的。
LMAX是在英國註冊並受到FSA監管(監管號碼爲509778)的外匯黃金交易所。LMAX也是歐洲第一家也是惟一一家採用多邊交易設施Multilateral Trading Facility(MTF)擁有交易所牌照和經紀商牌照的歐洲頂級金融公司
LAMX擁有最迅捷的交易平臺,頂級技術支持。LMAX交易所使用「(MTF)分裂器Disruptor」技術,能夠在極短期內(通常在3百萬秒之一內)處理訂單,在一個線程裏每秒處理6百萬訂單。全部訂單均爲撮合成交形式,無一例外。多邊交易設施(MTF)曾經用來設計倫敦證券交易 所(london Stock Exchange)、德國證券及衍生工具交易所(Deutsche Borse)和歐洲證券交易所(Euronext)。
2011年LMAX憑藉該技術得到了金融行業技術評選大賽的最佳交易系統獎和甲骨文「公爵杯」創新編程框架獎。
2.4.二、Disruptor是什麼
一、 簡單理解:Disruptor是一個Queue。Disruptor是實現了「隊列」的功能,並且是一個有界隊列。而隊列的應用場景天然就是「生產者-消費者」模型。
二、 在JDK中Queue有不少實現類,包括不限於ArrayBlockingQueue、LinkBlockingQueue,這兩個底層的數據結構分別是數組和鏈表。數組查詢快,鏈表增刪快,可以適應大多數應用場景。
三、 可是ArrayBlockingQueue、LinkBlockingQueue都是線程安全的。涉及到線程安全,就會有synchronized、lock等關鍵字,這就意味着CPU會打架。
四、 Disruptor一種線程之間信息無鎖的交換方式(使用CAS(Compare And Swap/Set)操做)。
2.4.二、Disruptor主要特色
一、 沒有競爭=沒有鎖=很是快。
二、 全部訪問者都記錄本身的序號的實現方式,容許多個生產者與多個消費者共享相同的數據結構。
三、 在每一個對象中都能跟蹤序列號(ring buffer,claim Strategy,生產者和消費者),加上神奇的cache line padding,就意味着沒有爲僞共享和非預期的競爭。
2.4.二、 Disruptor 核心技術點
Disruptor能夠當作一個事件監聽或消息機制,在隊列中一邊生產者放入消息,另一邊消費者並行取出處理.
底層是單個數據結構:一個ring buffer。
每一個生產者和消費者都有一個次序計算器,以顯示當前緩衝工做方式。
每一個生產者消費者可以操做本身的次序計數器的可以讀取對方的計數器,生產者可以讀取消費者的計算器確保其在沒有鎖的狀況下是可寫的。數組
核心組件
Ring Buffer 環形的緩衝區,負責對經過 Disruptor 進行交換的數據(事件)進行存儲和更新。
Sequence 經過順序遞增的序號來編號管理經過其進行交換的數據(事件),對數據(事件)的處理過程老是沿着序號逐個遞增處理。
RingBuffer底層是個數組,次序計算器是一個64bit long 整數型,平滑增加。緩存
一、 接受數據並寫入到腳標31的位置,以後會沿着序號一直寫入,可是不會繞過消費者所在的腳標。
二、 Joumaler和replicator同時讀到24的位置,他們能夠批量讀取數據到30
三、消費邏輯線程讀到了14的位置,可是無法繼續讀下去,由於他的sequence暫停在15的位置上,須要等到他的sequence給他序號。若是sequence能正常工做,就能讀取到30的數據。安全
三、Storm組件本地目錄樹服務器
四、Storm zookeeper目錄樹網絡
五、Storm 任務提交的過程
TopologyMetricsRunnable.TaskStartEvent[oldAssignment=,newAssignment=Assignment[masterCodeDir=C:\Users\MAOXIA~1\AppData\Local\Temp\e73862a8-f7e7-41f3-883d-af494618bc9f\nimbus\stormdist\double11-1-1458909887,nodeHost={61ce10a7-1e78-4c47-9fb3-c21f43a331ba=192.168.1.106},taskStartTimeSecs={1=1458909910, 2=1458909910, 3=1458909910, 4=1458909910, 5=1458909910, 6=1458909910, 7=1458909910, 8=1458909910},workers=[ResourceWorkerSlot[hostname=192.168.1.106,memSize=0,cpu=0,tasks=[1, 2, 3, 4, 5, 6, 7, 8],jvm=,nodeId=61ce10a7-1e78-4c47-9fb3-c21f43a331ba,port=6900]],timeStamp=1458909910633,type=Assign],task2Component=,clusterName=,topologyId=double11-1-1458909887,timestamp=0]
六、Storm 消息容錯機制
6.一、整體介紹
在storm中,可靠的信息處理機制是從spout開始的。
一個提供了可靠的處理機制的spout須要記錄他發射出去的tuple,當下遊bolt處理tuple或者子tuple失敗時spout可以從新發射。
Storm經過調用Spout的nextTuple()發送一個tuple。爲實現可靠的消息處理,首先要給每一個發出的tuple帶上惟一的ID,而且將ID做爲參數傳遞給SoputOutputCollector的emit()方法:collector.emit(new Values(「value1」,」value2」), msgId); messageid就是用來標示惟一的tupke的,而rootid是隨機生成的
給每一個tuple指定ID告訴Storm系統,不管處理成功仍是失敗,spout都要接收tuple樹上全部節點返回的通知。若是處理成功,spout的ack()方法將會對編號是msgId的消息應答確認;若是處理失敗或者超時,會調用fail()方法。
6.二、基本實現
Storm 系統中有一組叫作」acker」的特殊的任務,它們負責跟蹤DAG(有向無環圖)中的每一個消息。
acker任務保存了spout id到一對值的映射。第一個值就是spout的任務id,經過這個id,acker就知道消息處理完成時該通知哪一個spout任務。第二個值是一個64bit的數字,咱們稱之爲」ack val」, 它是樹中全部消息的隨機id的異或計算結果。
ack val表示了整棵樹的的狀態,不管這棵樹多大,只須要這個固定大小的數字就能夠跟蹤整棵樹。當消息被建立和被應答的時候都會有相同的消息id發送過來作異或。 每當acker發現一棵樹的ack val值爲0的時候,它就知道這棵樹已經被徹底處理了
總結:
Storm爲了保證每條數據成功被處理,實現至少一次語義,經過Storm的ACK機制能夠對spout產生的每個tuple進行跟蹤;
tuple處理成功是指這個Tuple以及這個Tuple產生的全部子Tuple都被成功處理, 由每個處理bolt經過OutputCollector的方法ack(tuple)來告知storm當前bolt處理成功,最終調用spout的ack方法;
處理失敗是指這個Tuple或這個Tuple產生的全部Tuple中的任意一個tuple處理失敗或者超時(超時時間由Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS指定), 處理失敗bolt調用OutputCollector的方法fail(tuple),來告知storm當前bolt處理失敗,最終調用spout的fail方法從新發送失敗的tuple,失敗時storm不會自動重發失敗的tuple,須要咱們在spout中從新獲取發送失敗數據,手動從新發送一次。
Ack原理
Storm中有個特殊的task名叫acker,他們負責跟蹤spout發出的每個Tuple的Tuple樹(由於一個tuple經過spout發出了,通過每個bolt處理後,會生成一個新的tuple發送出去)。當acker(框架自啓動的task)發現一個Tuple樹已經處理完成了,它會發送一個消息給產生這個Tuple的那個task。對任意大的一個Tuple樹,storm只須要恆定的20字節就能夠進行跟蹤。acker對於每一個spout-tuple保存一個ack-val的校驗值,它的初始值是0,而後每發射一個Tuple或Ack一個Tuple時,這個Tuple的id就要跟這個校驗值異或一下,
而且把獲得的值更新爲ack-val的新值。那麼假設每一個發射出去的Tuple都被ack了,那麼最後ack-val的值就必定是0。Acker就根據ack-val是否爲0來判斷是否徹底處理,若是爲0則認爲已徹底處理。
例以下圖是一個簡單的Topology:
開啓Act機制 1.spout發射tuple的時候指定messageId 2.spout對發射的tuple進行緩存,不然spout沒法獲取發送失敗的數據進行重發, (這裏到底系統裏有沒有緩存沒有成功處理的tuple,好比接口conf.setMaxSpoutPending()是否只緩存了條數仍是原始數據還要去查證一下) 3.spout要重寫BaseRichSpout的fail和ack方法,spout根據messageId對於成功處理的tuple從緩存隊列中刪除,對於失敗的tuple選擇重發或作其它處理; 4.若是使用BasicBolt,BasicOutputCollector在emit新的tuple時自動與源tuple錨定,execute方法結束時源tuple會被自動ack或fail; 使用RichBolt在emit數據的時需顯示指定該數據的源tuple,以保持tracker鏈路,即collector.emit(oldTuple, newTuple); 而且須要在execute執行成功後調用OutputCollector.ack(tuple), 當失敗處理時,執行OutputCollector.fail(tuple); 5.設置acker數大於0,conf.setNumAckers(>0); 關閉Ack機制 1.在Tuple層面去掉可靠性。在發射Tuple的時候不指定MessageID來達到不不跟蹤這個Tuple的目的 2.若是對於一個Tuple樹裏面的某一部分到底成不成功不是很關心,那麼能夠在Bolt發射這些Tuple的時候不錨定它們。 這樣這些Tuple就不在Tuple樹裏面,也就不會被跟蹤了。 3.把Config.TOPOLOGY_ACKERS設置成0。在這種狀況下,Storm會在Spout發射一個Tuple以後立刻調用Spout的ack方法, 也就是說這個Tuple樹不會被跟蹤。 例子程序: public class RandomSentenceSpout extends BaseRichSpout { private SpoutOutputCollector _collector; private Random _rand; private ConcurrentHashMap