使用Storm實現實時大數據分析

摘要:隨着數據體積的愈來愈大,實時處理成爲了許多機構須要面對的首要挑戰。Shruthi Kumar和Siddharth Patankar在Dr.Dobb’s上結合了汽車超速監視,爲咱們演示了使用Storm進行實時大數據分析。CSDN在此編譯、整理。html

簡單和明瞭,Storm讓大數據分析變得輕鬆加愉快。java

當今世界,公司的平常運營常常會生成TB級別的數據。數據來源囊括了互聯網裝置能夠捕獲的任何類型數據,網站、社交媒體、交易型商業數據以及其它商業環境中建立的數據。考慮到數據的生成量,實時處理成爲了許多機構須要面對的首要挑戰。咱們常常用的一個很是有效的開源實時計算工具就是Storm —— Twitter開發,一般被比做「實時的Hadoop」。然而Storm遠比Hadoop來的簡單,由於用它處理大數據不會帶來新老技術的交替。node

Shruthi Kumar、Siddharth Patankar共同效力於Infosys,分別從事技術分析和研發工做。本文詳述了Storm的使用方法,例子中的項目名稱爲「超速報警系統(Speeding Alert System)」。咱們想實現的功能是:實時分析過往車輛的數據,一旦車輛數據超過預設的臨界值 —— 便觸發一個trigger並把相關的數據存入數據庫。mysql

 

1.  Storm是什麼git

 

     全量數據處理使用的大可能是鼎鼎大名的hadoop或者hive,做爲一個批處理系統,hadoop以其吞吐量大、自動容錯等優勢,在海量數據處理上獲得了普遍的使用。github

      Hadoop下的Map/Reduce框架對於數據的處理流程是:redis

 

      一、 將要處理的數據上傳到Hadoop的文件系統HDFS中。sql

      二、 Map階段數據庫

             a)   Master對Map的預處理:對於大量的數據進行切分,劃分爲M個16~64M的數據分片(可經過參數自定義分片大小)設計模式

             b)   調用Mapper函數:Master爲Worker分配Map任務,每一個分片都對應一個Worker進行處理。各個Worker讀取並調用用戶定義的Mapper函數    處理數據,並將結果存入HDFS,返回存儲位置給Master。

一個Worker在Map階段完成時,在HDFS中,生成一個排好序的Key-values組成的文件。並將位置信息彙報給Master。

      三、 Reduce階段

             a)   Master對Reduce的預處理:Master爲Worker分配Reduce任務,他會將全部Mapper產生的數據進行映射,將相同key的任務分配給某個Worker。

             b)   調用Reduce函數:各個Worker將分配到的數據集進行排序(使用工具類Merg),並調用用戶自定義的Reduce函數,並將結果寫入HDFS。

每一個Worker的Reduce任務完成後,都會在HDFS中生成一個輸出文件。Hadoop並不將這些文件合併,由於這些文件每每會做爲另外一個Map/reduce程序的輸入。

         以上的流程,粗略歸納,就是從HDFS中獲取數據,將其按照大小分片,進行分佈式處理,最終輸出結果。從流程來看,Hadoop框架進行數據處理有如下要求:

一、 數據已經存在在HDFS當中。

二、 數據間是少關聯的。各個任務執行器在執行負責的數據時,無需考慮對其餘數據的影響,數據之間應儘量是無聯繫、不會影響的。

使用Hadoop,適合大批量的數據處理,這是他所擅長的。因爲基於Map/Reduce這種單級的數據處理模型進行,所以,若是數據間的關聯繫較大,須要進行數據的多級交互處理(某個階段的處理數據依賴於上一個階段),須要進行屢次map/reduce。又因爲map/reduce每次執行都須要遍歷整個數據集,對於數據的實時計算並不合適,因而有了storm。

      對比Hadoop的批處理,Storm是個實時的、分佈式以及具有高容錯的計算系統。同Hadoop同樣Storm也能夠處理大批量的數據,然而Storm在保證高可靠性的前提下還可讓處理進行的更加實時;也就是說,全部的信息都會被處理。Storm一樣還具有容錯和分佈計算這些特性,這就讓Storm能夠擴展到不一樣的機器上進行大批量的數據處理。他一樣還有如下的這些特性:

  • 易於擴展:對於擴展,伴隨着業務的發展,咱們的數據量、計算量可能會愈來愈大,因此但願這個系統是可擴展的。你只須要添加機器和改變對應的topology(拓撲)設置。Storm使用Hadoop Zookeeper進行集羣協調,這樣能夠充分的保證大型集羣的良好運行。
  • 每條信息的處理均可以獲得保證。
  • Storm集羣管理簡易。
  • Storm的容錯機能:一旦topology遞交,Storm會一直運行它直到topology被廢除或者被關閉。而在執行中出現錯誤時,也會由Storm從新分配任務。這是分佈式系統中通用問題。一個節點掛了不能影響個人應用。
  • 低延遲。都說了是實時計算系統了,延遲是必定要低的。
  • 儘管一般使用Java,Storm中的topology能夠用任何語言設計。

       在線實時流處理模型

       對於處理大批量數據的Map/reduce程序,在任務完成以後就中止了,但Storm是用於實時計算的,因此,相應的處理程序會一直執行(等待任務,有任務則執行)直至手動中止。

       對於Storm,他是實時處理模型,與hadoop的不一樣是,他是針對在線業務而存在的計算平臺,如統計某用戶的交易量、生成爲某個用戶的推薦列表等實時性高的需求。他是一個「流處理」框架。何謂流處理?storm將數據以Stream的方式,並按照Topology的順序,依次處理並最終生成結果。

固然爲了更好的理解文章,你首先須要安裝和設置Storm。須要經過如下幾個簡單的步驟:

  • 從Storm官方下載Storm安裝文件
  • 將bin/directory解壓到你的PATH上,並保證bin/storm腳本是可執行的。
      儘管 Storm 是使用 Clojure 語言開發的,您仍然能夠在 Storm 中使用幾乎任何語言編寫應用程序。所需的只是一個鏈接到 Storm 的架構的適配器。已存在針對 Scala、JRuby、Perl 和 PHP 的適配器,可是還有支持流式傳輸到 Storm 拓撲結構中的結構化查詢語言適配器。

2.  Storm的組件

 

       Storm集羣和Hadoop集羣表面上看很相似。可是Hadoop上運行的是MapReduce jobs,而在Storm上運行的是拓撲(topology),這二者之間是很是不同的。一個關鍵的區別是: 一個MapReduce job最終會結束, 而一個topology永遠會運行(除非你手動kill掉)。

       Storm集羣主要由一個主節點(Nimbus後臺程序)和一羣工做節點(worker node)Supervisor的節點組成,經過 Zookeeper進行協調。Nimbus相似Hadoop裏面的JobTracker。Nimbus負責在集羣裏面分發代碼,分配計算任務給機器, 而且監控狀態。

      每個工做節點上面運行一個叫作Supervisor的節點。Supervisor會監聽分配給它那臺機器的工做,根據須要啓動/關閉工做進程。每個工做進程執行一個topology的一個子集;一個運行的topology由運行在不少機器上的不少工做進程組成。


一、 Nimbus主節點:

     主節點一般運行一個後臺程序 —— Nimbus,用於響應分佈在集羣中的節點,分配任務和監測故障。這個很相似於Hadoop中的Job Tracker。

二、Supervisor工做節點:

      工做節點一樣會運行一個後臺程序 —— Supervisor,用於收聽工做指派並基於要求運行工做進程。每一個工做節點都是topology中一個子集的實現。而Nimbus和Supervisor之間的協調則經過Zookeeper系統或者集羣。

三、Zookeeper

     Zookeeper是完成Supervisor和Nimbus之間協調的服務。而應用程序實現實時的邏輯則被封裝進Storm中的「topology」。topology則是一組由Spouts(數據源)和Bolts(數據操做)經過Stream Groupings進行鏈接的圖。下面對出現的術語進行更深入的解析。

四、Worker:

       運行具體處理組件邏輯的進程。

五、Task:

       worker中每個spout/bolt的線程稱爲一個task. 在storm0.8以後,task再也不與物理線程對應,同一個spout/bolt的task可能會共享一個物理線程,該線程稱爲executor。

六、Topology(拓撲):

       storm中運行的一個實時應用程序,由於各個組件間的消息流動造成邏輯上的一個拓撲結構。一個topology是spouts和bolts組成的圖, 經過stream groupings將圖中的spouts和bolts鏈接起來,以下圖:

      

 

     一個topology會一直運行直到你手動kill掉,Storm自動從新分配執行失敗的任務, 而且Storm能夠保證你不會有數據丟失(若是開啓了高可靠性的話)。若是一些機器意外停機它上面的全部任務會被轉移到其餘機器上。

運行一個topology很簡單。首先,把你全部的代碼以及所依賴的jar打進一個jar包。而後運行相似下面的這個命令:

      storm jar all-my-code.jar backtype.storm.MyTopology arg1 arg2

這個命令會運行主類: backtype.strom.MyTopology, 參數是arg1, arg2。這個類的main函數定義這個topology而且把它提交給Nimbus。storm jar負責鏈接到Nimbus而且上傳jar包。

Topology的定義是一個Thrift結構,而且Nimbus就是一個Thrift服務, 你能夠提交由任何語言建立的topology。上面的方面是用JVM-based語言提交的最簡單的方法。


七、Spout:

       消息源spout是Storm裏面一個topology裏面的消息生產者。簡而言之,Spout歷來源處讀取數據並放入topology。Spout分紅可靠和不可靠兩種;當Storm接收失敗時,可靠的Spout會對tuple(元組,數據項組成的列表)進行重發;而不可靠的Spout不會考慮接收成功與否只發射一次。

       消息源能夠發射多條消息流stream。使用OutputFieldsDeclarer.declareStream來定義多個stream,而後使用SpoutOutputCollector來發射指定的stream。

      而Spout中最主要的方法就是nextTuple(),該方法會發射一個新的tuple到topology,若是沒有新tuple發射則會簡單的返回。

       要注意的是nextTuple方法不能阻塞,由於storm在同一個線程上面調用全部消息源spout的方法。

 

另外兩個比較重要的spout方法是ack和fail。storm在檢測到一個tuple被整個topology成功處理的時候調用ack,不然調用fail。storm只對可靠的spout調用ack和fail。

八、Bolt:

     Topology中全部的處理都由Bolt完成。即全部的消息處理邏輯被封裝在bolts裏面。Bolt能夠完成任何事,好比:鏈接的過濾、聚合、訪問文件/數據庫、等等。

        Bolt從Spout中接收數據並進行處理,若是遇到複雜流的處理也可能將tuple發送給另外一個Bolt進行處理。即須要通過不少blots。好比算出一堆圖片裏面被轉發最多的圖片就至少須要兩步:第一步算出每一個圖片的轉發數量。第二步找出轉發最多的前10個圖片。(若是要把這個過程作得更具備擴展性那麼可能須要更多的步驟)。

        Bolts能夠發射多條消息流, 使用OutputFieldsDeclarer.declareStream定義stream,使用OutputCollector.emit來選擇要發射的stream。

      而Bolt中最重要的方法是execute(),以新的tuple做爲參數接收。無論是Spout仍是Bolt,若是將tuple發射成多個流,這些流均可以經過declareStream()來聲明。

     bolts使用OutputCollector來發射tuple,bolts必需要爲它處理的每個tuple調用OutputCollector的ack方法,以通知Storm這個tuple被處理完成了,從而通知這個tuple的發射者spouts。 通常的流程是: bolts處理一個輸入tuple,  發射0個或者多個tuple, 而後調用ack通知storm本身已經處理過這個tuple了。storm提供了一個IBasicBolt會自動調用ack。

九、Tuple:

       一次消息傳遞的基本單元。原本應該是一個key-value的map,可是因爲各個組件間傳遞的tuple的字段名稱已經事先定義好,因此tuple中只要按序填入各個value就好了,因此就是一個value list.

十、Stream:

        源源不斷傳遞的tuple就組成了stream。消息流stream是storm裏的關鍵抽象。一個消息流是一個沒有邊界的tuple序列, 而這些tuple序列會以一種分佈式的方式並行地建立和處理。經過對stream中tuple序列中每一個字段命名來定義stream。在默認的狀況下,tuple的字段類型能夠是:integer,long,short, byte,string,double,float,boolean和byte array。你也能夠自定義類型(只要實現相應的序列化器)。

     每一個消息流在定義的時候會被分配給一個id,由於單向消息流使用的至關廣泛, OutputFieldsDeclarer定義了一些方法讓你能夠定義一個stream而不用指定這個id。在這種狀況下這個stream會分配個值爲‘default’默認的id 。

      Storm提供的最基本的處理stream的原語是spout和bolt。你能夠實現spout和bolt提供的接口來處理你的業務邏輯。

      

十一、Stream Groupings:

Stream Grouping定義了一個流在Bolt任務間該如何被切分。這裏有Storm提供的6個Stream Grouping類型:

1). 隨機分組(Shuffle grouping):隨機分發tuple到Bolt的任務,保證每一個任務得到相等數量的tuple。

2). 字段分組(Fields grouping):根據指定字段分割數據流,並分組。例如,根據「user-id」字段,相同「user-id」的元組老是分發到同一個任務,不一樣「user-id」的元組可能分發到不一樣的任務。

3). 所有分組(All grouping):tuple被複制到bolt的全部任務。這種類型須要謹慎使用。

4). 全局分組(Global grouping):所有流都分配到bolt的同一個任務。明確地說,是分配給ID最小的那個task。

5). 無分組(None grouping):你不須要關心流是如何分組。目前,無分組等效於隨機分組。但最終,Storm將把無分組的Bolts放到Bolts或Spouts訂閱它們的同一線程去執行(若是可能)。

6). 直接分組(Direct grouping):這是一個特別的分組類型。元組生產者決定tuple由哪一個元組處理者任務接收。

固然還能夠實現CustomStreamGroupimg接口來定製本身須要的分組。

 

storm 和hadoop的對比來了解storm中的基本概念。

  Hadoop Storm
系統角色 JobTracker Nimbus
TaskTracker Supervisor
Child Worker
應用名稱 Job Topology
組件接口 Mapper/Reducer Spout/Bolt


3.  Storm應用場景

       Storm 與其餘大數據解決方案的不一樣之處在於它的處理方式。Hadoop 在本質上是一個批處理系統。數據被引入 Hadoop 文件系統 (HDFS) 並分發到各個節點進行處理。當處理完成時,結果數據返回到 HDFS 供始發者使用。Storm 支持建立拓撲結構來轉換沒有終點的數據流。不一樣於 Hadoop 做業,這些轉換從不中止,它們會持續處理到達的數據。

Twitter列舉了Storm的三大類應用:

1. 信息流處理{Stream processing}
Storm可用來實時處理新數據和更新數據庫,兼具容錯性和可擴展性。即Storm能夠用來處理源源不斷流進來的消息,處理以後將結果寫入到某個存儲中去。

2. 連續計算{Continuous computation}
Storm可進行連續查詢並把結果即時反饋給客戶端。好比把Twitter上的熱門話題發送到瀏覽器中。

3. 分佈式遠程程序調用{Distributed RPC}
       Storm可用來並行處理密集查詢。Storm的拓撲結構是一個等待調用信息的分佈函數,當它收到一條調用信息後,會對查詢進行計算,並返回查詢結果。舉個例子Distributed RPC能夠作並行搜索或者處理大集合的數據。

        經過配置drpc服務器,將storm的topology發佈爲drpc服務。客戶端程序能夠調用drpc服務將數據發送到storm集羣中,並接收處理結果的反饋。這種方式須要drpc服務器進行轉發,其中drpc服務器底層經過thrift實現。適合的業務場景主要是實時計算。而且擴展性良好,能夠增長每一個節點的工做worker數量來動態擴展。

 

 

4.  項目實施,構建Topology

 

      當下狀況咱們須要給Spout和Bolt設計一種可以處理大量數據(日誌文件)的topology,當一個特定數據值超過預設的臨界值時促發警報。使用Storm的topology,逐行讀入日誌文件而且監視輸入數據。在Storm組件方面,Spout負責讀入輸入數據。它不只從現有的文件中讀入數據,同時還監視着新文件。文件一旦被修改Spout會讀入新的版本而且覆蓋以前的tuple(能夠被Bolt讀入的格式),將tuple發射給Bolt進行臨界分析,這樣就能夠發現全部可能超臨界的記錄。

下一節將對用例進行詳細介紹。

臨界分析

這一節,將主要聚焦於臨界值的兩種分析類型:瞬間臨界(instant thershold)和時間序列臨界(time series threshold)。

  • 瞬間臨界值監測:一個字段的值在那個瞬間超過了預設的臨界值,若是條件符合的話則觸發一個trigger。舉個例子當車輛超越80千米每小時,則觸發trigger。
  • 時間序列臨界監測:字段的值在一個給定的時間段內超過了預設的臨界值,若是條件符合則觸發一個觸發器。好比:在5分鐘類,時速超過80KM兩次及以上的車輛。

Listing One顯示了咱們將使用的一個類型日誌,其中包含的車輛數據信息有:車牌號、車輛行駛的速度以及數據獲取的位置。

AB 123 60 North city
BC 123 70 South city
CD 234 40 South city
DE 123 40 East  city
EF 123 90 South city
GH 123 50 West  city

這裏將建立一個對應的XML文件,這將包含引入數據的模式。這個XML將用於日誌文件的解析。XML的設計模式和對應的說明請見下表。

XML文件和日誌文件都存放在Spout能夠隨時監測的目錄下,用以關注文件的實時更新。而這個用例中的topology請見下圖。

Figure 1:Storm中創建的topology,用以實現數據實時處理

如圖所示:FilelistenerSpout接收輸入日誌並進行逐行的讀入,接着將數據發射給ThresoldCalculatorBolt進行更深一步的臨界值處理。一旦處理完成,被計算行的數據將發送給DBWriterBolt,而後由DBWriterBolt存入給數據庫。下面將對這個過程的實現進行詳細的解析。

Spout的實現

Spout以日誌文件和XML描述文件做爲接收對象。XML文件包含了與日誌一致的設計模式。不妨設想一下一個示例日誌文件,包含了車輛的車牌號、行駛速度、以及數據的捕獲位置。(看下圖)

Figure2:數據從日誌文件到Spout的流程圖

Listing Two顯示了tuple對應的XML,其中指定了字段、將日誌文件切割成字段的定界符以及字段的類型。XML文件以及數據都被保存到Spout指定的路徑。

Listing Two:用以描述日誌文件的XML文件。

 

  1. <TUPLEINFO>   
  2. <FIELDLIST>   
  3. <FIELD>   
  4. <COLUMNNAME>vehicle_number</COLUMNNAME>   
  5. <COLUMNTYPE>string</COLUMNTYPE>   
  6. </FIELD>   
  7.    
  8. <FIELD>  
  9. <COLUMNNAME>speed</COLUMNNAME>   
  10. <COLUMNTYPE>int</COLUMNTYPE>   
  11. </FIELD>   
  12.    
  13. <FIELD>   
  14. <COLUMNNAME>location</COLUMNNAME>   
  15. <COLUMNTYPE>string</COLUMNTYPE>   
  16. </FIELD>   
  17. </FIELDLIST>   
  18. <DELIMITER>,</DELIMITER>   
  19. </TUPLEINFO>     

 

經過構造函數及它的參數Directory、PathSpout和TupleInfo對象建立Spout對象。TupleInfo儲存了日誌文件的字段、定界符、字段的類型這些很必要的信息。這個對象經過XSTream序列化XML時創建。

Spout的實現步驟:

  • 對文件的改變進行分開的監聽,並監視目錄下有無新日誌文件添加。
  • 在數據獲得了字段的說明後,將其轉換成tuple。
  • 聲明Spout和Bolt之間的分組,並決定tuple發送給Bolt的途徑。

Spout的具體編碼在Listing Three中顯示。

Listing Three:Spout中open、nextTuple和delcareOutputFields方法的邏輯。

 

  1. public void open( Map conf, TopologyContext context,SpoutOutputCollector collector )     
  2. {     
  3.            _collector = collector;     
  4.          try     
  5.          {     
  6.          fileReader  =  new BufferedReader(new FileReader(new File(file)));    
  7.          }    
  8.          catch (FileNotFoundException e)    
  9.          {    
  10.          System.exit(1);     
  11.          }    
  12. }                                                            
  13.    
  14. public void nextTuple()    
  15. {    
  16.          protected void ListenFile(File file)    
  17.          {    
  18.          Utils.sleep(2000);    
  19.          RandomAccessFile access = null;    
  20.          String line = null;     
  21.             try     
  22.             {    
  23.                 while ((line = access.readLine()) != null)    
  24.                 {    
  25.                     if (line !=null)    
  26.                     {     
  27.                          String[] fields=null;    
  28.                           if (tupleInfo.getDelimiter().equals("|"))  fields = line.split("\\"+tupleInfo.getDelimiter());     
  29.                           else     
  30.                           fields = line.split  (tupleInfo.getDelimiter());     
  31.                           if (tupleInfo.getFieldList().size() == fields.length)  _collector.emit(new Values(fields));    
  32.                     }    
  33.                }    
  34.             }    
  35.             catch (IOException ex){ }    
  36.             }    
  37. }    
  38.    
  39. public void declareOutputFields(OutputFieldsDeclarer declarer)    
  40. {    
  41.       String[] fieldsArr = new String [tupleInfo.getFieldList().size()];    
  42.       for(int i=0; i<tupleInfo.getFieldList().size(); i++)    
  43.       {    
  44.               fieldsArr[i] = tupleInfo.getFieldList().get(i).getColumnName();    
  45.       }    
  46. declarer.declare(new Fields(fieldsArr));    
  47. }        

 

declareOutputFileds()決定了tuple發射的格式,這樣的話Bolt就能夠用相似的方法將tuple譯碼。Spout持續對日誌文件的數據的變動進行監聽,一旦有添加Spout就會進行讀入而且發送給Bolt進行處理。

Bolt的實現

Spout的輸出結果將給予Bolt進行更深一步的處理。通過對用例的思考,咱們的topology中須要如Figure 3中的兩個Bolt。

Figure 3:Spout到Bolt的數據流程。

ThresholdCalculatorBolt

Spout將tuple發出,由ThresholdCalculatorBolt接收並進行臨界值處理。在這裏,它將接收好幾項輸入進行檢查;分別是:

臨界值檢查

  • 臨界值欄數檢查(拆分紅字段的數目)
  • 臨界值數據類型(拆分後字段的類型)
  • 臨界值出現的頻數
  • 臨界值時間段檢查

Listing Four中的類,定義用來保存這些值。

Listing Four:ThresholdInfo類

 

  1. public class ThresholdInfo implementsSerializable    
  2.    
  3. {      
  4.         private String action;     
  5.         private String rule;     
  6.         private Object thresholdValue;    
  7.         private int thresholdColNumber;     
  8.         private Integer timeWindow;     
  9.         private int frequencyOfOccurence;     
  10. }     
基於字段中提供的值,臨界值檢查將被Listing Five中的execute()方法執行。代碼大部分的功能是解析和接收值的檢測。

 

Listing Five:臨界值檢測代碼段

 

  1. public void execute(Tuple tuple, BasicOutputCollector collector)     
  2. {    
  3.     if(tuple!=null)     
  4.     {    
  5.         List<Object> inputTupleList = (List<Object>) tuple.getValues();    
  6.         int thresholdColNum = thresholdInfo.getThresholdColNumber();     
  7.         Object thresholdValue = thresholdInfo.getThresholdValue();     
  8.         String thresholdDataType = tupleInfo.getFieldList().get(thresholdColNum-1).getColumnType();     
  9.         Integer timeWindow = thresholdInfo.getTimeWindow();    
  10.          int frequency = thresholdInfo.getFrequencyOfOccurence();    
  11.          if(thresholdDataType.equalsIgnoreCase("string"))    
  12.          {    
  13.              String valueToCheck = inputTupleList.get(thresholdColNum-1).toString();    
  14.              String frequencyChkOp = thresholdInfo.getAction();    
  15.              if(timeWindow!=null)    
  16.              {    
  17.                  long curTime = System.currentTimeMillis();    
  18.                  long diffInMinutes = (curTime-startTime)/(1000);    
  19.                  if(diffInMinutes>=timeWindow)    
  20.                  {    
  21.                      if(frequencyChkOp.equals("=="))    
  22.                      {    
  23.                           if(valueToCheck.equalsIgnoreCase(thresholdValue.toString()))    
  24.                           {    
  25.                               count.incrementAndGet();    
  26.                               if(count.get() > frequency)    
  27.                                   splitAndEmit(inputTupleList,collector);    
  28.                           }    
  29.                      }    
  30.                      else if(frequencyChkOp.equals("!="))    
  31.                      {    
  32.                          if(!valueToCheck.equalsIgnoreCase(thresholdValue.toString()))    
  33.                          {    
  34.                               count.incrementAndGet();    
  35.                               if(count.get() > frequency)    
  36.                                   splitAndEmit(inputTupleList,collector);    
  37.                           }    
  38.                       }    
  39.                       else                         System.out.println("Operator not supported");     
  40.                   }    
  41.               }    
  42.               else   
  43.               {    
  44.                   if(frequencyChkOp.equals("=="))    
  45.                   {    
  46.                       if(valueToCheck.equalsIgnoreCase(thresholdValue.toString()))    
  47.                       {    
  48.                           count.incrementAndGet();    
  49.                           if(count.get() > frequency)    
  50.                               splitAndEmit(inputTupleList,collector);    
  51.                           }    
  52.                   }    
  53.                   else if(frequencyChkOp.equals("!="))    
  54.                   {    
  55.                        if(!valueToCheck.equalsIgnoreCase(thresholdValue.toString()))    
  56.                        {    
  57.                            count.incrementAndGet();    
  58.                            if(count.get() > frequency)    
  59.                                splitAndEmit(inputTupleList,collector);    
  60.                           }    
  61.                    }    
  62.                }    
  63.             }    
  64.             else if(thresholdDataType.equalsIgnoreCase("int") ||                     thresholdDataType.equalsIgnoreCase("double") ||                     thresholdDataType.equalsIgnoreCase("float") ||                     thresholdDataType.equalsIgnoreCase("long") ||                     thresholdDataType.equalsIgnoreCase("short"))    
  65.             {    
  66.                 String frequencyChkOp = thresholdInfo.getAction();    
  67.                 if(timeWindow!=null)    
  68.                 {    
  69.                      long valueToCheck =                          Long.parseLong(inputTupleList.get(thresholdColNum-1).toString());    
  70.                      long curTime = System.currentTimeMillis();    
  71.                      long diffInMinutes = (curTime-startTime)/(1000);    
  72.                      System.out.println("Difference in minutes="+diffInMinutes);    
  73.                      if(diffInMinutes>=timeWindow)    
  74.                      {    
  75.                           if(frequencyChkOp.equals("<"))    
  76.                           {    
  77.                               if(valueToCheck < Double.parseDouble(thresholdValue.toString()))    
  78.                               {    
  79.                                    count.incrementAndGet();    
  80.                                    if(count.get() > frequency)    
  81.                                        splitAndEmit(inputTupleList,collector);    
  82.                               }    
  83.                           }    
  84.                           else if(frequencyChkOp.equals(">"))    
  85.                           {    
  86.                                if(valueToCheck > Double.parseDouble(thresholdValue.toString()))    
  87.                                 {    
  88.                                    count.incrementAndGet();    
  89.                                    if(count.get() > frequency)    
  90.                                        splitAndEmit(inputTupleList,collector);    
  91.                                }    
  92.                            }    
  93.                            else if(frequencyChkOp.equals("=="))    
  94.                            {    
  95.                               if(valueToCheck == Double.parseDouble(thresholdValue.toString()))    
  96.                               {    
  97.                                   count.incrementAndGet();    
  98.                                   if(count.get() > frequency)    
  99.                                       splitAndEmit(inputTupleList,collector);    
  100.                                }    
  101.                            }    
  102.                            else if(frequencyChkOp.equals("!="))    
  103.                            {    
  104.     . . .    
  105.                             }    
  106.                        }    
  107.              }    
  108.       else   
  109.           splitAndEmit(null,collector);    
  110.       }    
  111.       else   
  112.      {    
  113.            System.err.println("Emitting null in bolt");    
  114.            splitAndEmit(null,collector);    
  115.     }    
  116. }   


 

經由Bolt發送的的tuple將會傳遞到下一個對應的Bolt,在咱們的用例中是DBWriterBolt。

DBWriterBolt

通過處理的tuple必須被持久化以便於觸發tigger或者更深層次的使用。DBWiterBolt作了這個持久化的工做並把tuple存入了數據庫。表的創建由prepare()函數完成,這也將是topology調用的第一個方法。方法的編碼如Listing Six所示。

Listing Six:建表編碼。

 

  1. public void prepare( Map StormConf, TopologyContext context )     
  2. {           
  3.     try     
  4.     {    
  5.         Class.forName(dbClass);    
  6.     }     
  7.     catch (ClassNotFoundException e)     
  8.     {    
  9.         System.out.println("Driver not found");    
  10.         e.printStackTrace();    
  11.     }    
  12.      
  13.     try     
  14.     {    
  15.        connection driverManager.getConnection(     
  16.            "jdbc:mysql://"+databaseIP+":"+databasePort+"/"+databaseName, userName, pwd);    
  17.        connection.prepareStatement("DROP TABLE IF EXISTS "+tableName).execute();    
  18.      
  19.        StringBuilder createQuery = new StringBuilder(    
  20.            "CREATE TABLE IF NOT EXISTS "+tableName+"(");    
  21.        for(Field fields : tupleInfo.getFieldList())    
  22.        {    
  23.            if(fields.getColumnType().equalsIgnoreCase("String"))    
  24.                createQuery.append(fields.getColumnName()+" VARCHAR(500),");    
  25.            else   
  26.                createQuery.append(fields.getColumnName()+" "+fields.getColumnType()+",");    
  27.        }    
  28.        createQuery.append("thresholdTimeStamp timestamp)");    
  29.        connection.prepareStatement(createQuery.toString()).execute();    
  30.      
  31.        // Insert Query    
  32.        StringBuilder insertQuery = new StringBuilder("INSERT INTO "+tableName+"(");    
  33.        String tempCreateQuery = new String();    
  34.        for(Field fields : tupleInfo.getFieldList())    
  35.        {    
  36.             insertQuery.append(fields.getColumnName()+",");    
  37.        }    
  38.        insertQuery.append("thresholdTimeStamp").append(") values (");    
  39.        for(Field fields : tupleInfo.getFieldList())    
  40.        {    
  41.            insertQuery.append("?,");    
  42.        }    
  43.      
  44.        insertQuery.append("?)");    
  45.        prepStatement = connection.prepareStatement(insertQuery.toString());    
  46.     }    
  47.     catch (SQLException e)     
  48.     {           
  49.         e.printStackTrace();    
  50.     }           
  51. }    


 

數據分批次的插入數據庫。插入的邏輯由Listting Seven中的execute()方法提供。大部分的編碼都是用來實現可能存在不一樣類型輸入的解析。

Listing Seven:數據插入的代碼部分。

 

  1. public void execute(Tuple tuple, BasicOutputCollector collector)     
  2. {    
  3.     batchExecuted=false;    
  4.     if(tuple!=null)    
  5.     {    
  6.        List<Object> inputTupleList = (List<Object>) tuple.getValues();    
  7.        int dbIndex=0;    
  8.        for(int i=0;i<tupleInfo.getFieldList().size();i++)    
  9.        {    
  10.            Field field = tupleInfo.getFieldList().get(i);    
  11.            try {    
  12.                dbIndex = i+1;    
  13.                if(field.getColumnType().equalsIgnoreCase("String"))                 
  14.                    prepStatement.setString(dbIndex, inputTupleList.get(i).toString());    
  15.                else if(field.getColumnType().equalsIgnoreCase("int"))    
  16.                    prepStatement.setInt(dbIndex,    
  17.                        Integer.parseInt(inputTupleList.get(i).toString()));    
  18.                else if(field.getColumnType().equalsIgnoreCase("long"))    
  19.                    prepStatement.setLong(dbIndex,     
  20.                        Long.parseLong(inputTupleList.get(i).toString()));    
  21.                else if(field.getColumnType().equalsIgnoreCase("float"))    
  22.                    prepStatement.setFloat(dbIndex,     
  23.                        Float.parseFloat(inputTupleList.get(i).toString()));    
  24.                else if(field.getColumnType().equalsIgnoreCase("double"))    
  25.                    prepStatement.setDouble(dbIndex,     
  26.                        Double.parseDouble(inputTupleList.get(i).toString()));    
  27.                else if(field.getColumnType().equalsIgnoreCase("short"))    
  28.                    prepStatement.setShort(dbIndex,     
  29.                        Short.parseShort(inputTupleList.get(i).toString()));    
  30.                else if(field.getColumnType().equalsIgnoreCase("boolean"))    
  31.                    prepStatement.setBoolean(dbIndex,     
  32.                        Boolean.parseBoolean(inputTupleList.get(i).toString()));    
  33.                else if(field.getColumnType().equalsIgnoreCase("byte"))    
  34.                    prepStatement.setByte(dbIndex,     
  35.                        Byte.parseByte(inputTupleList.get(i).toString()));    
  36.                else if(field.getColumnType().equalsIgnoreCase("Date"))    
  37.                {    
  38.                   Date dateToAdd=null;    
  39.                   if (!(inputTupleList.get(i) instanceof Date))      
  40.                   {      
  41.                        DateFormat df = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");    
  42.                        try     
  43.                        {    
  44.                            dateToAdd = df.parse(inputTupleList.get(i).toString());    
  45.                        }    
  46.                        catch (ParseException e)     
  47.                        {    
  48.                            System.err.println("Data type not valid");    
  49.                        }    
  50.                    }      
  51.                    else   
  52.                    {    
  53.             dateToAdd = (Date)inputTupleList.get(i);    
  54.             java.sql.Date sqlDate = new java.sql.Date(dateToAdd.getTime());    
  55.             prepStatement.setDate(dbIndex, sqlDate);    
  56.             }       
  57.             }     
  58.         catch (SQLException e)     
  59.         {    
  60.              e.printStackTrace();    
  61.         }    
  62.     }    
  63.     Date now = new Date();              
  64.     try   
  65.     {    
  66.         prepStatement.setTimestamp(dbIndex+1, new java.sql.Timestamp(now.getTime()));    
  67.         prepStatement.addBatch();    
  68.         counter.incrementAndGet();    
  69.         if (counter.get()== batchSize)     
  70.         executeBatch();    
  71.     }     
  72.     catch (SQLException e1)     
  73.     {    
  74.         e1.printStackTrace();    
  75.     }               
  76.    }    
  77.    else   
  78.    {    
  79.         long curTime = System.currentTimeMillis();    
  80.        long diffInSeconds = (curTime-startTime)/(60*1000);    
  81.        if(counter.get()<batchSize && diffInSeconds>batchTimeWindowInSeconds)    
  82.        {    
  83.             try {    
  84.                 executeBatch();    
  85.                 startTime = System.currentTimeMillis();    
  86.             }    
  87.             catch (SQLException e) {    
  88.                  e.printStackTrace();    
  89.             }    
  90.        }    
  91.    }    
  92. }    
  93.      
  94. public void executeBatch() throws SQLException    
  95. {    
  96.     batchExecuted=true;    
  97.     prepStatement.executeBatch();    
  98.     counter = new AtomicInteger(0);    
  99. }   


 

一旦Spout和Bolt準備就緒(等待被執行),topology生成器將會創建topology並準備執行。下面就來看一下執行步驟。

在本地集羣上運行和測試topology

  • 經過TopologyBuilder創建topology。
  • 使用Storm Submitter,將topology遞交給集羣。以topology的名字、配置和topology的對象做爲參數。
  • 提交topology。

Listing Eight:創建和執行topology。

 

  1. public class StormMain    
  2. {    
  3.      public static void main(String[] args) throws AlreadyAliveException,     
  4.                                                    InvalidTopologyException,     
  5.                                                    InterruptedException     
  6.      {    
  7.           ParallelFileSpout parallelFileSpout = new ParallelFileSpout();    
  8.           ThresholdBolt thresholdBolt = new ThresholdBolt();    
  9.           DBWriterBolt dbWriterBolt = new DBWriterBolt();    
  10.           TopologyBuilder builder = new TopologyBuilder();    
  11.           builder.setSpout("spout", parallelFileSpout, 1);    
  12.           builder.setBolt("thresholdBolt", thresholdBolt,1).shuffleGrouping("spout");    
  13.           builder.setBolt("dbWriterBolt",dbWriterBolt,1).shuffleGrouping("thresholdBolt");    
  14.           if(this.argsMain!=null && this.argsMain.length > 0)     
  15.           {    
  16.               conf.setNumWorkers(1);    
  17.               StormSubmitter.submitTopology(     
  18.                    this.argsMain[0], conf, builder.createTopology());    
  19.           }    
  20.           else   
  21.           {        
  22.               Config conf = new Config();    
  23.               conf.setDebug(true);    
  24.               conf.setMaxTaskParallelism(3);    
  25.               LocalCluster cluster = new LocalCluster();    
  26.               cluster.submitTopology(    
  27.               "Threshold_Test", conf, builder.createTopology());    
  28.           }    
  29.      }    
  30. }   

 

topology被創建後將被提交到本地集羣。一旦topology被提交,除非被取締或者集羣關閉,它將一直保持運行不須要作任何的修改。這也是Storm的另外一大特點之一。

這個簡單的例子體現了當你掌握了topology、spout和bolt的概念,將能夠輕鬆的使用Storm進行實時處理。若是你既想處理大數據又不想遍歷Hadoop的話,不難發現使用Storm將是個很好的選擇。

 

 

5.  storm常見問題解答

 

1、我有一個數據文件,或者我有一個系統裏面有數據,怎麼導入storm作計算?

你須要實現一個Spout,Spout負責將數據emit到storm系統裏,交給bolts計算。怎麼實現spout能夠參考官方的kestrel spout實現:
https://github.com/nathanmarz/storm-kestrel

若是你的數據源不支持事務性消費,那麼就沒法獲得storm提供的可靠處理的保證,也不必實現ISpout接口中的ack和fail方法。

2、Storm爲了保證tuple的可靠處理,須要保存tuple信息,這會不會致使內存OOM?

Storm爲了保證tuple的可靠處理,acker會保存該節點建立的tuple id的xor值,這稱爲ack value,那麼每ack一次,就將tuple id和ack value作異或(xor)。當全部產生的tuple都被ack的時候, ack value必定爲0。這是個很簡單的策略,對於每個tuple也只要佔用約20個字節的內存。對於100萬tuple,也才20M左右。關於可靠處理看這個:
https://github.com/nathanmarz/storm/wiki/Guaranteeing-message-processing

3、Storm計算後的結果保存在哪裏?能夠保存在外部存儲嗎?

Storm不處理計算結果的保存,這是應用代碼須要負責的事情,若是數據不大,你能夠簡單地保存在內存裏,也能夠每次都更新數據庫,也能夠採用NoSQL存儲。storm並無像s4那樣提供一個Persist API,根據時間或者容量來作存儲輸出。這部分事情徹底交給用戶。

數據存儲以後的展示,也是你須要本身處理的,storm UI只提供對topology的監控和統計。

4、Storm怎麼處理重複的tuple?

由於Storm要保證tuple的可靠處理,當tuple處理失敗或者超時的時候,spout會fail並從新發送該tuple,那麼就會有tuple重複計算的問題。這個問題是很難解決的,storm也沒有提供機制幫助你解決。一些可行的策略:
(1)不處理,這也算是種策略。由於實時計算一般並不要求很高的精確度,後續的批處理計算會更正實時計算的偏差。
(2)使用第三方集中存儲來過濾,好比利用mysql,memcached或者redis根據邏輯主鍵來去重。
(3)使用bloom filter作過濾,簡單高效。

5、Storm的動態增刪節點

我在storm和s4裏比較裏談到的動態增刪節點,是指storm能夠動態地添加和減小supervisor節點。對於減小節點來講,被移除的supervisor上的worker會被nimbus從新負載均衡到其餘supervisor節點上。在storm 0.6.1之前的版本,增長supervisor節點不會影響現有的topology,也就是現有的topology不會從新負載均衡到新的節點上,在擴展集羣的時候很不方便,須要從新提交topology。所以我在storm的郵件列表裏提了這個問題,storm的開發者nathanmarz建立了一個issue 54並在0.6.1提供了rebalance命令來讓正在運行的topology從新負載均衡,具體見:
https://github.com/nathanmarz/storm/issues/54
和0.6.1的變動:
http://groups.google.com/group/storm-user/browse_thread/thread/24a8fce0b2e53246

storm並不提供機制來動態調整worker和task數目。

6、Storm UI裏spout統計的complete latency的具體含義是什麼?爲何emit的數目會是acked的兩倍?
這個事實上是storm郵件列表裏的一個問題。Storm做者marz的解答:

The complete latency is the time  from the spout emitting a tuple to that
tuple being acked on the spout
. So it tracks the time for the whole tuple
tree to be processed.

If you dive into the spout component in the UI, you'll see that a lot of
the emitted/transferred is on the __ack* stream.  This is the spoutcommunicating with the ackers which take care of tracking the tuple trees. 
簡單地說,complete latency表示了tuple從emit到被acked通過的時間,能夠認爲是tuple以及該tuple的後續子孫(造成一棵樹)整個處理時間。其次spout的emit和transfered還統計了spout和acker之間內部的通訊信息,好比對於可靠處理的spout來講,會在emit的時候同時發送一個_ack_init給acker,記錄tuple id到task id的映射,以便ack的時候能找到正確的acker task。
相關文章
相關標籤/搜索