- 場景
伴隨着信息科技突飛猛進的發展,信息呈現出爆發式的膨脹,人們獲取信息的途徑也更加多樣、更加便捷,同時對於信息的時效性要求也愈來愈高。舉個搜索場景中的例子,當一個賣家發佈了一條寶貝信息時,他但願的固然是這個寶貝立刻就能夠被賣家搜索出來、點擊、購買啦,相反,若是這個寶貝要等到次日或者更久才能夠被搜出來,估計這個大哥就要罵娘了。再舉一個推薦的例子,若是用戶昨天在淘寶上買了一雙襪子,今天想買一副泳鏡去游泳,可是卻發現系統在竭盡全力地給他推薦襪子、鞋子,根本對他今天尋找泳鏡的行爲視而不見,估計這哥們內心就會想推薦你妹呀。其實稍微瞭解點背景知識的碼農們都知道,這是由於後臺系統作的是天天一次的全量處理,並且大可能是在夜深人靜之時作的,那麼你今天白天作的事情固然要明天才能反映出來啦。java
- 實現一個實時計算系統
全量數據處理使用的大可能是鼎鼎大名的hadoop或者hive,做爲一個批處理系統,hadoop以其吞吐量大、自動容錯等優勢,在海量數據處理上獲得了普遍的使用。可是,hadoop不擅長實時計算,由於它自然就是爲批處理而生的,這也是業界一致的共識。不然最近這兩年也不會有s4,storm,puma這些實時計算系統如雨後春筍般冒出來啦。先拋開s4,storm,puma這些系統不談,咱們首先來看一下,若是讓咱們本身設計一個實時計算系統,咱們要解決哪些問題。git
- 低延遲。都說了是實時計算系統了,延遲是必定要低的。
- 高性能。性能不高就是浪費機器,浪費機器是要受批評的哦。
- 分佈式。系統都是爲應用場景而生的,若是你的應用場景、你的數據和計算單機就能搞定,那麼不用考慮這些複雜的問題了。咱們所說的是單機搞不定的狀況。
- 可擴展。伴隨着業務的發展,咱們的數據量、計算量可能會愈來愈大,因此但願這個系統是可擴展的。
- 容錯。這是分佈式系統中通用問題。一個節點掛了不能影響個人應用。
好,若是僅僅須要解決這5個問題,可能會有無數種方案,並且各有千秋,隨便舉一種方案,使用消息隊列+分佈在各個機器上的工做進程就ok啦。咱們再繼續往下看。github
- 容易在上面開發應用程序。親,你設計的系統須要應用程序開發人員考慮各個處理組件的分佈、消息的傳遞嗎?若是是,那有點麻煩啊,開發人員可能會用很差,也不會想去用。
- 消息不丟失。用戶發佈的一個寶貝消息不能在實時處理的時候給丟了,對吧?更嚴格一點,若是是一個精確數據統計的應用,那麼它處理的消息要很少很多才行。這個要求有點高哦。
- 消息嚴格有序。有些消息之間是有強相關性的,好比同一個寶貝的更新和刪除操做消息,若是處理時搞亂順序徹底是不同的效果了。
不知道你們對這些問題是否都有了本身的答案,下面讓咱們帶着這些問題,一塊兒來看一看storm的解決方案吧。數據庫
- Storm是什麼
若是隻用一句話來描述storm的話,可能會是這樣:分佈式實時計算系統。按照storm做者的說法,storm對於實時計算的意義相似於hadoop對於批處理的意義。咱們都知道,根據google mapreduce來實現的hadoop爲咱們提供了map, reduce原語,使咱們的批處理程序變得很是地簡單和優美。一樣,storm也爲實時計算提供了一些簡單優美的原語。咱們會在第三節中詳細介紹。編程
咱們來看一下storm的適用場景。架構
- 流數據處理。Storm能夠用來處理源源不斷流進來的消息,處理以後將結果寫入到某個存儲中去。
- 分佈式rpc。因爲storm的處理組件是分佈式的,並且處理延遲極低,因此能夠做爲一個通用的分佈式rpc框架來使用。固然,其實咱們的搜索引擎自己也是一個分佈式rpc系統。
說了半天,好像都是很玄乎的東西,下面咱們開始具體講解storm的基本概念和它內部的一些實現原理吧。app
- Storm的基本概念
首先咱們經過一個 storm 和hadoop的對比來了解storm中的基本概念。框架
Hadoop | Storm | |
系統角色 | JobTracker | Nimbus |
TaskTracker | Supervisor | |
Child | Worker | |
應用名稱 | Job | Topology |
組件接口 | Mapper/Reducer | Spout/Bolt |
表3-1分佈式
接下來咱們再來具體看一下這些概念。ide
- Nimbus:負責資源分配和任務調度。
- Supervisor:負責接受nimbus分配的任務,啓動和中止屬於本身管理的worker進程。
- Worker:運行具體處理組件邏輯的進程。
- Task:worker中每個spout/bolt的線程稱爲一個task. 在storm0.8以後,task再也不與物理線程對應,同一個spout/bolt的task可能會共享一個物理線程,該線程稱爲executor。
- Topology:storm中運行的一個實時應用程序,由於各個組件間的消息流動造成邏輯上的一個拓撲結構。
- Spout:在一個topology中產生源數據流的組件。一般狀況下spout會從外部數據源中讀取數據,而後轉換爲topology內部的源數據。Spout是一個主動的角色,其接口中有個nextTuple()函數,storm框架會不停地調用此函數,用戶只要在其中生成源數據便可。
- Bolt:在一個topology中接受數據而後執行處理的組件。Bolt能夠執行過濾、函數操做、合併、寫數據庫等任何操做。Bolt是一個被動的角色,其接口中有個execute(Tuple input)函數,在接受到消息後會調用此函數,用戶能夠在其中執行本身想要的操做。
- Tuple:一次消息傳遞的基本單元。原本應該是一個key-value的map,可是因爲各個組件間傳遞的tuple的字段名稱已經事先定義好,因此tuple中只要按序填入各個value就好了,因此就是一個value list.
- Stream:源源不斷傳遞的tuple就組成了stream。
10. stream grouping:即消息的partition方法。Storm中提供若干種實用的grouping方式,包括shuffle, fields hash, all, global, none, direct和localOrShuffle等
相比於s4, puma等其餘實時計算系統,storm最大的亮點在於其記錄級容錯和可以保證消息精確處理的事務功能。下面就重點來看一下這兩個亮點的實現原理。
- Storm記錄級容錯的基本原理
首先來看一下什麼叫作記錄級容錯?storm容許用戶在spout中發射一個新的源tuple時爲其指定一個message id, 這個message id能夠是任意的object對象。多個源tuple能夠共用一個message id,表示這多個源 tuple對用戶來講是同一個消息單元。storm中記錄級容錯的意思是說,storm會告知用戶每個消息單元是否在指定時間內被徹底處理了。那什麼叫作徹底處理呢,就是該message id綁定的源tuple及由該源tuple後續生成的tuple通過了topology中每個應該到達的bolt的處理。舉個例子。在圖4-1中,在spout由message 1綁定的tuple1和tuple2通過了bolt1和bolt2的處理生成兩個新的tuple,並最終都流向了bolt3。當這個過程完成處理完時,稱message 1被徹底處理了。
圖4-1
在storm的topology中有一個系統級組件,叫作acker。這個acker的任務就是追蹤從spout中流出來的每個message id綁定的若干tuple的處理路徑,若是在用戶設置的最大超時時間內這些tuple沒有被徹底處理,那麼acker就會告知spout該消息處理失敗了,相反則會告知spout該消息處理成功了。在剛纔的描述中,咱們提到了」記錄tuple的處理路徑」,若是曾經嘗試過這麼作的同窗能夠仔細地思考一下這件事的複雜程度。可是storm中倒是使用了一種很是巧妙的方法作到了。在說明這個方法以前,咱們來複習一個數學定理。
A xor A = 0.
A xor B…xor B xor A = 0,其中每個操做數出現且僅出現兩次。
storm中使用的巧妙方法就是基於這個定理。具體過程是這樣的:在spout中系統會爲用戶指定的message id生成一個對應的64位整數,做爲一個root id。root id會傳遞給acker及後續的bolt做爲該消息單元的惟一標識。同時不管是spout仍是bolt每次新生成一個tuple的時候,都會賦予該tuple一個64位的整數的id。Spout發射完某個message id對應的源tuple以後,會告知acker本身發射的root id及生成的那些源tuple的id。而bolt呢,每次接受到一個輸入tuple處理完以後,也會告知acker本身處理的輸入tuple的id及新生成的那些tuple的id。Acker只須要對這些id作一個簡單的異或運算,就能判斷出該root id對應的消息單元是否處理完成了。下面經過一個圖示來講明這個過程。
圖4-1 spout中綁定message 1生成了兩個源tuple,id分別是0010和1011.
圖4-2 bolt1處理tuple 0010時生成了一個新的tuple,id爲0110.
圖4-3 bolt2處理tuple 1011時生成了一個新的tuple,id爲0111.
圖4-4 bolt3中接收到tuple 0110和tuple 0111,沒有生成新的tuple.
可能有些細心的同窗會發現,容錯過程存在一個可能出錯的地方,那就是,若是生成的tuple id並非徹底各異的,acker可能會在消息單元徹底處理完成以前就錯誤的計算爲0。這個錯誤在理論上的確是存在的,可是在實際中其機率是極低極低的,徹底能夠忽略。
- Storm的事務拓撲
事務拓撲(transactional topology)是storm0.7引入的特性,在最近發佈的0.8版本中已經被封裝爲Trident,提供了更加便利和直觀的接口。由於篇幅所限,在此對事務拓撲作一個簡單的介紹。
事務拓撲的目的是爲了知足對消息處理有着極其嚴格要求的場景,例如實時計算某個用戶的成交筆數,要求結果徹底精確,不能多也不能少。Storm的事務拓撲是徹底基於它底層的spout/bolt/acker原語實現的,經過一層巧妙的封裝得出一個優雅的實現。我的以爲這也是storm最大的魅力之一。
事務拓撲簡單來講就是將消息分爲一個個的批(batch),同一批內的消息以及批與批之間的消息能夠並行處理,另外一方面,用戶能夠設置某些bolt爲committer,storm能夠保證committer的finishBatch()操做是按嚴格不降序的順序執行的。用戶能夠利用這個特性經過簡單的編程技巧實現消息處理的精確。
- Storm在淘寶
因爲storm的內核是clojure編寫的(不過大部分的拓展工做都是java編寫的),爲咱們理解它的實現帶來了必定的困難,好在大部分狀況下storm都比較穩定,固然咱們也在盡力熟悉clojure的世界。咱們在使用storm時一般都是選擇java語言開發應用程序。
在淘寶,storm被普遍用來進行實時日誌處理,出如今實時統計、實時風控、實時推薦等場景中。通常來講,咱們從類kafka的metaQ或者基於hbase的timetunnel中讀取實時日誌消息,通過一系列處理,最終將處理結果寫入到一個分佈式存儲中,提供給應用程序訪問。咱們天天的實時消息量從幾百萬到幾十億不等,數據總量達到TB級。對於咱們來講,storm每每會配合分佈式存儲服務一塊兒使用。在咱們正在進行的個性化搜索實時分析項目中,就使用了timetunnel + hbase + storm + ups的架構,天天處理幾十億的用戶日誌信息,從用戶行爲發生到完成分析延遲在秒級。
- Storm的將來
Storm0.7系列的版本已經在各大公司獲得了普遍使用,最近發佈的0.8版本中引入了State,使得其從一個純計算框架演變成了一個包含存儲和計算的實時計算新利器,還有剛纔提到的Trident,提供更加友好的接口,同時可定製scheduler的特性也爲其針對不一樣的應用場景作優化提供了更便利的手段,也有人已經在基於storm的實時ql(query language)上邁出了腳本。在服務化方面,storm一直在朝着融入mesos框架的方向努力。同時,storm也在實現細節上不斷地優化,使用不少優秀的開源產品,包括kryo, Disruptor, curator等等。能夠想象,當storm發展到1.0版本時,必定是一款無比傑出的產品,讓咱們拭目以待,固然,最好仍是參與到其中去吧,同窗們。
- 參考文獻
[1]storm官方wiki及code. https://github.com/nathanmarz/storm