Flume做爲一個日誌收集工具,很是輕量級,基於一個個Flume Agent,可以構建一個很複雜很強大的日誌收集系統,它的靈活性和優點,主要體如今以下幾點:
1)模塊化設計:在其Flume Agent內部能夠定義三種組件:Source、Channel、Sink;
2)組合式設計:能夠在Flume Agent中根據業務須要組合Source、Channel、Sink三種組件,構建相對複雜的日誌流管道;
3)插件式設計:能夠經過配置文件來編排收集日誌管道的流程,減小對Flume代碼的侵入性;
4)可擴展性:咱們能夠根據本身業務的須要來定製實現某些組件(Source、Channel、Sink)
支持集成各類主流系統和框架:像Hadoop、HBase、Hive、Kafka、ElasticSearch、Thrift、Avro等,都可以很好的和Flume集成;
5)高級特性:Failover、Load balancing、Interceptor等。
前端
爲何要對Flume日誌收集系統進行分層設計
基於Flume設計實現分層日誌收集系統,到底有什麼好處呢?咱們能夠先看一下,若是不分層,會帶來哪些問題:
1)若是須要經過Kafka去緩衝上游基於Flume收集而構建的日誌流,對於數據平臺內部服務器產生的數據還好,可是若是日誌數據是跨業務組,甚至是跨部門,那麼就須要將Kafka相關信息暴露給外部,這樣對Kafka的訪問便不是數據平臺內部可控的;
2)若是是外部日誌進入平臺內部HDFS,這樣若是須要對Hadoop系統進行升級或例行維護,這種直連的方式會影響到上游部署Flume的日誌流的始端日誌收集服務;
3)若是數據平臺內部某些系統,如Kafka集羣、HDFS集羣所在節點的機房位置變動,數據遷移,會使得依賴日誌數據的外部系統受到不一樣程度的影響,外部系統須要相關開發或運維人員參與進來;
4)因爲收集日誌的數據源端多是外部一些服務器(多個單個的節點),一些業務集羣(相互協做的多節點組),也多是內部一些提供收集服務的服務節點,這些全部的服務器上部署的Flume Agent都處於一層中,比較難於分組管理;
5)因爲全部數據源端Flume Agent收集的日誌進入數據平臺的時候,沒有一個統一的相似總線的組件,很難由於某些業務擴展而獨立地去升級數據平臺內部的接收層服務節點,可能爲了升級數據平臺內部某個系統或服務而致使影響了其餘的接收層服務節點。
nginx
經過下圖咱們能夠看出,這種單層日誌收集系統設計,存在太多的問題,並且系統或服務越多致使整個日誌收集系統越難以控制:json
上圖中,不管是外部仍是內部,只要部署了Flume Agent的節點,都直接同內部的Kafka集羣和Hadoop集羣相連,因此在數據平臺內部只能儘可能保持Kafka和Hadoop集羣正常穩定運行,也要爲外部日誌收集Flume Agent的數據流量的陡增和異常變化作好防控準備。再者,如需停機維護或者升級某一個集羣,可能都須要通知外部全部Flume Agent所在節點的業務方,作好應對(停機)準備。
接着看,若是咱們基於Flume使用分層的方式來設計日誌收集系統,又有哪些優點,以下圖所示:
後端
上圖中,Flume日誌收集系統採用兩層架構設計:第一層(L1)是日誌收集層,第二層(L2)是數據平臺緩衝層(匯聚層)。經過這種方式,使得日誌收集系統有以下特色:
1)針對數據平臺外部的業務系統,根據須要分析的數據業務類型進行分組,屬於同一種類型的業務日誌,在數據平臺前端增長了一個Flume匯聚層節點組,該組節點隻影響到它對應的L1層的業務數據;
2)若是Hadoop集羣、Kafka須要停機維護或升級,對外部L1層Flume Agent沒有影響,只須要在L2層作好數據的接收與緩衝便可,待維護或升級結束,繼續將L2層緩存的數據導入到數據存儲系統;
3)若是外部某個類型的業務日誌數據節點須要擴容,直接在L1層將數據流指向數據平臺內部與之相對應的L2層Flume Agent節點組便可,可以對外部因業務變化發生的新增日誌收集需求,進行快速地響應和部署;
4)對於數據平臺內部,由於收集日誌的節點很是可控,能夠直接經過L1層Flume Agent使日誌數據流入HDFS或Kafka,固然爲了架構統一和管理,最好也是經過L2層Flume Agent節點組來匯聚/緩衝L1層Flume Agent收集的日誌數據。
緩存
經過上面分析可見,分層無非是爲了使的日誌數據源節點的Flume Agent服務與數據平臺的存儲系統(Kafka/HDFS)進行解耦,同時可以更好地對同類型業務多節點的日誌流進行一個聚合操做,並分離開獨立管理。另外,能夠根據實際業務須要,適當增長Flume系統分層,知足日誌流數據的匯聚須要。
bash
應用總體架構
咱們看一下,Flume日誌收集系統,在咱們這個示例應用中處於一個什麼位置,我簡單畫了一下圖,加了一些有關數據處理和分析的節點/組件,以下圖所示:服務器
這裏,簡單瞭解一下上圖便可,因爲日誌收集在整個應用系統中是很重要的一個環節,因此必須保證日誌收集系統設計的可靠、可用、靈活、穩定,經過上面在日誌收集系統收集日誌以後,數據平臺所作的大量分析處理,來凸顯日誌收集系統的重要性,這裏其餘內容不作過多說明。
架構
Flume分層架構實踐
這裏,咱們主要以實時收集日誌爲例,說明如何構建一個相對複雜的Flume分層日誌收集系統。首先,簡要說明一下日誌收集需求:
1)手機客戶端上報的用戶行爲事件(App User Event),經過數據平臺內部定義好的接口格式,從Nginx日誌裏面實時流入數據平臺,這對應於Flume日誌收集系統L1層;
2)經過組織各類活動,來推廣某些App的產品特性,會定向向用戶推送通知,單獨使用推送點擊(Push Click)Agent來收集這些點擊行爲數據;
3)App所依賴的一些基礎內容,會以服務的形式開放給外部第三方調用,對於由第三方App帶來的用戶的行爲點擊事件(Thirdparty Click),單獨使用L1層Flume Agent進行收集;
4)第三方會在App中根據不一樣的內容,投放廣告(Ad),對於廣告曝光/點擊行爲的數據,與上述提到的數據收集單獨分離出來,由於該日誌數據後期可能會大規模推廣,會有爆發性增加,在L1層進行收集;
5)在L2層主要是匯聚或緩衝L1層流入的日誌數據;
6)同時,爲了防止L2層Flume Agent由於故障或例行停機維護等,因此使用了Flume的Failover特性,亦即L1層每個Sink同時指向L2層的2個相同的Flume Agent;
7)L1層的Flume Agent在收集日誌的過程當中應該不容許在Channel中累積過多數據(可是還要防止數據流速過慢致使內存Channel數據溢出),還要可以儘可能下降讀寫磁盤的開銷,因此使用內存類型的Channel;
8)L2層爲了保證數據可以可靠地緩衝(在容許的一段時間內累積保存數據),如Hadoop或Kafka故障停機或停機維護升級,採用文件類型的Channel,還要儘可能調大容量,也不能由於多應用共享磁盤而形成數據處理延遲,因此對於不一樣的Channel分別使用獨立的磁盤。app
詳細分層設計以下圖所示:
負載均衡
上圖是從實際的整個數據平臺中拿出來一部分,簡單便於解釋說明。有關上圖中所涉及到的Flume Agent的配置詳情,下面會根據Flume分層的結構(L1層、L2層)來詳細配置說明。因爲L1層的10.10.1.101和10.10.1.102節點上部署的Flume Agent是對稱的,因此下面只拿出其中一個來講明配置,不一樣的是,這兩個節點上Flume Agent的Sink使用Failover功能,分別交叉指向L2層Flume Agent,也可以起到必定的負載均衡的做用。
上游Flume日誌收集層
下面,分別針對10.10.1.101節點上的3個Flume Agent的配置內容,分別進行說明以下:
L1層:App用戶行爲事件(App User Event)日誌收集
Flume Agent名稱爲a1,使用Exec Source、Memory Channel、Avro Sink,這裏咱們的Nginx日誌文件始終指向/data/nginx/logs/app_user_events.log,即便日切或小時切文件,使用tail -F就能保證日誌內容都被收集。具體配置內容以下所示:
a1.sources = s1 a1.channels = mc1 a1.sinks = k1 k2 # Configure source a1.sources.s1.channels = mc1 a1.sources.s1.type = exec a1.sources.s1.command = tail -F /data/nginx/logs/app_user_events.log # Configure channel a1.channels.mc1.type = memory a1.channels.mc1.transactionCapacity = 50000 a1.channels.mc1.capacity = 100000 # Configure sinks a1.sinks.k1.channel = mc1 a1.sinks.k1.type = avro a1.sinks.k1.hostname = 10.10.1.122 a1.sinks.k1.port = 44446 a1.sinks.k2.channel = mc1 a1.sinks.k2.type = avro a1.sinks.k2.hostname = 10.10.1.121 a1.sinks.k2.port = 44446 # Configure failover a1.sinkgroups = g1 a1.sinkgroups.g1.sinks = k1 k2 a1.sinkgroups.g1.processor.type = failover a1.sinkgroups.g1.processor.priority.k1 = 9 a1.sinkgroups.g1.processor.priority.k2 = 7 a1.sinkgroups.g1.processor.maxpenalty = 10000
L1層:推送點擊事件(Push Click Event)日誌收集
a2.sources = s2 a2.channels = mc2 a2.sinks = k3 k4 # Configure source a2.sources.s2.channels = mc2 a2.sources.s2.type = exec a2.sources.s2.command = tail -F /data/nginx/logs/push_click_events.log # Configure channel a2.channels.mc2.type = memory a2.channels.mc2.capacity = 50000 a2.channels.mc2.transactionCapacity = 100000 # Configure sinks a2.sinks.k3.channel = mc2 a2.sinks.k3.type = avro a2.sinks.k3.hostname = 10.10.1.121 a2.sinks.k3.port = 44447 a2.sinks.k4.channel = mc2 a2.sinks.k4.type = avro a2.sinks.k4.hostname = 10.10.1.122 a2.sinks.k4.port = 44447 # Configure failover a2.sinkgroups = g2 a2.sinkgroups.g2.sinks = k3 k4 a2.sinkgroups.g2.processor.type = failover a2.sinkgroups.g2.processor.priority.k3 = 9 a2.sinkgroups.g2.processor.priority.k4 = 7 a2.sinkgroups.g2.processor.maxpenalty = 10000
L1層:第三方點擊事件(Thirdparty Click Event)日誌收集
第三方點擊事件經過統一的接口上傳數據,那麼配置起來也比較容易,以下所示:
a3.sources = s3 a3.channels = mc3 a3.sinks = k5 k6 # Configure source a3.sources.s3.channels = mc3 a3.sources.s3.type = exec a3.sources.s3.command = tail -F /data/nginx/logs/thirdparty_click_events.log # Configure channel a3.channels.mc3.type = memory a3.channels.mc3.transactionCapacity = 50000 a3.channels.mc3.capacity = 100000 # Configure sinks a3.sinks.k5.channel = mc3 a3.sinks.k5.type = avro a3.sinks.k5.hostname = 10.10.1.121 a3.sinks.k5.port = 44446 a3.sinks.k6.channel = mc3 a3.sinks.k6.type = avro a3.sinks.k6.hostname = 10.10.1.122 a3.sinks.k6.port = 44446 # Configure failover a3.sinkgroups = g3 a3.sinkgroups.g3.sinks = k5 k6 a3.sinkgroups.g3.processor.type = failover a3.sinkgroups.g3.processor.priority.k5 = 9 a3.sinkgroups.g3.processor.priority.k6 = 7 a3.sinkgroups.g3.processor.maxpenalty = 10000
L1層:廣告點擊事件(Ad Click Event)日誌收集
廣告點擊事件日誌收集配置,以下所示:
a4.sources = s4 a4.channels = mc4 a4.sinks = k7 k8 # Configure source a4.sources.s4.channels = mc4 a4.sources.s4.type = exec a4.sources.s4.command = tail -F /data/nginx/logs/ad.log # Configure channel a4.channels.mc4.type = memory a4.channels.mc4.transactionCapacity = 50000 a4.channels.mc4.capacity = 100000 # Configure sinks a4.sinks.k7.channel = mc4 a4.sinks.k7.type = avro a4.sinks.k7.hostname = 10.10.1.121 a4.sinks.k7.port = 44448 a4.sinks.k8.channel = mc4 a4.sinks.k8.type = avro a4.sinks.k8.hostname = 10.10.1.122 a4.sinks.k8.port = 44448 # Configure failover a4.sinkgroups = g4 a4.sinkgroups.g4.sinks = k7 k8 a4.sinkgroups.g4.processor.type = failover a4.sinkgroups.g4.processor.priority.k7 = 10 a4.sinkgroups.g4.processor.priority.k8 = 8 a4.sinkgroups.g4.processor.maxpenalty = 10000
下游Flume日誌收集匯聚層
L2層:App用戶事件+推送點擊事件日誌合併收集
這種業務需求是:把App用戶事件和推送點擊事件合併寫入文件,最後都會寫入HDFS,從而進一步在Hive中進行離線分析;同時又要使這兩種事件分別獨立地走實時計算的流程,App用戶事件實時計算流程須要實時統計用戶使用App過程當中行爲特徵,而推送點擊事件實時計算須要針對某一次活動來實時分析和展現用戶的參與狀況。
具體配置內容,以下所示:
a1.sources = s1 s2 a1.channels = fc1 fc2 fc3 a1.sinks = kk1 fk2 kk3 # Configure source: # Configure app user event source: s1 -> fc1+fc2 a1.sources.s1.channels = fc1 fc2 a1.sources.s1.type = avro a1.sources.s1.bind = 10.10.1.121 a1.sources.s1.port = 44446 a1.sources.s1.threads = 8 # Configure source # Configure push click event source: s2 -> fc2+fc3 a1.sources.s2.channels = fc2 fc3 a1.sources.s2.type = avro a1.sources.s2.bind = 10.10.1.122 a1.sources.s2.port = 44447 a1.sources.s2.threads = 4 # Configure file channel(/data1) # Configure app user event channel: fc1 ->kk1 a1.channels.fc1.type = file a1.channels.fc1.checkpointDir = /data1/flume/channels/app_user_event/checkpoint a1.channels.fc1.useDualCheckpoints = true a1.channels.fc1.backupCheckpointDir = /data1/flume/channels/app_user_event/backup a1.channels.fc1.dataDirs = /data1/flume/channels/app_user_event/data a1.channels.fc1.transactionCapacity = 100000 a1.channels.fc1.capacity = 500000 a1.channels.fc1.checkpointInterval = 60000 a1.channels.fc1.keep-alive = 5 a1.channels.fc1.maxFileSize = 5368709120 # Configure file channel(/data2) # Configure app user event + push click event: fc2 - > fk2 a1.channels.fc2.type = file a1.channels.fc2.checkpointDir = /data2/flume/channels/offline_file_event/checkpoint a1.channels.fc2.useDualCheckpoints = true a1.channels.fc2.backupCheckpointDir = /data2/flume/channels/offline_file_event/backup a1.channels.fc2.dataDirs = /data2/flume/channels/offline_file_event/data a1.channels.fc2.transactionCapacity = 100000 a1.channels.fc2.capacity = 500000 a1.channels.fc2.checkpointInterval = 60000 a1.channels.fc2.keep-alive = 5 a1.channels.fc2.maxFileSize = 5368709120 # Configure file channel(/data3) # Configure push click channel: fc3 ->kk3 a1.channels.fc3.type = file a1.channels.fc3.checkpointDir = /data3/flume/channels/push_click_event/checkpoint a1.channels.fc3.useDualCheckpoints = true a1.channels.fc3.backupCheckpointDir = /data3/flume/channels/push_click_event/backup a1.channels.fc3.dataDirs = /data3/flume/channels/push_click_event/data a1.channels.fc3.transactionCapacity = 100000 a1.channels.fc3.capacity = 500000 a1.channels.fc3.checkpointInterval = 60000 a1.channels.fc3.keep-alive = 5 a1.channels.fc3.maxFileSize = 5368709120 # Configure sink: RealtimeMessageSink(app user event) a1.sinks.kk1.type = org.shirdrn.flume.sink.RealtimeMessageSink a1.sinks.kk1.channel = fc1 a1.sinks.kk1.metadata.broker.list = kafka01:9092,kafka02:9092,kafka03:9092 a1.sinks.kk1.topic = json_user_event a1.sinks.kk1.serializer.class = kafka.serializer.StringEncoder a1.sinks.kk1.producer.type = async a1.sinks.kk1.message.send.max.retries = 3 a1.sinks.kk1.client.id = flume_app_user_event_2_1 a1.sinks.kk1.event.decoder.count = 8 a1.sinks.kk1.output.stat.event.batch.size = 2000 a1.sinks.kk1.event.decoder.queue.size = 1000 # Configure sink: RichRollingFileSink a1.sinks.fk2.type = org.shirdrn.flume.sink.RichRollingFileSink a1.sinks.fk2.channel = fc2 a1.sinks.fk2.batchSize = 100 a1.sinks.fk2.serializer = TEXT a1.sinks.fk2.sink.rollInterval = 60 a1.sinks.fk2.sink.directory = /data/flume/rolling_files a1.sinks.fk2.sink.file.prefix = event a1.sinks.fk2.sink.file.suffix = .log a1.sinks.fk2.sink.file.pattern = yyyyMMddHHmmss # Configure sink: RealtimeMessageSink(push click) a1.sinks.kk3.type = org.shirdrn.flume.sink.RealtimeMessageSink a1.sinks.kk3.channel = fc3 a1.sinks.kk3.metadata.broker.list = kafka01:9092,kafka02:9092,kafka03:9092 a1.sinks.kk3.topic = json_push_click_event a1.sinks.kk3.serializer.class = kafka.serializer.StringEncoder a1.sinks.kk3.producer.type = async a1.sinks.kk3.message.send.max.retries = 3 a1.sinks.kk3.client.id = flume_push_click_2_1 a1.sinks.kk3.event.decoder.count = 4 a1.sinks.kk3.output.stat.event.batch.size = 2000 a1.sinks.kk3.event.decoder.queue.size = 1000
上面,能夠看到咱們本身實現的org.shirdrn.flume.sink.RealtimeMessageSink,該Sink主要是使Flume收集的日誌寫入Kafka中,在Flume 1.5.0版本中尚未內置實現,因此咱們本身實現了,並在其中加入了適合咱們業務的處理邏輯,好比,將Nginx日誌記錄行解析,而後根據實時計算須要,過濾掉不須要進入Kafka(最終在Storm集羣中處理)事件數據,最後轉成JSON字符串的格式,寫入到Kafka中的Topic裏。經過上面的配置也能夠看出,能夠配置不少參數,例如解析線程數、隊列大小等。
因爲咱們須要將寫入本地文件系統的文件按照咱們本身的方式來定義,因此基於Flume內置的file_roll實現進行修改,實現了本身的org.shirdrn.flume.sink.RichRollingFileSink,該Sink主要是對文件名字符串進行格式化,可以經過文件名來獲取到文件生成的時間(人類可讀格式)。
L2層:廣告點擊事件日誌收集
上面的圖中,L1層能夠根據須要擴展到更多的服務器節點,在L2層根據須要進行匯聚/緩衝,具體配置內容以下所示:
a2.sources = s3 a2.channels = fc4 a2.sinks = kk4 # Configure source: s3 -> fc4 a2.sources.s3.channels = fc4 a2.sources.s3.type = avro a2.sources.s3.bind = 10.10.1.121 a2.sources.s3.port = 44448 a2.sources.s3.threads = 2 # Configure channel(/data4) # Configure Ad channel: fc4 ->kk4 a2.channels.fc4.type = file a2.channels.fc4.checkpointDir = /data4/flume/channels/ad/checkpoint a2.channels.fc4.useDualCheckpoints = true a2.channels.fc4.backupCheckpointDir = /data4/flume/channels/ad/backup a2.channels.fc4.dataDirs = /data4/flume/channels/ad/data a2.channels.fc4.transactionCapacity = 100000 a2.channels.fc4.capacity = 500000 a2.channels.fc4.checkpointInterval = 60000 a2.channels.fc4.keep-alive = 5 a2.channels.fc1.maxFileSize = 5368709120 # Configure sinks: RealtimeAdKafkaSink a2.sinks.kk4.type = org.shirdrn.flume.sink.RealtimeAdKafkaSink a2.sinks.kk4.channel = fc4 a2.sinks.kk4.metadata.broker.list = kafka01:9092,kafka02:9092,kafka03:9092 a2.sinks.kk4.topic = json_ad_event a2.sinks.kk4.serializer.class = kafka.serializer.StringEncoder a2.sinks.kk4.producer.type = async a2.sinks.kk4.message.send.max.retries = 3 a2.sinks.kk4.client.id = flume_ad_2_1 a2.sinks.kk4.event.decoder.count = 4 a2.sinks.kk4.output.stat.event.batch.size = 2500 a2.sinks.kk4.event.decoder.queue.size = 5000
實踐總結
這裏咱們簡單總結一些內容,以下所示:
1)Flume監控
簡單一點的監控,直接在啓動的時候,開啓一個Web端口,經過端口來獲取Flume Agent服務的一些相關數據,命令相似:
bin/flume-ng agent -n a1 -c conf -f conf/config.conf -Dflume.monitoring.type=http -Dflume.monitoring.port=34545
這樣即可以在Flume Agent服務節點上,瀏覽Web端口34545來查看,數據以JSON格式表示,比較重要的一些元數據,如channel容量、當前使用量等等,經過這些數據能夠了解當前Flume的工做狀態,是否須要升級擴容等等。
另外,也能夠經過Ganglia來收集並分析Flume Agent服務運行狀態,可以更加詳細地展現Flume Agent服務的狀態,由於Ganglia配置相對複雜,這裏就不作過多解釋,感興趣能夠嘗試一下。
2)Flume內存調優
由於Flume使用Java實現的,因此就會遇到有關JVM調優的問題,這個也比較容易。默認狀況下,Flume Agent進程的堆內存設置比較小,在日誌數據量比較大的狀況下就須要修改並調試這些參數,以知足業務須要。設置JVM相關參數,能夠修改conf/flume-env.sh文件(也能夠直接在啓動Flume Agent服務時指定JVM選項參數),例如修改JAVA_OPTS變量,示例以下所示:
JAVA_OPTS="-server -Xms1024m -Xmx4096m -Dcom.sun.management.jmxremote -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:ParallelGCThreads=4 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -Xloggc:/data/flume/logs/gc-ad.log"
這樣,能夠方便地修改GC策略,通常因爲Flume實時收集日誌比較注重實時性,但願可以快速地響應,儘可能減小GC致使暫停業務線程被掛起的時間,因此能夠將GC設置爲ParNew+CMS策略。將GC日誌輸出,在必定程度上可以更加方便地觀察Flume Agent服務運行過程當中JVM GC的詳細狀況,經過診斷來優化服務運行。
3)下游L2層接收消息調優
一般,在開始部署Flume日誌收集系統時,上游L1層服務節點比較少,在L2層匯聚時使用默認的配置可能效果也會不錯,可是若是L1層Flume Agent愈來愈多,就能看到L2層處理速度慢下來。L2層的Flume Agent服務通常會遠遠小於L1層Flume Agent服務數,這種狀況下,若是L2層Flume Agent服務使用Avro Source,能夠調大Avro接收線程數,示例以下:
a1.sources.s1.type = avro a1.sources.s1.bind = 10.10.1.121 a1.sources.s1.port = 44446 a1.sources.s1.threads = 8
上面默認狀況下threads參數的值1,能夠將該值調大,不然的話,L1層就會堆積日誌記錄,嚴重可能致使數據丟失。
4)Flume處理業務邏輯約束Flume的易擴展性使得咱們能夠根據本身的業務特色來實現一些組件,那麼咱們在將實際業務邏輯摻雜進Flume中時,須要考慮是否非得必須這麼作?若是這麼作是否會影響Flume實時傳輸日誌的速度和效率?Flume做爲一個輕量級的日誌收集工具,我的認爲最好將相對複雜的業務邏輯(尤爲是須要與一些存儲系統,如MySQL、Redis交互時)後移,放在Storm集羣中去處理,或者本身實現的業務處理集羣中,而Flume就讓它去作其擅長的事情——路由消息。固然,有些業務場景可能必須在Flume日誌收集層去作,如根據原始非結構化的消息,沒法控制不一樣類型的消息路由到不一樣的目的地,那麼可能須要在收集層作一個簡單的解析或格式化,實際上這是在Flume層作了一個簡單的日誌分發。不管如何,若是想在Flume層插入業務邏輯處理,儘可能避免過於複雜的處理而影響整個日誌傳輸速度,若是後端有實時推薦需求,日誌中事件的實時性大大延遲,就會影響實施個性化推薦。