前言:老劉不敢保證說的有多好,但絕對是很是良心地講述自學大數據開發路上的一些經歷和感悟,保證會講述一些不一樣於別人技術博客的細節。node
老劉如今想寫點有本身特點的東西,講講自學大數據遇到的一些事情,保證講一些別人技術博客裏忽略的知識點。面試
不少自學編程的人都會有一個問題,特別是研二即將找工做的小夥伴,由於立刻就要找工做了,自學時間很少了,因此在自學的路上,經常會忽略不少細小但很重要的知識點,不少夥伴都是直接背一些機構的資料。編程
本身沒有靜下心來好好研究各個知識點,也沒有考慮這些機構寫的知識點的對錯,徹底照搬資料上的知識點,沒有造成本身的理解,這是很是危險的!緩存
從今天開始,老劉就來給你們講講自學大數據開發路上的那些容易被人忽略的細節,讓你們對知識點造成本身的理解。服務器
在解釋什麼是flume這類知識點上,不少機構的資料或者網上的技術博客講的都很差,不少培訓機構的資料是這樣形容的「Flume是一個高可用,高可靠,分佈式的海量日誌採集、聚合和傳輸的系統」。
通常都會以爲這句話沒啥問題,可是好好想一想,這句話是否是至關於這個場景,男女相親,男方說我有車有房,可是沒有說是什麼車,什麼房,自行車也是車,租的房也是房啊!因此在說有車有房的時候,必定要拿出確鑿的證據。網絡
因此呢,當面試的時候,直接說flume是一個高可用,高可靠,分佈式的海量日誌採集、聚合和傳輸的系統是很是沒有說服力的,很是典型的照搬資料,沒有本身的理解。架構
它是如何作到高可用,高可靠,分佈式也須要講一講,這樣才以爲可靠!這就是老劉說的和別人不同的地方,真的良心分享!框架
老劉以爲能夠這樣說在一個完整的離線大數據處理系統中,除了hdfs+mapreduce+hive組成分析系統的核心以外,還須要數據採集、結果數據導出、任務調度等不可或缺的輔助系統,而這些輔助工具在hadoop生態體系中都有便捷的開源框架。分佈式
其中,flume就是一個日誌採集、聚合和傳輸系統的開源框架,它的高可用、高可靠、分佈式這些特色,通常都是經過部署多個服務器,而後在每一個服務器上部署flume agent模式造成的,而且flume經過事務機制保證了數據傳輸的完整性和準確性,flume事務在後面講。工具
flume的概念就講這麼多,這樣說的目的主要是不想讓你們照搬機構資料的內容,本身多想一想,要有本身的理解。
看到這個架構圖,老劉直接先說說flume是怎麼工做的?
外部數據源以特定格式向flume發送events事件,當source接收到events時,它將其存儲到一個或多個channel,channel會一直保存events直到它被sink消費。sink的主要功能是從channel中讀取events,並將其存入外部存儲系統或轉發到下一個source,成功後再從channe移除events。
接着講講各個組件agent、source、channe、sink。
agent
它是一個JVM進程,它以事件的形式將數據從源頭送至目的。
source
它是一個採集組件,用來獲取數據。
channel
它是一個傳輸通道組件,用來緩存數據,用於從將source的數據傳遞到sink。
sink它
是一個下沉組件,它將數據發送給最終的存儲系統或者下一個agent。
flume事務是很是很是重要的,以前就說過經過flume事務,實現了傳輸數據的完整性和準確性。
先看看這張圖:
flume它有兩個事務,分別是put事務、take事務。
put事務的步驟分爲兩步:
doput,它先將此數據寫入到臨時緩衝區putlist裏面;
docommit,它會去檢查channel裏面有沒有空位置,若是有空位置就會傳入數據;若是channel裏面沒有空位置,那就會把數據回滾到putlist裏面。
take事務也分爲兩步:
dotake,它會將數據讀取到臨時緩衝區takelist,並把數據傳到hdfs上;
docommit,它會去判斷數據是否上傳成功,若成功那麼就會清除臨時緩衝區takelist裏的數據;若不成功,好比hdfs發生崩潰啥的,那就會回滾數據到channel裏面。
經過講述兩個事務的步驟,是否是就知道了爲何flume會保證傳輸數據的完整和準確。
老劉總結一下就是,數據在傳輸到下一個節點時,假設接收節點出現異常,好比網絡異常之類的,那就會回滾這一批數據,所以就會致使數據重發。
那在同一個節點內,source寫入數據到channel,數據在一個批次內出現異常,那就會不寫入到channel中,已經接收到的部分數據會被直接拋棄,靠上一個節點重發數據。
經過這兩個事務,flume就提升了數據傳輸的完整性、準確性。
這部分是flume最重要的,做爲一個日誌採集框架,flume的應用比它的概念還要重要,必定要知道flume要怎麼用!老劉最開始壓根就沒看這部分,光看知識點了,如今才發現實戰的重要性!
可是flume實戰案例數不勝數,咱們難道要記住每個案例嗎?
固然不是,這個flume案例咱們能夠根據官網裏的配置文件進行配置,以下圖:
看左下角藍色方框裏的內容,就能夠查詢到相關配置文件。在這裏老劉有句話說,若是想學習一個新的框架,我們的學習資料就是官網,經過官網學習,不只能提高技術,還能提升英語,且不美滋滋!
如今開始講案例,第一個是採集文件到HDFS,需求就是監控一個文件若是有新增的內容就把數據採集到HDFS上。
根據官網資料,flume配置文件開發須要在flume安裝目錄下建立一個文件夾,後期存放flume開發的配置文件。
根據需求的描述,source的配置應該選擇爲exec;爲了保證數據不丟失,channel的配置應該選擇file;sink的配置應該選擇爲hdfs。
這樣雖然已知足需求,可是咱們作數據開發,確定會存在很是多的小文件,必定要作相關的優化。例如,文件小,文件多怎麼解決?文件目錄多怎麼解決?
因此咱們還要選擇一些參數來控制參數和目錄。
# Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 #配置source #指定source的類型爲exec,經過unix命令來傳輸結果數據 a1.sources.r1.type = exec #監控這個文件,有新的數據產生就不斷採集 a1.sources.r1.command = tail -F /opt/bigdata/flumeData/tail.log #指定source的數據流入到channel中 a1.sources.r1.channels = c1 #配置channel #選擇file,就是保證數據不丟失,即便出現火災或者洪災 a1.channels.c1.type = file #設置檢查點目錄--該目錄是記錄下event在數據目錄下的位置 a1.channels.c1.checkpointDir=/kkb/data/flume_checkpoint #數據存儲所在的目錄 a1.channels.c1.dataDirs=/kkb/data/flume_data #配置sink a1.sinks.k1.channel = c1 #指定sink類型爲hdfs a1.sinks.k1.type = hdfs #指定數據收集到hdfs目錄 a1.sinks.k1.hdfs.path = hdfs://node01:9000/tailFile/%Y-%m-%d/%H%M #指定生成文件名的前綴 a1.sinks.k1.hdfs.filePrefix = events- #是否啓用時間上的」捨棄」 -->控制目錄 a1.sinks.k1.hdfs.round = true #時間上進行「捨棄」的值 # 如 12:10 -- 12:19 => 12:10 # 如 12:20 -- 12:29 => 12:20 a1.sinks.k1.hdfs.roundValue = 10 #時間上進行「捨棄」的單位 a1.sinks.k1.hdfs.roundUnit = minute # 控制文件個數 #60s或者50字節或者10條數據,誰先知足,就開始滾動生成新文件 a1.sinks.k1.hdfs.rollInterval = 60 a1.sinks.k1.hdfs.rollSize = 50 a1.sinks.k1.hdfs.rollCount = 10 #每一個批次寫入的數據量 a1.sinks.k1.hdfs.batchSize = 100 #開始本地時間戳--開啓後就可使用%Y-%m-%d去解析時間 a1.sinks.k1.hdfs.useLocalTimeStamp = true #生成的文件類型,默認是Sequencefile,可用DataStream,則爲普通文本 a1.sinks.k1.hdfs.fileType = DataStream
第二個是採集目錄到HDFS,若是一個目錄中不斷產生新的文件,就須要把目錄中的文件不斷地進行數據傳輸到HDFS上。
採集目錄的話,source的配置通常採用spooldir;channel的配置能夠設置爲file,也能夠設置爲別的,通常爲memory;sink的配置仍是設置爲hdfs。
# Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # 配置source ##注意:不能往監控目中重複丟同名文件 a1.sources.r1.type = spooldir a1.sources.r1.spoolDir = /opt/bigdata/flumeData/files # 是否將文件的絕對路徑添加到header a1.sources.r1.fileHeader = true a1.sources.r1.channels = c1 #配置channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 #配置sink a1.sinks.k1.type = hdfs a1.sinks.k1.channel = c1 a1.sinks.k1.hdfs.path = hdfs://node01:9000/spooldir/%Y-%m-%d/%H%M a1.sinks.k1.hdfs.filePrefix = events- a1.sinks.k1.hdfs.round = true a1.sinks.k1.hdfs.roundValue = 10 a1.sinks.k1.hdfs.roundUnit = minute a1.sinks.k1.hdfs.rollInterval = 60 a1.sinks.k1.hdfs.rollSize = 50 a1.sinks.k1.hdfs.rollCount = 10 a1.sinks.k1.hdfs.batchSize = 100 a1.sinks.k1.hdfs.useLocalTimeStamp = true #生成的文件類型,默認是Sequencefile,可用DataStream,則爲普通文本 a1.sinks.k1.hdfs.fileType = DataStream
最後再說一個兩個agent串聯,就是第一個agent負責監控某個目錄中新增的文件進行數據收集,經過網絡發送到第二個agent當中去,第二個agent負責接收第一個agent發送的數據,並將數據保存到hdfs上面去。
雖然是兩個agent串聯,但只要看了官網的配置文件以及通過以前的兩個案例,這兩個agent串聯難度也是很通常的。
首先是agent的配置文件是這樣的:
# Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # 配置source ##注意:不能往監控目中重複丟同名文件 a1.sources.r1.type = spooldir a1.sources.r1.spoolDir = /opt/bigdata/flumeData/files a1.sources.r1.fileHeader = true a1.sources.r1.channels = c1 #配置channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 #配置sink a1.sinks.k1.channel = c1 #AvroSink是用來經過網絡來傳輸數據的,能夠將event發送到RPC服務器(好比AvroSource) a1.sinks.k1.type = avro #node02 注意修改成本身的hostname a1.sinks.k1.hostname = node02 a1.sinks.k1.port = 4141
agent2的配置文件是這樣的:
# Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 #配置source #經過AvroSource接受AvroSink的網絡數據 a1.sources.r1.type = avro a1.sources.r1.channels = c1 #AvroSource服務的ip地址 a1.sources.r1.bind = node02 #AvroSource服務的端口 a1.sources.r1.port = 4141 #配置channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 #配置sink a1.sinks.k1.channel = c1 a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path = hdfs://node01:9000/avro-hdfs/%Y-%m-%d/%H-%M a1.sinks.k1.hdfs.filePrefix = events- a1.sinks.k1.hdfs.round = true a1.sinks.k1.hdfs.roundValue = 10 a1.sinks.k1.hdfs.roundUnit = minute a1.sinks.k1.hdfs.rollInterval = 60 a1.sinks.k1.hdfs.rollSize = 50 a1.sinks.k1.hdfs.rollCount = 10 a1.sinks.k1.hdfs.batchSize = 100 a1.sinks.k1.hdfs.useLocalTimeStamp = true #生成的文件類型,默認是Sequencefile,可用DataStream,則爲普通文本 a1.sinks.k1.hdfs.fileType = DataStream
最後運行的時候,先啓動node02上的flume,而後在啓動node01上的flume。
老劉此次講了flume的四個容易被忽略的細節,就是想提醒自學的小夥伴們要注意細節,絕對不能徹底照搬資料上說的內容,對每一個知識點必定要有本身的理解。
最後,若是以爲有哪裏寫的很差或者有錯誤的地方,能夠聯繫公衆號:努力的老劉,進行交流。但願可以對大數據開發感興趣的同窗有幫助,但願可以獲得同窗們的指導。
若是以爲寫的不錯,給老劉點個贊!