最近在研究jstorm,看了不少資料,因此也想分享出來一些。html
簡單快速的傳輸層框架,安裝以下:java
wget http://download.zeromq.org/zeromq-2.1.7.tar.gz
tar zxf zeromq-2.1.7.tar.gz
cd zeromq-2.1.7
./configure
make
sudo make install
sudo ldconfig git
應該是zmq的java包吧,安裝步驟以下:github
git clone git://github.com/nathanmarz/jzmq.git
cd jzmq
./autogen.sh
./configure
make
make install算法
針對大型分佈式系統提供配置維護、名字服務、分佈式同步、組服務等,能夠保證:spring
在zookeeper中有幾種角色:sql
在啓動以前須要在conf下編寫zoo.cfg配置文件,裏面的內容包括:apache
在單機的時候能夠直接將zoo_sample.cfg修改成zoo.cfg,而後使用啓動服務便可(若是報錯沒有目錄,手動建立便可):編程
sudo ./zkServer.sh start服務器
如今用netstat -na(或者是./zkCli.sh 127.0.0.1:2181)就能看到在監聽指定的端口,那麼zookeeper如今起來了。
參考:
該系統是阿里巴巴在對storm作了重寫和優化,在storm裏面能運行的在jstorm裏面也能運行,該系統擅長執行實時計算,並且基本上都在內存中搞定。進入正題,jstorm中有以下幾種角色:
jstorm是用zookeeper來管理的,下面來看conf/storm.yaml中的經常使用配置:
在把配置搞正確以後,就能夠用bin中的腳原本啓動節點服務了:
sudo ./jstorm nimbus
sudo ./jstorm supervisor
參考:
結構和hadoop的很像,總體看來以下(Nimbus負責控制、提交任務,Supervisor負責執行任務):
爲了作實時計算你須要創建topology,由計算節點組成的圖:
在JStorm上的topology的生命週期以下:
在集羣運行的時候要明白Worker、Executor、Task的概念,固然消息被傳遞的時候其實發起者、接收者都是Task,而真正執行的是Executor(能夠理解爲一個線程),由它來輪詢其中的Spout/Bolt:
在jstorm中經過ack機制來保證數據至少被處理一次,簡單來講下ack:
在消息發、收的過程當中會造成一棵樹狀的結構,在一個消息收的時候發一個驗證消息,發的時候也發一個驗證消息,那麼整體上每一個消息出現兩次。那麼ack機制就是將每一個消息的隨機生成的ID進行異或,若是在某一時刻結果爲0,那就說明處理成功。
以下圖所示:
須要補充一下:雖然ack算是隨機算法,可是出錯的機率極低,可是系統應該具有在出錯以後矯正的能力(甚至檢查是否出錯)。ack機制保證了消息會被處理,可是不能保證只處理一次&順序處理,在須要的情形就有了事務的概念:
所謂普通模式是指不去使用JStorm爲開發人員提供的高級抽象,用其提供的原生的接口進行開發,主要涉及到的接口有:
由於還存在多種其餘類型的拓撲結構,那麼在builder這個環節固然不能亂傳,在基本用法要去實現IRichSpout、IRichBolt接口,他們並無新增任何的方法,僅僅是用來區分類型。既然是拓撲結構那麼應該是一個比較複雜的網絡,其實這個是在builder中完成的,其中setSpout/setBolt返回的結果實際上是InputDeclarer對象,在其中定義了N個流分組的策略:
1
2
3
4
5
6
7
8
9
10
11
|
public
T fieldsGrouping(String componentId, String streamId, Fields fields);
// 按字段分組,具備一樣字段值的Tuple會被分到相同Bolt裏的Task,不一樣字段值則會被分配到不一樣Task
public
T globalGrouping(String componentId, String streamId);
// 全局分組,Tuple被分配到Bolt中ID值最低的的一個Task。
public
T shuffleGrouping(String componentId, String streamId);
// 隨機分組,隨機派發Stream裏面的Tuple,保證每一個Bolt接收到的Tuple數目大體相同,經過輪詢隨機的方式使得下游Bolt之間接收到的Tuple數目差值不超過1。
public
T localOrShuffleGrouping(String componentId, String streamId);
本worker優先,若是本worker內有目標component的task,則隨機從本worker內部的目標component的task中進行選擇,不然就和普通的shuffleGrouping同樣
public
T noneGrouping(String componentId, String streamId);
隨機發送tuple到目標component上,但沒法保證平均
public
T allGrouping(String componentId, String streamId);
// 廣播分組,每個Tuple,全部的Bolt都會收到。
public
T directGrouping(String componentId, String streamId);
// 直接分組,Tuple須要指定由Bolt的哪一個Task接收。 只有被聲明爲Direct Stream的消息流能夠聲明這種分組方法。
// 自定義分組
public
T customGrouping(String componentId, CustomStreamGrouping grouping);
public
T customGrouping(String componentId, String streamId, CustomStreamGrouping grouping);
public
T grouping(GlobalStreamId id, Grouping grouping);
|
經過這些接口,咱們能夠一邊增長處理節點、一邊指定其消費哪些消息。
基本的用法是每次處理一個tuple,可是這種效率比較低,不少狀況下是能夠批量獲取消息而後一塊兒處理,批量用法對這種方式提供了支持。打開代碼能夠很明顯地發現jstorm和storm的有着不小的區別:
1
2
3
4
5
6
7
8
9
10
11
12
13
|
// storm 中的定義
public
interface
IBatchSpout
extends
Serializable {
void
open(Map conf, TopologyContext context);
void
emitBatch(
long
batchId, TridentCollector collector);
// 批次發射tuple
void
ack(
long
batchId);
// 成功處理批次
void
close();
Map getComponentConfiguration();
Fields getOutputFields();
}
// jstorm中的定義
public
interface
IBatchSpout
extends
IBasicBolt, ICommitter, Serializable {
}
|
另外若是用批次的話就須要改用BatchTopologyBuilder來構建拓撲結構,在IBatchSpout中主要實現的接口以下:
事務拓撲並非新的東西,只是在原始的ISpout、IBolt上作了一層封裝。在事務拓撲中以並行(processing)和順序(commiting)混合的方式來完成任務,使用Transactional Topology能夠保證每一個消息只會成功處理一次。不過須要注意的是,在Spout須要保證可以根據BatchId進行屢次重試,在這裏有一個基本的例子,這裏有一個不錯的講解。
此次一種更高級的抽象(甚至不須要知道底層是怎麼map-reduce的),所面向的再也不是spout和bolt,而是stream。主要涉及到下面幾種接口: