流式處理框架storm淺析(下篇)

本文來自網易雲社區html

做者:汪建偉java


  • 舉個栗子bash

1 實現的目標 app

設計一個系統,來實現對一個文本里面的單詞出現的頻率進行統計。負載均衡

2 設計Topology結構: 框架

這是一個簡單的例子,topology也很是簡單。整個topology以下:maven

整個topology分爲三個部分:ide

WordReader:數據源,負責發送sentenceui

WordNormalizer:負責將sentence切分this

Wordcounter:負責對單詞的頻率進行累加

3 代碼實現

1. 構建maven環境,添加storm依賴

<repositories>  
	      <!-- Repository where we can found the storm dependencies  -->  
	      <repository>  
	          <id>clojars.org</id>  
	          <url>http://clojars.org/repo</url>  
	      </repository>  
	</repositories>  
	<dependencies>  
	      <dependency>   
	        <groupId>storm</groupId>  
	        <artifactId>storm</artifactId>  
	        <version>0.7.1</version>  
	     </dependency>  
	</dependencies>複製代碼

2. 定義Topology

public class TopologyMain {  
	    public static void main(String[] args) throws InterruptedException {  
	           
	        //Topology definition  
	        TopologyBuilder builder = new TopologyBuilder();  
	        builder.setSpout("word-reader",new WordReader());  
	        builder.setBolt("word-normalizer", new WordNormalizer())  
	            .shuffleGrouping("word-reader");  
	        builder.setBolt("word-counter", new WordCounter(),1)  
	            .fieldsGrouping("word-normalizer", new Fields("word"));  
	          
	        //Configuration  
	        Config conf = new Config();  
	        conf.put("wordsFile", args[0]);  
	        conf.setDebug(false);  
	        //Topology run  
	        conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);  
	        LocalCluster cluster = new LocalCluster();  
	        cluster.submitTopology("Getting-Started-Toplogie", conf, builder.createTopology());  
	        Thread.sleep(1000);  
	        cluster.shutdown();  
	    }  
	}複製代碼

3. 實現WordReader Spout

public class WordReader extends BaseRichSpout {  
	  
	    private SpoutOutputCollector collector;  
	    private FileReader fileReader;  
	    private boolean completed = false;  
	    public void ack(Object msgId) {  
	        System.out.println("OK:"+msgId);  
	    }  
	    public void close() {}  
	    public void fail(Object msgId) {  
	        System.out.println("FAIL:"+msgId);  
	    }  
	  
	    public void nextTuple() {  
	        if(completed){  
	            try {  
	                Thread.sleep(1000);  
	            } catch (InterruptedException e) {  
	            }  
	            return;  
	        }  
	        String str;  
	        BufferedReader reader = new BufferedReader(fileReader);  
	        try{  
	            while((str = reader.readLine()) != null){  
	                this.collector.emit(new Values(str),str);  
	            }  
	        }catch(Exception e){  
	            throw new RuntimeException("Error reading tuple",e);  
	        }finally{  
	            completed = true;  
	        }  
	    }  
	  
	    public void open(Map conf, TopologyContext context,  
	                     SpoutOutputCollector collector) {  
	        try {  
	            this.fileReader = new FileReader(conf.get("wordsFile").toString());  
	        } catch (FileNotFoundException e) {  
	            throw new RuntimeException("Error reading file ["+conf.get("wordFile")+"]");  
	        }  
	        this.collector = collector;  
	    }  
	  
	    public void declareOutputFields(OutputFieldsDeclarer declarer) {  
	        declarer.declare(new Fields("line"));  
	    }  
	}複製代碼

第一個被調用的spout方法都是public void open(Map conf, TopologyContext context, SpoutOutputCollector collector)。它接收以下參數:配置對象,在定義topology對象是建立;TopologyContext對象,包含全部拓撲數據;還有SpoutOutputCollector對象,它能讓咱們發佈交給bolts處理的數據。

4. 實現WordNormalizer bolt


public class WordNormalizer extends BaseBasicBolt {  
	  
	    public void cleanup() {}  
	  
	    public void execute(Tuple input, BasicOutputCollector collector) {  
	        String sentence = input.getString(0);  
	        String[] words = sentence.split(" ");  
	        for(String word : words){  
	            word = word.trim();  
	            if(!word.isEmpty()){  
	                word = word.toLowerCase();  
	                collector.emit(new Values(word));  
	            }  
	        }  
	    }  
	      
	    public void declareOutputFields(OutputFieldsDeclarer declarer) {  
	        declarer.declare(new Fields("word"));  
	    }  
	}複製代碼

bolt最重要的方法是void execute(Tuple input),每次接收到元組時都會被調用一次,還會再發布若干個元組。

5. 實現WordCounter bolt

public class WordCounter extends BaseBasicBolt {  
	  
	    Integer id;  
	    String name;  
	    Map counters;  
	  
	    @Override  
	    public void cleanup() {  
	        System.out.println("-- Word Counter ["+name+"-"+id+"] --");  
	        for(Map.Entry entry : counters.entrySet()){  
	            System.out.println(entry.getKey()+": "+entry.getValue());  
	        }  
	    }  
	  
	    @Override  
	    public void prepare(Map stormConf, TopologyContext context) {  
	        this.counters = new HashMap();  
	        this.name = context.getThisComponentId();  
	        this.id = context.getThisTaskId();  
	    }  
	  
	    @Override  
	    public void declareOutputFields(OutputFieldsDeclarer declarer) {}  
	  
	    @Override  
	    public void execute(Tuple input, BasicOutputCollector collector) {  
	        String str = input.getString(0);  
	        if(!counters.containsKey(str)){  
	            counters.put(str, 1);  
	        }else{  
	            Integer c = counters.get(str) + 1;  
	            counters.put(str, c);  
	        }  
	    }  
	}複製代碼

6. 使用本地模式運行Topology

在這個目錄下面建立一個文件,/src/main/resources/words.txt,一個單詞一行,而後用下面的命令運行這個拓撲:mvn exec:java -Dexec.main -Dexec.args=」src/main/resources/words.txt」。

若是你的words.txt文件有以下內容: Storm test are great is an Storm simple application but very powerful really Storm is great 你應該會在日誌中看到相似下面的內容: is: 2 application: 1 but: 1 great: 1 test: 1 simple: 1 Storm: 3 really: 1 are: 1 great: 1 an: 1 powerful: 1 very: 1 在這個例子中,每類節點只有一個實例。

  • 附-Storm記錄級容錯的基本原理

首先來看一下什麼叫作記錄級容錯?storm容許用戶在spout中發射一個新的源tuple時爲其指定一個message id, 這個message id能夠是任意的object對象。多個源tuple能夠共用一個message id,表示這多個源 tuple對用戶來講是同一個消息單元。storm中記錄級容錯的意思是說,storm會告知用戶每個消息單元是否在指定時間內被徹底處理了。那什麼叫作徹底處理呢,就是該message id綁定的源tuple及由該源tuple後續生成的tuple通過了topology中每個應該到達的bolt的處理。舉個例子。在圖4-1中,在spout由message 1綁定的tuple1和tuple2通過了bolt1和bolt2的處理生成兩個新的tuple,並最終都流向了bolt3。當這個過程完成處理完時,稱message 1被徹底處理了。

在storm的topology中有一個系統級組件,叫作acker。這個acker的任務就是追蹤從spout中流出來的每個message id綁定的若干tuple的處理路徑,若是在用戶設置的最大超時時間內這些tuple沒有被徹底處理,那麼acker就會告知spout該消息處理失敗了,相反則會告知spout該消息處理成功了。在剛纔的描述中,咱們提到了」記錄tuple的處理路徑」,若是曾經嘗試過這麼作的同窗能夠仔細地思考一下這件事的複雜程度。可是storm中倒是使用了一種很是巧妙的方法作到了。在說明這個方法以前,咱們來複習一個數學定理。

A xor A = 0.

A xor B…xor B xor A = 0,其中每個操做數出現且僅出現兩次。

storm中使用的巧妙方法就是基於這個定理。具體過程是這樣的:在spout中系統會爲用戶指定的message id生成一個對應的64位整數,做爲一個root id。root id會傳遞給acker及後續的bolt做爲該消息單元的惟一標識。同時不管是spout仍是bolt每次新生成一個tuple的時候,都會賦予該tuple一個64位的整數的id。Spout發射完某個message id對應的源tuple以後,會告知acker本身發射的root id及生成的那些源tuple的id。而bolt呢,每次接受到一個輸入tuple處理完以後,也會告知acker本身處理的輸入tuple的id及新生成的那些tuple的id。Acker只須要對這些id作一個簡單的異或運算,就能判斷出該root id對應的消息單元是否處理完成了。下面經過一個圖示來講明這個過程。


上圖 spout中綁定message 1生成了兩個源tuple,id分別是0010和1011.


上圖 bolt1處理tuple 0010時生成了一個新的tuple,id爲0110.


上圖 bolt2處理tuple 1011時生成了一個新的tuple,id爲0111.

上圖 bolt3中接收到tuple 0110和tuple 0111,沒有生成新的tuple.

容錯過程存在一個可能出錯的地方,那就是,若是生成的tuple id並非徹底各異的,acker可能會在消息單元徹底處理完成以前就錯誤的計算爲0。這個錯誤在理論上的確是存在的,可是在實際中其機率是極低極低的,徹底能夠忽略。



相關閱讀:流式處理框架storm淺析(上篇)

網易雲免費體驗館,0成本體驗20+款雲產品!

更多網易研發、產品、運營經驗分享請訪問網易雲社區


相關文章:
【推薦】 3分鐘帶你瞭解負載均衡服務
【推薦】 非對稱加密與證書(上篇)
【推薦】 手遊破解手段介紹及易盾保護方案

相關文章
相關標籤/搜索