Flume是一個分佈式的、高可用的海量日誌收集、聚合和傳輸日誌收集系統,支持在日誌系統中定製各種數據發送方(如:Kafka,HDFS等),便於收集數據。其核心爲agent,agent是一個java進程,運行在日誌收集節點。html
agent裏面包含3個核心組件:source、channel、sink。
source組件是專用於收集日誌的,能夠處理各類類型各類格式的日誌數據,包括avro、thrift、exec、jms、spooling directory、netcat、sequence generator、syslog、http、legacy、自定義,同時 source組件把數據收集java
之後,臨時存放在channel中。apache
channel組件是在agent中專用於臨時存儲數據的,能夠存放在memory、jdbc、file、自定義等。channel中的數據只有在sink發送成功以後纔會被刪除。vim
sink組件是用於把數據發送到目的地的組件,目的地包括hdfs、logger、avro、thrift、ipc、file、null、hbase、solr、自定義。分佈式
在整個數據傳輸過程當中,流動的是event。事務保證是在event級別。flume能夠支持多級flume的agent,支持扇入(fan-in)、扇出(fan-out)。oop
1)hadoop集羣(樓主用的版本2.7.3,共6個節點,可參考http://www.cnblogs.com/qq503665965/p/6790580.html)測試
2)flume集羣規劃:網站
HOSTspa |
做用3d |
方式 |
路徑 |
hadoop01 |
agent |
spooldir |
/home/hadoop/logs |
hadoop05 |
collector |
HDFS |
/logs |
hadoop06 |
collector |
HDFS |
/logs |
其基本結構官網給出了更加具體的說明,這裏樓主就直接copy過來了:
1)系統環境變量配置
1 export FLUME_HOME=/home/hadoop/apache-flume-1.7.0-bin 2 export PATH=$PATH:$JAVA_HOME/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$FLUME_HOME/bin
記得 source /etc/profile 。
2)flume JDK環境
1 mv flume-env.sh.templete flume-env.sh 2 vim flume-env.sh 3 export JAVA_HOME=/usr/jdk1.7.0_60//增長JDK安裝路徑便可
3)hadoop01中flume配置
在conf目錄增長配置文件 flume-client ,內容爲:
1 #agent1名稱 2 agent1.channels = c1 3 agent1.sources = r1 4 agent1.sinks = k1 k2 5 6 #sink組名稱 7 agent1.sinkgroups = g1 8 9 #set channel 10 agent1.channels.c1.type = memory 11 agent1.channels.c1.capacity = 1000 12 agent1.channels.c1.transactionCapacity = 100 13 14 agent1.sources.r1.channels = c1 15 agent1.sources.r1.type = spooldir 16 #日誌源 17 agent1.sources.r1.spoolDir =/home/hadoop/logs 18 19 agent1.sources.r1.interceptors = i1 i2 20 agent1.sources.r1.interceptors.i1.type = static 21 agent1.sources.r1.interceptors.i1.key = Type 22 agent1.sources.r1.interceptors.i1.value = LOGIN 23 agent1.sources.r1.interceptors.i2.type = timestamp 24 25 # 設置sink1 26 agent1.sinks.k1.channel = c1 27 agent1.sinks.k1.type = avro 28 #sink1所在主機 29 agent1.sinks.k1.hostname = hadoop05 30 agent1.sinks.k1.port = 52020 31 32 #設置sink2 33 agent1.sinks.k2.channel = c1 34 agent1.sinks.k2.type = avro 35 #sink2所在主機 36 agent1.sinks.k2.hostname = hadoop06 37 agent1.sinks.k2.port = 52020 38 39 #設置sink組包含sink1,sink2 40 agent1.sinkgroups.g1.sinks = k1 k2 41 42 #高可靠性 43 agent1.sinkgroups.g1.processor.type = failover 44 #設置優先級 45 agent1.sinkgroups.g1.processor.priority.k1 = 10 46 agent1.sinkgroups.g1.processor.priority.k2 = 1 47 agent1.sinkgroups.g1.processor.maxpenalty = 10000
4)hadoop05配置
1 #設置 Agent名稱 2 a1.sources = r1 3 a1.channels = c1 4 a1.sinks = k1 5 6 #設置channlels 7 a1.channels.c1.type = memory 8 a1.channels.c1.capacity = 1000 9 a1.channels.c1.transactionCapacity = 100 10 11 # 當前節點信息 12 a1.sources.r1.type = avro 13 #綁定主機名 14 a1.sources.r1.bind = hadoop05 15 a1.sources.r1.port = 52020 16 a1.sources.r1.interceptors = i1 17 a1.sources.r1.interceptors.i1.type = static 18 a1.sources.r1.interceptors.i1.key = Collector 19 a1.sources.r1.interceptors.i1.value = hadoop05 20 a1.sources.r1.channels = c1 21 22 #sink的hdfs地址 23 a1.sinks.k1.type=hdfs 24 a1.sinks.k1.hdfs.path=/logs 25 a1.sinks.k1.hdfs.fileType=DataStream 26 a1.sinks.k1.hdfs.writeFormat=TEXT 27 #沒1K產生文件 28 a1.sinks.k1.hdfs.rollInterval=1 29 a1.sinks.k1.channel=c1 30 #文件後綴 31 a1.sinks.k1.hdfs.filePrefix=%Y-%m-%d
5)hadoop06配置
1 #設置 Agent名稱 2 a1.sources = r1 3 a1.channels = c1 4 a1.sinks = k1 5 6 #設置channel 7 a1.channels.c1.type = memory 8 a1.channels.c1.capacity = 1000 9 a1.channels.c1.transactionCapacity = 100 10 11 # 當前節點信息 12 a1.sources.r1.type = avro 13 #綁定主機名 14 a1.sources.r1.bind = hadoop06 15 a1.sources.r1.port = 52020 16 a1.sources.r1.interceptors = i1 17 a1.sources.r1.interceptors.i1.type = static 18 a1.sources.r1.interceptors.i1.key = Collector 19 a1.sources.r1.interceptors.i1.value = hadoop06 20 a1.sources.r1.channels = c1 21 #設置sink的hdfs地址目錄 22 a1.sinks.k1.type=hdfs 23 a1.sinks.k1.hdfs.path=/logs 24 a1.sinks.k1.hdfs.fileType=DataStream 25 a1.sinks.k1.hdfs.writeFormat=TEXT 26 a1.sinks.k1.hdfs.rollInterval=1 27 a1.sinks.k1.channel=c1 28 a1.sinks.k1.hdfs.filePrefix=%Y-%m-%d
1)啓動collector,即hadoop05,hadoop06
1 flume-ng agent -n a1 -c conf -f flume-server -Dflume.root.logger=DEBUG,console
2)啓動agent,即hadoop01
flume-ng agent -n a1 -c conf -f flume-client -Dflume.root.logger=DEBUG,console
agent啓動後,hadoop05,hadoop06的控制檯可看到以下打印信息:
1)啓動zookeeper集羣(未搭建zookeeper的同窗能夠忽略)
2)啓動hdfs start-dfs.sh
3)模擬網站日誌,樓主這裏隨便弄的測試數據
4)上傳到/hadoop/home/logs
hadoop01輸出:
hadoop05輸出:
因爲hadoop05設置的優先級高於hadoop06,所以hadoop06無日誌寫入。
咱們再看hdfs上,是否成功上傳了日誌文件:
因爲樓主設置的hadoop05的優先級要高於hadoop06,這也是上面的日誌收集hadoop05輸出而不是hadoop06輸出的緣由。如今咱們幹掉優先級高的hadoop05,看hadoop06是否能正常進行日誌採集工做。
咱們向日志源添加一個測試日誌文件:
hadoop06輸出:
查看hdfs:
好了!flume集羣配置及日誌收集就介紹到這裏,下次樓主樓主會具體介紹利用mapreduce對日誌進行清洗,並存儲到hbase相關的內容。