使用Storm實現實時大數據分析(轉)

原文連接:http://blog.csdn.net/hguisu/article/details/8454368html

 

簡單和明瞭,Storm讓大數據分析變得輕鬆加愉快。java

當今世界,公司的平常運營常常會生成TB級別的數據。數據來源囊括了互聯網裝置能夠捕獲的任何類型數據,網站、社交媒體、交易型商業數據以及其它商業環境中建立的數據。考慮到數據的生成量,實時處理成爲了許多機構須要面對的首要挑戰。咱們常常用的一個很是有效的開源實時計算工具就是Storm —— Twitter開發,一般被比做「實時的Hadoop」。然而Storm遠比Hadoop來的簡單,由於用它處理大數據不會帶來新老技術的交替。node

Shruthi Kumar、Siddharth Patankar共同效力於Infosys,分別從事技術分析和研發工做。本文詳述了Storm的使用方法,例子中的項目名稱爲「超速報警系統(Speeding Alert System)」。咱們想實現的功能是:實時分析過往車輛的數據,一旦車輛數據超過預設的臨界值 —— 便觸發一個trigger並把相關的數據存入數據庫。mysql

 

1.  Storm是什麼git

 

     全量數據處理使用的大可能是鼎鼎大名的hadoop或者hive,做爲一個批處理系統,hadoop以其吞吐量大、自動容錯等優勢,在海量數據處理上獲得了普遍的使用。github

      Hadoop下的Map/Reduce框架對於數據的處理流程是:web

 

      一、 將要處理的數據上傳到Hadoop的文件系統HDFS中。redis

      二、 Map階段sql

             a)   Master對Map的預處理:對於大量的數據進行切分,劃分爲M個16~64M的數據分片(可經過參數自定義分片大小)數據庫

             b)   調用Mapper函數:Master爲Worker分配Map任務,每一個分片都對應一個Worker進行處理。各個Worker讀取並調用用戶定義的Mapper函數    處理數據,並將結果存入HDFS,返回存儲位置給Master。

一個Worker在Map階段完成時,在HDFS中,生成一個排好序的Key-values組成的文件。並將位置信息彙報給Master。

      三、 Reduce階段

             a)   Master對Reduce的預處理:Master爲Worker分配Reduce任務,他會將全部Mapper產生的數據進行映射,將相同key的任務分配給某個Worker。

             b)   調用Reduce函數:各個Worker將分配到的數據集進行排序(使用工具類Merg),並調用用戶自定義的Reduce函數,並將結果寫入HDFS。

每一個Worker的Reduce任務完成後,都會在HDFS中生成一個輸出文件。Hadoop並不將這些文件合併,由於這些文件每每會做爲另外一個Map/reduce程序的輸入。

         以上的流程,粗略歸納,就是從HDFS中獲取數據,將其按照大小分片,進行分佈式處理,最終輸出結果。從流程來看,Hadoop框架進行數據處理有如下要求:

一、 數據已經存在在HDFS當中。

二、 數據間是少關聯的。各個任務執行器在執行負責的數據時,無需考慮對其餘數據的影響,數據之間應儘量是無聯繫、不會影響的。

使用Hadoop,適合大批量的數據處理,這是他所擅長的。因爲基於Map/Reduce這種單級的數據處理模型進行,所以,若是數據間的關聯繫較大,須要進行數據的多級交互處理(某個階段的處理數據依賴於上一個階段),須要進行屢次map/reduce。又因爲map/reduce每次執行都須要遍歷整個數據集,對於數據的實時計算並不合適,因而有了storm。

      對比Hadoop的批處理,Storm是個實時的、分佈式以及具有高容錯的計算系統。同Hadoop同樣Storm也能夠處理大批量的數據,然而Storm在保證高可靠性的前提下還可讓處理進行的更加實時;也就是說,全部的信息都會被處理。Storm一樣還具有容錯和分佈計算這些特性,這就讓Storm能夠擴展到不一樣的機器上進行大批量的數據處理。他一樣還有如下的這些特性:

  • 易於擴展:對於擴展,伴隨着業務的發展,咱們的數據量、計算量可能會愈來愈大,因此但願這個系統是可擴展的。你只須要添加機器和改變對應的topology(拓撲)設置。Storm使用Hadoop Zookeeper進行集羣協調,這樣能夠充分的保證大型集羣的良好運行。
  • 每條信息的處理均可以獲得保證。
  • Storm集羣管理簡易。
  • Storm的容錯機能:一旦topology遞交,Storm會一直運行它直到topology被廢除或者被關閉。而在執行中出現錯誤時,也會由Storm從新分配任務。這是分佈式系統中通用問題。一個節點掛了不能影響個人應用。
  • 低延遲。都說了是實時計算系統了,延遲是必定要低的。
  • 儘管一般使用Java,Storm中的topology能夠用任何語言設計。

       在線實時流處理模型

       對於處理大批量數據的Map/reduce程序,在任務完成以後就中止了,但Storm是用於實時計算的,因此,相應的處理程序會一直執行(等待任務,有任務則執行)直至手動中止。

       對於Storm,他是實時處理模型,與hadoop的不一樣是,他是針對在線業務而存在的計算平臺,如統計某用戶的交易量、生成爲某個用戶的推薦列表等實時性高的需求。他是一個「流處理」框架。何謂流處理?storm將數據以Stream的方式,並按照Topology的順序,依次處理並最終生成結果。

固然爲了更好的理解文章,你首先須要安裝和設置Storm。須要經過如下幾個簡單的步驟:

  • 從Storm官方下載Storm安裝文件
  • 將bin/directory解壓到你的PATH上,並保證bin/storm腳本是可執行的。
      儘管 Storm 是使用 Clojure 語言開發的,您仍然能夠在 Storm 中使用幾乎任何語言編寫應用程序。所需的只是一個鏈接到 Storm 的架構的適配器。已存在針對 Scala、JRuby、Perl 和 PHP 的適配器,可是還有支持流式傳輸到 Storm 拓撲結構中的結構化查詢語言適配器。

2.  Storm的組件

 

       Storm集羣和Hadoop集羣表面上看很相似。可是Hadoop上運行的是MapReduce jobs,而在Storm上運行的是拓撲(topology),這二者之間是很是不同的。一個關鍵的區別是: 一個MapReduce job最終會結束, 而一個topology永遠會運行(除非你手動kill掉)。

       Storm集羣主要由一個主節點(Nimbus後臺程序)和一羣工做節點(worker node)Supervisor的節點組成,經過 Zookeeper進行協調。Nimbus相似Hadoop裏面的JobTracker。Nimbus負責在集羣裏面分發代碼,分配計算任務給機器, 而且監控狀態。

      每個工做節點上面運行一個叫作Supervisor的節點。Supervisor會監聽分配給它那臺機器的工做,根據須要啓動/關閉工做進程。每個工做進程執行一個topology的一個子集;一個運行的topology由運行在不少機器上的不少工做進程組成。

 

一、 Nimbus主節點:

     主節點一般運行一個後臺程序 —— Nimbus,用於響應分佈在集羣中的節點,分配任務和監測故障。這個很相似於Hadoop中的Job Tracker。

二、Supervisor工做節點:

      工做節點一樣會運行一個後臺程序 —— Supervisor,用於收聽工做指派並基於要求運行工做進程。每一個工做節點都是topology中一個子集的實現。而Nimbus和Supervisor之間的協調則經過Zookeeper系統或者集羣。

三、Zookeeper

     Zookeeper是完成Supervisor和Nimbus之間協調的服務。而應用程序實現實時的邏輯則被封裝進Storm中的「topology」。topology則是一組由Spouts(數據源)和Bolts(數據操做)經過Stream Groupings進行鏈接的圖。下面對出現的術語進行更深入的解析。

四、Worker:

       運行具體處理組件邏輯的進程。

五、Task:

       worker中每個spout/bolt的線程稱爲一個task. 在storm0.8以後,task再也不與物理線程對應,同一個spout/bolt的task可能會共享一個物理線程,該線程稱爲executor。

六、Topology(拓撲):

       storm中運行的一個實時應用程序,由於各個組件間的消息流動造成邏輯上的一個拓撲結構。一個topology是spouts和bolts組成的圖, 經過stream groupings將圖中的spouts和bolts鏈接起來,以下圖:

      

 

     一個topology會一直運行直到你手動kill掉,Storm自動從新分配執行失敗的任務, 而且Storm能夠保證你不會有數據丟失(若是開啓了高可靠性的話)。若是一些機器意外停機它上面的全部任務會被轉移到其餘機器上。

運行一個topology很簡單。首先,把你全部的代碼以及所依賴的jar打進一個jar包。而後運行相似下面的這個命令:

      storm jar all-my-code.jar backtype.storm.MyTopology arg1 arg2

這個命令會運行主類: backtype.strom.MyTopology, 參數是arg1, arg2。這個類的main函數定義這個topology而且把它提交給Nimbus。storm jar負責鏈接到Nimbus而且上傳jar包。

Topology的定義是一個Thrift結構,而且Nimbus就是一個Thrift服務, 你能夠提交由任何語言建立的topology。上面的方面是用JVM-based語言提交的最簡單的方法。

 

七、Spout:

       消息源spout是Storm裏面一個topology裏面的消息生產者。簡而言之,Spout歷來源處讀取數據並放入topology。Spout分紅可靠和不可靠兩種;當Storm接收失敗時,可靠的Spout會對tuple(元組,數據項組成的列表)進行重發;而不可靠的Spout不會考慮接收成功與否只發射一次。

       消息源能夠發射多條消息流stream。使用OutputFieldsDeclarer.declareStream來定義多個stream,而後使用SpoutOutputCollector來發射指定的stream。

      而Spout中最主要的方法就是nextTuple(),該方法會發射一個新的tuple到topology,若是沒有新tuple發射則會簡單的返回。

       要注意的是nextTuple方法不能阻塞,由於storm在同一個線程上面調用全部消息源spout的方法。

 

另外兩個比較重要的spout方法是ack和fail。storm在檢測到一個tuple被整個topology成功處理的時候調用ack,不然調用fail。storm只對可靠的spout調用ack和fail。

八、Bolt:

     Topology中全部的處理都由Bolt完成。即全部的消息處理邏輯被封裝在bolts裏面。Bolt能夠完成任何事,好比:鏈接的過濾、聚合、訪問文件/數據庫、等等。

        Bolt從Spout中接收數據並進行處理,若是遇到複雜流的處理也可能將tuple發送給另外一個Bolt進行處理。即須要通過不少blots。好比算出一堆圖片裏面被轉發最多的圖片就至少須要兩步:第一步算出每一個圖片的轉發數量。第二步找出轉發最多的前10個圖片。(若是要把這個過程作得更具備擴展性那麼可能須要更多的步驟)。

        Bolts能夠發射多條消息流, 使用OutputFieldsDeclarer.declareStream定義stream,使用OutputCollector.emit來選擇要發射的stream。

      而Bolt中最重要的方法是execute(),以新的tuple做爲參數接收。無論是Spout仍是Bolt,若是將tuple發射成多個流,這些流均可以經過declareStream()來聲明。

     bolts使用OutputCollector來發射tuple,bolts必需要爲它處理的每個tuple調用OutputCollector的ack方法,以通知Storm這個tuple被處理完成了,從而通知這個tuple的發射者spouts。 通常的流程是: bolts處理一個輸入tuple,  發射0個或者多個tuple, 而後調用ack通知storm本身已經處理過這個tuple了。storm提供了一個IBasicBolt會自動調用ack。

九、Tuple:

       一次消息傳遞的基本單元。原本應該是一個key-value的map,可是因爲各個組件間傳遞的tuple的字段名稱已經事先定義好,因此tuple中只要按序填入各個value就好了,因此就是一個value list.

十、Stream:

        源源不斷傳遞的tuple就組成了stream。消息流stream是storm裏的關鍵抽象。一個消息流是一個沒有邊界的tuple序列, 而這些tuple序列會以一種分佈式的方式並行地建立和處理。經過對stream中tuple序列中每一個字段命名來定義stream。在默認的狀況下,tuple的字段類型能夠是:integer,long,short, byte,string,double,float,boolean和byte array。你也能夠自定義類型(只要實現相應的序列化器)。

     每一個消息流在定義的時候會被分配給一個id,由於單向消息流使用的至關廣泛, OutputFieldsDeclarer定義了一些方法讓你能夠定義一個stream而不用指定這個id。在這種狀況下這個stream會分配個值爲‘default’默認的id 。

      Storm提供的最基本的處理stream的原語是spout和bolt。你能夠實現spout和bolt提供的接口來處理你的業務邏輯。

      

十一、Stream Groupings:

Stream Grouping定義了一個流在Bolt任務間該如何被切分。這裏有Storm提供的6個Stream Grouping類型:

1). 隨機分組(Shuffle grouping):隨機分發tuple到Bolt的任務,保證每一個任務得到相等數量的tuple。

2). 字段分組(Fields grouping):根據指定字段分割數據流,並分組。例如,根據「user-id」字段,相同「user-id」的元組老是分發到同一個任務,不一樣「user-id」的元組可能分發到不一樣的任務。

3). 所有分組(All grouping):tuple被複制到bolt的全部任務。這種類型須要謹慎使用。

4). 全局分組(Global grouping):所有流都分配到bolt的同一個任務。明確地說,是分配給ID最小的那個task。

5). 無分組(None grouping):你不須要關心流是如何分組。目前,無分組等效於隨機分組。但最終,Storm將把無分組的Bolts放到Bolts或Spouts訂閱它們的同一線程去執行(若是可能)。

6). 直接分組(Direct grouping):這是一個特別的分組類型。元組生產者決定tuple由哪一個元組處理者任務接收。

固然還能夠實現CustomStreamGroupimg接口來定製本身須要的分組。

 

storm 和hadoop的對比來了解storm中的基本概念。

  Hadoop Storm
系統角色 JobTracker Nimbus
TaskTracker Supervisor
Child Worker
應用名稱 Job Topology
組件接口 Mapper/Reducer Spout/Bolt

 

3.  Storm應用場景

       Storm 與其餘大數據解決方案的不一樣之處在於它的處理方式。Hadoop 在本質上是一個批處理系統。數據被引入 Hadoop 文件系統 (HDFS) 並分發到各個節點進行處理。當處理完成時,結果數據返回到 HDFS 供始發者使用。Storm 支持建立拓撲結構來轉換沒有終點的數據流。不一樣於 Hadoop 做業,這些轉換從不中止,它們會持續處理到達的數據。

Twitter列舉了Storm的三大類應用:

1. 信息流處理{Stream processing} Storm可用來實時處理新數據和更新數據庫,兼具容錯性和可擴展性。即Storm能夠用來處理源源不斷流進來的消息,處理以後將結果寫入到某個存儲中去。

2. 連續計算{Continuous computation} Storm可進行連續查詢並把結果即時反饋給客戶端。好比把Twitter上的熱門話題發送到瀏覽器中。

3. 分佈式遠程程序調用{Distributed RPC}        Storm可用來並行處理密集查詢。Storm的拓撲結構是一個等待調用信息的分佈函數,當它收到一條調用信息後,會對查詢進行計算,並返回查詢結果。舉個例子Distributed RPC能夠作並行搜索或者處理大集合的數據。

        經過配置drpc服務器,將storm的topology發佈爲drpc服務。客戶端程序能夠調用drpc服務將數據發送到storm集羣中,並接收處理結果的反饋。這種方式須要drpc服務器進行轉發,其中drpc服務器底層經過thrift實現。適合的業務場景主要是實時計算。而且擴展性良好,能夠增長每一個節點的工做worker數量來動態擴展。

 

 

4.  項目實施,構建Topology

 

      當下狀況咱們須要給Spout和Bolt設計一種可以處理大量數據(日誌文件)的topology,當一個特定數據值超過預設的臨界值時促發警報。使用Storm的topology,逐行讀入日誌文件而且監視輸入數據。在Storm組件方面,Spout負責讀入輸入數據。它不只從現有的文件中讀入數據,同時還監視着新文件。文件一旦被修改Spout會讀入新的版本而且覆蓋以前的tuple(能夠被Bolt讀入的格式),將tuple發射給Bolt進行臨界分析,這樣就能夠發現全部可能超臨界的記錄。

下一節將對用例進行詳細介紹。

臨界分析

這一節,將主要聚焦於臨界值的兩種分析類型:瞬間臨界(instant thershold)和時間序列臨界(time series threshold)。

  • 瞬間臨界值監測:一個字段的值在那個瞬間超過了預設的臨界值,若是條件符合的話則觸發一個trigger。舉個例子當車輛超越80千米每小時,則觸發trigger。
  • 時間序列臨界監測:字段的值在一個給定的時間段內超過了預設的臨界值,若是條件符合則觸發一個觸發器。好比:在5分鐘類,時速超過80KM兩次及以上的車輛。

Listing One顯示了咱們將使用的一個類型日誌,其中包含的車輛數據信息有:車牌號、車輛行駛的速度以及數據獲取的位置。

AB 123 60 North city
BC 123 70 South city
CD 234 40 South city
DE 123 40 East  city
EF 123 90 South city
GH 123 50 West  city

這裏將建立一個對應的XML文件,這將包含引入數據的模式。這個XML將用於日誌文件的解析。XML的設計模式和對應的說明請見下表。

XML文件和日誌文件都存放在Spout能夠隨時監測的目錄下,用以關注文件的實時更新。而這個用例中的topology請見下圖。

Figure 1:Storm中創建的topology,用以實現數據實時處理

如圖所示:FilelistenerSpout接收輸入日誌並進行逐行的讀入,接着將數據發射給ThresoldCalculatorBolt進行更深一步的臨界值處理。一旦處理完成,被計算行的數據將發送給DBWriterBolt,而後由DBWriterBolt存入給數據庫。下面將對這個過程的實現進行詳細的解析。

Spout的實現

Spout以日誌文件和XML描述文件做爲接收對象。XML文件包含了與日誌一致的設計模式。不妨設想一下一個示例日誌文件,包含了車輛的車牌號、行駛速度、以及數據的捕獲位置。(看下圖)

Figure2:數據從日誌文件到Spout的流程圖

Listing Two顯示了tuple對應的XML,其中指定了字段、將日誌文件切割成字段的定界符以及字段的類型。XML文件以及數據都被保存到Spout指定的路徑。

Listing Two:用以描述日誌文件的XML文件。

 

  1. <TUPLEINFO>   
  2. <FIELDLIST>   
  3. <FIELD>   
  4. <COLUMNNAME>vehicle_number</COLUMNNAME>   
  5. <COLUMNTYPE>string</COLUMNTYPE>   
  6. </FIELD>   
  7.    
  8. <FIELD>  
  9. <COLUMNNAME>speed</COLUMNNAME>   
  10. <COLUMNTYPE>int</COLUMNTYPE>   
  11. </FIELD>   
  12.    
  13. <FIELD>   
  14. <COLUMNNAME>location</COLUMNNAME>   
  15. <COLUMNTYPE>string</COLUMNTYPE>   
  16. </FIELD>   
  17. </FIELDLIST>   
  18. <DELIMITER>,</DELIMITER>   
  19. </TUPLEINFO>     

 

經過構造函數及它的參數Directory、PathSpout和TupleInfo對象建立Spout對象。TupleInfo儲存了日誌文件的字段、定界符、字段的類型這些很必要的信息。這個對象經過XSTream序列化XML時創建。

Spout的實現步驟:

  • 對文件的改變進行分開的監聽,並監視目錄下有無新日誌文件添加。
  • 在數據獲得了字段的說明後,將其轉換成tuple。
  • 聲明Spout和Bolt之間的分組,並決定tuple發送給Bolt的途徑。

Spout的具體編碼在Listing Three中顯示。

Listing Three:Spout中open、nextTuple和delcareOutputFields方法的邏輯。

 

  1. public void open( Map conf, TopologyContext context,SpoutOutputCollector collector )     
  2. {     
  3.            _collector = collector;     
  4.          try     
  5.          {     
  6.          fileReader  =  new BufferedReader(new FileReader(new File(file)));    
  7.          }    
  8.          catch (FileNotFoundException e)    
  9.          {    
  10.          System.exit(1);     
  11.          }    
  12. }                                                            
  13.    
  14. public void nextTuple()    
  15. {    
  16.          protected void ListenFile(File file)    
  17.          {    
  18.          Utils.sleep(2000);    
  19.          RandomAccessFile access = null;    
  20.          String line = null;     
  21.             try     
  22.             {    
  23.                 while ((line = access.readLine()) != null)    
  24.                 {    
  25.                     if (line !=null)    
  26.                     {     
  27.                          String[] fields=null;    
  28.                           if (tupleInfo.getDelimiter().equals("|"))  fields = line.split("\\"+tupleInfo.getDelimiter());     
  29.                           else     
  30.                           fields = line.split  (tupleInfo.getDelimiter());     
  31.                           if (tupleInfo.getFieldList().size() == fields.length)  _collector.emit(new Values(fields));    
  32.                     }    
  33.                }    
  34.             }    
  35.             catch (IOException ex){ }    
  36.             }    
  37. }    
  38.    
  39. public void declareOutputFields(OutputFieldsDeclarer declarer)    
  40. {    
  41.       String[] fieldsArr = new String [tupleInfo.getFieldList().size()];    
  42.       for(int i=0; i<tupleInfo.getFieldList().size(); i++)    
  43.       {    
  44.               fieldsArr[i] = tupleInfo.getFieldList().get(i).getColumnName();    
  45.       }    
  46. declarer.declare(new Fields(fieldsArr));    
  47. }        

 

declareOutputFileds()決定了tuple發射的格式,這樣的話Bolt就能夠用相似的方法將tuple譯碼。Spout持續對日誌文件的數據的變動進行監聽,一旦有添加Spout就會進行讀入而且發送給Bolt進行處理。

Bolt的實現

Spout的輸出結果將給予Bolt進行更深一步的處理。通過對用例的思考,咱們的topology中須要如Figure 3中的兩個Bolt。

Figure 3:Spout到Bolt的數據流程。

ThresholdCalculatorBolt

Spout將tuple發出,由ThresholdCalculatorBolt接收並進行臨界值處理。在這裏,它將接收好幾項輸入進行檢查;分別是:

臨界值檢查

  • 臨界值欄數檢查(拆分紅字段的數目)
  • 臨界值數據類型(拆分後字段的類型)
  • 臨界值出現的頻數
  • 臨界值時間段檢查

Listing Four中的類,定義用來保存這些值。

Listing Four:ThresholdInfo類

 

  1. public class ThresholdInfo implementsSerializable    
  2.    
  3. {      
  4.         private String action;     
  5.         private String rule;     
  6.         private Object thresholdValue;    
  7.         private int thresholdColNumber;     
  8.         private Integer timeWindow;     
  9.         private int frequencyOfOccurence;     
  10. }     

基於字段中提供的值,臨界值檢查將被Listing Five中的execute()方法執行。代碼大部分的功能是解析和接收值的檢測。

 

Listing Five:臨界值檢測代碼段

 

  1. public void execute(Tuple tuple, BasicOutputCollector collector)     
  2. {    
  3.     if(tuple!=null)     
  4.     {    
  5.         List<Object> inputTupleList = (List<Object>) tuple.getValues();    
  6.         int thresholdColNum = thresholdInfo.getThresholdColNumber();     
  7.         Object thresholdValue = thresholdInfo.getThresholdValue();     
  8.         String thresholdDataType = tupleInfo.getFieldList().get(thresholdColNum-1).getColumnType();     
  9.         Integer timeWindow = thresholdInfo.getTimeWindow();    
  10.          int frequency = thresholdInfo.getFrequencyOfOccurence();    
  11.          if(thresholdDataType.equalsIgnoreCase("string"))    
  12.          {    
  13.              String valueToCheck = inputTupleList.get(thresholdColNum-1).toString();    
  14.              String frequencyChkOp = thresholdInfo.getAction();    
  15.              if(timeWindow!=null)    
  16.              {    
  17.                  long curTime = System.currentTimeMillis();    
  18.                  long diffInMinutes = (curTime-startTime)/(1000);    
  19.                  if(diffInMinutes>=timeWindow)    
  20.                  {    
  21.                      if(frequencyChkOp.equals("=="))    
  22.                      {    
  23.                           if(valueToCheck.equalsIgnoreCase(thresholdValue.toString()))    
  24.                           {    
  25.                               count.incrementAndGet();    
  26.                               if(count.get() > frequency)    
  27.                                   splitAndEmit(inputTupleList,collector);    
  28.                           }    
  29.                      }    
  30.                      else if(frequencyChkOp.equals("!="))    
  31.                      {    
  32.                          if(!valueToCheck.equalsIgnoreCase(thresholdValue.toString()))    
  33.                          {    
  34.                               count.incrementAndGet();    
  35.                               if(count.get() > frequency)    
  36.                                   splitAndEmit(inputTupleList,collector);    
  37.                           }    
  38.                       }    
  39.                       else                         System.out.println("Operator not supported");     
  40.                   }    
  41.               }    
  42.               else   
  43.               {    
  44.                   if(frequencyChkOp.equals("=="))    
  45.                   {    
  46.                       if(valueToCheck.equalsIgnoreCase(thresholdValue.toString()))    
  47.                       {    
  48.                           count.incrementAndGet();    
  49.                           if(count.get() > frequency)    
  50.                               splitAndEmit(inputTupleList,collector);    
  51.                           }    
  52.                   }    
  53.                   else if(frequencyChkOp.equals("!="))    
  54.                   {    
  55.                        if(!valueToCheck.equalsIgnoreCase(thresholdValue.toString()))    
  56.                        {    
  57.                            count.incrementAndGet();    
  58.                            if(count.get() > frequency)    
  59.                                splitAndEmit(inputTupleList,collector);    
  60.                           }    
  61.                    }    
  62.                }    
  63.             }    
  64.             else if(thresholdDataType.equalsIgnoreCase("int") ||                     thresholdDataType.equalsIgnoreCase("double") ||                     thresholdDataType.equalsIgnoreCase("float") ||                     thresholdDataType.equalsIgnoreCase("long") ||                     thresholdDataType.equalsIgnoreCase("short"))    
  65.             {    
  66.                 String frequencyChkOp = thresholdInfo.getAction();    
  67.                 if(timeWindow!=null)    
  68.                 {    
  69.                      long valueToCheck =                          Long.parseLong(inputTupleList.get(thresholdColNum-1).toString());    
  70.                      long curTime = System.currentTimeMillis();    
  71.                      long diffInMinutes = (curTime-startTime)/(1000);    
  72.                      System.out.println("Difference in minutes="+diffInMinutes);    
  73.                      if(diffInMinutes>=timeWindow)    
  74.                      {    
  75.                           if(frequencyChkOp.equals("<"))    
  76.                           {    
  77.                               if(valueToCheck < Double.parseDouble(thresholdValue.toString()))    
  78.                               {    
  79.                                    count.incrementAndGet();    
  80.                                    if(count.get() > frequency)    
  81.                                        splitAndEmit(inputTupleList,collector);    
  82.                               }    
  83.                           }    
  84.                           else if(frequencyChkOp.equals(">"))    
  85.                           {    
  86.                                if(valueToCheck > Double.parseDouble(thresholdValue.toString()))    
  87.                                 {    
  88.                                    count.incrementAndGet();    
  89.                                    if(count.get() > frequency)    
  90.                                        splitAndEmit(inputTupleList,collector);    
  91.                                }    
  92.                            }    
  93.                            else if(frequencyChkOp.equals("=="))    
  94.                            {    
  95.                               if(valueToCheck == Double.parseDouble(thresholdValue.toString()))    
  96.                               {    
  97.                                   count.incrementAndGet();    
  98.                                   if(count.get() > frequency)    
  99.                                       splitAndEmit(inputTupleList,collector);    
  100.                                }    
  101.                            }    
  102.                            else if(frequencyChkOp.equals("!="))    
  103.                            {    
  104.     . . .    
  105.                             }    
  106.                        }    
  107.              }    
  108.       else   
  109.           splitAndEmit(null,collector);    
  110.       }    
  111.       else   
  112.      {    
  113.            System.err.println("Emitting null in bolt");    
  114.            splitAndEmit(null,collector);    
  115.     }    
  116. }   



 

經由Bolt發送的的tuple將會傳遞到下一個對應的Bolt,在咱們的用例中是DBWriterBolt。

DBWriterBolt

通過處理的tuple必須被持久化以便於觸發tigger或者更深層次的使用。DBWiterBolt作了這個持久化的工做並把tuple存入了數據庫。表的創建由prepare()函數完成,這也將是topology調用的第一個方法。方法的編碼如Listing Six所示。

Listing Six:建表編碼。

 

  1. public void prepare( Map StormConf, TopologyContext context )     
  2. {           
  3.     try     
  4.     {    
  5.         Class.forName(dbClass);    
  6.     }     
  7.     catch (ClassNotFoundException e)     
  8.     {    
  9.         System.out.println("Driver not found");    
  10.         e.printStackTrace();    
  11.     }    
  12.      
  13.     try     
  14.     {    
  15.        connection driverManager.getConnection(     
  16.            "jdbc:mysql://"+databaseIP+":"+databasePort+"/"+databaseName, userName, pwd);    
  17.        connection.prepareStatement("DROP TABLE IF EXISTS "+tableName).execute();    
  18.      
  19.        StringBuilder createQuery = new StringBuilder(    
  20.            "CREATE TABLE IF NOT EXISTS "+tableName+"(");    
  21.        for(Field fields : tupleInfo.getFieldList())    
  22.        {    
  23.            if(fields.getColumnType().equalsIgnoreCase("String"))    
  24.                createQuery.append(fields.getColumnName()+" VARCHAR(500),");    
  25.            else   
  26.                createQuery.append(fields.getColumnName()+" "+fields.getColumnType()+",");    
  27.        }    
  28.        createQuery.append("thresholdTimeStamp timestamp)");    
  29.        connection.prepareStatement(createQuery.toString()).execute();    
  30.      
  31.        // Insert Query    
  32.        StringBuilder insertQuery = new StringBuilder("INSERT INTO "+tableName+"(");    
  33.        String tempCreateQuery = new String();    
  34.        for(Field fields : tupleInfo.getFieldList())    
  35.        {    
  36.             insertQuery.append(fields.getColumnName()+",");    
  37.        }    
  38.        insertQuery.append("thresholdTimeStamp").append(") values (");    
  39.        for(Field fields : tupleInfo.getFieldList())    
  40.        {    
  41.            insertQuery.append("?,");    
  42.        }    
  43.      
  44.        insertQuery.append("?)");    
  45.        prepStatement = connection.prepareStatement(insertQuery.toString());    
  46.     }    
  47.     catch (SQLException e)     
  48.     {           
  49.         e.printStackTrace();    
  50.     }           
  51. }    



 

數據分批次的插入數據庫。插入的邏輯由Listting Seven中的execute()方法提供。大部分的編碼都是用來實現可能存在不一樣類型輸入的解析。

Listing Seven:數據插入的代碼部分。

 

  1. public void execute(Tuple tuple, BasicOutputCollector collector)     
  2. {    
  3.     batchExecuted=false;    
  4.     if(tuple!=null)    
  5.     {    
  6.        List<Object> inputTupleList = (List<Object>) tuple.getValues();    
  7.        int dbIndex=0;    
  8.        for(int i=0;i<tupleInfo.getFieldList().size();i++)    
  9.        {    
  10.            Field field = tupleInfo.getFieldList().get(i);    
  11.            try {    
  12.                dbIndex = i+1;    
  13.                if(field.getColumnType().equalsIgnoreCase("String"))                 
  14.                    prepStatement.setString(dbIndex, inputTupleList.get(i).toString());    
  15.                else if(field.getColumnType().equalsIgnoreCase("int"))    
  16.                    prepStatement.setInt(dbIndex,    
  17.                        Integer.parseInt(inputTupleList.get(i).toString()));    
  18.                else if(field.getColumnType().equalsIgnoreCase("long"))    
  19.                    prepStatement.setLong(dbIndex,     
  20.                        Long.parseLong(inputTupleList.get(i).toString()));    
  21.                else if(field.getColumnType().equalsIgnoreCase("float"))    
  22.                    prepStatement.setFloat(dbIndex,     
  23.                        Float.parseFloat(inputTupleList.get(i).toString()));    
  24.                else if(field.getColumnType().equalsIgnoreCase("double"))    
  25.                    prepStatement.setDouble(dbIndex,     
  26.                        Double.parseDouble(inputTupleList.get(i).toString()));    
  27.                else if(field.getColumnType().equalsIgnoreCase("short"))    
  28.                    prepStatement.setShort(dbIndex,     
  29.                        Short.parseShort(inputTupleList.get(i).toString()));    
  30.                else if(field.getColumnType().equalsIgnoreCase("boolean"))    
  31.                    prepStatement.setBoolean(dbIndex,     
  32.                        Boolean.parseBoolean(inputTupleList.get(i).toString()));    
  33.                else if(field.getColumnType().equalsIgnoreCase("byte"))    
  34.                    prepStatement.setByte(dbIndex,     
  35.                        Byte.parseByte(inputTupleList.get(i).toString()));    
  36.                else if(field.getColumnType().equalsIgnoreCase("Date"))    
  37.                {    
  38.                   Date dateToAdd=null;    
  39.                   if (!(inputTupleList.get(i) instanceof Date))      
  40.                   {      
  41.                        DateFormat df = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");    
  42.                        try     
  43.                        {    
  44.                            dateToAdd = df.parse(inputTupleList.get(i).toString());    
  45.                        }    
  46.                        catch (ParseException e)     
  47.                        {    
  48.                            System.err.println("Data type not valid");    
  49.                        }    
  50.                    }      
  51.                    else   
  52.                    {    
  53.             dateToAdd = (Date)inputTupleList.get(i);    
  54.             java.sql.Date sqlDate = new java.sql.Date(dateToAdd.getTime());    
  55.             prepStatement.setDate(dbIndex, sqlDate);    
  56.             }       
  57.             }     
  58.         catch (SQLException e)     
  59.         {    
  60.              e.printStackTrace();    
  61.         }    
  62.     }    
  63.     Date now = new Date();              
  64.     try   
  65.     {    
  66.         prepStatement.setTimestamp(dbIndex+1, new java.sql.Timestamp(now.getTime()));    
  67.         prepStatement.addBatch();    
  68.         counter.incrementAndGet();    
  69.         if (counter.get()== batchSize)     
  70.         executeBatch();    
  71.     }     
  72.     catch (SQLException e1)     
  73.     {    
  74.         e1.printStackTrace();    
  75.     }               
  76.    }    
  77.    else   
  78.    {    
  79.         long curTime = System.currentTimeMillis();    
  80.        long diffInSeconds = (curTime-startTime)/(60*1000);    
  81.        if(counter.get()<batchSize && diffInSeconds>batchTimeWindowInSeconds)    
  82.        {    
  83.             try {    
  84.                 executeBatch();    
  85.                 startTime = System.currentTimeMillis();    
  86.             }    
  87.             catch (SQLException e) {    
  88.                  e.printStackTrace();    
  89.             }    
  90.        }    
  91.    }    
  92. }    
  93.      
  94. public void executeBatch() throws SQLException    
  95. {    
  96.     batchExecuted=true;    
  97.     prepStatement.executeBatch();    
  98.     counter = new AtomicInteger(0);    
  99. }   



 

一旦Spout和Bolt準備就緒(等待被執行),topology生成器將會創建topology並準備執行。下面就來看一下執行步驟。

在本地集羣上運行和測試topology

  • 經過TopologyBuilder創建topology。
  • 使用Storm Submitter,將topology遞交給集羣。以topology的名字、配置和topology的對象做爲參數。
  • 提交topology。

Listing Eight:創建和執行topology。

 

  1. public class StormMain    
  2. {    
  3.      public static void main(String[] args) throws AlreadyAliveException,     
  4.                                                    InvalidTopologyException,     
  5.                                                    InterruptedException     
  6.      {    
  7.           ParallelFileSpout parallelFileSpout = new ParallelFileSpout();    
  8.           ThresholdBolt thresholdBolt = new ThresholdBolt();    
  9.           DBWriterBolt dbWriterBolt = new DBWriterBolt();    
  10.           TopologyBuilder builder = new TopologyBuilder();    
  11.           builder.setSpout("spout", parallelFileSpout, 1);    
  12.           builder.setBolt("thresholdBolt", thresholdBolt,1).shuffleGrouping("spout");    
  13.           builder.setBolt("dbWriterBolt",dbWriterBolt,1).shuffleGrouping("thresholdBolt");    
  14.           if(this.argsMain!=null && this.argsMain.length > 0)     
  15.           {    
  16.               conf.setNumWorkers(1);    
  17.               StormSubmitter.submitTopology(     
  18.                    this.argsMain[0], conf, builder.createTopology());    
  19.           }    
  20.           else   
  21.           {        
  22.               Config conf = new Config();    
  23.               conf.setDebug(true);    
  24.               conf.setMaxTaskParallelism(3);    
  25.               LocalCluster cluster = new LocalCluster();    
  26.               cluster.submitTopology(    
  27.               "Threshold_Test", conf, builder.createTopology());    
  28.           }    
  29.      }    
  30. }   

 

topology被創建後將被提交到本地集羣。一旦topology被提交,除非被取締或者集羣關閉,它將一直保持運行不須要作任何的修改。這也是Storm的另外一大特點之一。

這個簡單的例子體現了當你掌握了topology、spout和bolt的概念,將能夠輕鬆的使用Storm進行實時處理。若是你既想處理大數據又不想遍歷Hadoop的話,不難發現使用Storm將是個很好的選擇。

 

 

5.  storm常見問題解答

 

1、我有一個數據文件,或者我有一個系統裏面有數據,怎麼導入storm作計算?

你須要實現一個Spout,Spout負責將數據emit到storm系統裏,交給bolts計算。怎麼實現spout能夠參考官方的kestrel spout實現:
https://github.com/nathanmarz/storm-kestrel

若是你的數據源不支持事務性消費,那麼就沒法獲得storm提供的可靠處理的保證,也不必實現ISpout接口中的ack和fail方法。

2、Storm爲了保證tuple的可靠處理,須要保存tuple信息,這會不會致使內存OOM?

Storm爲了保證tuple的可靠處理,acker會保存該節點建立的tuple id的xor值,這稱爲ack value,那麼每ack一次,就將tuple id和ack value作異或(xor)。當全部產生的tuple都被ack的時候, ack value必定爲0。這是個很簡單的策略,對於每個tuple也只要佔用約20個字節的內存。對於100萬tuple,也才20M左右。關於可靠處理看這個:
https://github.com/nathanmarz/storm/wiki/Guaranteeing-message-processing

3、Storm計算後的結果保存在哪裏?能夠保存在外部存儲嗎?

Storm不處理計算結果的保存,這是應用代碼須要負責的事情,若是數據不大,你能夠簡單地保存在內存裏,也能夠每次都更新數據庫,也能夠採用NoSQL存儲。storm並無像s4那樣提供一個Persist API,根據時間或者容量來作存儲輸出。這部分事情徹底交給用戶。

數據存儲以後的展示,也是你須要本身處理的,storm UI只提供對topology的監控和統計。

4、Storm怎麼處理重複的tuple?

由於Storm要保證tuple的可靠處理,當tuple處理失敗或者超時的時候,spout會fail並從新發送該tuple,那麼就會有tuple重複計算的問題。這個問題是很難解決的,storm也沒有提供機制幫助你解決。一些可行的策略:
(1)不處理,這也算是種策略。由於實時計算一般並不要求很高的精確度,後續的批處理計算會更正實時計算的偏差。
(2)使用第三方集中存儲來過濾,好比利用mysql,memcached或者redis根據邏輯主鍵來去重。
(3)使用bloom filter作過濾,簡單高效。

5、Storm的動態增刪節點

我在storm和s4裏比較裏談到的動態增刪節點,是指storm能夠動態地添加和減小supervisor節點。對於減小節點來講,被移除的supervisor上的worker會被nimbus從新負載均衡到其餘supervisor節點上。在storm 0.6.1之前的版本,增長supervisor節點不會影響現有的topology,也就是現有的topology不會從新負載均衡到新的節點上,在擴展集羣的時候很不方便,須要從新提交topology。所以我在storm的郵件列表裏提了這個問題,storm的開發者nathanmarz建立了一個issue 54並在0.6.1提供了rebalance命令來讓正在運行的topology從新負載均衡,具體見:
https://github.com/nathanmarz/storm/issues/54
和0.6.1的變動:
http://groups.google.com/group/storm-user/browse_thread/thread/24a8fce0b2e53246

storm並不提供機制來動態調整worker和task數目。

6、Storm UI裏spout統計的complete latency的具體含義是什麼?爲何emit的數目會是acked的兩倍?
這個事實上是storm郵件列表裏的一個問題。Storm做者marz的解答:

The complete latency is the time  from the spout emitting a tuple to that tuple being acked on the spout. So it tracks the time for the whole tuple tree to be processed.
If you dive into the spout component in the UI, you'll see that a lot of the emitted/transferred is on the __ack* stream.  This is the spout communicating with the ackers which take care of tracking the tuple trees. 


簡單地說,complete latency表示了tuple從emit到被acked通過的時間,能夠認爲是tuple以及該tuple的後續子孫(造成一棵樹)整個處理時間。其次spout的emit和transfered還統計了spout和acker之間內部的通訊信息,好比對於可靠處理的spout來講,會在emit的時候同時發送一個_ack_init給acker,記錄tuple id到task id的映射,以便ack的時候能找到正確的acker task。

 

 

 

 

6.  其餘開源的大數據解決方案

 

自 Google 在 2004 年推出 MapReduce 範式以來,已誕生了多個使用原始 MapReduce 範式(或擁有該範式的質量)的解決方案。Google 對 MapReduce 的最初應用是創建萬維網的索引。儘管此應用程序仍然很流行,但這個簡單模型解決的問題也正在增多。

表 1 提供了一個可用開源大數據解決方案的列表,包括傳統的批處理和流式處理應用程序。在將 Storm 引入開源以前將近一年的時間裏,Yahoo! 的 S4 分佈式流計算平臺已向 Apache 開源。S4 於 2010 年 10 月發佈,它提供了一個高性能計算 (HPC) 平臺,嚮應用程序開發人員隱藏了並行處理的複雜性。S4 實現了一個可擴展的、分散化的集羣架構,並歸入了部分容錯功能。


表 1. 開源大數據解決方案

解決方案 開發商 類型 描述
Storm Twitter 流式處理 Twitter 的新流式大數據分析解決方案
S4 Yahoo! 流式處理 來自 Yahoo! 的分佈式流計算平臺
Hadoop Apache 批處理 MapReduce 範式的第一個開源實現
Spark UC Berkeley AMPLab 批處理 支持內存中數據集和恢復能力的最新分析平臺
Disco Nokia 批處理 Nokia 的分佈式 MapReduce 框架
HPCC LexisNexis 批處理 HPC 大數據集羣
相關文章
相關標籤/搜索