海量數據處理使用的大可能是鼎鼎大名的hadoop或者hive,做爲一個批處理系統,hadoop以其吞吐量大、自動容錯等優勢,在海量數據處理上獲得了普遍的使用。可是,hadoop不擅長實時計算,由於它自然就是爲批處理而生的,這也是業界一致的共識。不然最近這兩年也不會有s4,storm,puma這些實時計算系統如雨後春筍般冒出來。先拋開s4,storm,puma這些系統不談,咱們首先來看一下,若是讓咱們本身設計一個實時計算系統,咱們要解決哪些問題:java
一、低延遲。都說了是實時計算系統了,延遲是必定要低的。
二、高性能。性能不高就是浪費機器,浪費機器是要受批評的哦。
三、分佈式。系統都是爲應用場景而生的,若是你的應用場景、你的數據和計算單機就能搞定,那麼不用考慮這些複雜的問題了,這裏講的是單機搞不定的狀況。
四、可擴展。伴隨着業務的發展,咱們的數據量、計算量可能會愈來愈大,因此但願這個系統是可擴展的。
五、容錯性。這是分佈式系統中通用問題。一個節點掛了不能影響個人應用。node
下面咱們來看看什麼是Stormgit
Storm是什麼
若是隻用一句話來描述storm的話,可能會是這樣:分佈式實時計算系統。按照storm做者的說法,storm對於實時計算的意義相似於hadoop對於批處理的意義。咱們都知道,根據google mapreduce來實現的hadoop爲咱們提供了map, reduce原語,使咱們的批處理程序變得很是地簡單和優美。一樣,storm也爲實時計算提供了一些簡單優美的原語,後面進行詳細介紹。github
咱們來看一下storm的適用場景。數據庫
一、流數據處理。Storm能夠用來處理源源不斷流進來的消息,處理以後將結果寫入到某個存儲中去。
二、分佈式RPC。因爲storm的處理組件是分佈式的,並且處理延遲極低,因此能夠做爲一個通用的分佈式rpc框架來使用。固然,其實咱們的搜索引擎自己也是一個分佈式rpc系統。編程
Storm的基本概念
首先,咱們經過一個storm和hadoop的對比來了解storm中的基本概念。架構
Hadoopapp |
Storm框架 |
|
系統角色分佈式 |
JobTracker |
Nimbus |
TaskTracker |
Supervisor |
|
Child |
Worker |
|
應用名稱 |
Job |
Topology |
組件接口 |
Mapper/Reducer |
Spout/Bolt |
其中:
一、Nimbus:負責資源分配和任務調度。
二、Supervisor:負責接受nimbus分配的任務,啓動和中止屬於本身管理的worker進程。
三、Worker:運行具體處理組件邏輯的進程。
四、Task:worker中每個spout/bolt的線程稱爲一個task. 在storm0.8以後,task再也不與物理線程對應,同一個spout/bolt的task可能會共享一個物理線程,該線程稱爲executor。
Storm組件
Storm集羣主要由一個主節點和一羣工做節點(worker node)組成,經過 Zookeeper進行協調。
主節點:
主節點一般運行一個後臺程序 —— Nimbus,用於響應分佈在集羣中的節點,分配任務和監測故障。這個很相似於Hadoop中的Job Tracker。
工做節點:
工做節點一樣會運行一個後臺程序 —— Supervisor,用於收聽工做指派並基於要求運行工做進程。每一個工做節點都是topology中一個子集的實現。而Nimbus和Supervisor之間的協調則經過Zookeeper系統或者集羣。
Zookeeper
Zookeeper是完成Supervisor和Nimbus之間協調的服務。而應用程序實現實時的邏輯則被封裝進Storm中的「topology」。topology則是一組由Spouts(數據源)和Bolts(數據操做)經過Stream Groupings進行鏈接的圖。下面對出現的術語進行更深入的解析。
Spout:
簡而言之,Spout歷來源處讀取數據並放入topology。Spout分紅可靠和不可靠兩種;當Storm接收失敗時,可靠的Spout會對tuple(元組,數據項組成的列表)進行重發;而不可靠的Spout不會考慮接收成功與否只發射一次。而Spout中最主要的方法就是nextTuple(),該方法會發射一個新的tuple到topology,若是沒有新tuple發射則會簡單的返回。
Bolt:
Topology中全部的處理都由Bolt完成。Bolt能夠完成任何事,好比:鏈接的過濾、聚合、訪問文件/數據庫、等等。Bolt從Spout中接收數據並進行處理,若是遇到複雜流的處理也可能將tuple發送給另外一個Bolt進行處理。而Bolt中最重要的方法是execute(),以新的tuple做爲參數接收。無論是Spout仍是Bolt,若是將tuple發射成多個流,這些流均可以經過declareStream()來聲明。
Stream Groupings:
Stream Grouping定義了一個流在Bolt任務間該如何被切分。這裏有Storm提供的6個Stream Grouping類型:
1. 隨機分組(Shuffle grouping):隨機分發tuple到Bolt的任務,保證每一個任務得到相等數量的tuple。
2. 字段分組(Fields grouping):根據指定字段分割數據流,並分組。例如,根據「user-id」字段,相同「user-id」的元組老是分發到同一個任務,不一樣「user-id」的元組可能分發到不一樣的任務。
3. 所有分組(All grouping):tuple被複制到bolt的全部任務。這種類型須要謹慎使用。
4. 全局分組(Global grouping):所有流都分配到bolt的同一個任務。明確地說,是分配給ID最小的那個task。
5. 無分組(None grouping):你不須要關心流是如何分組。目前,無分組等效於隨機分組。但最終,Storm將把無分組的Bolts放到Bolts或Spouts訂閱它們的同一線程去執行(若是可能)。
6. 直接分組(Direct grouping):這是一個特別的分組類型。元組生產者決定tuple由哪一個元組處理者任務接收。
固然還能夠實現CustomStreamGroupimg接口來定製本身須要的分組。
Storm記錄級容錯的基本原理
首先來看一下什麼叫作記錄級容錯?storm容許用戶在spout中發射一個新的源tuple時爲其指定一個message id, 這個message id能夠是任意的object對象。多個源tuple能夠共用一個message id,表示這多個源 tuple對用戶來講是同一個消息單元。storm中記錄級容錯的意思是說,storm會告知用戶每個消息單元是否在指定時間內被徹底處理了。那什麼叫作徹底處理呢,就是該message id綁定的源tuple及由該源tuple後續生成的tuple通過了topology中每個應該到達的bolt的處理。舉個例子。在圖4-1中,在spout由message 1綁定的tuple1和tuple2通過了bolt1和bolt2的處理生成兩個新的tuple,並最終都流向了bolt3。當這個過程完成處理完時,稱message 1被徹底處理了。
圖4-1
在storm的topology中有一個系統級組件,叫作acker。這個acker的任務就是追蹤從spout中流出來的每個message id綁定的若干tuple的處理路徑,若是在用戶設置的最大超時時間內這些tuple沒有被徹底處理,那麼acker就會告知spout該消息處理失敗了,相反則會告知spout該消息處理成功了。在剛纔的描述中,咱們提到了」記錄tuple的處理路徑」,若是曾經嘗試過這麼作的同窗能夠仔細地思考一下這件事的複雜程度。可是storm中倒是使用了一種很是巧妙的方法作到了。在說明這個方法以前,咱們來複習一個數學定理。
A xor A = 0.
A xor B…xor B xor A = 0,其中每個操做數出現且僅出現兩次。
storm中使用的巧妙方法就是基於這個定理。具體過程是這樣的:在spout中系統會爲用戶指定的message id生成一個對應的64位整數,做爲一個root id。root id會傳遞給acker及後續的bolt做爲該消息單元的惟一標識。同時不管是spout仍是bolt每次新生成一個tuple的時候,都會賦予該tuple一個64位的整數的id。Spout發射完某個message id對應的源tuple以後,會告知acker本身發射的root id及生成的那些源tuple的id。而bolt呢,每次接受到一個輸入tuple處理完以後,也會告知acker本身處理的輸入tuple的id及新生成的那些tuple的id。Acker只須要對這些id作一個簡單的異或運算,就能判斷出該root id對應的消息單元是否處理完成了。下面經過一個圖示來講明這個過程。
圖4-1 spout中綁定message 1生成了兩個源tuple,id分別是0010和1011.
圖4-2 bolt1處理tuple 0010時生成了一個新的tuple,id爲0110.
圖4-3 bolt2處理tuple 1011時生成了一個新的tuple,id爲0111.
圖4-4 bolt3中接收到tuple 0110和tuple 0111,沒有生成新的tuple.
可能有些細心的同窗會發現,容錯過程存在一個可能出錯的地方,那就是,若是生成的tuple id並非徹底各異的,acker可能會在消息單元徹底處理完成以前就錯誤的計算爲0。這個錯誤在理論上的確是存在的,可是在實際中其機率是極低極低的,徹底能夠忽略。
Storm的事務拓撲
事務拓撲(transactional topology)是storm0.7引入的特性,在最近發佈的0.8版本中已經被封裝爲Trident,提供了更加便利和直觀的接口。由於篇幅所限,在此對事務拓撲作一個簡單的介紹。
事務拓撲的目的是爲了知足對消息處理有着極其嚴格要求的場景,例如實時計算某個用戶的成交筆數,要求結果徹底精確,不能多也不能少。Storm的事務拓撲是徹底基於它底層的spout/bolt/acker原語實現的,經過一層巧妙的封裝得出一個優雅的實現。我的以爲這也是storm最大的魅力之一。
事務拓撲簡單來講就是將消息分爲一個個的批(batch),同一批內的消息以及批與批之間的消息能夠並行處理,另外一方面,用戶能夠設置某些bolt爲committer,storm能夠保證committer的finishBatch()操做是按嚴格不降序的順序執行的。用戶能夠利用這個特性經過簡單的編程技巧實現消息處理的精確。
Storm在淘寶
因爲storm的內核是clojure編寫的(不過大部分的拓展工做都是java編寫的),爲咱們理解它的實現帶來了必定的困難,好在大部分狀況下storm都比較穩定,固然咱們也在盡力熟悉clojure的世界。咱們在使用storm時一般都是選擇java語言開發應用程序。
在淘寶,storm被普遍用來進行實時日誌處理,出如今實時統計、實時風控、實時推薦等場景中。通常來講,咱們從類kafka的metaQ或者基於hbase的timetunnel中讀取實時日誌消息,通過一系列處理,最終將處理結果寫入到一個分佈式存儲中,提供給應用程序訪問。咱們天天的實時消息量從幾百萬到幾十億不等,數據總量達到TB級。對於咱們來講,storm每每會配合分佈式存儲服務一塊兒使用。在咱們正在進行的個性化搜索實時分析項目中,就使用了timetunnel + hbase + storm + ups的架構,天天處理幾十億的用戶日誌信息,從用戶行爲發生到完成分析延遲在秒級。