Storm入門,包學會

前言

只有光頭才能變強。html

文本已收錄至個人GitHub精選文章,歡迎Stargithub.com/ZhongFuChen…git

據說過大數據的同窗應該都據說過Storm吧?其實我如今負責的系統用的就是Storm,在最開始接手系統的時候,我是徹底不瞭解Storm的(如今其實也是隻知其一;不知其二而已)github

因爲最近在整理系統,因此順便花了點時間入門了一下Storm(前幾天花了點時間改了一下,上線之後一堆Bug,因而就果斷回滾了。)web

這篇文章來說講簡單Storm的簡單使用,沒有複雜的東西。看完這篇文章,等到接手Storm的代碼的時候大家**『大概』『應該』**能看懂Storm的代碼。算法

什麼是Storm

咱們首先進官方看一下Storm的介紹:apache

Apache Storm is a free and open source distributed realtime computation system後端

Storm是一個分佈式的實時計算系統安全

分佈式:我在以前已經寫過挺多的分佈式的系統了,好比Kafka/HDFS/Elasticsearch等等。如今看到分佈式這個詞,三歪第一反應就是「它的存儲或者計算交由多臺服務器上完成,最後彙總起來達到最終的效果」。服務器

實時:處理速度是毫秒級或者秒級的微信

計算:能夠簡單理解爲對數據進行處理,好比清洗數據(對數據進行規整,取出有用的數據)。

咱們使用Storm作了什麼?

我如今作的消息管理平臺是能夠推送各種的消息的(IM/PUSH/短信/微信消息等等),消息下發後,咱們是確定要知道這條消息的下發狀況的(是否發送成功,若是用戶沒收到是因爲什麼緣由致使用戶沒收到,消息是否被點擊了等等)。

消息是否成功下發到用戶上,這是運營和客服常常關心的問題。

消息下發的效果,這是運營很是關心的問題

基於上面問題,咱們用了Storm作了一套本身的埋點方案,幫助咱們快速確認消息是否成功下發到用戶上以及統計消息下發的效果

聽起來好像很牛逼,下面我來說講背景,看完你就會發現一點兒都不難。

需求背景

消息管理平臺雖然看起來只是發消息的,可是系統設計仍是有點東西的。咱們以「微服務」的思想去看這個系統,會將不一樣的功能模塊抽取到不一樣的系統的。

其中PUSH(推送)的鏈路是最長的,一條消息下發通過的後端系統就有7個,如圖下:

這7個系統都有可能「幹掉」了這條消息,致使用戶沒收到。若是咱們每去查一個問題,都要逐一排查每一個系統,那實在是太慢了。

不少時候客服反饋過來的問題都是當天的,甚至是前幾分鐘的,咱們須要有一個及時的反饋給客服來幫助用戶找到爲何收不到消息的緣由。

因而咱們要作兩個功能:

  1. 可以查詢用戶 當天全部的消息下發狀況。(可以快速定位是哪一個系統什麼緣由致使用戶收不到消息)
  2. 查詢某條消息的 實時總體下發狀況。(可以快速查看該消息的總體下發狀況,包括下發量,中途過濾的量以及點擊量)

若是是單純查問題,咱們將各個系統的日誌收集到Kafka,而後寫到Elasticsearch這個是徹底沒問題的(如今咱們也是這麼幹的)

涉及到統計相關的,咱們就有本身的一套埋點方案,這個是便於對數據的統計,也能完成部分排查的功能

需求實現

前面提到了「埋點」,實際上就是打日誌。其實就是在關鍵的地方上打上日誌作記錄,方便排查問題。

好比,如今咱們有7個系統,每一個系統在執行消息的時候都會可能致使這條消息發不出去(多是消息去重了,多是用戶的手機號不正確,多是用戶過久沒有登陸了等等都有可能)。咱們在這些『關鍵位置』都打上日誌,方便咱們去排查。

這些「關鍵位置」咱們都給它用簡單的數字來命個名。好比說:咱們用「11」來表明這個用戶沒有綁定手機號,用「12」來表明這個用戶10分鐘前收到了一條如出一轍的消息,用「13」來表明這個用戶屏蔽了消息.....

「11」「12」「13」「14」「15」「16」這些就叫作「點位」,把這些點位在關鍵的位置中打上日誌,這個就叫作「埋點

有了埋點,咱們要作的就是將這些點位收集起來,而後統一處理成咱們的格式,輸出到數據源中。

OK,就是分三步:

  1. 收集日誌
  2. 清洗日誌
  3. 輸出到數據源

收集日誌咱們有logAgent幫咱們收集到Kafka,實時清洗日誌咱們用的就是Storm,清洗完咱們輸出到Redis(實時)/Hive(離線)。

Storm通常是在處理(清洗)那層,Storm的上下游也很明確了(上游是消息隊列,下游寫到各類數據源,這種是最多見的):

Storm統一清洗出來放到Redis,咱們就能夠經過接口來很方便去查一條消息的總體下發狀況,好比:

到這裏,主要想說明咱們經過Storm來實時清洗數據,下來來說講Storm的基本使用~

Storm入門

咱們從一段最簡單的Storm代碼入門,先看看下面的代碼:

若是徹底沒看過Storm代碼的同窗,看到上面的代碼會怎麼分析?我是這樣的:

  • 首先有一個TopologyBuilder的東西,這個東西多是Storm的構造器之類的
  • 而後設置了Spout和Bolt(可是我不知道這兩個東西是用來幹嗎的,可是我能夠點進去對象裏邊看看作了什麼)
  • 而後設置了一下Config配置(應該是設置Storm分配多少內存,多少線程之類的,反正跟配置相關)
  • 最後用StormSubmitter提交任務,把配置和TopologyBuilder的內容給提交上去。

咱們簡單搜一下,就能夠發現它的流程大體是這樣的:

Spout是數據的源頭,通常咱們用它去接收數據,Spout接收到數據後往Bolt上發送,Bolt處理數據(清洗)。Bolt清洗完數據能夠寫到一個數據源或者傳遞給下一個Bolt繼續清洗。

Topology關聯了咱們在程序中定義好的Spout和Bolt。各類 Spout 和 Bolt 鏈接在一塊兒以後,就成了一個 Topology,一個 Topology 就是一個 Storm 應用。

Spout往Bolt傳遞數據,Bolt往Bolt傳遞數據,這個傳遞的過程叫作Stream,Stream傳遞的是一個一個Tuple

如今問題來了,咱們的Spout和Bolt之間是怎麼關聯起來的呢?Bolt和Bolt之間是怎麼關聯起來的呢?

在上面的圖咱們知道一個Topology會有多個Spout和多個Bolt,那我怎麼知道這個Spout傳遞的數據是給這個Bolt,這個Bolt傳遞的數據是給另一個Bolt?(說白了,就是上面圖上的箭頭是怎麼關聯的呢?)

在Storm中,有Grouping的機制,就是決定Spout的數據流向哪一個Bolt,Bolt的數據流向下一個Bolt。

爲了提升併發度,咱們在setBolt的時候,能夠指定Bolt的線程數,也就是所謂的Executor(Spout也一樣能夠指定線程數的,只是此次我拿Bolt來舉例)。咱們的結構可能會是這樣的:

分組的策略有如下:

  • 1) shuffleGrouping(隨機分組)
  • 2)fieldsGrouping(按照字段分組,在這裏便是同一個單詞只能發送給一個Bolt)
  • 3)allGrouping(廣播發送,即每個Tuple,每個Bolt都會收到)
  • 4)globalGrouping(全局分組,將Tuple分配到task id值最低的task裏面)
  • 5)noneGrouping(隨機分派)
  • 6)directGrouping(直接分組,指定Tuple與Bolt的對應發送關係)
  • 7)Local or shuffle Grouping
  • 8)partialKeyGrouping(關鍵字分組,與按字段分組很類似,但他分配更加均衡)
  • 9)customGrouping (自定義的Grouping)

shuffleGrouping策略咱們是用得最多的,好比上面的圖上有兩個Spout,咱們會將這兩個Spout的Tuple均勻分發到各個Bolt中執行。

說到這裏,咱們再回頭看看最開始的代碼,我給補充一下注釋,大家應該就能看得懂了:

我仍是再畫一個圖吧:

入門的過程複雜嗎?不復雜。說白了就是Spout接收到數據,經過grouping機制將Spout的數據傳到給Bolt處理,Bolt處理完看還需不須要繼續往下處理,若是須要就傳遞給下一個Bolt,不須要就寫到數據源、調接口等等。

Storm架構

當咱們提交任務以後,會發生什麼呢?咱們來看看。

  1. 任務提交後,會被上傳到 Nimbus節點上,它是主控節點,負責分配代碼、佈置任務及檢測故障
  2. Nimbus會去 Zookeeper上讀取整個集羣的信息,將任務交給 Supervisor,它是工做節點,負責建立、執行任務
  3. Supervisor建立Worker進程,每一個Worker對應一個Topology的子集。Worker是Task的容器,Task是真正的任務執行者。

流程大體以下:

Nimbus和Supervisor都是節點(服務器),Storm用Zookeeper去管理Supervisor節點的信息。

Supervisor節點下會建立Worker進程,建立多少個Worker進程由Conf配置文件決定。線程Executor,由進程產生,用於執行任務,Executor線程數有多少個是在setBolt、setSpout的時候決定。Task是真正的任務執行者,Task其實就是包裝了Bolt/Spout實例。

關於Worker、Executor、Task之間的關係,在官網有一個例子專門說明了,咱們能夠看看。先放出代碼:

內部的圖:

解釋一下:

  • 默認狀況下: 若是不指定Tasks數,那麼一個線程會有一個Task
  • conf.setNumWorkers(2)表明會建立兩個Worker進程
  • setSpout("blue-spout", new BlueSpout(), 2)藍色Spout會有兩個線程處理,由於有兩個進程,因此一個進程會有一個藍色Spout線程
  • topologyBuilder.setBolt("green-bolt", new GreenBolt(), 2).setNumTasks(4) 綠色Bolt會有兩個線程處理,由於有兩個進程Worker因此一個進程會有一個綠色Bolt線程。又由於設置了4個Task數,因此一個線程會分配兩個綠色的Task
  • topologyBuilder.setBolt("yellow-bolt", new YellowBolt(), 6).shuffleGrouping("green-bolt")。黃色Bolt會有6個線程處理,由於建立了兩個進程,因此一個進程會有3個黃色Bolt線程。沒有單獨設置Task書,因此一個線程默認有一個Task

從上面咱們能夠知道threads ≤ tasks線程數是確定小於等於Task數的。有沒有好奇寶寶會問:「Storm用了線程,那麼會有線程不安全的狀況嗎?」(其實這是三歪剛學的疑問)

通常來講不會,由於不少狀況下,一個線程是對應一個Task的(Task你能夠理解爲Bolt/Spout的實例),既然每一個線程是處理本身的實例了,那固然不會有線程安全的問題啦。(固然了,你若是在Bolt/Spout中設置了靜態成員變量,那仍是會有線程安全問題)

最後

這篇文章簡單地介紹了一下Storm,Storm的東西其實還有不少,包括ack機制什麼的。如今進官方找文檔,都在主推Trident了,有興趣的同窗能夠繼續往下看。

話又說回來,我司也在主推Flink了,這塊後續若是有遷移計劃,我也準備學學搞搞,到時候再來分享分享入門文章。

參考資料:

各種知識點總結

下面的文章都有對應的原創精美PDF,在持續更新中,能夠來找我催更~

涵蓋Java後端全部知識點的開源項目(已有7 K star):github.com/ZhongFuChen…

若是你們想要實時關注我更新的文章以及分享的乾貨的話,微信搜索Java3y

PDF文檔的內容均爲手打,有任何的不懂均可以直接來問我(公衆號有個人聯繫方式)。

本文使用

相關文章
相關標籤/搜索