【Twitter Storm系列】flume-ng+Kafka+Storm+HDFS 實時系統搭建

一直以來都想接觸Storm實時計算這塊的東西,最近在羣裏看到上海一哥們羅寶寫的Flume+Kafka+Storm的實時日誌流系統的搭建文檔,本身也跟着整了一遍,以前羅寶的文章中有一些要注意點沒提到的,之後一些寫錯的點,在這邊我會作修正;內容應該說絕大部分引用羅寶的文章的,這裏要謝謝羅寶兄弟,還有寫這篇文章@晨色星空J2EE也給了我很大幫助,這裏也謝謝@晨色星空J2EEhtml


以前在弄這個的時候,跟羣裏的一些人討論過,有的人說,直接用storm不就能夠作實時處理了,用不着那麼麻煩;其實否則,作軟件開發的都知道模塊化思想,這樣設計的緣由有兩方面:java

一方面是能夠模塊化,功能劃分更加清晰,從「數據採集--數據接入--流失計算--數據輸出/存儲」mysql

1.數據採集git

負責從各節點上實時採集數據,選用clouderaflume來實現github

2.數據接入sql

因爲採集數據的速度和數據處理的速度不必定同步,所以添加一個消息中間件來做爲緩衝,選用apachekafka數據庫

3.流式計算apache

對採集到的數據進行實時分析,選用apachestorm編程

4.數據輸出ubuntu

對分析後的結果持久化,暫定用mysql

另外一方面是模塊化以後,加入當Storm掛掉了以後,數據採集和數據接入仍是繼續在跑着,數據不會丟失,storm起來以後能夠繼續進行流式計算;


那麼接下來咱們來看下總體的架構圖


詳細介紹各個組件及安裝配置:

操做系統:ubuntu

Flume

FlumeCloudera提供的一個分佈式、可靠、和高可用的海量日誌採集、聚合和傳輸的日誌收集系統,支持在日誌系統中定製各種數據發送方,用於收集數據;同時,Flume提供對數據進行簡單處理,並寫到各類數據接受方(可定製)的能力。

下圖爲flume典型的體系結構:

Flume數據源以及輸出方式:

Flume提供了從console(控制檯)RPC(Thrift-RPC)text(文件)tail(UNIX tail)syslog(syslog日誌系統,支持TCPUDP2種模式)exec(命令執行)等數據源上收集數據的能力,在咱們的系統中目前使用exec方式進行日誌採集。

Flume的數據接受方,能夠是console(控制檯)text(文件)dfs(HDFS文件)RPC(Thrift-RPC)syslogTCP(TCP syslog日誌系統)等。在咱們系統中由kafka來接收。

Flume下載及文檔:

http://flume.apache.org/

Flume安裝:

[plain] view plain copy

  1. $tar zxvf apache-flume-1.4.0-bin.tar.gz/usr/local  

Flume啓動命令:

[plain] view plain copy

  1. $bin/flume-ng agent --conf conf --conf-file conf/flume-conf.properties --name producer -Dflume.root.logger=INFO,console  


Kafka

kafka是一種高吞吐量的分佈式發佈訂閱消息系統,她有以下特性:

  • 經過O(1)的磁盤數據結構提供消息的持久化,這種結構對於即便數以TB的消息存儲也可以保持長時間的穩定性能。

  • 高吞吐量:即便是很是普通的硬件kafka也能夠支持每秒數十萬的消息。

  • 支持經過kafka服務器和消費機集羣來分區消息。

  • 支持Hadoop並行數據加載。

kafka的目的是提供一個發佈訂閱解決方案,它能夠處理消費者規模的網站中的全部動做流數據。 這種動做(網頁瀏覽,搜索和其餘用戶的行動)是在現代網絡上的許多社會功能的一個關鍵因素。 這些數據一般是因爲吞吐量的要求而經過處理日誌和日誌聚合來解決。 對於像Hadoop的同樣的日誌數據和離線分析系統,但又要求實時處理的限制,這是一個可行的解決方案。kafka的目的是經過Hadoop的並行加載機制來統一線上和離線的消息處理,也是爲了經過集羣機來提供實時的消費。

kafka分佈式訂閱架構以下圖:--取自Kafka官網

羅寶兄弟文章上的架構圖是這樣的

其實二者沒有太大區別,官網的架構圖只是把Kafka簡潔的表示成一個Kafka Cluster,而羅寶兄弟的架構圖就相對詳細一些;

Kafka版本:0.8.0

Kafka下載及文檔:http://kafka.apache.org/

Kafka安裝:

[plain] view plain copy

  1. > tar xzf kafka-<VERSION>.tgz  

  2. > cd kafka-<VERSION>  

  3. > ./sbt update  

  4. > ./sbt package  

  5. > ./sbt assembly-package-dependency  

啓動及測試命令:

1 start server

[plain] view plain copy

  1. > bin/zookeeper-server-start.shconfig/zookeeper.properties  

  2. > bin/kafka-server-start.shconfig/server.properties  

這裏是官網上的教程,kafka自己有內置zookeeper,可是我本身在實際部署中是使用單獨的zookeeper集羣,因此第一行命令我就沒執行,這裏只是些出來給你們看下。

配置獨立的zookeeper集羣須要配置server.properties文件,講zookeeper.connect修改成獨立集羣的IP和端口

[plain] view plain copy

  1. zookeeper.connect=nutch1:2181  

2Create a topic

[plain] view plain copy

  1. > bin/kafka-create-topic.sh --zookeeper localhost:2181 --replica 1 --partition 1 --topic test  

  2. > bin/kafka-list-topic.sh --zookeeperlocalhost:2181  

3Send some messages

[plain] view plain copy

  1. > bin/kafka-console-producer.sh--broker-list localhost:9092 --topic test  

4Start a consumer

[plain] view plain copy

  1. > bin/kafka-console-consumer.sh--zookeeper localhost:2181 --topic test --from-beginning  

kafka-console-producer.shkafka-console-cousumer.sh只是系統提供的命令行工具。這裏啓動是爲了測試是否能正常生產消費;驗證流程正確性

在實際開發中仍是要自行開發本身的生產者與消費者;

kafka的安裝也能夠參考我以前寫的文章:http://blog.csdn.net/weijonathan/article/details/18075967

Storm

Twitter將Storm正式開源了,這是一個分佈式的、容錯的實時計算系統,它被託管在GitHub上,遵循 Eclipse Public License 1.0。Storm是由BackType開發的實時處理系統,BackType如今已在Twitter麾下。GitHub上的最新版本是Storm 0.5.2,基本是用Clojure寫的。


Storm的主要特色以下:

  1. 簡單的編程模型。相似於MapReduce下降了並行批處理複雜性,Storm下降了進行實時處理的複雜性。

  2. 可使用各類編程語言。你能夠在Storm之上使用各類編程語言。默認支持Clojure、Java、Ruby和Python。要增長對其餘語言的支持,只需實現一個簡單的Storm通訊協議便可。

  3. 容錯性。Storm會管理工做進程和節點的故障。

  4. 水平擴展。計算是在多個線程、進程和服務器之間並行進行的。

  5. 可靠的消息處理。Storm保證每一個消息至少能獲得一次完整處理。任務失敗時,它會負責從消息源重試消息。

  6. 快速。系統的設計保證了消息能獲得快速的處理,使用ØMQ做爲其底層消息隊列。(0.9.0.1版本支持ØMQ和netty兩種模式)

  7. 本地模式。Storm有一個「本地模式」,能夠在處理過程當中徹底模擬Storm集羣。這讓你能夠快速進行開發和單元測試。

因爲篇幅問題,具體的安裝步驟能夠參考我以前寫的文章:http://blog.csdn.net/weijonathan/article/details/17762477

接下來重頭戲開始拉!那就是框架之間的整合啦

flumekafka整合

1.下載flume-kafka-plus:https://github.com/beyondj2ee/flumeng-kafka-plugin

2.提取插件中的flume-conf.properties文件

修改該文件:#source section

producer.sources.s.type = exec
producer.sources.s.command = tail -f -n+1 /mnt/hgfs/vmshare/test.log
producer.sources.s.channels = c

修改全部topic的值改成test

將改後的配置文件放進flume/conf目錄下

在該項目中提取如下jar包放入環境中flumelib下:


注:這裏的flumeng-kafka-plugin.jar這個包,後面在github項目中已經移動到package目錄了。找不到的童鞋能夠到package目錄獲取。

完成上面的步驟以後,咱們來測試下flume+kafka這個流程有沒有走通;

咱們先啓動flume,而後再啓動kafka,啓動步驟按以前的步驟執行;接下來咱們使用kafka的kafka-console-consumer.sh腳本查看是否有flume有沒有往Kafka傳輸數據;


以上這個是個人test.log文件經過flume抓取傳到kafka的數據;說明咱們的flume和kafka流程走通了;

你們還記得剛開始咱們的流程圖麼,其中有一步是經過flume到kafka,還有一步是到hdfs的;而咱們這邊尚未提到如何存入kafka且同時存如hdfs;

flume是支持數據同步複製,同步複製流程圖以下,取自於flume官網,官網用戶指南地址:http://flume.apache.org/FlumeUserGuide.html


怎麼設置同步複製呢,看下面的配置:

[plain] view plain copy

  1. #2個channel和2個sink的配置文件  這裏咱們能夠設置兩個sink,一個是kafka的,一個是hdfs的;  

  2. a1.sources = r1  

  3. a1.sinks = k1 k2  

  4. a1.channels = c1 c2  

具體配置大夥根據本身的需求去設置,這裏就不具體舉例了

kafka和storm的整合

1.下載kafka-storm0.8插件:https://github.com/wurstmeister/storm-kafka-0.8-plus

2.使用maven package進行編譯,獲得storm-kafka-0.8-plus-0.3.0-SNAPSHOT.jar包  --有轉載的童鞋注意下,這裏的包名以前寫錯了,如今改正確了!很差意思!

3.將該jar包及kafka_2.9.2-0.8.0-beta1.jar、metrics-core-2.2.0.jar、scala-library-2.9.2.jar (這三個jar包在kafka項目中能找到)

備註:若是開發的項目須要其餘jar,記得也要放進stormLib中好比用到了mysql就要添加mysql-connector-java-5.1.22-bin.jarstormlib

那麼接下來咱們把storm也重啓下;

完成以上步驟以後,咱們還有一件事情要作,就是使用kafka-storm0.8插件,寫一個本身的Storm程序;

這裏我給大夥附上一個我弄的storm程序,百度網盤分享地址:http://pan.baidu.com/s/1mgp0LLY

先稍微看下程序的建立Topology代碼


數據操做主要在WordCounter類中,這裏只是使用簡單JDBC進行插入處理


這裏只須要輸入一個參數做爲Topology名稱就能夠了!咱們這裏使用本地模式,因此不輸入參數,直接看流程是否走通;

[plain] view plain copy

  1. storm-0.9.0.1/bin/storm jar storm-start-demo-0.0.1-SNAPSHOT.jar com.storm.topology.MyTopology  

先看下日誌,這裏打印出來了往數據庫裏面插入數據了


而後咱們查看下數據庫;插入成功了!


到這裏咱們的整個整合就完成了!

可是這裏還有一個問題,不知道大夥有沒有發現。這個也是@晨色星空J2EE跟我說的,其實我也應該想到的;

因爲咱們使用storm進行分佈式流式計算,那麼分佈式最須要注意的是數據一致性以及避免髒數據的產生;因此我提供的測試項目只能用於測試,正式開發不能這樣處理;

@晨色星空J2EE給的建議是創建一個zookeeper的分佈式全局鎖,保證數據一致性,避免髒數據錄入!

zookeeper客戶端框架大夥可使用Netflix Curator來完成,因爲這塊我還沒去看,因此只能寫到這裏了!

這裏在一次謝謝羅寶和@晨色星空!

相關文章
相關標籤/搜索