Storm筆記

用了一段時間Storm後的筆記。發現能夠記的東西很少,證實Storm挺簡單的,你只要遵循一些簡單的接口與原則,就能寫出大規模實時消息處理的程序。git

不斷更新中,請儘可能訪問博客原文。github

爲何用Storm 沒接觸前把Storm想象得很強大,接觸後以爲它就那樣無關緊要,再後來又以爲沒有了所有本身作也麻煩。算法

集羣管理:支持應用的部署,工做節點的管理(任務分配、HA、Scalable等),Metrics的收集。數據庫

數據流的傳輸與路由:支持多種數據在各處理節點間自由流動(是同類方案裏DAG拓撲最靈活的),基於Netty的高效傳輸機制,支持輪詢、廣播、按值分組的路由。apache

數據高可靠性的保證:支持數據流動了多個節點後,在某個節點的處理失敗,能夠引起數據從源頭開始重傳。數組

按Storm的官方說法,你也能夠本身搭建許多消息隊列和worker組成的網絡來實現實時處理,可是:網絡

乏味:你大部份開發時間花費在部署worker,部署中間隊列,配置消息發送到哪裏。你所關心的實時處理邏輯只佔了你的代碼不多的比例 。架構

脆弱:你要本身負責保持每一個worker和隊列正常工做。併發

伸縮時痛苦:當worker或隊列的消息吞吐量過高時,你須要從新分區,從新配置其它worker,讓它們發送消息到新位置。框架

缺點 核心代碼是用Clojure寫成,翻看代碼很是不便。其實,它如今不少新的外部模塊都用Java來寫了,另外阿里同窗翻寫了一個JStorm。

其餘流處理方案 Spark-Streaming:老是有人問爲何不用Spark Stream,首先它是Micro-Batch風格的準實時方案,間隔通常設到500ms。另外,它的消息流拓撲好像沒Storm那樣能夠隨便亂入,有時候必須弄個DB或MQ來作中間傳輸。

Samza: Linkedin的產品,與Storm比,傳輸基於Apache Kafka,集羣管理基於YARN,它只作處理的那塊,但多了基於RocksDB的狀態管理。但據《大數據日知錄》說,受限於Kafka和YARN,它的拓撲也不夠靈活。

自定義Spout Storm對可靠消息傳輸的支持程度,很大程度上依賴於Spout的實現。

並不默認就支持高可靠性的,collector emit的時候要傳輸msgId,要本身處理ack(msgId)和fail(msgId)函數。而不少spout其實沒有這樣作,只有Kafka Spout作的比較正規。

默認的,若是三十秒,消息流經的全部下游節點沒有都ack完畢,或者有一個節點報fail,則觸發fail(msgId)函數。

由於ack/fail的參數只有msgId,這就要Spout想在ack/fail時對上有消息源如Kafka/JMS進行ack/fail,又或者fail時想重發消息,若是須要完整的消息體而不僅是msgId才能完成時,要本身把msgId對應的消息存起來(會撐爆內存麼,好在Kafka不須要)。

另外,由於每一個Spout 是單線程的,會循環的調用nextTuple()的同時,調用ack()或fail()處理結果。因此nextTuple()若是沒消息時不要長期阻塞,避免把ack()也阻塞了。同時,Storm本身有個機制在nextTuple老是沒消息時Sleep一下避免空循環耗CPU,但參考storm-starter裏的spout,仍是直接內部等個50ms好了。在JStorm裏,就改成了兩條分開的線程。

另外,spout有時是每次被調用nextTuple()時主動去pull消息的,有時是被動接收push消息後存放在LinkedBlockingQueue裏,netxtTuple()時從Queue裏取消息的。Spout忽然crash的話,存在Queue裏的消息也會丟失,除非上游消息源有ack機制來保障。

Spout還有個Max Pending的配置,若是有太多消息沒有ack,它就不會再調nextTuple()。但若是上游消息源是主動Push的,消息仍是會源源不斷的來,累積在queue裏。

RichBolt vs BasicBolt 直接用BasicBolt,會在execute()後自動ack/fail Tuple,而RichBolt則須要自行調用ack/fail。

那何時使用RichBolt? Bolt不是在每次execute()時馬上產生新消息,須要異步的發送新消息(好比聚合一段時間的數據再發送)時,又或者想異步的ack/fail原消息時就須要。

BasicBolt的prepare()裏並無collector參數,只在每次execute()時傳入collector。而RichBolt恰好相反,你能夠在初始化時就把collector保存起來,用它在任意時候發送消息。

另外,若是用RichBolt的collector,還要考慮在發送消息時是否帶上傳入的Tuple,若是不帶,則下游的處理節點出錯也不會回溯到Spout重發。用BasicBolt則已默認帶上。

Ack機制 做者是一拍腦殼想到了用20個字節來追蹤每條Spout出來的消息被處理的狀況,原理是XOR的時候,N XOR N=0,同一個值以任意次序XOR兩次會歸0,如A XOR B XOR B XOR C XOR A XOR C =0, 在發出Tuple時,就用隨機產生的Tuple Id XOR一下。等接收的Bolt ack時,再XOR一下,就會歸0。因此當消息以任意的順序會流經不少節點,產生不少新Tuple,若是都被成功處理,即全部Tuple id都被以任意順序執行了兩次XOR,則這20個字節最後應該從新歸0,就可判斷所有ack完畢。

另外,重發是從最上游的Spout開始,若是某個bolt的操做是非冪等的,還要想一想怎麼本身去實現去重。

異常處理 若是但願上游的Spout重發消息,則在BasicBolt中拋出FailedException 或在RichBolt中直接fail掉Tuple。 其餘狀況下,execute()方法不該該拋出任何異常,或者你是故意拋出異常使得Topology停轉。

狀態管理 狀態數據分兩種,一種是本地歷史數據,不如使用路由規則,使相同用戶的數據老是路由到同一個Bolt。 一種是全局的數據。Storm徹底無論數據的持久化(Trident那塊沒用到不算), 《Storm 並不是徹底適合全部實時應用》 就是吐槽Storm的狀態數據管理的。

不像Linkedin的Samza,Bolt若是須要歷史數據,通常本身在內存裏管理數據(Crash掉或節點的變化致使路由變化就沒了哈),或者在本地起一個Redis/Memcached(不能與Bolt一塊兒管理,路由變化的數據遷移,性能也會削弱)

對於全局數據,一樣須要Cassandra之類高可擴展的NOSQL來幫忙,但此時延時會更厲害,性能瓶頸也極可能壓到了Cassandra上。

定時任務 定時聚合數據之類的需求,除了本身在bolt裏開定時器,還能夠用以下設置,全部Bolt都定時收到一條Tick消息:

conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 300);

以下函數用於判斷是Tick仍是正常業務消息:

protected static boolean isTickTuple(Tuple tuple) { return tuple.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID) && tuple.getSourceStreamId().equals(Constants.SYSTEM_TICK_STREAM_ID); }

拓撲的定義 除了使用Java代碼,還可使用Yaml來動態定義拓撲,見 https://github.com/ptgoetz/flux

併發度的定義及基於命令行的動態擴容見官方文檔,另對於worker進程數的建議是Use one worker per topology per machine。

序列化 Tuple除了傳基本類型與數組,AraayList,HashMap外,也能夠傳一下Java對象的。Storm使用Kyro做爲序列化框架,據測比Hessian什麼的都要快和小。但必定註冊這些額外的Java對象的類型,不然就會使用Java默認的序列化。

參看官方文檔,有兩種方式註冊類型,一個是storm.yaml文件,一個是Config類的registerSerialization方法。如無特殊需求,直接註冊須要序列化的類就能夠了,不須要本身實現一個Serializer,Kryo默認的按fields序列化的Serializer已足夠。

Spout和Bolt的構造函數只會在submit Topology時調一次,而後序列化起來,直接發給工做節點,工做節點裏實例化時不會被調用裏,因此複雜的成員變量記得都定義成transient,在open(),prepare()裏初始化及鏈接數據庫等資源。

另外,須要實現close()函數清理資源,但該函數不承諾在worker進程被殺時保證被調用。

fields grouping的算法 按名稱提取fields的值,疊加其hash值,再按當前的可選Task數量取模。因此,動態擴展Task數量,或某Task失效被重建的話,均可能讓原來的分配徹底亂掉。

與其餘開源技術的集成 好比External目錄裏的一堆,storm-contrib 裏也有一堆,目前支持Jdbc,Redis,HBase,HDFS,Hive,甚至還有Esper,目標都是經過配置(好比SQL及Input/Output fields),而非代碼,或儘可能少的代碼,實現交互。有時也能夠不必定要直接用它們,當成Example Code來看就行了。

另外,與傳統的Java應用思路相比,Bolt/Spout與資源鏈接時,比較難實現共享鏈接池的概念,鏈接池通常都是每一個Bolt/Spout實例自用的,要正確處理其鏈接數量。

HA的實現 若是Worker進程失效,Supervisor進程會檢查 Worker的心跳信息,從新建立。 Supervisor進程,須要用Daemon程序如monit來啓動,失效時自動從新啓動。 若是整個機器節點失效,Nimbus會在其餘節點上從新建立。

Nimbus進程,須要用Daemon程序如monit來啓動,失效時自動從新啓動。 若是Nimbus進程所在的機器直接倒了,須要在其餘機器上從新啓動,Storm目前沒有內建支持,須要本身寫腳本實現。

由於Supervisor和Nimbus在進程內都不保存狀態,狀態都保存在本地文件和ZooKeeper,所以進程能夠隨便殺。 即便Nimbus進程不在了,也只是不能部署新任務,有節點失效時不能從新分配而已,不影響已有的線程。 一樣,若是Supervisor進程失效,不影響已存在的Worker進程。

Zookeeper自己已是按至少三臺部署的HA架構了。

運維管理 Storm UI也是用Clojure寫的,比較難改,好在它提供了Restful API,能夠與其餘系統集成,或基於API重寫一個UI。

Metrics的採樣率是1/20(topology.stats.sample.rate=0.05),即Storm隨機從20個事件裏取出一個事件來進行統計,命中的話,counter 直接+20。

在舊版本的Storm使用舊版的ZooKeeper要啓動數據清理的腳本,在新版上只要修改ZooKeeper的配置文件zoo.cfg, 默認是24小時清理一次 autopurge.purgeInterval=24。

日誌的配置在logback/cluster.xml文件裏,Storm的日誌,自然的須要Logstash + ElasticSearch的集中式日誌方案。

storm.local.dir 要本身建,並且不支持~/ 表明用戶根目錄。

storm.yaml的默認值在 https://github.com/apache/storm/blob/master/conf/defaults.yaml

Tunning

  1. 內部傳輸機制的各類配置,見文檔

  2. 屏蔽ack機制,當可靠傳輸並非最重要時。能夠把Acker數量設爲0,可讓Spout不要發出msgId,或者bolt發送消息時不傳以前的Tuple。

相關文章
相關標籤/搜索