Storm內部的消息傳遞機制

轉載自https://www.cnblogs.com/Jack47/p/understanding-storm-internal-message-passing.htmlhtml

一個Storm拓撲,就是一個複雜的多階段的流式計算。Storm中的組件(Component)就是對各個階段的一個抽象,其中的Spout是生產者的角色,它負責源源不斷地從Storm外部接收消息,扔給下游的組件處理,下游組件處理完成後,最終輸出到外部的存儲系統。git

本文主要講解消息在Storm內部的各個組件(Component)之間如何進行傳遞,本文適用於JStorm 2.1.0之後的版本。對於JStorm各個版本的改進,見這裏github

若是讀者對Storm的基本組成部分如Spout,Bolt,Worker進程和Task不瞭解,能夠先看下 Storm介紹(一)Storm介紹(二)「理解Storm拓撲併發」編程

下文中使用的"消息"(Messagesage)和「元組"(Tuple)兩個詞語實際上是同一個意思。緩存

Storm中的每一個組件,能夠用一個三元組來定義網絡

<inputStreamId, boltImpl, outputStreamInfo>

其中inputStreamId定義了這個組件所消費的流的ID; boltImpl是bolt的具體實現的類;outputStreamInfo表明這個組件的輸出流的信息,包含兩部分:輸出流的ID和路由方式。因此當一個組件發射一個消息後,經過這個流的分組策略(Grouping)就能夠立馬計算出消費它的taskId。併發

拿下面中的拓撲爲例,從圖一看到這個拓撲由三個組件構成:藍色的Spout,消費方是Green Bolt,Green Bolt的消費方是Yellow Bolt。這個拓撲在集羣上的運行時的狀態如圖二所示:
它由兩個Worker進程組成,每一個Work裏面運行4個Task。仔細的讀者已經發現了這裏沒有Executor,這是JStorm和Storm很大的不一樣。JStorm認爲Executor的存在收益比過低,雖然它支持不停機動態擴大Task的數量,但同時增長了理解成本,增長了應用開發人員編程的複雜度,因此JStorm中去掉了Executor。ide

->
Storm_component<-工具

圖一 一個實際拓撲組件的構成性能

->
Storm_assign_example<-

圖二 集羣上運行的一個實際拓撲

在Storm拓撲內部,同一個拓撲的多個Worker之間會發生消息傳遞,好比上圖二中的兩個Worker進程,他們之間的通訊就是進程間的通訊了,發送的消息須要通過序列化和反序列化。Storm中,Worker之間使用Netty進行網絡通訊。

在Storm拓撲的一個Worker進程內部,多個Task之間也會進行通訊。好比上圖二中的Task 6和Task 3。Storm中Worker進程內部的消息通訊依賴於LMAX Disruptor這個高性能線程間通訊的消息通訊庫。

Storm內部的消息傳遞

JStorm與Storm在內部消息傳遞機制上的主要差異:
JStorm中獨立出一個線程來專門負責消息的反序列化,這樣執行線程單獨執行,而不是Storm那樣,一個線程負責執行反序列化並執行用戶的邏輯。至關因而把流水線拆解的更小了。這樣對於反序列化時延跟執行時延在同一個數量級的應用性能提高比較明顯。

Storm_Intra_Communication

圖三:JStorm內部消息隊列的概要圖

從圖裏看到隊列都是綠色的,這些隊列都是某個Worker內部的隊列。爲了可讀性只保留了一個Worker進程(一個storm節點通常都運行多個Worker),並且在這個Worker進程裏只畫了一個Task,(再次的,在一個Worker進程裏一般有多個Task)

詳細解釋

每一個Worker進程有一個NettyServer,它監聽在Worker的TCP端口上(經過 supervisor.slots.ports來配置),其餘須要跟它通訊的Worker會做爲NettyClient分別創建鏈接。當NettyServer接收到消息,會根據taskId參數把消息放到對應的反序列化隊列(DeserializedQueue)裏面。 topology.executor.receive.buffer.size決定了反序列化隊列的大小。TaskReceiver中的反序列化線程專門負責消費反序列化隊列中的消息:先將消息反序列化,而後放到執行隊列(Execute Queue)中去。

執行隊列的消費者是BoltExecutor線程,它負責從隊列中取出消息,執行用戶的代碼邏輯。執行完用戶的代碼邏輯後,最終經過OutputCollect輸出消息,此時消息裏已經生成了目標task的taskId。topology.executor.receive.buffer.size決定了執行隊列的大小。能夠看到JStorm中執行隊列跟反序列化隊列的大小是同一個配置項,即它們是一致的。

仔細看了圖三的同窗會發現執行隊列的生產者除了TaskReceiver外,還有一個。這種消息的來源就是Worker內部的其餘Task的TaskTransfer。

輸出的消息經過TaskTransfer來發送,若是目標Task是Worker內部的Task,就直接扔到目標Task的執行隊列中去。若是目標Task是在其餘Worker上,那就先放到序列化隊列中,而後由單獨的一個線程專門負責序列化,而後經過NettyClient發送出去。topology.executor.send.buffer.size決定了序列化隊列的大小。

每一個Worker進程有多個NettyClient,他們負責與其餘的Worker進行網絡通訊。

延伸閱讀

如何配置Storm的內部消息緩存

上面提到的衆多配置項都在conf/defaults.yaml裏有定義。能夠經過在Storm集羣的conf/storm.yaml裏進行配置來全局的覆值。也能夠經過Storm的Java API backtype.storm.Config 來對單個的Storm拓撲進行配置。

如何配置Storm的併發

Storm消息緩存的正確配置不可是和你的拓撲的負載類型緊密關聯的,並且和拓撲的併發度有很大關係。後者的詳細信息見理解Storm併發一文。

瞭解Storm拓撲上發生了什麼?

Storm UI是一個用於觀察你正在運行的拓撲的關鍵指標的良好開端。例如,它能夠給你展現所謂的Spout/Bolt的「容量」。衆多的指標能夠幫助你決定對本文中提到的衆多緩存相關的配置參數的修改,對你的拓撲的運行效率的影響是正向的仍是負向的。更多信息見"運行一個多節點Storm集羣"一文。

除此以外還能夠註冊應用程序本身的指標並使用相似Graphite這類工具來跟蹤它們。詳細信息見「從Storm發送指標到Graphite"和「經過RPM和Supervisord來安裝和運行Graphite"。

性能調優方面的建議

能夠看Storm主要做者Nathan Marz的演講:調優和上線Storm拓撲

看看其餘沒有提到的參數:
topology.spout.max.batch.size
數據容量(data volume),數據速度(velocity),消息的大小,處理一個消息的計算複雜度。因此須要不斷調整才能找到最佳的參數,沒有銀彈。

參考資料:

Understanding storm internal message buffers

相關文章
相關標籤/搜索