JSTORM使用筆記

安裝部署

zeromq

簡單快速的傳輸層框架,安裝以下:java

wget http://download.zeromq.org/zeromq-2.1.7.tar.gz
tar zxf zeromq-2.1.7.tar.gz
cd zeromq-2.1.7
./configure
make
sudo make install
sudo ldconfig git

jzmq

應該是zmq的java包吧,安裝步驟以下:github

git clone git://github.com/nathanmarz/jzmq.git
cd jzmq
./autogen.sh
./configure
make
make install算法

zookeeper

針對大型分佈式系統提供配置維護、名字服務、分佈式同步、組服務等,能夠保證:spring

  1. 順序性:客戶端的更新請求都會被順序處理
  2. 原子性:更新操做要不成功,要不失敗
  3. 一致性:客戶端不論鏈接到那個服務端,展示給它的都是同一個視圖
  4. 可靠性:更新會被持久化
  5. 實時性:對於每一個客戶端他的系統視圖都是最新的

在zookeeper中有幾種角色:sql

  1. Leader:發起投票和決議,更新系統狀態
  2. Follower:響應客戶端請求,參與投票
  3. Observer:不參與投票,只同步Leader狀態
  4. Client:發起請求

在啓動以前須要在conf下編寫zoo.cfg配置文件,裏面的內容包括:數據庫

  1. tickTime:心跳間隔
  2. initLimit:Follower和Leader之間創建鏈接的最大心跳數
  3. syncLimit:Follower和Leader之間通訊時限
  4. dataDir:數據目錄
  5. dataLogDir:日誌目錄
  6. minSessionTimeout:最小會話時間(默認tickTime * 2)
  7. maxSessionTimeout:最大會話時間(默認tickTime * 20)
  8. maxClientCnxns:客戶端數量
  9. clientPort:監聽客戶端鏈接的端口
  10. server.N=YYYY:A:B:其中N爲服務器編號,YYYY是服務器的IP地址,A是Leader和Follower通訊端口,B爲選舉端口

在單機的時候能夠直接將zoo_sample.cfg修改成zoo.cfg,而後使用啓動服務便可(若是報錯沒有目錄,手動建立便可):apache

sudo ./zkServer.sh start編程

如今用netstat -na(或者是./zkCli.sh 127.0.0.1:2181)就能看到在監聽指定的端口,那麼zookeeper如今起來了。 服務器

參考:

  1. http://blog.csdn.net/shenlan211314/article/details/6170717
  2. http://blog.csdn.net/hi_kevin/article/details/7089358
  3. 下載地址:http://apache.dataguru.cn/zookeeper/zookeeper-3.4.6

jstorm

該系統是阿里巴巴在對storm作了重寫和優化,在storm裏面能運行的在jstorm裏面也能運行,該系統擅長執行實時計算,並且基本上都在內存中搞定。進入正題,jstorm中有以下幾種角色:

  1. spout:源頭。
  2. bolt:處理器。
  3. topology:由處理器、源頭組成的拓撲網絡(每條邊就是一個訂閱關係)。
  4. tuple:數據。
  5. worker:執行進程。
  6. task:執行線程。
  7. nimbus:分發代碼、任務,監控集羣運行狀態
  8. supervisor:監聽nimbus的指令,接收分發代碼和任務並執行

jstorm是用zookeeper來管理的,下面來看conf/storm.yaml中的經常使用配置:

  1. storm.zookeeper.servers:zookeeper集羣地址。
  2. storm.zookeeper.root:zookeeper中storm的根目錄位置。
  3. storm.local.dir:用來存放配置文件、JAR等。
  4. storm.messaging.netty.transfer.async.batch:在使用Netty的時候,設置是否一個batch中會有多個消息。
  5. java.library.path:本地庫的加載地址,好比zeromq、jzmq等。
  6. supervisor.slots.ports:supervisor節點上的worker使用的端口號列表。
  7. supervisor.enable.cgroup:是否使用cgroups來作資源隔離。
  8. topology.buffer.size.limited:是否限制內存,若是不限制將使用LinkedBlockingDeque。
  9. topology.performance.metrics:是否開啓監控。
  10. topology.alimonitor.metrics.post:是否將監控數據發送給AliMonitor。
  11. topology.enable.classloader:默認禁用了用戶自定義的類加載器。
  12. worker.memory.size:worker的內存大小。

在把配置搞正確以後,就能夠用bin中的腳原本啓動節點服務了:

sudo ./jstorm nimbus
sudo ./jstorm supervisor

參考:

  1. https://github.com/alibaba/jstorm/wiki/%E5%A6%82%E4%BD%95%E5%AE%89%E8%A3%85
  2. storm編程入門:http://ifeve.com/getting-started-with-storm-5/

jstorm的架構

結構和hadoop的很像,總體看來以下(Nimbus負責控制、提交任務,Supervisor負責執行任務):

爲了作實時計算你須要創建topology,由計算節點組成的圖:

在JStorm上的topology的生命週期以下:

  1. 上傳代碼並作校驗(/nimbus/inbox);
  2. 創建本地目錄(/stormdist/topology-id/);
  3. 創建zookeeper上的心跳目錄;
  4. 計算topology的工做量(parallelism hint),分配task-id並寫入zookeeper;
  5. 把task分配給supervisor執行;
  6. 在supervisor中定時檢查是否有新的task,下載新代碼、刪除老代碼,剩下的工做交個小弟worker;
  7. 在worker中把task拿到,看裏面有哪些spout/Bolt,而後計算須要給哪些task發消息並創建鏈接;
  8. 在nimbus將topology終止的時候會將zookeeper上的相關信息刪除;

在集羣運行的時候要明白WorkerExecutorTask的概念,固然消息被傳遞的時候其實發起者、接收者都是Task,而真正執行的是Executor(能夠理解爲一個線程),由它來輪詢其中的Spout/Bolt:

在jstorm中經過ack機制來保證數據至少被處理一次,簡單來講下ack:

在消息發、收的過程當中會造成一棵樹狀的結構,在一個消息收的時候發一個驗證消息,發的時候也發一個驗證消息,那麼整體上每一個消息出現兩次。那麼ack機制就是將每一個消息的隨機生成的ID進行異或,若是在某一時刻結果爲0,那就說明處理成功。

以下圖所示: 

須要補充一下:雖然ack算是隨機算法,可是出錯的機率極低,可是系統應該具有在出錯以後矯正的能力(甚至檢查是否出錯)。ack機制保證了消息會被處理,可是不能保證只處理一次&順序處理,在須要的情形就有了事務的概念:

碼代碼

基本用法

所謂普通模式是指不去使用JStorm爲開發人員提供的高級抽象,用其提供的原生的接口進行開發,主要涉及到的接口有:

  1. ISpout:數據源頭接口,jstorm會不斷調用nextTuple方法來獲取數據併發射出去。
    1. open:在worker中初始化該ISpout時調用,通常用來設置一些屬性:好比從spring容器中獲取對應的Bean。
    2. close:和open相對應(在要關閉的時候調用)。
    3. activate:從非活動狀態變爲活動狀態時調用。
    4. deactivate:和activate相對應(從活動狀態變爲非活動狀態時調用)。
    5. nextTuple:JStorm但願在每次調用該方法的時候,它會經過collector.emit發射一個tuple。
    6. ack:jstorm發現msgId對應的tuple被成功地完整消費會調用該方法。
    7. fail:和ack相對應(jstorm發現某個tuple在某個環節失敗了)。
  2. IBolt:數據處理接口,jstorm將消息發給他並讓其處理,完成以後可能整個處理流程就結束了,也可能傳遞給下一個節點繼續執行。
    1. prepare:對應ISpout的open方法。
    2. cleanup:對應ISpout的close方法(吐槽一下,搞成同樣的名字會死啊...)。
    3. execute:處理jstorm發送過來的tuple。
  3. TopologyBuilder:每一個jstorm運行的任務都是一個拓撲接口,而builder的做用就是根據配置文件構建這個拓撲結構,更直白就是構建一個網。
    1. setSpout:添加源頭節點並設置並行度。
    2. setBolt:添加處理節點並設置並行度。

由於還存在多種其餘類型的拓撲結構,那麼在builder這個環節固然不能亂傳,在基本用法要去實現IRichSpoutIRichBolt接口,他們並無新增任何的方法,僅僅是用來區分類型。既然是拓撲結構那麼應該是一個比較複雜的網絡,其實這個是在builder中完成的,其中setSpout/setBolt返回的結果實際上是InputDeclarer對象,在其中定義了N個流分組的策略:

public T fieldsGrouping(String componentId, String streamId, Fields fields); // 字段分組
public T globalGrouping(String componentId, String streamId); // 全局分組
public T shuffleGrouping(String componentId, String streamId); // 隨機分組
public T localOrShuffleGrouping(String componentId, String streamId); // 本地或隨機分組
public T noneGrouping(String componentId, String streamId); // 無分組
public T allGrouping(String componentId, String streamId); // 廣播分組
public T directGrouping(String componentId, String streamId); // 直接分組
// 自定義分組  
public T customGrouping(String componentId, CustomStreamGrouping grouping);  
public T customGrouping(String componentId, String streamId, CustomStreamGrouping grouping);  
public T grouping(GlobalStreamId id, Grouping grouping);  

經過這些接口,咱們能夠一邊增長處理節點、一邊指定其消費哪些消息。

批量用法

基本的用法是每次處理一個tuple,可是這種效率比較低,不少狀況下是能夠批量獲取消息而後一塊兒處理,批量用法對這種方式提供了支持。打開代碼能夠很明顯地發現jstorm和storm的有着不小的區別:

// storm 中的定義
public interface IBatchSpout extends Serializable {
	void open(Map conf, TopologyContext context);
	void emitBatch(long batchId, TridentCollector collector);// 批次發射tuple
	void ack(long batchId); // 成功處理批次
	void close();
	Map getComponentConfiguration();
	Fields getOutputFields();
}

// jstorm中的定義
public interface IBatchSpout extends IBasicBolt, ICommitter, Serializable {
}

另外若是用批次的話就須要改用BatchTopologyBuilder來構建拓撲結構,在IBatchSpout中主要實現的接口以下:

  1. execute:雖然和IBolt中名字、參數一致,可是增長了一些默認邏輯
    1. 入參的input.getValue(0)表示批次(BatchId)。
    2. 發送消息時collector.emit(new Values(batchId, value)),發送的列表第一個字段表示批次(BatchId)。
  2. commit:批次成功時調用,常見的是修改offset。
  3. revert:批次失敗時調用,能夠在這裏根據offset取出批次數據進行重試。

Transactional Topology

事務拓撲並非新的東西,只是在原始的ISpout、IBolt上作了一層封裝。在事務拓撲中以並行(processing)和順序(commiting)混合的方式來完成任務,使用Transactional Topology能夠保證每一個消息只會成功處理一次。不過須要注意的是,在Spout須要保證可以根據BatchId進行屢次重試,在這裏有一個基本的例子,這裏有一個不錯的講解。

Trident

此次一種更高級的抽象(甚至不須要知道底層是怎麼map-reduce的),所面向的再也不是spout和bolt,而是stream。主要涉及到下面幾種接口:

  1. 在本地完成的操做
    1. Function:自定義操做。
    2. Filters:自定義過濾。
    3. partitionAggregate:對同批次的數據進行local combiner操做。
    4. project:只保留stream中指定的field。
    5. stateQuery、partitionPersist:查詢和持久化。
  2. 決定Tuple如何分發到下一個處理環節
    1. shuffle:隨機。
    2. broadcast:廣播。
    3. partitionBy:以某一個特定的field進行hash,分到某一個分區,這樣該field位置相同的都會放到同一個分區。
    4. global:全部tuple發到指定的分區。
    5. batchGlobal:同一批的tuple被放到相同的分區(不一樣批次不一樣分區)。
    6. partition:用戶自定義的分區策略。
  3. 不一樣partition處理結果的匯聚操做
    1. aggregate:只針對同一批次的數據。
    2. persistentAggregate:針對全部批次進行匯聚,並將中間狀態持久化。
  4. 對stream中的tuple進行從新分組,後續的操做將會對每個分組獨立進行(相似sql中的group by)
    1. groupBy
  5. 將多個Stream融合成一個
    1. merge:多個流進行簡單的合併。
    2. join:多個流按照某個KEY進行UNION操做(只能針對同一個批次的數據)。

這裏有一個jstorm中使用Trident的簡單例子。

DRPC

 

 

 

 

 

 

問題排查

 

 

 

模型設計

在不少的實際問題中,咱們面對的模型都是大同小異,下面先來看問題是什麼:

問題描述

一、在流式計算中常常須要對一批的數據進行彙總計算,若是用SQL來描述就是:

SELECT MIN(status) FROM my_table GROUP BY order_id

在用JStorm來實現這一條簡單的SQL時,面對的是一條一條的數據庫變化的消息(這裏須要保證有序消費),其實至關於在一堆的消息上面作了一個嵌套的SQL查詢,用一張圖表示以下:

二、

 

  

 

 

 

 

----- updating -----

相關文章
相關標籤/搜索