上一篇【 storm開發環境搭建 】 博文鏈接:https://my.oschina.net/u/2342969/blog/878765java
本篇會深刻理解Streams,歡迎同志(此同志非彼同志)們經過私信/評論等方式共同窗習瞭解.git
Streams是storm中一個核心的概念,它是在分佈式並行處理和建立的無限序列元組,Streams經過給流元組中字段命名來定義,默認狀況下,元組能夠包含整型,長整型,短整型,字節,字符串,布爾型,雙精度浮點型,單精度浮點型,字節數組,也能夠自定義序列化類型。github
下面共同窗習一下 Tuple(元組)、OutputFieldsDeclarer、 元組中動態類型以及序列器apache
Tuple是storm中主要的數據結構,是storm中使用的基本單元、元組。元組是一個值列表,其中,每一個值能夠是任意類型。動態類型的元組不須要被定義,元組有相似 getInteger 和getString的幫助方法,無須手動轉換結果類型。數組
storm須要知道如何序列化全部的值,默認狀況下,storm知道如何序列化簡單類型,好比字符串、字節數組,若是想使用複雜的對象,則須要註冊實現一個該類型的序列器。安全
在storm中tuple接口集成了Iuple接口 ,實現類爲TupleImpl。數據結構
tuple的數據結構相似於map的鍵值對,其中鍵定義在OutputFieldsDeclarer方法的Fields對象中。分佈式
經過如下例子,能夠幫助你們更好的理解:ide
//例2-2 src/main/java/bolts/WordNormalizer.java package bolts; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.IRichBolt; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; import java.util.ArrayList; import java.util.List; import java.util.Map; public class WordNormalizer implements IRichBolt { private OutputCollector collector; public void cleanup(){} /** * *bolt*從單詞文件接收到文本行,並標準化它。 * 文本行會所有轉化成小寫,並切分它,從中獲得全部單詞。 */ public void execute(Tuple input){ String sentence = input.getString(0); String[] words = sentence.split(" "); for(String word : words){ word = word.trim(); if(!word.isEmpty()){ word=word.toLowerCase(); //發佈這個單詞 List a = new ArrayList(); a.add(input); collector.emit(a,new Values(word)); } } //對元組作出應答 collector.ack(input); } public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector=collector; } /** * 這個*bolt*只會發佈「word」域 */ public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); } @Override public Map<String, Object> getComponentConfiguration() { return null; } }
建立了發送一個字段(「word」)的Bolt,此時tuple的鍵爲「word」,值爲execute方法中發送的Values對象。oop
本次介紹的storm0.6.0(含)及後續版本中如何使用序列器,storm在0.6.0以前使用不一樣的序列器,這裏就不介紹老版本的。
tuple能夠由任意類型組合而成,由於storm是分佈式的,因此它須要知道在task間如何序列化和反序列化數據的。
storm使用Kryo進行序列化,Kryo是java開發中一個快速靈活序列器。默認狀況下,storm能夠序列化基礎類型,好比字符串,字節,數組,ArrayList, HashMap, HashSet和 Clojure 集合類型,若是須要使用其餘類型,須要自定義序列器。
在元組中沒有對應類型的字段。在字段中放入對象和storm動態序列化數據,獲得序列化接口前,咱們瞭解一下爲何storm元組是動態類型。
增長靜態類型會大大增長storm API的複雜性, Hadoop 中,使用靜態類型的鍵和值,在使用是須要大量的註釋,對於hadoop API使用以及類型安全是一個不值得的負擔,動態類型使用起來會很簡單。
此外,storm 元組沒有一個合理的方式使用靜態類型,假如一個bolt訂閱了多個流,那些流中元組會有不一樣類型傳輸在字段中。當一個bolt在execute方法接收元組,能夠接收任何流的元組,就會有不少類型的元組。這樣在一個bolt中,就須要爲每一個類型的tuple生命不一樣的方法訂閱,顯然,storm選擇了簡單方式,使用動態類型。
最後,另外使用動態類型的理由是storm能夠直接被 Clojure 和 JRuby 這類動態類型的語言使用。
綜上所述,storm 使用Kryo 做爲序列器。爲了實現自定義序列器,就須要用Kryo註冊一個新的序列器,
在Kryo的Github主頁: https://github.com/EsotericSoftware/kryo,有更加詳細的介紹,這裏僅作一下簡單介紹。
增長自定義序列器,須要在拓撲配置中添加「 topology.kryo.register 」屬性,它能夠配置一組序列器列表,每一個序列器能夠選擇一下兩種方式之一:
例子以下:
topology.kryo.register: - com.mycompany.CustomType1 - com.mycompany.CustomType2: com.mycompany.serializer.CustomType2Serializer - com.mycompany.CustomType3
「com.mycompany.CustomType1「 和「com.mycompany.CustomType3「 使用「FieldsSerializer」序列化,反之,「com.mycompany.CustomType2「會使用」com.mycompany.serializer.CustomType2Serializer「 序列化。
storm提供了在拓撲配置中註冊序列器的助手,Config類調用registerSerialization方法能夠將一個序列器放入配置中。其中有一個高級設置「Config.TOPOLOGY_SKIP_MISSING_KRYO_REGISTRATIONS」,若是把它設置爲true,storm將會忽略在類路徑無有效代碼的序列器,不然,storm找不到序列器,將會排除異常。當在集羣中運行了不少使用了不一樣序列器的拓撲,想經過「storm.yaml」文件爲全部拓撲配置好序列器,它就很是有用的。
當storm遇到一個沒有序列器註冊的類型,它可能會使用java序列器,若是此類型也沒法被java序列器序列化,storm就會報出一個錯誤。
須要注意的是,不管在CPU消耗方面仍是序列化對象的大小, java序列器都是很是耗費資源的。故在生產上使用拓撲的話,強烈建議使用自定義序列器。java序列器在那裏,是爲了容易設計新的原型。
經過設置「Config.TOPOLOGY_FALL_BACK_ON_JAVA_SERIALIZATION」項爲false,能夠關閉java序列器。
Storm 0.7.0 能夠設置特殊組件配置,固然,若是一個組件定義一個序列化,這個序列器須要對其餘bolt可用,不然其餘bolt將沒法接收那個組件的消息。
一個拓撲被提交,一組序列器會被拓撲選擇爲全部組件發送消息使用,這是經過特殊組件序列化註冊信息與普通組件序列化註冊信息合併實現。當爲同一個類註冊了多個序列化時,序列器會任意選擇一個。
若是兩個特定組件序列器有衝突,則會強制一個特定的類作序列器,只需在拓撲特定配置定義想要的序列器,拓撲特定配置優先於序列器註冊的特定組件。