簡單快速的傳輸層框架,安裝以下:java
wget http://download.zeromq.org/zeromq-2.1.7.tar.gz
tar zxf zeromq-2.1.7.tar.gz
cd zeromq-2.1.7
./configure
make
sudo make install
sudo ldconfig git
應該是zmq的java包吧,安裝步驟以下:github
git clone git://github.com/nathanmarz/jzmq.git
cd jzmq
./autogen.sh
./configure
make
make install算法
針對大型分佈式系統提供配置維護、名字服務、分佈式同步、組服務等,能夠保證:spring
在zookeeper中有幾種角色:sql
在啓動以前須要在conf下編寫zoo.cfg配置文件,裏面的內容包括:數據庫
在單機的時候能夠直接將zoo_sample.cfg修改成zoo.cfg,而後使用啓動服務便可(若是報錯沒有目錄,手動建立便可):apache
sudo ./zkServer.sh start編程
如今用netstat -na(或者是./zkCli.sh 127.0.0.1:2181)就能看到在監聽指定的端口,那麼zookeeper如今起來了。 服務器
參考:
該系統是阿里巴巴在對storm作了重寫和優化,在storm裏面能運行的在jstorm裏面也能運行,該系統擅長執行實時計算,並且基本上都在內存中搞定。進入正題,jstorm中有以下幾種角色:
jstorm是用zookeeper來管理的,下面來看conf/storm.yaml中的經常使用配置:
在把配置搞正確以後,就能夠用bin中的腳原本啓動節點服務了:
sudo ./jstorm nimbus
sudo ./jstorm supervisor
參考:
結構和hadoop的很像,總體看來以下(Nimbus負責控制、提交任務,Supervisor負責執行任務):
爲了作實時計算你須要創建topology,由計算節點組成的圖:
在JStorm上的topology的生命週期以下:
在集羣運行的時候要明白Worker、Executor、Task的概念,固然消息被傳遞的時候其實發起者、接收者都是Task,而真正執行的是Executor(能夠理解爲一個線程),由它來輪詢其中的Spout/Bolt:
在jstorm中經過ack機制來保證數據至少被處理一次,簡單來講下ack:
在消息發、收的過程當中會造成一棵樹狀的結構,在一個消息收的時候發一個驗證消息,發的時候也發一個驗證消息,那麼整體上每一個消息出現兩次。那麼ack機制就是將每一個消息的隨機生成的ID進行異或,若是在某一時刻結果爲0,那就說明處理成功。
以下圖所示:
須要補充一下:雖然ack算是隨機算法,可是出錯的機率極低,可是系統應該具有在出錯以後矯正的能力(甚至檢查是否出錯)。ack機制保證了消息會被處理,可是不能保證只處理一次&順序處理,在須要的情形就有了事務的概念:
所謂普通模式是指不去使用JStorm爲開發人員提供的高級抽象,用其提供的原生的接口進行開發,主要涉及到的接口有:
由於還存在多種其餘類型的拓撲結構,那麼在builder這個環節固然不能亂傳,在基本用法要去實現IRichSpout、IRichBolt接口,他們並無新增任何的方法,僅僅是用來區分類型。既然是拓撲結構那麼應該是一個比較複雜的網絡,其實這個是在builder中完成的,其中setSpout/setBolt返回的結果實際上是InputDeclarer對象,在其中定義了N個流分組的策略:
public T fieldsGrouping(String componentId, String streamId, Fields fields); // 字段分組 public T globalGrouping(String componentId, String streamId); // 全局分組 public T shuffleGrouping(String componentId, String streamId); // 隨機分組 public T localOrShuffleGrouping(String componentId, String streamId); // 本地或隨機分組 public T noneGrouping(String componentId, String streamId); // 無分組 public T allGrouping(String componentId, String streamId); // 廣播分組 public T directGrouping(String componentId, String streamId); // 直接分組 // 自定義分組 public T customGrouping(String componentId, CustomStreamGrouping grouping); public T customGrouping(String componentId, String streamId, CustomStreamGrouping grouping); public T grouping(GlobalStreamId id, Grouping grouping);
經過這些接口,咱們能夠一邊增長處理節點、一邊指定其消費哪些消息。
基本的用法是每次處理一個tuple,可是這種效率比較低,不少狀況下是能夠批量獲取消息而後一塊兒處理,批量用法對這種方式提供了支持。打開代碼能夠很明顯地發現jstorm和storm的有着不小的區別:
// storm 中的定義 public interface IBatchSpout extends Serializable { void open(Map conf, TopologyContext context); void emitBatch(long batchId, TridentCollector collector);// 批次發射tuple void ack(long batchId); // 成功處理批次 void close(); Map getComponentConfiguration(); Fields getOutputFields(); } // jstorm中的定義 public interface IBatchSpout extends IBasicBolt, ICommitter, Serializable { }
另外若是用批次的話就須要改用BatchTopologyBuilder來構建拓撲結構,在IBatchSpout中主要實現的接口以下:
事務拓撲並非新的東西,只是在原始的ISpout、IBolt上作了一層封裝。在事務拓撲中以並行(processing)和順序(commiting)混合的方式來完成任務,使用Transactional Topology能夠保證每一個消息只會成功處理一次。不過須要注意的是,在Spout須要保證可以根據BatchId進行屢次重試,在這裏有一個基本的例子,這裏有一個不錯的講解。
此次一種更高級的抽象(甚至不須要知道底層是怎麼map-reduce的),所面向的再也不是spout和bolt,而是stream。主要涉及到下面幾種接口:
在這裏有一個jstorm中使用Trident的簡單例子。
在不少的實際問題中,咱們面對的模型都是大同小異,下面先來看問題是什麼:
一、在流式計算中常常須要對一批的數據進行彙總計算,若是用SQL來描述就是:
SELECT MIN(status) FROM my_table GROUP BY order_id
在用JStorm來實現這一條簡單的SQL時,面對的是一條一條的數據庫變化的消息(這裏須要保證有序消費),其實至關於在一堆的消息上面作了一個嵌套的SQL查詢,用一張圖表示以下:
二、
----- updating -----