Storm和JStorm(阿里的流處理框架)

本文導讀:html

一、What——JStorm是什麼? 1.1 概述
    1.2優勢
    1.3應用場景
    1.4JStorm架構

二、Why——爲何啓動JStorm項目?(與storm的區別) 2.1storm的現狀、缺陷
    2.2JStorm比Storm更穩定,功能更強大,更快!—— 表現
        2.2.1穩定性好的表現
        2.2.2調度強大的表現
        2.2.3性能更好的表現
    2.3性能提高的緣由所在
    2.4JStorm的其它優勢
    2.5與flume、S四、AKKA、Spark的比較

三、JStorm的性能優化點 四、JStorm的常見問題 五、TODO List 六、參考連接

 

一、What——JStorm是什麼? 

概述:

  JStorm 是一個分佈式實時計算引擎,相似Hadoop MapReduce的系統, 用戶按照規定的編程規範實現一個任務,而後將這個任務遞交給JStorm系統,Jstorm將這個任務跑起來,而且按7 * 24小時運行起來,一旦中間一個worker 發生意外故障, 調度器當即分配一個新的worker替換這個失效的worker。所以,從應用的角度,JStorm 應用是一種遵照某種編程規範的分佈式應用。從系統角度,JStorm一套相似MapReduce的調度系統。從數據的角度,是一套基於流水線的消息處理機制。實時計算如今是大數據領域中最火爆的一個方向,由於人們對數據的要求愈來愈高,實時性要求也愈來愈快,傳統的 Hadoop Map Reduce,逐漸知足不了需求,所以在這個領域需求不斷。java

  在Storm和JStorm出現之前,市面上出現不少實時計算引擎,但自storm和JStorm出現後,基本上能夠說一統江湖,git

其優勢:

  1. 開發很是迅速: 接口簡單,容易上手,只要遵照Topology,Spout, Bolt的編程規範便可開發出一個擴展性極好的應用,底層rpc,worker之間冗餘,數據分流之類的動做徹底不用考慮。
  2. 擴展性極好:當一級處理單元速度,直接配置一下併發數,便可線性擴展性能
  3. 健壯:當worker失效或機器出現故障時, 自動分配新的worker替換失效worker;調度器Nimbus採用主從備份,支持熱切
  4. 數據準確性: 能夠採用Acker機制,保證數據不丟失。 若是對精度有更多一步要求,採用事務機制,保證數據準確。

應用場景:
  JStorm處理數據的方式是基於消息的流水線處理, 所以特別適合無狀態計算,也就是計算單元的依賴的數據所有在接受的消息中能夠找到, 而且最好一個數據流不依賴另一個數據流。github

  1. 日誌分析:從日誌中分析出特定的數據,並將分析的結果存入外部存儲器如數據庫。目前,主流日誌分析技術就使用JStorm或Storm
  2. 管道系統: 將一個數據從一個系統傳輸到另一個系統, 好比將數據庫同步到Hadoop
  3. 消息轉化器: 將接受到的消息按照某種格式進行轉化,存儲到另一個系統如消息中間件
  4. 統計分析器: 從日誌或消息中,提煉出某個字段,而後作count或sum計算,最後將統計值存入外部存儲器。中間處理過程可能更復雜。
  5. ......

JStorm架構:

  JStorm 從設計的角度,就是一個典型的調度系統web

  在這個系統中,數據庫

    • Nimbus是做爲調度器角色
    • Supervisor 做爲worker的代理角色,負責殺死worker和運行worker
    • Worker是task的容器
    • Task是真正任務的執行者
    • ZK 是整個系統中的協調者

 具體參考下圖:apache

jiagou

 

來自阿里的流處理框架:JStorm

  關於流處理框架,在先前的文章彙總已經介紹過Strom,今天學習的是來自阿里的的流處理框架JStorm。簡單的概述JStorm就是:JStorm 比Storm更穩定,更強大,更快,Storm上跑的程序,一行代碼不變能夠運行在JStorm上。直白的講JStorm是阿里巴巴的團隊基於Storm的二次開發產物,至關於他們的Tengine是基於Nginx開發的同樣。如下爲阿里巴巴團隊放棄直接使用Storm選擇自行開發JStorm的緣由:編程

jstorm

二、Why——爲何啓動JStorm項目?___與storm的區別

阿里擁有本身的實時計算引擎

  1. 相似於hadoop 中的MR
  2. 開源storm響應太慢
  3. 開源社區的速度徹底跟不上Ali的需求
  4. 下降將來運維成本
  5. 提供更多技術支持,加快內部業務響應速度

現有Storm沒法知足一些需求

  1. 現有storm調度太簡單粗暴,沒法定製化
  2. Storm 任務分配不平衡
  3. RPC OOM(OOM - Out of Memory,內存溢出 ——俗稱雪崩問題)一直沒有解決
  4. 監控太簡單
  5. 對ZK 訪問頻繁

現狀

  在整個阿里巴巴集團,1000+的物理機上運行着Storm,一淘(200+),CDO(200+),支付寶(150+),B2B(50+),阿里媽媽(50+),共享事業羣(50+),其餘等。安全

WHY之一句話概述:JStorm比Storm更穩定,功能更強大,更快!(Storm上跑的程序能夠一行代碼不變運行在JStorm上)性能優化

  JStorm相比Storm更穩定

  1. Nimbus 實現HA:當一臺nimbus掛了,自動熱切到備份nimbus ——Nimbus HA
  2. 原生Storm RPC:Zeromq 使用堆外內存,致使OS 內存不夠,Netty 致使OOM;JStorm底層RPC 採用netty + disruptor,保證發送速度和接受速度是匹配的,完全解決雪崩問題
  3. 現有Strom,在添加supervisor或者supervisor shutdown時,會觸發任務rebalance;提交新任務時,當worker數不夠時,觸發其餘任務作rebalance。——在JStorm中不會發生,使得數據流更穩定
  4. 新上線的任務不會衝擊老的任務:新調度從cpu,memory,disk,net 四個角度對任務進行分配;已經分配好的新任務,無需去搶佔老任務的cpu,memory,disk和net ——任務之間影響小
  5. Supervisor主線 ——more catch
  6. Spout/Bolt 的open/prepare ——more catch
  7. 全部IO, 序列化,反序列化 ——more catch
  8. 減小對ZK的訪問量:去掉大量無用的watch;task的心跳時間延長一倍;Task心跳檢測無需全ZK掃描。

  JStorm相比Storm調度更強大

  1. 完全解決了storm 任務分配不均衡問題
  2. 從4個維度進行任務分配:CPU、Memory、Disk、Net
  3. 默認一個task,一個cpu slot。當task消耗更多的cpu時,能夠申請更多cpu slot 
    • 解決新上線的任務去搶佔老任務的cpu
    • 一淘有些task內部起不少線程,單task消耗太多cpu
  4. 默認一個task,一個memory slot。當task須要更多內存時,能夠申請更多內存slot
    • 先海狗項目中,slot task 須要8G內存,並且其餘任務2G內存就夠了
  5. 默認task,不申請disk slot。當task 磁盤IO較重時,能夠申請disk slot
    • 海狗/實時同步項目中,task有較重的本地磁盤讀寫操做
  6. 能夠強制某個component的task 運行在不一樣的節點上
    • 聚石塔,海狗項目,某些task提供web Service服務,爲了端口不衝突,所以必須強制這些task運行在不一樣節點上
  7. 能夠強制topology運行在單獨一個節點上
    • 節省網絡帶寬
    • Tlog中大量小topology,爲了減小網絡開銷,強制任務分配到一個節點上
  8. 能夠自定義任務分配:提早預定任務分配到哪臺機器上,哪一個端口,多少個cpu slot,多少內存,是否申請磁盤
    • 海狗項目中,部分task指望分配到某些節點上
  9. 能夠預定上一次成功運行時的任務分配:上次task分配了什麼資源,此次仍是使用這些資源
    • CDO不少任務期待重啓後,仍使用老的節點,端口

  Task內部異步化

  1. Worker內部全流水線模式
  2. Spout nextTuple和ack/fail運行在不一樣線程
    • EagleEye中,在nextTuple作sleep和wait操做不會block ack/fail動做  

  JStorm相比Storm性能更好

  JStorm 0.9.0 性能很是的好,使用netty時單worker 發送最大速度爲11萬QPS,使用zeromq時,最大速度爲12萬QPS。

  • JStorm 0.9.0 在使用Netty的狀況下,比Storm 0.9.0 使用netty狀況下,快10%, 而且JStorm netty是穩定的而Storm 的Netty是不穩定的
  • 在使用ZeroMQ的狀況下, JStorm 0.9.0 比Storm 0.9.0 快30%

爲何更快、性能提高的緣由:

  1. Zeromq 減小一次內存拷貝
  2. 增長反序列化線程
  3. 重寫採樣代碼,大幅減小採樣影響
  4. 優化ack代碼
  5. 優化緩衝map性能
  6. Java 比clojure更底層

附註:和storm編程方式的改變:

  編程接口改變:當topology.max.spout.pending 設置不爲1時(包括topology.max.spout.pending設置爲null),spout內部將額外啓動一個線程單獨執行ack或fail操做, 從而nextTuple在單獨一個線程中執行,所以容許在nextTuple中執行block動做原生的storm,nextTuple/ack/fail 都在一個線程中執行,當數據量不大時,nextTuple當即返回,而ack、fail一樣也容易沒有數據,進而致使CPU 大量空轉,白白浪費CPU, 而在JStorm中, nextTuple能夠以block方式獲取數據,好比從disruptor中或BlockingQueue中獲取數據,當沒有數據時,直接block住,節省了大量CPU。

  但所以帶來一個問題, 處理ack/fail 和nextTuple時,必須當心線程安全性

  附屬: 當topology.max.spout.pending爲1時, 恢復爲spout一個線程,即nextTuple/ack/fail 運行在一個線程中。

JStorm的其餘優化點

  1. 資源隔離。不一樣部門,使用不一樣的組名,每一個組有本身的Quato;不一樣組的資源隔離;採用cgroups 硬隔離
  2. Classloader。解決應用的類和Jstorm的類發生衝突,應用的類在本身的類空間中
  3. Task 內部異步化。Worker 內部全流水線模式,Spout nextTuple和ack/fail運行在不一樣線程

JStorm與其它產品的比較:

  Flume 是一個成熟的系統,主要focus在管道上,將數據從一個數據源傳輸到另一個數據源, 系統提供大量現成的插件作管道做用。固然也能夠作一些計算和分析,但插件的開發沒有Jstorm便捷和迅速。

  S4 就是一個半成品,健壯性還能夠,但數據準確性較糟糕,沒法保證數據不丟失,這個特性讓S4 大受限制,也致使了S4開源不少年,但發展一直不是很迅速。

  AKKA 是一個actor模型,也是一個不錯的系統,在這個actor模型基本上,你想作任何事情都沒有問題,但問題是你須要作更多的工做,topology怎麼生成,怎麼序列化。數據怎麼流(隨機,仍是group by)等等。

  Spark 是一個輕量的內存MR, 更偏重批量數據處理。

三、JStorm性能優化:

  1. 選型:

    按照性能來講, trident < transaction < 使用ack機制普通接口 < 關掉ack機制的普通接口, 所以,首先要權衡一下應該選用什麼方式來完成任務。

    若是「使用ack機制普通接口」時, 能夠嘗試關掉ack機制,查看性能如何,若是性能有大幅提高,則預示着瓶頸不在spout, 有多是Acker的併發少了,或者業務處理邏輯慢了。

  2. 增長併發:能夠簡單增長併發,查看是否可以增長處理能力
  3. 讓task分配更加均勻:當使用fieldGrouping方式時,有可能形成有的task任務重,有的task任務輕,所以讓整個數據流變慢, 儘可能讓task之間壓力均勻。
  4. 使用MetaQ或Kafka時:對於MetaQ和Kafka, 一個分區只能一個線程消費,所以有可能簡單的增長併發沒法解決問題, 能夠嘗試增長MetaQ和Kafka的分區數

四、常見問題:

4.1 性能問題

  參考上面3中JStorm性能優化

4.2 資源不夠

  當報告 」No supervisor resource is enough for component 「, 則意味着資源不夠 若是是僅僅是測試環境,能夠將supervisor的cpu 和memory slot設置大,

  在jstorm中, 一個task默認會消耗一個cpu slot和一個memory slot, 而一臺機器上默認的cpu slot是(cpu 核數 -1), memory slot數(物理內存大小 * 75%/1g), 若是一個worker上運行task比較多時,須要將memory slot size設小(默認是1G), 好比512M, memory.slot.per.size: 535298048

1 #if it is null, then it will be detect by system
2  supervisor.cpu.slot.num: null
3 
4  #if it is null, then it will be detect by system
5  supervisor.mem.slot.num: null
6 
7 # support disk slot
8 # if it is null, it will use $(storm.local.dir)/worker_shared_data
9  supervisor.disk.slot: null

4.3 序列化問題

  全部spout,bolt,configuration, 發送的消息(Tuple)都必須實現Serializable, 不然就會出現序列化錯誤.

  若是是spout或bolt的成員變量沒有實現Serializable時,但又必須使用時, 能夠對該變量申明時,增長transient 修飾符, 而後在open或prepare時,進行實例化

seriliazble_error

4.4 Log4j 衝突

  0.9.0 開始,JStorm依舊使用Log4J,但storm使用Logbak,所以應用程序若是有依賴log4j-over-slf4j.jar, 則須要exclude 全部log4j-over-slf4j.jar依賴,下個版本將自定義classloader,就不用擔憂這個問題。

 1 SLF4J: Detected both log4j-over-slf4j.jar AND slf4j-log4j12.jar on the class path, preempting StackOverflowError. 
 2 SLF4J: See also 
 3 http://www.slf4j.org/codes.html#log4jDelegationLoop for more details.
 4 Exception in thread "main" java.lang.ExceptionInInitializerError
 5         at org.apache.log4j.Logger.getLogger(Logger.java:39)
 6         at org.apache.log4j.Logger.getLogger(Logger.java:43)
 7         at com.alibaba.jstorm.daemon.worker.Worker.<clinit>(Worker.java:32)
 8 Caused by: java.lang.IllegalStateException: Detected both log4j-over-slf4j.jar AND slf4j-log4j12.jar on the class path, preempting StackOverflowError. See also 
 9 http://www.slf4j.org/codes.html#log4jDelegationLoop for more details.
10         at org.apache.log4j.Log4jLoggerFactory.<clinit>(Log4jLoggerFactory.java:49)
11         ... 3 more
12 Could not find the main class: com.alibaba.jstorm.daemon.worker.Worker.  Program will exit.

4.5 類衝突

  若是應用程序使用和JStorm相同的jar 但版本不同時,建議打開classloader, 修改配置文件

1 topology.enable.classloader: true 

  或者

1 ConfigExtension.setEnableTopologyClassLoader(conf, true); 

  JStorm默認是關掉classloader,所以JStorm會強制使用JStorm依賴的jar

4.6 提交任務後,等待幾分鐘後,web ui始終沒有顯示對應的task

  有3種狀況:

  4.6.1用戶程序初始化太慢

  若是有用戶程序的日誌輸出,則代表是用戶的初始化太慢或者出錯,查看日誌便可。 另外對於MetaQ 1.x的應用程序,Spout會recover ~/.meta_recover/目錄下文件,能夠直接刪除這些消費失敗的問題,加速啓動。

  4.6.2一般是用戶jar衝突或初始化發生問題

    打開supervisor 日誌,找出啓動worker命令,單獨執行,而後檢查是否有問題。相似下圖:

fail_start_worker

  4.6.3檢查是否是storm和jstorm使用相同的本地目錄

    檢查配置項 」storm.local.dir「, 是否是storm和jstorm使用相同的本地目錄,若是相同,則將兩者分開

4.7 提示端口被綁定

  有2種狀況:

  4.7.1多個worker搶佔一個端口

    假設是6800 端口被佔, 能夠執行命令 「ps -ef|grep 6800」 檢查是否有多個進程, 若是有多個進程,則手動殺死他們

  4.7.2系統打開太多的connection

    Linux對外鏈接端口數限制,TCP client對外發起鏈接數達到28000左右時,就開始大量拋異常,須要

1 # echo "10000 65535" > /proc/sys/net/ipv4/ip_local_port_range

 

五、TODO list

  1. Quato,每一個group配額
  2. Storm on yarn
  3. 應用自定義Hook
  4. 權限管理 
  5. logview
  6. classloader
  7. upgrade Netty to netty4

 

參考連接

Github源碼:https://github.com/alibaba/jstorm/

中文文檔:https://github.com/alibaba/jstorm/wiki/JStorm-Chinese-Documentation

相關文章
相關標籤/搜索