用Storm輕鬆實時大數據分析【翻譯】

原文地址html

簡單易用,Storm讓大數據分析變得垂手可得。java

現在,公司在平常運做中常常會產生TB(terabytes)級的數據。數據來源包括從網絡傳感器捕獲的,到Web,社交媒體,交易型業務數據,以及其餘業務環境中建立的數據。考慮到數據的生成量,實時計算(real-time computation )已成爲不少組織面臨的一個巨大挑戰。咱們已經有效地使用了一個可擴展的實時計算系統——開源的 Storm 工具,它是有 Twitter 開發,一般被稱爲「實時 Hadoop(real-time Hadoop)」。然而,Storm 遠遠比 Hadoop 簡單,由於它並不須要掌握新技術處理大數據。node

本文介紹瞭如何使用 Storm。示例項目稱之爲「超速警報系統(Speeding Alert System),」分析實時數據,當車速超過一個預約義的閾值(threshold)時,觸發一個 trigger,相關數據就會保存到數據庫中。mysql

什麼是Storm


Hadoop 依靠批量處理(batch processing),而 Storm 是一個實時的(real-time),分佈式的(distributed),容錯的(fault-tolerant),計算系統。像 Hadoop,它能夠保證可靠性處理大量的數據,但不能實時;也就是說,每一個消息都將被處理。Storm 也提供這些特性,如容錯,分佈式計算,這些使它適合在不一樣機器上處理大規模數據。它還具備以下特性: git

  • 簡擴展。若想擴展,你只需添加設備和改變 topology 的並行性設置。用於集羣協調的 Hadoop Zookeeper 用在 Storm 使得它很是容易擴展。
  • 保證每一個消息都被處理。
  • Storm 集羣(cluster)很容易管理。
  • 容錯性:一旦 topology 被提交,Storm 運行 topology,直到它被殺掉或集羣被關閉。此外,若是執行期間發生錯誤,那麼從新分配的任務由 Storm 處理。
  • Storm 的 topology 能夠用任何語言定義,但一般仍是用 Java。

文章接下來的部分,你首先須要安裝和創建 Storm。步驟以下: github

  • Storm 官方站點下載 Storm.
  • 解壓,將 bin/ 添加到你的環境變量 PATH,保證 bin/storm 腳本可執行。

Hadoop Map/Reduce 的數據處理過程是,從HDFS中獲取數據,分片後,進行分佈式處理,最終輸出結果。redis

Hadoop 與 Storm 的概念對比,以下表所示:sql

Hadoop數據庫

Storm 瀏覽器

JobTracker

Nimbus

TaskTracker

Supervisor

Child

Worker

Job

Topology

Mapper/Reducer

Spout/Bolt

Twitter 列舉了Storm的三大類應用:

1. 信息流處理(Stream processing)。Storm可用來實時處理新數據和更新數據庫,兼具容錯性和可擴展性。即Storm能夠用來處理源源不斷流進來的消息,處理以後將結果寫入到某個存儲中去。

2. 連續計算(Continuous computation)。Storm可進行連續查詢並把結果即時反饋給客戶端。好比把Twitter上的熱門話題發送到瀏覽器中。

3. 分佈式遠程程序調用(Distributed RPC)。Storm可用來並行處理密集查詢。Storm的拓撲結構是一個等待調用信息的分佈函數,當它收到一條調用信息後,會對查詢進行計算,並返回查詢結果。舉個例子Distributed RPC能夠作並行搜索或者處理大集合的數據。

經過配置drpc服務器,將storm的topology發佈爲drpc服務。客戶端程序能夠調用drpc服務將數據發送到storm集羣中,並接收處理結果的反饋。這種方式須要drpc服務器進行轉發,其中drpc服務器底層經過thrift實現。適合的業務場景主要是實時計算。而且擴展性良好,能夠增長每一個節點的工做worker數量來動態擴展。

4. 項目實施,構建Topology。

Storm組件


Storm 集羣主要由主節點(master)和工做節點(worker node)組成,它們經過 Zookeeper 進行協調。Nimbus相似Hadoop裏面的 JobTracker。Nimbus 負責在集羣裏面分發代碼,分配計算任務給機器, 而且監控狀態。

主節點(master)——Nimbus

主節點運行一個守護進程(daemon),Nimbus,它負責在集羣中分佈代碼,分配任務(Task)並監測故障。它相似於 Hadoop 的 Job Tracker。

工做者節點(worker node)——Supervisor

工做節點一樣會運行一個守護進程,Supervisor,它監聽已分配的工做,並按要求運行工做進程。每一個工做節點都執行一個 topology 的子集。Nimbus 和 Supervisor 之間的協調是由 Zookeeper 或其集羣來管理。

Zookeeper

Zookeeper 負責 Supervisor 和 Nimbus 之間的協調。一個實時應用程序的邏輯被封裝到一個 Storm 的「topology」中。一個 topology 是由一組 spouts(數據源)和 bolts(數據操做)組成,經過 Stream Groupings 鏈接(協調)。下面更進一步說明這些術語。

Spout

簡單來講,一個 spout 在 topology 中從一個源中讀取數據。spout 能夠是可靠的,也能夠是不可靠的。若是 Storm 處理失敗,那麼一個可靠的 spout 能夠確保從新發送元組(它是一個數據項的有序列表)。一個不可靠的 spout,元組一旦發送,它不會跟蹤。spout 中的主要方法是 nextTuple()。該種方法或者向 topology 發出一個新元組,或是直接返回,若是沒有什麼可發。

Bolt

bolt 負責全部處理處理 topology 發生的一切。 bolt 可作從過濾到鏈接,聚合,寫文件/數據庫等等任何事。bolt 從一個 spout 接收數據來處理,在複雜流轉換中,它能夠進一步發出元組到另外一個 bolt。bolt 中主要方法是 execute(),它接受一個元組做爲輸入。在 spout 和 bolt,發動元組到更多的流,能夠在 declareStream() 中聲明和指定流。

Stream Grouping

stream grouping 定義流在 bolt 任務之間如何被劃分。Storm 提供了內置的流分組:隨機分組(shuffle grouping),域組域(fields grouping),全部分組(all grouping),單一分組(one grouping),直接分組(direct grouping)和本地/隨機分組(local/shuffle grouping)。自定義分組實現可使用 CustomStreamGrouping 接口。

  • 隨機分組(Shuffle grouping):隨機分發tuple到Bolt的任務,保證每一個任務得到相等數量的tuple。
  • 字段分組(Fields grouping):根據指定字段分割數據流,並分組。例如,根據「user-id」字段,相同「user-id」的元組老是分發到同一個任務,不一樣「user-id」的元組可能分發到不一樣的任務。
  • 所有分組(All grouping):tuple被複制到bolt的全部任務。這種類型須要謹慎使用。
  • 全局分組(Global grouping):所有流都分配到bolt的同一個任務。明確地說,是分配給ID最小的那個task。
  • 無分組(None grouping):你不須要關心流是如何分組。目前,無分組等效於隨機分組。但最終,Storm將把無分組的Bolts放到Bolts或Spouts訂閱它們的同一線程去執行(若是可能)。
  • 直接分組(Direct grouping):這是一個特別的分組類型。元組生產者決定tuple由哪一個元組處理者任務接收。

另外,還涉及其餘概念。

Task

worker 中每個 spout/bolt 的線程稱爲一個 task。在 storm0.8 以後,task 再也不與物理線程對應,同一個 spout/bolt 的 task 可能會共享一個物理線程,該線程稱爲 executor。

Tuple

一次消息傳遞的基本單元。原本應該是一個 key-value 的 map,可是因爲各個組件間傳遞的tuple的字段名稱已經事先定義好,因此,tuple 中只要按序填入各個value 就好了,因此就是一個 value list。

Topology

storm中運行的一個實時應用程序,由於各個組件間的消息流動造成邏輯上的一個拓撲結構。一個topology是spouts和bolts組成的圖, 經過stream groupings將圖中的spouts和bolts鏈接起來,以下圖:

center[1]

一個 topology 會一直運行,直到你 kill 掉它,Storm自動地從新分配執行失敗的任務, 而且Storm能夠保證你不會有數據丟失(若是開啓了高可靠性的話)。若是一些機器意外停機它上面的全部任務會被轉移到其餘機器上。

運行一個topology很簡單。首先,把你全部的代碼以及所依賴的jar打進一個jar包。而後運行相似下面的這個命令:

storm jar all-my-code.jar backtype.storm.MyTopology arg1 arg2 

這個命令會運行主類: backtype.strom.MyTopology, 參數是arg1, arg2。這個類的main函數定義這個topology而且把它提交給Nimbus。storm jar負責鏈接到Nimbus而且上傳jar包。

Topology的定義是一個Thrift結構,而且Nimbus就是一個Thrift服務, 你能夠提交由任何語言建立的topology。上面的方面是用JVM-based語言提交的最簡單的方法。

Stream

源源不斷傳遞的tuple就組成了stream。消息流stream是storm裏的關鍵抽象。一個消息流是一個沒有邊界的tuple序列, 而這些tuple序列會以一種分佈式的方式並行地建立和處理。經過對stream中tuple序列中每一個字段命名來定義stream。在默認的狀況下,tuple的字段類型能夠是:integer,long,short, byte,string,double,float,boolean和byte array。你也能夠自定義類型(只要實現相應的序列化器)。

每一個消息流在定義的時候會被分配給一個id,由於單向消息流使用的至關廣泛, OutputFieldsDeclarer 定義了一些方法讓你能夠定義一個stream而不用指定這個id。在這種狀況下這個stream會分配個值爲‘default’默認的id 。

Storm提供的最基本的處理stream的原語是spout和bolt。你能夠實現spout和bolt提供的接口來處理你的業務邏輯。

center 

實現


對於咱們的示例中,咱們設計了一個 spout 和 bolt 的 topology,能夠處理大量規模數據(日誌文件),設計觸發一個報警,當一個特定值超過預設閾值時。使用 Storm 的 topology,日誌文件按行讀取,topology 監控到來的數據。在 Storm 組件,spout 讀取到來的數據。它不只從現存的文件中讀取數據,也監控新文件。一旦文件被修改,spout 讀取新條目,轉換爲元組(一個能夠被 bolt 讀取的格式)後,把元組發出到 bolt 執行閾值分析,查找任何超過閾值的記錄。

閾值分析(Threshold Analysis)


本節主要集中兩種類型的閾值(threshold)分析:瞬時閾值(instant thershold)和時間序列閾值(time series threshold)。

  • 瞬時閾值監測:一個字段的值在那個瞬間超過了預設的臨界值,若是條件符合的話則觸發一個trigger。舉個例子當車輛超越80千米每小時,則觸發trigger。
  • 時間序列閾值監測:字段的值在一個給定的時間段內超過了預設的臨界值,若是條件符合則觸發一個觸發器。好比:在5分鐘內,時速超過80千米每小時兩次及以上的車輛。

清單 1 顯示一個咱們使用的日誌文件,它包含車輛數據信息,例如車輛號碼,速度,位置。

清單 1:日誌文件,經過檢查點的車輛信息

AB 123, 60, North city
BC 123, 70, South city
CD 234, 40, South city
DE 123, 40, East city
EF 123, 90, South city
GH 123, 50, West city

建立相應的XML文件,它由到來的數據格式組成。用於解析日誌文件。架構 XML 及其相應的描述以下表所示。

infotable3

XML文件和日誌文件都被 spout 隨時監測,本例使用的 topology 以下圖所示。

info1

圖 1:Storm中建立的 topology,以處理實時數據

如圖1所示,FilelistenerSpout 接收輸入日誌,並逐行讀取,把數據發送給 ThresoldCalculatorBolt 進一步的閾值處理。一旦處理完成,根據閾值計算的行被髮動到 DBWriterBolt,持久化到數據庫(或發出報警)。這個過程的具體實現將在下面介紹。

Spout 實現


spout 把日誌文件和XML描述符文件做爲輸入。該XML文件指定了日誌文件的格式。如今考慮一個例子的日誌文件,它包含車輛信息,如車輛號碼,速度,位置等三個信息。如圖 2 所示。

info2

圖 2:數據從日誌文件到 spout 的流程圖

列表 2 顯示了tuple對應的XML,其中指定了字段、將日誌文件切割成字段的定界符以及字段的類型。XML文件以及數據都被保存到Spout指定的路徑。

列表 2:用以描述日誌文件的XML文件

<TUPLEINFO>
              <FIELDLIST>
                  <FIELD>
                            <COLUMNNAME>vehicle_number</COLUMNNAME> 
                            <COLUMNTYPE>string</COLUMNTYPE> 
                  </FIELD>
                             
                  <FIELD>
                            <COLUMNNAME>speed</COLUMNNAME> 
                            <COLUMNTYPE>int</COLUMNTYPE> 
                  </FIELD>
                                                          
                  <FIELD>
                             <COLUMNNAME>location</COLUMNNAME> 
                             <COLUMNTYPE>string</COLUMNTYPE> 
                  </FIELD>
              </FIELDLIST>  
           <DELIMITER>,</DELIMITER> 
</TUPLEINFO>

構造函數用參數 Directory、PathSpout 和 TupleInfo 對象建立 Spout 對象。TupleInfo 儲存與日誌文件相關的必要信息,如字段、分隔符、字段類型等。該對象經過XSTream序列化XML來建建。

Spout實現步驟:

  • 監聽一個單獨日誌文件的變化。監控目錄是否添加新的日誌文件。
  • 聲明字段後,把 spout 讀取行轉換成 tuple。
  • 聲明Spout和Bolt之間的分組,並決定tuple發送給Bolt的方式。

Spout 代碼以下列表 3 所示。

列表 3:Spout中 open、nextTuple 和 delcareOutputFields 方法

public void open( Map conf, TopologyContext context,SpoutOutputCollector collector ) 
{
    _collector = collector;
    try
    {
    fileReader  =  new BufferedReader(new FileReader(new File(file)));
    } 
    catch (FileNotFoundException e) 
    {
    System.exit(1);
    }
}
                                                                                                  
public void nextTuple() 
{
    protected void ListenFile(File file) 
    {
    Utils.sleep(2000);
    RandomAccessFile access = null; 
    String line = null;                  
       try
       { 
           while ((line = access.readLine()) != null)
           { 
               if (line !=null)
               {
                    String[] fields=null;
                    if (tupleInfo.getDelimiter().equals("|"))
                       fields = line.split("\\"+tupleInfo.getDelimiter());
                    else                                                                                                             fields = line.split(tupleInfo.getDelimiter());                                                
                    if (tupleInfo.getFieldList().size() == fields.length)
                       _collector.emit(new Values(fields)); 
               }          
           } 
      } 
      catch (IOException ex) { }              
      }
}
 
public void declareOutputFields(OutputFieldsDeclarer declarer) 
{
     String[] fieldsArr = new String [tupleInfo.getFieldList().size()];
     for(int i=0; i<tupleInfo.getFieldList().size(); i++)
     {
         fieldsArr[i] = tupleInfo.getFieldList().get(i).getColumnName();
     }           
     declarer.declare(new Fields(fieldsArr));
}   

declareOutputFileds() 決定tuple發送的格式,這樣,Bolt就能用相似的方式編碼 tuple。Spout持續監聽添加到日誌文件的數據,一旦有數據添加,它就讀取並把數據發送給 bolt 處理。

Bolt 實現


Spout 輸出結果將給予Bolt進行更深一步的處理。通過對用例的思考,咱們的topology中須要如圖 3中的兩個Bolt。

info3

圖 3:Spout到Bolt的數據流程

ThresholdCalculatorBolt

Spout將tuple發出,由ThresholdCalculatorBolt接收並進行臨界值處理。在這裏,它將接收好幾項輸入進行檢查;分別是:

臨界值檢查

  • 閾值欄數檢查(拆分紅字段的數目)
  • 閾值數據類型(拆分後字段的類型)
  • 閾值出現的頻數
  • 閾值時間段檢查

列表 4中的類,定義用來保存這些值。

public class ThresholdInfo implements Serializable
{
    private String action;
    private String rule;
    private Object thresholdValue;
    private int thresholdColNumber;
    private Integer timeWindow;
    private int frequencyOfOccurence;
}

基於字段中提供的值,閾值檢查將被在 execute() 方法執行,如列表 5 所示。代碼大部分的功能是解析和檢測到來的值。

列表 5:閾值檢測代碼段

public void execute(Tuple tuple, BasicOutputCollector collector) 
{
    if(tuple!=null)
    {
        List<Object> inputTupleList = (List<Object>) tuple.getValues();
        int thresholdColNum = thresholdInfo.getThresholdColNumber();
        Object thresholdValue = thresholdInfo.getThresholdValue();
        String thresholdDataType = 
            tupleInfo.getFieldList().get(thresholdColNum-1).getColumnType();
        Integer timeWindow = thresholdInfo.getTimeWindow();
        int frequency = thresholdInfo.getFrequencyOfOccurence();
 
        if(thresholdDataType.equalsIgnoreCase("string"))
        {
            String valueToCheck = inputTupleList.get(thresholdColNum-1).toString();
            String frequencyChkOp = thresholdInfo.getAction();
            if(timeWindow!=null)
            {
                long curTime = System.currentTimeMillis();
                long diffInMinutes = (curTime-startTime)/(1000);
                if(diffInMinutes>=timeWindow)
                {
                    if(frequencyChkOp.equals("=="))
                    {
                         if(valueToCheck.equalsIgnoreCase(thresholdValue.toString()))
                         {
                             count.incrementAndGet();
                             if(count.get() > frequency)
                                 splitAndEmit(inputTupleList,collector);
                         }
                    }
                    else if(frequencyChkOp.equals("!="))
                    {
                        if(!valueToCheck.equalsIgnoreCase(thresholdValue.toString()))
                        {
                             count.incrementAndGet();
                             if(count.get() > frequency)
                                 splitAndEmit(inputTupleList,collector);
                         }
                     }
                     else
                         System.out.println("Operator not supported");
                 }
             }
             else
             {
                 if(frequencyChkOp.equals("=="))
                 {
                     if(valueToCheck.equalsIgnoreCase(thresholdValue.toString()))
                     {
                         count.incrementAndGet();
                         if(count.get() > frequency)
                             splitAndEmit(inputTupleList,collector);    
                     }
                 }
                 else if(frequencyChkOp.equals("!="))
                 {
                      if(!valueToCheck.equalsIgnoreCase(thresholdValue.toString()))
                      {
                          count.incrementAndGet();
                          if(count.get() > frequency)
                              splitAndEmit(inputTupleList,collector);   
                      }
                  }
              }
           }
           else if(thresholdDataType.equalsIgnoreCase("int") || 
                   thresholdDataType.equalsIgnoreCase("double") || 
                   thresholdDataType.equalsIgnoreCase("float") || 
                   thresholdDataType.equalsIgnoreCase("long") || 
                   thresholdDataType.equalsIgnoreCase("short"))
           {
               String frequencyChkOp = thresholdInfo.getAction();
               if(timeWindow!=null)
               {
                    long valueToCheck = 
                        Long.parseLong(inputTupleList.get(thresholdColNum-1).toString());
                    long curTime = System.currentTimeMillis();
                    long diffInMinutes = (curTime-startTime)/(1000);
                    System.out.println("Difference in minutes="+diffInMinutes);
                    if(diffInMinutes>=timeWindow)
                    {
                         if(frequencyChkOp.equals("<"))
                         {
                             if(valueToCheck < Double.parseDouble(thresholdValue.toString()))
                             {
                                  count.incrementAndGet();
                                  if(count.get() > frequency)
                                      splitAndEmit(inputTupleList,collector);
                             }
                         }
                         else if(frequencyChkOp.equals(">"))
                         {
                              if(valueToCheck > Double.parseDouble(thresholdValue.toString())) 
                              {
                                  count.incrementAndGet();
                                  if(count.get() > frequency)
                                      splitAndEmit(inputTupleList,collector);
                              }
                          }
                          else if(frequencyChkOp.equals("=="))
                          {
                             if(valueToCheck == Double.parseDouble(thresholdValue.toString()))
                             {
                                 count.incrementAndGet();
                                 if(count.get() > frequency)
                                     splitAndEmit(inputTupleList,collector);
                              }
                          }
                          else if(frequencyChkOp.equals("!="))
                          {
   . . . 
                          }
                      }
                   
             }
     else
          splitAndEmit(null,collector);
     }
     else
     {
          System.err.println("Emitting null in bolt");
          splitAndEmit(null,collector);
     }
}

根據閾值 bolt 發送的 tuple 被髮送到下一個相應的Bolt,在咱們的用例中是 DBWriterBolt

DBWriterBolt

已經處理的tuple必須被持久化,以便於觸發tigger或者未來使用。DBWiterBolt 完成的工做是將 tuple 持久化到數據庫。表的創建是由 prepare() 完成,這也是topology調用的第一個方法。該方法的代碼如列表 6 所示。

列表 6:建立表的代碼

public void prepare( Map StormConf, TopologyContext context ) 
{       
    try
    {
        Class.forName(dbClass);
    } 
    catch (ClassNotFoundException e) 
    {
        System.out.println("Driver not found");
        e.printStackTrace();
    }
 
    try
    {
       connection driverManager.getConnection( 
           "jdbc:mysql://"+databaseIP+":"+databasePort+"/"+databaseName, userName, pwd);
       connection.prepareStatement("DROP TABLE IF EXISTS "+tableName).execute();
 
       StringBuilder createQuery = new StringBuilder(
           "CREATE TABLE IF NOT EXISTS "+tableName+"(");
       for(Field fields : tupleInfo.getFieldList())
       {
           if(fields.getColumnType().equalsIgnoreCase("String"))
               createQuery.append(fields.getColumnName()+" VARCHAR(500),");
           else
               createQuery.append(fields.getColumnName()+" "+fields.getColumnType()+",");
       }
       createQuery.append("thresholdTimeStamp timestamp)");
       connection.prepareStatement(createQuery.toString()).execute();
 
       // Insert Query
       StringBuilder insertQuery = new StringBuilder("INSERT INTO "+tableName+"(");
       String tempCreateQuery = new String();
       for(Field fields : tupleInfo.getFieldList())
       {
            insertQuery.append(fields.getColumnName()+",");
       }
       insertQuery.append("thresholdTimeStamp").append(") values (");
       for(Field fields : tupleInfo.getFieldList())
       {
           insertQuery.append("?,");
       }
 
       insertQuery.append("?)");
       prepStatement = connection.prepareStatement(insertQuery.toString());
    }
    catch (SQLException e) 
    {       
        e.printStackTrace();
    }       
}

數據的插入是分批次完成的。插入的邏輯由 execute() 方法提供,如列表 7 所示。大部分代碼是解析各類不一樣輸入類型。

列表 7:數據插入的代碼部分

public void execute(Tuple tuple, BasicOutputCollector collector) 
{
    batchExecuted=false;
    if(tuple!=null)
    {
       List<Object> inputTupleList = (List<Object>) tuple.getValues();
       int dbIndex=0;
       for(int i=0;i<tupleInfo.getFieldList().size();i++)
       {
           Field field = tupleInfo.getFieldList().get(i);
           try {
               dbIndex = i+1;
               if(field.getColumnType().equalsIgnoreCase("String"))             
                   prepStatement.setString(dbIndex, inputTupleList.get(i).toString());
               else if(field.getColumnType().equalsIgnoreCase("int"))
                   prepStatement.setInt(dbIndex,
                       Integer.parseInt(inputTupleList.get(i).toString()));
               else if(field.getColumnType().equalsIgnoreCase("long"))
                   prepStatement.setLong(dbIndex, 
                       Long.parseLong(inputTupleList.get(i).toString()));
               else if(field.getColumnType().equalsIgnoreCase("float"))
                   prepStatement.setFloat(dbIndex, 
                       Float.parseFloat(inputTupleList.get(i).toString()));
               else if(field.getColumnType().equalsIgnoreCase("double"))
                   prepStatement.setDouble(dbIndex, 
                       Double.parseDouble(inputTupleList.get(i).toString()));
               else if(field.getColumnType().equalsIgnoreCase("short"))
                   prepStatement.setShort(dbIndex, 
                       Short.parseShort(inputTupleList.get(i).toString()));
               else if(field.getColumnType().equalsIgnoreCase("boolean"))
                   prepStatement.setBoolean(dbIndex, 
                       Boolean.parseBoolean(inputTupleList.get(i).toString()));
               else if(field.getColumnType().equalsIgnoreCase("byte"))
                   prepStatement.setByte(dbIndex, 
                       Byte.parseByte(inputTupleList.get(i).toString()));
               else if(field.getColumnType().equalsIgnoreCase("Date"))
               {
                  Date dateToAdd=null;
                  if (!(inputTupleList.get(i) instanceof Date))  
                  {  
                       DateFormat df = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");
                       try
                       {
                           dateToAdd = df.parse(inputTupleList.get(i).toString());
                       }
                       catch (ParseException e) 
                       {
                           System.err.println("Data type not valid");
                       }
                   }  
                   else
                   {
            dateToAdd = (Date)inputTupleList.get(i);
            java.sql.Date sqlDate = new java.sql.Date(dateToAdd.getTime());
            prepStatement.setDate(dbIndex, sqlDate);
            }   
            } 
        catch (SQLException e) 
        {
             e.printStackTrace();
        }
    }
    Date now = new Date();          
    try
    {
        prepStatement.setTimestamp(dbIndex+1, new java.sql.Timestamp(now.getTime()));
        prepStatement.addBatch();
        counter.incrementAndGet();
        if (counter.get()== batchSize) 
        executeBatch();
    } 
    catch (SQLException e1) 
    {
        e1.printStackTrace();
    }           
   }
   else
   {
        long curTime = System.currentTimeMillis();
       long diffInSeconds = (curTime-startTime)/(60*1000);
       if(counter.get()<batchSize && diffInSeconds>batchTimeWindowInSeconds)
       {
            try {
                executeBatch();
                startTime = System.currentTimeMillis();
            }
            catch (SQLException e) {
                 e.printStackTrace();
            }
       }
   }
}
 
public void executeBatch() throws SQLException
{
    batchExecuted=true;
    prepStatement.executeBatch();
    counter = new AtomicInteger(0);
}

一旦Spout和Bolt準備就緒(等待被執行),topology生成器將會創建topology並執行。下面就來看一下執行步驟。

在本地集羣上運行和測試topology

  • 經過TopologyBuilder創建topology。
  • 使用Storm Submitter,將topology遞交給集羣。以topology的名字、配置和topology的對象做爲參數。
  • 提交topology。

列表 8:創建和執行topology

public class StormMain
{
     public static void main(String[] args) throws AlreadyAliveException, 
                                                   InvalidTopologyException, 
                                                   InterruptedException 
     {
          ParallelFileSpout parallelFileSpout = new ParallelFileSpout();
          ThresholdBolt thresholdBolt = new ThresholdBolt();
          DBWriterBolt dbWriterBolt = new DBWriterBolt();
          TopologyBuilder builder = new TopologyBuilder();
          builder.setSpout("spout", parallelFileSpout, 1);
          builder.setBolt("thresholdBolt", thresholdBolt,1).shuffleGrouping("spout");
          builder.setBolt("dbWriterBolt",dbWriterBolt,1).shuffleGrouping("thresholdBolt");
          if(this.argsMain!=null && this.argsMain.length > 0) 
          {
              conf.setNumWorkers(1);
              StormSubmitter.submitTopology( 
                   this.argsMain[0], conf, builder.createTopology());
          }
          else
          {    
              Config conf = new Config();
              conf.setDebug(true);
              conf.setMaxTaskParallelism(3);
              LocalCluster cluster = new LocalCluster();
              cluster.submitTopology(
              "Threshold_Test", conf, builder.createTopology());
          }
     }
}

建立 topology 後,提交到本地集羣。一旦topology被提交,除非被 kill 或者由於修改而關閉集羣,不然它將一直運行。這也是Storm一大優點之一。

本例展現創建和使用Storm,一旦你理解 topology、spout和bolt這些基本概念,將會很容易。若是你處理大數據,又不想用 Hadoop,那麼使用 Storm 是一個很好的選擇。

Storm常見問題解答


  • 1、我有一個數據文件,或者我有一個系統裏面有數據,怎麼導入storm作計算?

你須要實現一個Spout,Spout負責將數據emit到storm系統裏,交給bolts計算。怎麼實現spout能夠參考官方的kestrel spout實現:

https://github.com/nathanmarz/storm-kestrel

若是你的數據源不支持事務性消費,那麼就沒法獲得storm提供的可靠處理的保證,也不必實現ISpout接口中的ack和fail方法。

  • 2、Storm爲了保證tuple的可靠處理,須要保存tuple信息,這會不會致使內存OOM?

Storm爲了保證tuple的可靠處理,acker會保存該節點建立的tuple id的xor值,這稱爲ack value,那麼每ack一次,就將tuple id和ack value作異或(xor)。當全部產生的tuple都被ack的時候, ack value必定爲0。這是個很簡單的策略,對於每個tuple也只要佔用約20個字節的內存。對於100萬tuple,也才20M左右。關於可靠處理看這個:

https://github.com/nathanmarz/storm/wiki/Guaranteeing-message-processing

  • 3、Storm計算後的結果保存在哪裏?能夠保存在外部存儲嗎?

Storm不處理計算結果的保存,這是應用代碼須要負責的事情,若是數據不大,你能夠簡單地保存在內存裏,也能夠每次都更新數據庫,也能夠採用NoSQL存儲。storm並無像s4那樣提供一個Persist API,根據時間或者容量來作存儲輸出。這部分事情徹底交給用戶。

數據存儲以後的展示,也是你須要本身處理的,storm UI只提供對topology的監控和統計。

  • 4、Storm怎麼處理重複的tuple?

由於Storm要保證tuple的可靠處理,當tuple處理失敗或者超時的時候,spout會fail並從新發送該tuple,那麼就會有tuple重複計算的問題。這個問題是很難解決的,storm也沒有提供機制幫助你解決。一些可行的策略:

(1)不處理,這也算是種策略。由於實時計算一般並不要求很高的精確度,後續的批處理計算會更正實時計算的偏差。

(2)使用第三方集中存儲來過濾,好比利用mysql,memcached或者redis根據邏輯主鍵來去重。

(3)使用bloom filter作過濾,簡單高效。

  • 5、Storm的動態增刪節點

我在storm和s4裏比較裏談到的動態增刪節點,是指storm能夠動態地添加和減小supervisor節點。對於減小節點來講,被移除的supervisor上的worker會被nimbus從新負載均衡到其餘supervisor節點上。在storm 0.6.1之前的版本,增長supervisor節點不會影響現有的topology,也就是現有的topology不會從新負載均衡到新的節點上,在擴展集羣的時候很不方便,須要從新提交topology。所以我在storm的郵件列表裏提了這個問題,storm的開發者nathanmarz建立了一個issue 54並在0.6.1提供了rebalance命令來讓正在運行的topology從新負載均衡,具體見:

https://github.com/nathanmarz/storm/issues/54

和0.6.1的變動:

http://groups.google.com/group/storm-user/browse_thread/thread/24a8fce0b2e53246

storm並不提供機制來動態調整worker和task數目。

  • 6、Storm UI裏spout統計的complete latency的具體含義是什麼?爲何emit的數目會是acked的兩倍?

這個事實上是storm郵件列表裏的一個問題。Storm做者marz的解答:

The complete latency is the time  from the spout emitting a tuple to that tuple being acked on the spout. So it tracks the time  for the whole tuple tree to be processed.

If you dive into the spout component in the UI, you'll see that a lot of the emitted/transferred is on the __ack* stream.  This is the spout communicating with the ackers which take care of tracking the tuple trees.

簡單地說,complete latency表示了tuple從emit到被acked通過的時間,能夠認爲是tuple以及該tuple的後續子孫(造成一棵樹)整個處理時間。其次spout的emit和transfered還統計了spout和acker之間內部的通訊信息,好比對於可靠處理的spout來講,會在emit的時候同時發送一個_ack_init給acker,記錄tuple id到task id的映射,以便ack的時候能找到正確的acker task。

其餘開源的大數據解決方案


自 Google 在 2004 年推出 MapReduce 範式以來,已誕生了多個使用原始 MapReduce 範式(或擁有該範式的質量)的解決方案。Google 對 MapReduce 的最初應用是創建萬維網的索引。儘管此應用程序仍然很流行,但這個簡單模型解決的問題也正在增多。

表 1 提供了一個可用開源大數據解決方案的列表,包括傳統的批處理和流式處理應用程序。在將 Storm 引入開源以前將近一年的時間裏,Yahoo! 的 S4 分佈式流計算平臺已向 Apache 開源。S4 於 2010 年 10 月發佈,它提供了一個高性能計算 (HPC) 平臺,嚮應用程序開發人員隱藏了並行處理的複雜性。S4 實現了一個可擴展的、分散化的集羣架構,並歸入了部分容錯功能。

表 1. 開源大數據解決方案

解決方案

開發商

類型

描述

Storm

Twitter

流式處理

Twitter 的新流式大數據分析解決方案

S4

Yahoo!

流式處理

來自 Yahoo! 的分佈式流計算平臺

Hadoop

Apache

批處理

MapReduce 範式的第一個開源實現

Spark

UC Berkeley AMPLab

批處理

支持內存中數據集和恢復能力的最新分析平臺

Disco

Nokia

批處理

Nokia 的分佈式 MapReduce 框架

HPCC

LexisNexis

批處理

HPC 大數據集羣

參考資料


相關文章
相關標籤/搜索