實時海量日誌分析系統的架構設計、實現以及思考

1 序

對ETL系統中數據轉換和存儲操做的相關日誌進行記錄以及實時分析有助於咱們更好的觀察和監控ETL系統的相關指標(如單位時間某些操做的處理時間),發現系統中出現的缺陷和性能瓶頸。html

因爲須要對日誌進行實時分析,因此Storm是咱們想到的首個框架。Storm是一個分佈式實時計算系統,它能夠很好的處理流式數據。利用storm咱們幾乎能夠直接實現一個日誌分析系統,可是將日誌分析系統進行模塊化設計能夠收到更好的效果。模塊化的設計至少有兩方面的優勢:java

  1. 模塊化設計可使功能更加清晰。整個日誌分析系統能夠分爲「數據採集-數據緩衝-數據處理-數據存儲」四個步驟。Apache項目下的flumeng框架能夠很好的從多源目標收集數據,因此咱們用它來從ETL系統中收集日誌信息;因爲採集數據與處理數據的速度可能會出現不一致,因此咱們須要一個消息中間件來做爲緩衝,kafka是一個極好的選擇;而後對流式數據的處理,咱們將選擇大名鼎鼎的storm了,同時爲了更好的對數據進行處理,咱們把drools與storm進行了整合,分離出了數據處理規則,這樣更有利於管理規則;最後,咱們選擇redis做爲咱們處理數據的存儲工具,redis是一個內存數據庫,能夠基於健值進行快速的存取。
  2. 模塊化設計以後,storm和前兩個步驟之間就得到了很好的解耦,storm集羣若是出現問題,數據採集以及數據緩衝的操做還能夠繼續運行,數據不會丟失。

2 相關框架的介紹和安裝

2.1 flumeng

2.1.1 原理介紹

Flume是一個高可用、高可靠、分佈式的海量日誌採集、聚合和傳輸系統。Flume支持在日誌系統中定製日誌發送方,用於收集數據;同時,Flume提供對數據進行簡單處理,並寫到各類數據接收方的能力。它擁有一個簡單的、可擴展的流式數據流架構,以下圖所示:mysql

flume

日誌收集系統就是由一個或者多個agent(代理)組成,每一個agent由source、channel、sink三部分組成,source是數據的來源,channel是數據進行傳輸的通道,sink用於將數據傳輸到指定的地方。咱們能夠把agent看作一段水管,source是水管的入口,sink是水管的出口,數據流就是水流。 Agent本質上是一個jvm進程,agent各個組件之間是經過event來進行觸發和協調的。linux

2.1.2 flumeng的安裝

  1. 從官方網站下載apache-flume-1.4.0-bin.tar.gz壓縮包
  2. 解壓縮,並在conf目錄下面新建一個文件flume-conf.properties,內容以下:git

  3. 啓動代理。flume-ng agent –n a1 –f flume-conf.propertiesgithub

2.2 kafka

2.2.1 原理介紹

Kafka是linkedin用於日誌處理的分佈式消息隊列。Kafka的架構以下圖所示:web

kafka架構

Kafka的存儲策略有一下幾點:redis

  1. kafka以topic來進行消息管理,每一個topic包括多個partition,每一個partition包括一個邏輯log,由多個segment組成。
  2. 每一個segment中存儲多條消息,消息id由其邏輯位置決定,即從消息id可直接定位到消息的存儲位置,避免id到位置的額外映射。
  3. 每一個partition在內存中對應一個index,記錄每一個segment中的第一條消息的偏移。
  4. 發佈者發到某個topic的消息會被均勻的分佈到多個partition上(隨機或根據用戶指定的回調函數進行分佈),broker收到發佈消息往對應partition的最後一個segment上添加該消息,當某個segment上的消息條數達到配置值或消息發佈時間超過閾值時,segment上的消息會被flush到磁盤,只有flush到磁盤上的消息訂閱者才能訂閱到,segment達到必定的大小後將不會再往該segment寫數據,broker會建立新的segment。

2.2.2 kafka集羣的搭建

Kafka集羣的搭建須要依賴zookeeper來進行負載均衡,因此咱們須要在安裝kafka以前搭建zookeeper集羣。算法

  1. zookeeper集羣的搭建,本系統用到了兩臺機器。具體搭建過程見http://blog.csdn.net/itleochen/article/details/17453881
  2. 分別下載kafka_2.9.2-0.8.1的安裝包到兩臺機器,並解壓該安裝包。
  3. 打開conf/server.properties文件,修改配置項broker.id、zookeeper.connect、partitions以及host.name爲相應的值。
  4. 分別啓動kafka即完成了集羣的搭建。

2.3 storm

2.3.1 原理介紹

Storm是一個分佈式的、高容錯的實時計算系統。Storm對於實時計算的的意義至關於Hadoop對於批處理的意義。hadoop爲咱們提供了Map和Reduce原語,使咱們對數據進行批處理變的很是的簡單和優美。一樣,Storm也對數據的實時計算提供了簡單Spout和Bolt原語。sql

Strom集羣裏面有兩種節點,控制節點和工做節點,控制節點上面運行一個nimbus(相似於hadoop中的JobTracker)後臺程序,Nimbus負責在集羣裏面分佈代碼,分配工做給機器, 而且監控狀態。每個工做節點上面運行一個叫作Supervisor(相似Hadoop中的TaskTracker)的節點。Supervisor會監聽分配給它那臺機器的工做,根據須要啓動/關閉工做進程。每個工做進程執行一個Topology(相似hadoop中的Job)的一個子集;一個運行的Topology由運行在不少機器上的不少工做進程 Worker(相似Hadoop中的Child)組成。結構以下圖所示:

storm架構

Stream是storm裏面的關鍵抽象。一個stream是一個沒有邊界的tuple序列。storm提供一些原語來分佈式地、可靠地把一個stream傳輸進一個新的stream。好比: 你能夠把一個tweets流傳輸到熱門話題的流。

storm提供的最基本的處理stream的原語是spout和bolt。你能夠實現Spout和Bolt對應的接口以處理你的應用的邏輯。

Spout是流的源頭。好比一個spout可能從Kestrel隊列裏面讀取消息而且把這些消息發射成一個流。一般Spout會從外部數據源(隊列、數據庫等)讀取數據,而後封裝成Tuple形式,以後發送到Stream中。Spout是一個主動的角色,在接口內部有個nextTuple函數,Storm框架會不停的調用該函數。

Bolt能夠接收任意多個輸入stream。Bolt處理輸入的Stream,併產生新的輸出Stream。Bolt能夠執行過濾、函數操做、Join、操做數據庫等任何操做。Bolt是一個被動的角色,其接口中有一個execute(Tuple input)方法,在接收到消息以後會調用此函數,用戶能夠在此方法中執行本身的處理邏輯。

spout和bolt所組成一個網絡會被打包成topology, topology是storm裏面最高一級的抽象(相似 Job), 你能夠把topology提交給storm的集羣來運行。Topology的結構以下圖所示:

topology

2.3.2 storm集羣的搭建

Storm集羣的搭建也要依賴於zookeeper,本系統中storm與kafka共用一樣一個zookeeper集羣。

  1. 下載安裝包storm-0.9.0.1.tar.gz,並對該包進行解壓。
  2. 配置nimbus。 修改storm的conf/storm.yaml文件以下:

    注意:在每一個配置項前面必須留有空格,不然會沒法識別。storm.messaging.* 部分是Netty的配置。若是沒有該部分。那麼Storm默認仍是使用ZeroMQ。

  3. 配置supervisor 修改storm的conf/storm.yaml文件以下:

注意

  1. nimbus.host是nimbus的IP或hostname
  2. supervisor.slots.ports 是配置slot的ip地址。配了幾個地址,就有幾個slot,即幾個worker。若是嘗試提交的topology所聲明的worker數超過當前可用的slot,該topology提交會失敗。
  3. storm.messaging 部分是Netty的配置。

2.4 drools

Drools是一個基於Java的、開源的規則引擎,能夠將複雜多變的規則從硬編碼中解放出來,以規則腳本的形式存放在文件中,使得規則的變動不須要修正代碼重啓機器就能夠當即在線上環境生效。 日誌分析系統中,drools的做用是利用不一樣的規則對日誌信息進行處理,以得到咱們想要的數據。可是,Drools自己不是一個分佈式框架,因此規則引擎對log的處理沒法作到分佈式。咱們的策略是將drools整合到storm的bolt中去,這就就解決了drools沒法分佈式的問題。這是由於bolt能夠做爲task分發給多個worker來處理,這樣drools中的規則也天然被多個worker處理了。

2.5 redis

Redis是key-value存儲系統,它支持較爲豐富的數據結構,有String,list,set,hash以及zset。與memcached同樣,爲了保證效率,數據都是緩存在內存中。區別的是redis會週期性的把更新的數據寫入磁盤或者把修改操做寫入追加的記錄文件,而且在此基礎上實現了master-slave(主從)同步。 Redis是內存數據庫,因此有很是快速的存取效率。日誌分析系統數據量並非特別大,可是對存取的速度要求較高,因此選擇redis有很大的優點。

3 各個框架的整合

3.1 ETL系統整合flumeng

Flume如何收集ETL系統中的日誌是我須要考慮的第一個問題。log4j2提供了專門的Appender-FlumeAppender用於將log信息發送到flume系統,並不須要咱們來實現。咱們在log4j2的配置文件中配置了ETL系統將log信息發送到的目的地,即avro服務器端。該服務器端咱們在flume的配置文件中進行了配置。配置信息以下所示:

3.2 flumeng與kafka的整合

咱們從ETL系統中得到了日誌信息,將該信息不做任何處理傳遞到sink端,sink端發送數據到kafka。這個發送過程須要咱們編寫代碼來實現,咱們的實現代碼爲KafkaSink類。主要代碼以下所示:

該類中,咱們讀取了一些配置信息,這些配置信息咱們在flumeng的flume-conf.properties文件中進行了定義,定義內容以下:

將上面的KafkaSink類打包成flumeng-kafka.jar,並將該jar包以及kafka_2.9.2-0.8.1.jar、metrics-annotation-2.2.0.jar、metrics-core-2.2.0.jar、Scala-compiler.jar、scala-library.jar、zkclient-0.3.jar放到flume的lib目錄下,啓動flume,咱們就能夠將ETL系統中產生的日誌信息發送到kafka中的fks1這個topic中去了。

3.3 kafka與storm的整合

Storm中的spout如何主動消費kafka中的消息須要咱們編寫代碼來實現,httpsgithub.comwurstmeisterstorm-kafka-0.8-plus實現了一個kafka與storm整合的插件,下載該插件,將插件中的jar包以及metrics-core-2.2.0.jar、scala-compiler2.9.2.jar放到storm的lib目錄下。利用插件中的StormSpout類,咱們就能夠消費kafka中的消息了。主要代碼以下所示:

3.4

storm中bolt與drools的整合 Drools能夠將storm中處理數據的規則提取到一個drl文件中,該文件就成了惟一處理規則的文件。任什麼時候候規則出現變化,咱們只須要修改該drl文件,而不會改變其它的代碼。Bolt與drools的整合代碼以下所示:

相關文章
相關標籤/搜索