操做系統:Centos 5.5java JDK版本:1.7.0_21node Flume版本:1.3.1linux Hadoop版本:0.20.2git 配置1個agent ,2個collector,1個storageapache |
#下載安裝jdk1.7dom http://www.oracle.com/technetwork/java/javase/downloads/index.htmlcurl tar zxvf jdk-7u21-linux-x64.gz -C /usr/local/async #/etc/profile增長環境變量 pathmunge /usr/local/jdk1.7.0_21/bin export JAVA_HOME=/usr/local/jdk1.7.0_21/ export JRE_HOME=/usr/local/jdk1.7.0_21/jre export CLASSPATH=.:$JAVA_HOME/lib/tools.jar:$JAVA_HOME/lib/dt.jar #驗證java java -version #下載安裝flume 1.3.1 Flume的下載地址 http://www.apache.org/dyn/closer.cgi/flume/1.3.1/apache-flume-1.3.1-bin.tar.gz tar zxvf flume-distribution-0.9.4-bin.tar.gz -C /usr/local/ #/etc/profile增長環境變量 export FLUME_HOME=/usr/local/apache-flume-1.3.1-bin export FLUME_CONF_DIR=$FLUME_HOME/conf export PATH=.:$PATH::$FLUME_HOME/bin #驗證 flume # flume-ng version Flume 1.3.1 Source code repository: https://git-wip-us.apache.org/repos/asf/flume.git Revision: 77b5d2885fecb3560a873bd89f49cbac8a010347 Compiled by hshreedharan on Fri Dec 21 22:14:21 PST 2012 From source with checksum 2565bdfd8b6af459dbf85c6960f189a5 |
#設置配置文件 [root@cc-staging-loginmgr2 conf]# cat example.conf # example.conf: A single-node Flume configuration # Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 44444 # Describe the sink a1.sinks.k1.type = logger # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 #命令參數說明 -c conf 指定配置目錄爲conf -f conf/example.conf 指定配置文件爲conf/example.conf -n a1 指定agent名字爲a1,須要與example.conf中的一致 -Dflume.root.logger=INFO,console 指定DEBUF模式在console輸出INFO信息 #啓動agent cd /usr/local/apache-flume-1.3.1-bin flume-ng agent -c conf -f conf/example.conf -n a1 -Dflume.root.logger=INFO,console 2013-05-24 00:00:09,288 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.source.NetcatSource.start(NetcatSource.java:150)] Source starting 2013-05-24 00:00:09,303 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.source.NetcatSource.start(NetcatSource.java:164)] Created serverSocket:sun.nio.ch.ServerSocketChannelImpl[/] #在另外一個終端進行測試 [root@cc-staging-loginmgr2 conf]# telnet 44444 Trying Connected to localhost.localdomain ( Escape character is '^]'. hello world! OK 2013-05-24 00:00:24,306 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{} body: 68 65 6C 6C 6F 20 77 6F 72 6C 64 21 0D hello world!. } #測試成功,flume能夠正常使用 |
4. Flume Source測試
avro source能夠發送一個給定的文件給Flume,Avro 源使用AVRO RPC機制 #設置avro配置文件 [root@cc-staging-loginmgr2 conf]# cat avro.conf # Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = avro a1.sources.r1.channels = c1 a1.sources.r1.bind = a1.sources.r1.port = 4141 # Describe the sink a1.sinks.k1.type = logger # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 #啓動flume agent a1 cd /usr/local/apache-flume-1.3.1-bin/conf flume-ng agent -c . -f avro.conf -n a1 -Dflume.root.logger=INFO,console #建立指定文件 echo "hello world" > /usr/logs/log.10 #使用avro-client發送文件 flume-ng avro-client -c . -H localhost -p 4141 -F /usr/logs/log.10 2013-05-27 01:11:45,852 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{} body: 68 65 6C 6C 6F 20 77 6F 72 6C 64 hello world } Exec source runs a given Unix command on start-up and expects that process to continuously produce data on standard out #修改的配置文件 [root@cc-staging-loginmgr2 conf]# cat exec.conf # Describe/configure the source a1.sources.r1.type = exec a1.sources.r1.command = cat /usr/logs/log.10 a1.sources.r1.channels = c1 #啓動flume agent a1 cd /usr/local/apache-flume-1.3.1-bin/conf flume-ng agent -c . -f exec.conf -n a1 -Dflume.root.logger=INFO,console #追加內容到文件 echo "exec test" >> /usr/logs/log.10 #在啓動的終端查看console輸出 2013-05-27 01:50:12,825 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{} body: 68 65 6C 6C 6F 20 77 6F 72 6C 64 hello world } 2013-05-27 01:50:12,826 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{} body: 65 78 65 63 20 74 65 73 74 exec test } #若是要使用tail命令,必選使得file足夠大才能看到輸出內容 a1.sources.r1.command = tail -F /usr/logs/log.10 #生成足夠多的內容在文件裏 for i in {1..100};do echo "exec test$i" >> /usr/logs/log.10;echo $i;done #能夠在console看到output 2013-05-27 19:17:18,157 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.source.ExecSource.start(ExecSource.java:155)] Exec source starting with command:tail -n 5 -F /usr/logs/log.10 2013-05-27 19:19:50,334 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{} body: 65 78 65 63 20 74 65 73 74 37 exec test7 } 測試3: Spooling directory source This source lets you ingest data by dropping files in a spooling directory on disk. Unlike other asynchronous sources, this source avoids data loss even if Flume is restarted or fails. SpoolSource:是監測配置的目錄下新增的文件,並將文件中的數據讀取出來。須要注意兩點:1) 拷貝到spool目錄下的文件不能夠再打開編輯。 2) spool目錄下不可包含相應的子目錄 #修改的配置文件 [root@cc-staging-loginmgr2 conf]# cat spool.conf # Describe/configure the source a1.sources.r1.type = spooldir a1.sources.r1.spoolDir = /usr/logs/flumeSpool a1.sources.r1.fileHeader = true a1.sources.r1.channels = c1 #啓動flume agent a1 cd /usr/local/apache-flume-1.3.1-bin/conf flume-ng agent -c . -f spool.conf -n a1 -Dflume.root.logger=INFO,console #追加內容到spool目錄 [root@cc-staging-loginmgr2 ~]# echo "spool test1" > /usr/logs/flumeSpool/spool1.log #在啓動的終端查看console輸出 2013-05-27 22:49:06,098 (pool-4-thread-1) [INFO - org.apache.flume.client.avro.SpoolingFileLineReader.retireCurrentFile(SpoolingFileLineReader.java:229)] Preparing to move file /usr/logs/flumeSpool/spool1.log to /usr/logs/flumeSpool/spool1.log.COMPLETED 2013-05-27 22:49:06,101 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{file=/usr/logs/flumeSpool/spool1.log} body: 73 70 6F 6F 6C 20 74 65 73 74 31 spool test1 } 測試4 Netcat source 參見第3部分一個簡單的例子 Syslog tcp source #修改的配置文件 [root@cc-staging-loginmgr2 conf]# cat syslog.conf # Describe/configure the source a1.sources.r1.type = syslogtcp a1.sources.r1.port = 5140 a1.sources.r1.host = localhost a1.sources.r1.channels = c1 #啓動flume agent a1 cd /usr/local/apache-flume-1.3.1-bin/conf flume-ng agent -c . -f syslog.conf -n a1 -Dflume.root.logger=INFO,console #測試產生syslog, <37>由於須要wire format數據,不然會報錯」 Failed to extract syslog wire entry」 echo "<37>hello via syslog" | nc localhost 5140 #在啓動的終端查看console輸出 2013-05-27 23:39:10,755 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{Severity=5, Facility=4} body: 68 65 6C 6C 6F 20 76 69 61 20 73 79 73 6C 6F 67 hello via syslog } #UDP須要修改配置文件 a1.sources.r1.type = syslogudp a1.sources.r1.port = 5140 a1.sources.r1.host = localhost a1.sources.r1.channels = c1 #測試產生syslog echo "<37>hello via syslog" | nc -u localhost 5140 測試6 HTTP source JSONHandler #修改的配置文件 [root@cc-staging-loginmgr2 conf]# cat post.conf # Describe/configure the source a1.sources = r1 a1.channels = c1 a1.sources.r1.type = org.apache.flume.source.http.HTTPSource a1.sources.r1.port = 5140 a1.sources.r1.channels = c1 #啓動flume agent a1 cd /usr/local/apache-flume-1.3.1-bin/conf flume-ng agent -c . -f post.conf -n a1 -Dflume.root.logger=INFO,console #生成JSON 格式的POST request curl -X POST -d '[{ "headers" :{"namenode" : "namenode.example.com","datanode" : "random_datanode.example.com"},"body" : "really_random_body"}]' http://localhost:5140 #在啓動的終端查看console輸出 2013-05-28 01:17:47,186 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{namenode=namenode.example.com, datanode=random_datanode.example.com} body: 72 65 61 6C 6C 79 5F 72 61 6E 64 6F 6D 5F 62 6F really_random_bo } |