運營商手機視頻流量包業務日誌ETL及統計分析

本身作過的項目在這裏作一個記錄,不然就感受不是本身的了.一是由於過去時間已經很長了,二是由於當時作得有點粗糙,最後還不了了之了.html

話很少說,先大體介紹一下項目背景.之前各大手機視頻 App 通常都有運營商的流量包套餐.當用戶產生這樣的業務行爲時,運營商便獲取了一系列的用戶行爲日誌.mysql

這條日誌是一條獲取視頻用戶手機號碼的日誌.日誌的類型不少,當時作的主要工做是對這個類型的日誌作一系列的抽取,清洗,過濾,轉換及轉存工做.最後,對實時的日誌以10分鐘爲一個時間窗口作簡要的統計分析.linux

 

要認識這樣的項目,最重要的是要釐清整個數據的流向.web

首先,原始業務日誌不管是實時的仍是歷史的都保存在服務器日誌目錄內的 log 文件中,咱們使用 Apache Flume 這個工具將原始數據抽取出來.建立一個 Flume Agent ,配置 tail source ,memory channel 以及 kafka sink ,將原始數據先導入咱們的大數據集羣環境中.把數據首先弄到大數據環境中,這是全部大數據數據處理的開端.通常來講,kafka 集羣是大數據環境的門面,全部的集羣外部數據通常都是先導入到kafka中.sql

這個過程當中有一個問題,運營商的服務器在運營商的機房內網,跟咱們的 hdp 集羣環境網絡不是互通的.因此咱們用了一臺機器作網絡中轉,這臺機器分別跟 服務器 及咱們的 hdp 集羣環境網絡相通.apache

因此總共配置了3個Flume 的 Agent , 第一個爲服務器採集端的 agent:  tail log文件 source , memory channel , avro sink 到中起色器編程

第二個爲中起色器的agent : avro source ,memory channel,avro sink 到集羣環境中的一臺機器緩存

第三個爲集羣環境的某臺機器的agent : avro source, memory channel ,kafka sink 到 kafka集羣的brokers.服務器

這個流程調試了一段時間,比較糾結,整個流程運行了一段時間後會偶爾掛掉,而後整個數據的傳輸就斷了.網絡

第一個緣由是我本身的緣由,對 linux 系統還不是很熟練.我ssh到hdp集羣上的機器上,run 第三個 flume agent 的時候沒有意識到要讓進程在後臺可靠的運行.這樣,當ssh斷了的時候,這個agent的運行也會跟着中止.第三個agent掛掉了,前面的兩個agent會因爲memory channel滿了也依次掛掉.因此爲了不ssh的問題,須要在run agent命令的時候,要注意使用 nohup 及 & 標識 .這樣 ssh 斷掉以後,agent也能繼續運行.

第二個緣由是溝通的問題.採集端 agent 和中轉端 agent 是由業務方的開發人員管理的.我負責管理hdp集羣中的這個agent.整個流程3個agent , 3個source 和 3個sink.中間某一段不通暢就會致使agent的掛掉,掛掉某個agent的話,整個傳輸流程就斷了.當時出現問題的時候,屢次溝通都沒有溝通清楚,反反覆覆作了不少無用功,其實緣由就擺在那裏.主要是網絡的問題,由於第一個agent是運營商的專用網,而且仍是北京異地的機房,它到中轉服務器的sink網路不穩定,致使sink沒有發出去.sink中止了的話,memory channel容量逐漸變滿,致使source 無法正常工做,最終這個agent掛掉了.

最後的表現是每一個agent都會拋出一個錯誤,中止運行.因爲溝通的問題,一直沒有找到引起錯誤的源頭agent.

還有一個潛在的性能問題.中轉的 agent 沒有作 load balance .因爲採集端的數據量很是大,並且後期針對不一樣的日誌類型,開了好幾個不一樣的agent.數據一會兒所有集中到了中起色器的這個agent上,它的memory channel 是很容易跑滿的.

不過這個項目的實際狀況取決於網絡情況,整個流程中,第一個agent的sink 到第二個agent的source是主要瓶頸(取決於網絡情況),當這個瓶頸的問題解決了以後,第二個agent的性能就成了主要瓶頸.這個時候就須要構建flume sink的負載均衡了(詳見官網:http://flume.apache.org/FlumeUserGuide.html#load-balancing-sink-processor).

 

如今日誌數據已經成功弄到hdp集羣環境中了,能夠開始下一步了.不過,在正式開始下一步以前,還要提一點,那就是kafka. 上面最終是將日誌數據導入到kafka的某個topic中,.對於kafka的topic,要作的是設定好topic的partiions 和 replication-factor. 這個項目用的數值分別是6和2 ,不過這是拍腦殼肯定,沒有細緻研究.主要緣由是因爲 kafka 的性能實在是太優越了,kafka這裏的數據流程基本上不可能成爲瓶頸.(kafka很強大,之後也會愈來愈重要,值得細緻研究)

 

當數據採集到 kafka 集羣,也即hdp集羣環境中以後,就可使用大數據的集羣環境及相關技術生態對數據進行進一步的處理了. 因爲處理數據的起點是kafka,因此所有采用的是流式處理的方式,也即 data pipline,從 kafka 中讀取記錄,進行逐條的處理.  這裏,就要跟你們介紹一個很是很是優秀的開源數據處理及分發系統 ,Apache Nifi .由於後續的大部分工做是使用nifi完成的.

Apache Nifi 是由美國NSA 捐給Apache基金會,最先是NSA內部使用的一個數據處理工具.nifi可以保證數據的可靠性,nifi也可以保證數據的即時性,nifi逐條處理數據的速度很是快.最厲害的是,niifi提供了一個很是簡單直觀的基於web的用戶使用界面.nifi官網上的一句話準確概況了nifi的特色: Apache NiFi supports powerful and scalable directed graphs of data routing, transformation, and system mediation logic . 這個system mediation logic 說得很是到位,可以使用nifi畫出很是複雜的數據流動邏輯.沒錯, 是畫出來的. 

下面這張圖是nifi用戶界面裏面畫出來的, 它不只僅是流程圖, 實際上它就是整個處理過程.右鍵點擊start,日誌數據的etl就開始了,很是強大.

 

上面每個方塊表明對記錄的處理過程,每個箭頭表明記錄的流向.日誌記錄就是一條一條的在這個有向無環圖裏面流動,從起點開始,通過一個個的處理器通過處理,一直流到目的地. 看起來這個圖看起來比較複雜,但實際上作的事情,無外乎就是對日誌數據的抽取,過濾,路由,轉換,更新,分發這幾個事.

圖畫得這麼多,主要是因爲業務的須要.

其一,日誌文件的格式不統一,即便是同類型的 log 文件.格式也會有細微的差別.這也是不可能統一的.由於日誌文件的時間跨度大,由不一樣的開發人員,按照不一樣的業務人員需求和當時的約定和口徑來開發的.有時候,一樣的字段,會有不一樣格式的值,甚至還要基於記錄某個字段的值去作判斷.客觀上,日誌文件的格式是不統一的,甚至有時候不規範.這個時候,就須要很是靈活的處理流程了.nifi的易用性及擴展性就體如今這裏了,針對不一樣的日誌文件格式,只須要更改這個流程圖就行.加幾個處理器,連幾個箭頭.所有都是在 nifi 的這個界面上經過拖拽完成的.

其二,業務需求的變更性.對於用戶行爲的日誌文件,對於同一份日誌記錄,同時會有各類不一樣的業務需求.要作即席查詢,要作實時聚合,要作批量查詢.要備份表記錄.還要觸發特定業務處理.因此整個流程圖確定會隨着業務需求的增多愈來愈複雜.可以讓流程圖隨着業務需求的變更而跟着變更自己就是一個很是厲害的事了,不管來了怎樣的業務需求,我只須要連箭頭,增長新的處理流程,也就是 data flow 就能夠了.

 

接下來,詳細看數據的流向.起點是從 kafka 集羣中讀取日誌數據,批量讀進來若干條日誌記錄以後,使用nifi的處理器逐條對日誌記錄進行處理.讓記錄在這個圖中流動起來.

首先作驗證,而後過濾格式錯誤記錄,而後路由不一樣的日誌類型. nifi能作到這些的關鍵在於它的 flowfile 這個概念. 每一條數據記錄進入到nifi中就叫flowfile. 每個flowfile 由兩部分組成,一個是content, 文件內容. 一個是 attributes ,文件屬性.  在 nifi 中, 咱們能夠對文件屬性進行增刪改查等操做,甚至咱們可使用 nifi 提供的DSL,特定領域語言 對 attributes 進行編程. 這樣的設定使得能夠對數據記錄進行任何想要的邏輯處理. 因此,通常是先把日誌記錄的內容轉換到 flow file 的屬性值當中去,而後進行後續的不一樣處理.  以下圖:

 

接着繼續判斷,對於經過網絡獲取手機號碼成功的日誌,將原始的日誌記錄保存到 hbase 中,以後供業務方作即席查詢. 以下圖:

 

 對於獲取手機號碼失敗的日誌,手動去查用戶的地址和所屬運營商信息.這裏是強業務相關的,由於屬於其餘運營商,因此是獲取不到號碼的.這裏的處理真正體現了nifi的強大和靈活.

由於對於失敗的日誌,其實是缺乏必備字段的.缺乏了字段, 這在日誌文件批量處理中是多麼坑爹的事情.然而使用 nifi 卻能很輕鬆的解決這個問題.直接拿這條記錄的用戶ip 去調 一個內部的服務化查詢接口,把字段查出來.並把值賦給flow file 相應的attribute. 把這樣的日誌記錄變成正常的日誌記錄後,再匯入處處理的主流程當中,接着流動下去.

 

 

 

 接下的處理流程主要就是分發及轉存了.對於同一條日誌記錄,一份數據要提取字段,將之轉成 hive表結構對應的 csv 格式 ,保存到 hdfs 中, 也就是將全部處理好的數據落地到hdp集羣環境中去.這是數據清洗後一大終點,也是結果.以後能夠拿來直接作其餘處理了,好比作批量查詢,供其餘工具使用(好比 kylin 等),也能夠用來作模型訓練.由於這裏就算是乾淨的數據了.

 

 

另外一份數據(假定歷史數據都已經處理完成,如今都是實時的數據)要提取業務須要的字段,導入到kafka的topic 中. 供業務須要的即時統計分析使用.

 

 

至此,整個 nifi 的數據處理流程都走完了. 歸功了 nifi 的強大 ,數據從起點到終點,雖然處理流程多,但流向很是清晰.用 nifi 拖拽幾下,一套特定業務日誌的處理系統就完成了. 右鍵點擊 start ,系統就跑起來,你能夠在界面看到數據的流動,能夠監控,能夠暫停,能夠調試某段,能夠查看中間結果.這些都是能夠在界面上完成的.  用很簡單的使用方式,去作很複雜的事情,最牛逼的工具莫過於此了.固然, nifi 也有不少高級的用法. 甚至能夠 搭一個 nifi 的集羣 ,來處理更加海量的數據,這裏就不細說了.

固然,nifi中一個很長的數據處理流程是須要花時間觀察,調試及驗證的.從一個處理器到另外一個處理器是否通暢.數據是否阻塞在了流程圖中的某一段等問題.須要調試單個處理器的併發量及run schedule等信息.以及處理器之間的緩存隊列容量和大小等信息.這些須要耐心的調試,就像一個流動的人羣同樣,慢慢疏導.

 

最後,附上實時數據統計的代碼,這是這個項目寫的惟一代碼了,使用的是 spark 技術棧 ,spark streaming 和 spark sql . 關鍵是這個group by 函數.把聚合後的數據保存mysql 裏面,供業務應用查詢.這裏後來把時間窗口改成了一小時,由於當時聚合後的數據量也還蠻大,2天存了35萬條到mysql裏面,這樣的量放mysql 裏面不太合適.

可能一開始的方案就不正確,後來也沒繼續跟進了.

 

以上,就是這個項目的一個完整記錄,作得有點粗糙,最後也沒有繼續跟進.不少細節沒有考慮到 ,好比因爲網絡的問題,在整個流程中,是否會有數據的丟失?當因爲某種緣由致使流程中斷,是否會有數據重複.最後的結果,怎樣去校驗數據的完整性.這些問題理論上是聚焦於各個組件,好比sqoop, kafka ,nifi的機制上的,但實際跑起來以後,會有什麼樣的問題.當時並無過細的研究.

因此,若是本文中,有什麼不當的,須要補充以及錯誤的地方,歡迎指正,共同窗習,一塊兒進步.歡迎一塊兒探討關於nifi的使用,最近在碼的是一個基於nifi的數據交換平臺。

相關文章
相關標籤/搜索