Flume是Cloudera提供的一個高可用的,高可靠的,分佈式的海量日誌採集、聚合和傳輸的系統,Flume支持在日誌系統中定製各種數據發送方,用於收集數據;同時,Flume提供對數據進行簡單處理,並寫到各類數據接受方(可定製)的能力。html
Flume最先是Cloudera提供的日誌收集系統,目前是Apache下的一個孵化項目,Flume支持在日誌系統中定製各種數據發送方,用於收集數據。node
Flume提供對數據進行簡單處理,並寫到各類數據接受方(可定製)的能力, Flume提供了從console(控制檯)、RPC(Thrift-RPC)、text(文件)、tail(UNIX tail)、syslog(syslog日誌系統,支持TCP和UDP2種模式),exec(命令執行)等數據源上收集數據的能力。apache
Flume-og採用了多Master的方式。爲了保證配置數據的一致性,Flume引入了ZooKeeper,用於保存配置數據,ZooKeeper自己可保證配置數據的一致性和高可用,另外,在配置數據發生變化時,ZooKeeper能夠通知Flume Master節點。Flume Master間使用gossip協議同步數據。json
Flume-ng最明顯的改動就是取消了集中管理配置的 Master 和 Zookeeper,變爲一個純粹的傳輸工具。Flume-ng另外一個主要的不一樣點是讀入數據和寫出數據如今由不一樣的工做線程處理(稱爲 Runner)。 在 Flume-og 中,讀入線程一樣作寫出工做(除了故障重試)。若是寫出慢的話(不是徹底失敗),它將阻塞 Flume 接收數據的能力。這種異步的設計使讀入線程能夠順暢的工做而無需關注下游的任何問題。服務器
若是使用Apache-Flume的話只要上傳安裝包到服務器,而後解壓,再配置一下環境變量便可。本文使用CDH-5.12.2安裝Flume。異步
Flume的主要工做就是啓動一個Agent,一個Agent由三個部分組成,Source、Sink和Channel。Source表示數據從哪裏來,Sink表示數據收集到哪裏去,Channel是一個緩衝區。更詳細介紹能夠參看官方文檔。tcp
對於Source和Sink則根據不一樣的需求有不少種寫法。例如Source能夠直接從一個文件/目錄中去取數據,也能夠別人直接給你傳數據。因此,總的來講Source只有兩類,一類是主動的Source、一類是被動的Source。分佈式
主動的Source就是將Source配置成主動的到別人那裏去拿,主動的Source與被動的Source不一樣,它不是一個服務。主動的source有Exec Source、Spooling Directory Source、Taildir Source、Kafka Source等。ide
被動的Source就是別人給發過來,前提是Flume機器的source必定是一個服務,能夠是Http協議、tcp協議、Rcp協議的服務,規定是哪一種協議的服務,B機器就按那種協議去發。總之,被動的Source就是一種服務,至於使用什麼協議就看實際需求。被動的source有Avro Source、Thrift Source、JMS Source、NetCat Source等。 工具
通常來講,Sink沒有主動和被動之分。若是要將收集到的數據放到HDFS上的話,那麼Sink就是Hdfs的客戶端;若是放到Kafka中,那麼Sink就是Kafka的客戶端。總之,Sink通常都是作客戶端的。
對於Channel。只有兩種狀況,一種緩衝在內存中,一種緩衝在磁盤中。
相比於Spooldir Source,Taildir Source作了一些優化。Spooldir Source讀取目錄時,文件在很短的時間內不能修改,不然會報錯,致使Flume終止。而咱們常常須要上傳較大文件,當文件達到幾MB或者十幾MB,Flume就會報錯。固然,能夠對Flume的源代碼進行修改,來解決這個問題(可參見Flume Spooldir 源的一些問題)。Flume 1.7以後增長了Taildir Source,這個Source也能夠解決這個問題。
其中,channels,type,filegroups,filegroups.<filegroupName>是必配屬性。type=TAILDIR;filegroups是給若干個目錄取別名,例如 g1 g2;filegroups.<filegroupName>是設置對應目錄(g1或g2)下的文件匹配規則。
配置基本屬性。其中,channel、type、hdfs.path是必配屬性。type=hdfs;hdfs.path=hdfs://namenode:rpc端口/path。
設置寫文件的方式。hdfs.rollInterval,間隔時間,每間隔多少秒寫一個文件;hdfs.rollSize,寫入一個文件的最大大小;hdfs.rollCount,設置往一個文件中寫數據的最大次數。HDFS Sink寫數據到HDFS中,有三種不一樣的方式來寫文件。第一種是按設置的hdfs.rollInterval間隔時間來寫,達到這麼長時間就寫一個文件;第二種是根據文件的大小來生成文件,當文件達到hdfs.rollSize設置的大小以後從新寫一個文件;第三種是按照設置的hdfs.rollCount每次寫的次數,當達到這個極限時關閉流,生成一個文件。若是三個都配了,不管哪一個屬性先知足設置,就會關閉流,生成一個文件。若是不考慮某種寫文件的方式,就將其屬性值設置爲0。另外,hdfs.idleTimeout,設置超時時間,能夠設置超過多少秒都沒有數據過來,不管是否知足寫文件三個方式的設置,都會關閉流。
配置生成的文件名屬性。hdfs.filePrefix,生成的文件的前綴。hdfs.fileSuffix,生成文件的後綴。hdfs.inUsePrefix,是否使用用戶的前綴。
配置動態目錄屬性。須要用到的屬性爲hdfs.rund,hdfs.roundValue,hdfs.roundUnit,hdfs.useLocalTimeStamp等。例如,本文的flume.conf最後四行配置,能夠在hdfs上動態生成目錄,每隔10分鐘生成一個。
以下圖所示,可使用這裏的變量來設置一些動態的sink目錄,例如按照不一樣的時間(日期)來做爲不一樣的sink目錄。
a1.sources=s a1.channels=c a1.sinks=k a1.sources.s.type=TAILDIR a1.sources.s.filegroups=g1 a1.sources.s.filegroups.g1=/data/.*csv.* a1.sources.s.positionFile=/tmp/myflume/taildir_position.json a1.sources.s.channels=c a1.sources.s.fileHeader=true a1.channels.c.type=memory a1.channels.c.capacity=100 a1.channels.c.transactionCapacity=100 a1.channels.c.keep-alive=10 a1.channels.c.byteCapacity=0 a1.sinks.k.type=hdfs a1.sinks.k.channel=c a1.sinks.k.hdfs.path=hdfs://cdh01:8020/flume/%Y-%m-%d/%H%M a1.sinks.k.hdfs.rollInterval=60 a1.sinks.k.hdfs.rollSize=20971520 a1.sinks.k.hdfs.rollCount=0 a1.sinks.k.hdfs.idleTimeout=60 a1.sinks.k.hdfs.fileType=DataStream a1.sinks.k.hdfs.round=true a1.sinks.k.hdfs.roundValue=10 a1.sinks.k.hdfs.roundUnit=minute a1.sinks.k.hdfs.useLocalTimeStamp=true