1、原理介紹html
本文內容參考:https://github.com/apache/storm/tree/master/external/storm-kafka#brokerhostsjava
(一)使用storm-kafka的關鍵步驟git
一、建立ZkHostsgithub
當storm從kafka中讀取某個topic的消息時,須要知道這個topic有多少個分區,以及這些分區放在哪一個kafka節點(broker)上,docker
ZkHosts就是用於這個功能。apache
關於kafka信息在zk中的內容請參考:http://blog.csdn.net/jinhong_lu/article/details/46653087api
建立zkHosts有2種形式:app
public ZkHosts(String brokerZkStr, String brokerZkPath) 框架
public ZkHosts(String brokerZkStr)less
默認狀況下,zk信息被放到/brokers中,此時可使用第2種方式:
new ZkHosts("123.58.172.117:2181,123.58.172.98:2181,123.58.172.111:2181,123.58.172.114:2181,123.58.172.116:2181」)
若zk信息被放置在/kafka/brokers中,則可使用:
publicZkHosts("123.58.172.117:2181,123.58.172.98:2181,123.58.172.111:2181,123.58.172.114:2181,123.58.172.116:2181",「/kafka")
或者直接:
new ZkHosts("123.58.172.117:2181,123.58.172.98:2181,123.58.172.111:2181,123.58.172.114:2181,123.58.172.116:2181/kafka」)
默認狀況下,每60秒去讀取一次kafka的分區信息,能夠經過修改host.refreshFreqSecs來設置。
除了使用ZkHosts來讀取分析信息外,storm-kafka還提供了一種靜態指定的方法,如:
由此能夠看出,ZkHosts完成的功能就是指定了從哪一個kafka節點讀取某個topic的哪一個分區。
二、建立KafkaConfig
(1)有2種方式建立KafkaConfig
public KafkaConfig(BrokerHosts hosts, String topic)
public KafkaConfig(BrokerHosts hosts, String topic, String clientId)
BrokerHosts就是上面建立的實例,topic就是要訂閱的topic名稱,clientId用於指定存放當前topic consumer的offset的位置,這個id 應該是惟一的,不然多個拓撲會引發衝突。
事實上,trident的offset並不保存在這個位置,見下面介紹。
真正使用時,有2種擴展,分別用於通常的storm以及trident。
(2)core storm
Spoutconfig is an extension of KafkaConfig that supports additional fields with ZooKeeper connection info and for controlling behavior specific to KafkaSpout. The Zkroot will be used as root to store your consumer's offset. The id should uniquely identify your spout.
public SpoutConfig(BrokerHosts hosts, String topic, String zkRoot, String id);
public SpoutConfig(BrokerHosts hosts, String topic, String id);
In addition to these parameters, SpoutConfig contains the following fields that control how KafkaSpout behaves:
KafkaSpout 只接受 SpoutConfig做爲參數
(3)TridentKafkaConfig,TridentKafkaEmitter只接受TridentKafkaConfig使用參數
trident消費kafka的offset位置是在創建拓撲中指定,如:
topology.newStream(test, kafkaSpout).
則offset的位置爲:
/transactional/test/coordinator/currtx
(4)KafkaConfig的一些默認參數
能夠經過如下方式修改:
kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
三、設置MultiScheme
MultiScheme用於指定若是處理從kafka中讀取到的字節,同時它用於控制輸出字段名稱。
public Iterable<List<Object>> deserialize(byte[] ser);
public Fields getOutputFields();
默認狀況下,RawMultiScheme讀取一個字段並返回一個字節,而發射的字段名稱爲bytes。
能夠經過SchemeAsMultiScheme和 KeyValueSchemeAsMultiScheme改變這種默認行爲:
kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
上面的語句指定了將字節轉化爲字符。
同時創建拓撲時:
topology.newStream(「test", kafkaSpout).
each(new Fields("str"), new FilterFunction(),new Fields("word」))….
會指定發射的字段名稱爲str。
四、建立Spout
(1)core storm
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
(2)trident
OpaqueTridentKafkaSpout kafkaSpout = new OpaqueTridentKafkaSpout(kafkaConfig);
五、創建拓撲:
(1)core-storm
builder.setSpout("kafka-reader", new KafkaSpout(spoutConf), 12);
kafka-reader指定了spout的名稱,12指定了並行度。
(2)trident
topology.newStream(「test", kafkaSpout).
each(new Fields("str"), new FilterFunction(),new Fields("word」))….
test指定了放置offset的位置,也就是txid的位置。
str指定了spout發射字段的名稱。
完整示例:
Core Spout
Trident Spout
(二)當拓撲出錯時,如何從上一次的kafka位置繼續處理消息
一、KafkaConfig有一個配置項爲KafkaConfig.startOffsetTime,它用於指定拓撲從哪一個位置上開始處理消息,可取的值有3個:
(1)kafka.api.OffsetRequest.EarliestTime(): 從最先的消息開始
(2)kafka.api.OffsetRequest.LatestTime(): 從最新的消息開始,即從隊列隊伍最末端開始。
(3)根據時間點: 能夠參閱 How do I accurately get offsets of messages for a certain timestamp using OffsetRequest? 的實現原理。
How do I accurately get offsets of messages for a certain timestamp using OffsetRequest?
Kafka allows querying offsets of messages by time and it does so at segment granularity. The timestamp parameter is the unix timestamp and querying the offset by timestamp returns the latest possible offset of the message that is appended no later than the given timestamp. There are 2 special values of the timestamp - latest and earliest. For any other value of the unix timestamp, Kafka will get the starting offset of the log segment that is created no later than the given timestamp. Due to this, and since the offset request is served only at segment granularity, the offset fetch request returns less accurate results for larger segment sizes.
For more accurate results, you may configure the log segment size based on time (log.roll.ms) instead of size (log.segment.bytes). However care should be taken since doing so might increase the number of file handlers due to frequent log segment rolling.
二、因爲運行拓撲時,指定了offset在zk中保存的位置,當出現錯誤時,能夠找出offset
當從新部署拓撲時,必須保證offset的保存位置不變,它才能正確的讀取到offset。
(1)對於core storm,就是
SpoutConfig spoutConf = new SpoutConfig(brokerHosts, topic, zkRoot, id);
後2個參數不能變化
(2)對於trident而言,就是
topology.newStream(「test", kafkaSpout).
第1個參數不能變化。
三、也就是說只要拓撲運行過一次KafkaConfig.startOffsetTime,以後從新部署時都可從offset中開始。
再看看這2個參數
public boolean forceFromStart = false;
public long startOffsetTime = kafka.api.OffsetRequest.EarliestTime();
若是將forceFromStart(舊版本是ignoreZkOffsets)設置爲true,則每次拓撲從新啓動時,都會從開頭讀取消息。
若是爲false,則:
第一次啓動,從開頭讀取,以後的重啓均是從offset中讀取。
通常使用時,將數值設置爲以上2個便可。
(三)結果寫回kafka
若是想把結果寫回kafka,並保證事務性,可使用 storm.kafka.trident.TridentState, storm.kafka.trident.TridentStateFactory and storm.kafka.trident.TridentKafkaUpdater.
Writing to Kafka as part of your topology
You can create an instance of storm.kafka.bolt.KafkaBolt and attach it as a component to your topology or if you are using trident you can use storm.kafka.trident.TridentState, storm.kafka.trident.TridentStateFactory and storm.kafka.trident.TridentKafkaUpdater.
You need to provide implementation of following 2 interfaces
TupleToKafkaMapper and TridentTupleToKafkaMapper
These interfaces have 2 methods defined:
K getKeyFromTuple(Tuple/TridentTuple tuple);
V getMessageFromTuple(Tuple/TridentTuple tuple);
as the name suggests these methods are called to map a tuple to kafka key and kafka message. If you just want one field as key and one field as value then you can use the provided FieldNameBasedTupleToKafkaMapper.java implementation. In the KafkaBolt, the implementation always looks for a field with field name "key" and "message" if you use the default constructor to construct FieldNameBasedTupleToKafkaMapper for backward compatibility reasons. Alternatively you could also specify a different key and message field by using the non default constructor. In the TridentKafkaState you must specify what is the field name for key and message as there is no default constructor. These should be specified while constructing and instance of FieldNameBasedTupleToKafkaMapper.
KafkaTopicSelector and trident KafkaTopicSelector
This interface has only one method
public interface KafkaTopicSelector {
String getTopics(Tuple/TridentTuple tuple);
}
The implementation of this interface should return topic to which the tuple's key/message mapping needs to be published You can return a null and the message will be ignored. If you have one static topic name then you can use DefaultTopicSelector.java and set the name of the topic in the constructor.
Specifying kafka producer properties
You can provide all the produce properties , see http://kafka.apache.org/documentation.html#producerconfigs section "Important configuration properties for the producer", in your storm topology config by setting the properties map with key kafka.broker.properties.
附帶2個官方的示例
For the bolt :
For Trident:
2、示例介紹
(一)簡介
一、本項目完整代碼請見https://github.com/jinhong-lu/stormkafkademo/tree/master/src/main/java/org/jinhong/demo/storm_kafka/trident。
二、本項目主要完成如下功能:
(1)從kafka中讀取一個topic的消息,而後根據空格拆分單詞,最後統計數據後寫入一個HazelCastState(一個分佈式的內存存儲框架)。
(2)經過DRPC從上述的HazelCastState中讀取結果,並將結果輸出。
三、代碼可分爲3部分:
(1)單詞拆分
(2)定義拓撲行爲
(3)state定義
如下分爲三部分分別介紹。
(二)單詞拆分
原理很簡單,就是經過空格將單詞進行拆分。
這裏的wordsplit是一個function,它繼承自BaseFunction,最後,它將拆分出來的單詞逐個emit出去。
(三)定義拓撲行爲
一、定義kafka的相關配置
(1)建立一個topo。
(2)首先定義一個輸入流,其中第一個參數定義了zk中放置這個topo元信息的信息,通常是/transactional/kafka
(3)對每一個輸入的消息進行拆分:首先它的輸入是字段名稱爲str的消息,而後通過WordSplit這個Function處理,最後,以字段名稱word發送出去
(4)將結果根據word字段的值進行分組,就是說word值相同的放在一塊兒。
(5)將分組的結果分別count一下,而後以字段名稱aggregates_words寫入HazelCastStateFactory定義的state中,關於state請見下一部分的介紹。
三、從分佈式內存中讀取結果並進行輸出
(1)第三行定義了使用drpc須要處理的內容
(2)查詢分佈式內存中的內容,查詢字段爲word,而後以字段名count發送出去。
(3)將不須要統計的過濾掉。
(4)將結果進行聚合。
四、主函數
三個參數的含義爲:
/* args[0]:kafkazk,如:192.168.172.98:2181,192.168.172.111:2181,192.168.172.114:2181,192.168.172.116:2181,192.168.172.117:2181/kafka
* args[1]:topo名稱
* args[2]:niubus節點,如,192.168.172.98
*/
當參數數據大於1時,將拓撲提交到集羣中,不然提交到本地。提交拓撲到集羣的比較直觀,下面鄭重介紹一下drpc的查詢。
(1)首先定義一個本地的drpc對象,以及一個本地storm集羣。
(2)而後將拓撲羣提交到本地集羣。
(3)最後,使用drpuc不停的循環查詢統計結果並輸出。
注意上面的拓撲定義了2個流,第一個流用於接收kafka消息,而後拆分統計後寫入內存,第二個流則接受drpc的輸入,將drpc的輸入拆分後,再統計須要查詢的每一個單詞的統計結果。如在本例中,須要顯示單詞the的數量。
在本例中,drpc和kafka沒有本質的區別,它們都是一個用於向storm發送消息的集羣,只是輸入數據的方式有些不一樣,kafka經過spout輸入,drpc則直接經過execute()進行輸入。
運行方式:
方式一:直接在eclipse右鍵運行,參數只填一個,如123.58.172.98:2181,123.58.172.111:2181,123.58.172.114:2181,123.58.172.116:2181,123.58.172.117:2181/kafka。只要保證kafka集羣中有對應的topic,則會獲得如下輸出:
Word count: [[2]]
Word count: [[5]]
Word count: [[10]]
Word count: [[17]]
Word count: [[28]]
固然,統計結果根據輸入kafka的內容而不一樣。
(四)state定義
在定義拓撲的時候,最終的wordcount結果寫在了HazelCastState中:
persistentAggregate(new HazelCastStateFactory(),new Count(), newFields("aggregates_words"))
下面咱們分析一下如何使用state來保存topo的處理結果,或者是中間處理結果。
注意,使用state除了能夠保存最終的結果輸出,以保證事務型、透明事務型之外,還常常用於保存中間結果。好比blueprint第3章的一個例子中,用於統計疾病的發生數量,若是超過預警值,則向外發信息。若是統計結果成功,但向外發送信息失敗,則spout會重發數據,致使統計結果有誤,所以,此時能夠經過state將結果保存下來。
一、Factory類
內容很簡單,就是返回一個state,它也是三個state相關的類中惟一對外的接口。
二、Handler類
使用單例模式返回一個map。
三、State類
真正處理業務邏輯的類。主要的方法有mutiPut和mutiGet,用於將結果放入state與取出state。