1.flume sink 測試java
測試1 #hdfs sinklinux Using this sink requires hadoop to be installed so that Flume can use the Hadoop jars to communicate with the HDFS clusterapache 須要安裝hadoopapp 在/usr/local/apache-flume-1.3.1-bin/conf/flume-env.sh加入curl export HADOOP_HOME=/usr/local/hadooptcp #修改配置文件ide a1.sources.r1.type = syslogtcpoop a1.sources.r1.bind = 0.0.0.0post a1.sources.r1.port = 5140測試 a1.sources.r1.channels = c1 a1.sinks.k1.type = hdfs a1.sinks.k1.channel = c1 a1.sinks.k1.hdfs.path = hdfs://master:9000/user/hadoop/flume/collected/ a1.sinks.k1.hdfs.filePrefix = Syslog a1.sinks.k1.hdfs.round = true a1.sinks.k1.hdfs.roundValue = 10 a1.sinks.k1.hdfs.roundUnit = minute #啓動flume agent a1 cd /usr/local/apache-flume-1.3.1-bin/conf flume-ng agent -c . -f hdfs.conf -n a1 -Dflume.root.logger=INFO,console #測試產生syslog echo "<37>hello via syslog to hdfs testing one" | nc -u localhost 5140 #在啓動的終端查看console輸出,文件生成成功 2013-05-29 00:53:58,078 (hdfs-k1-call-runner-0) [INFO - org.apache.flume.sink.hdfs.BucketWriter.doOpen(BucketWriter.java:208)] Creating hdfs://master:9000/user/hadoop/flume/collected//Syslog.1369814037714.tmp 2013-05-29 00:54:28,220 (hdfs-k1-roll-timer-0) [INFO - org.apache.flume.sink.hdfs.BucketWriter.renameBucket(BucketWriter.java:427)] Renaming hdfs://master:9000/user/hadoop/flume/collected/Syslog.1369814037714.tmp to hdfs://master:9000/user/hadoop/flume/collected/Syslog.1369814037714 #在hadoop上查看文件 ./hadoop dfs -cat hdfs://172.25.4.35:9000/user/hadoop/flume/collected/Syslog.1369814037714 SEQ!org.apache.hadoop.io.LongWritable"org.apache.hadoop.io.BytesWritable^;>Gv$hello via syslog to hdfs testing one #修改配置文件以時間形式自動生成目錄 a1.sources.r1.type = org.apache.flume.source.http.HTTPSource a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 5140 a1.sources.r1.channels = c1 # Describe the sink a1.sinks.k1.type = hdfs a1.sinks.k1.channel = c1 a1.sinks.k1.hdfs.path = hdfs://master:9000/user/hadoop/flume/collected/%y-%m-%d/%H%M/%S a1.sinks.k1.hdfs.filePrefix = Syslog.%{host} a1.sinks.k1.hdfs.round = true a1.sinks.k1.hdfs.roundValue = 10 a1.sinks.k1.hdfs.roundUnit = minute #生成JSON 格式的POST request, header的timestamp 參數若是格式不對則沒法解析 須要生成13爲的timestamp才能解析出正確的時間,包含MilliSec date +%s #linux生成當前時間13位Unix timestamp date +%s%N|awk '{print substr($0,1,13)}' curl -X POST -d '[{ "headers":{"timestamp":"1369818213654","host":"cc-staging-loginmgr2"},"body": "hello via post"}]' http://localhost:5140 #在啓動的終端查看console輸出,文件生成成功 2013-05-29 02:03:38,646 (hdfs-k1-call-runner-4) [INFO - org.apache.flume.sink.hdfs.BucketWriter.doOpen(BucketWriter.java:208)] Creating hdfs://master:9000/user/hadoop/flume/collected/2013-05-29/0203/cc-staging-loginmgr2..1369818218614.tmp 2013-05-29 02:04:08,714 (hdfs-k1-roll-timer-0) [INFO - org.apache.flume.sink.hdfs.BucketWriter.renameBucket(BucketWriter.java:427)] Renaming hdfs://master:9000/user/hadoop/flume/collected/2013-05-29/0203/cc-staging-loginmgr2..1369818218614.tmp to hdfs://master:9000/user/hadoop/flume/collected/2013-05-29/0203/cc-staging-loginmgr2..1369818218614 #在hadoop上查看文件 ./hadoop dfs -ls hdfs://172.25.4.35:9000/user/hadoop/flume/collected/2013-05-29/0203 Found 1 items -rw-r--r-- 3 root supergroup 129 2013-05-29 02:04 /user/hadoop/flume/collected/2013-05-29/0203/cc-staging-loginmgr2..1369818218614 #測試2 logger sink Logs event at INFO level. Typically useful for testing/debugging purpose #測試3 Avro sink Flume events sent to this sink are turned into Avro events and sent to the configured hostname / port pair #Avro Source配置文件 a1.sources.r1.type = avro a1.sources.r1.channels = c1 a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 4545 #Avro Sink配置文件 a1.sinks.k1.type = avro a1.sinks.k1.channel = c1 a1.sinks.k1.hostname = 172.25.4.23 a1.sinks.k1.port = 4545 #先啓動Avro的Source,監聽端口 cd /usr/local/apache-flume-1.3.1-bin/conf flume-ng agent -c . -f avro.conf -n a1 -Dflume.root.logger=INFO,console #再啓動Avro的Sink cd /usr/local/apache-flume-1.3.1-bin/conf flume-ng agent -c . -f avro_sink.conf -n a1 -Dflume.root.logger=INFO,console #能夠看到已經創建鏈接 2013-06-02 19:23:00,237 (pool-5-thread-1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0x7a0e28bf, /172.25.4.32:14894 => /172.25.4.23:4545] CONNECTED: /172.25.4.32:14894 #在Avro Sink上生成測試log echo "<37>hello via avro sink" | nc localhost 5140 #在Avro Source上能夠看到log已經生成 2013-06-02 19:24:13,740 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{Severity=5, Facility=4} body: 68 65 6C 6C 6F 20 76 69 61 20 61 76 72 6F 20 73 hello via avro s } #測試4 File Roll Sink Stores events on the local filesystem #修改配置文件 a1.sinks.k1.type = file_roll a1.sinks.k1.channel = c1 a1.sinks.k1.sink.directory = /var/log/flume #啓動file roll 配置文件 cd /usr/local/apache-flume-1.3.1-bin/conf flume-ng agent -c . -f file_roll.conf -n a1 -Dflume.root.logger=INFO,console #生成測試log echo "<37>hello via file roll" | nc localhost 5140 echo "<37>hello via file roll 2" | nc localhost 5140 #查看/var/log/flume下是否生成文件,默認每30秒生成一個新文件 -rw-r--r-- 1 root root 20 Jun 2 19:44 1370227443397-1 -rw-r--r-- 1 root root 0 Jun 2 19:44 1370227443397-2 -rw-r--r-- 1 root root 22 Jun 2 19:45 1370227443397-3 cat 1370227443397-1 1370227443397-3 hello via file roll hello via file roll 2 |
2.Flume Channels測試
#Memory Channel The events are stored in a an in-memory queue with configurable max size. It’s ideal for flow that needs higher throughput and prepared to lose the staged data in the event of a agent failures # Replicating Channel Selector通道複製測試 #2個channel和2個sink的配置文件 # Name the components on this agent a1.sources = r1 a1.sinks = k1 k2 a1.channels = c1 c2 # Describe/configure the source a1.sources.r1.type = syslogtcp a1.sources.r1.port = 5140 a1.sources.r1.host = localhost a1.sources.r1.selector.type = replicating a1.sources.r1.channels = c1 c2 # Describe the sink a1.sinks.k1.type = avro a1.sinks.k1.channel = c1 a1.sinks.k1.hostname = 172.25.4.23 a1.sinks.k1.port = 4545 a1.sinks.k2.type = avro a1.sinks.k2.channel = c2 a1.sinks.k2.hostname = 172.25.4.33 a1.sinks.k2.port = 4545 # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.channels.c2.type = memory a1.channels.c2.capacity = 1000 a1.channels.c2.transactionCapacity = 100 #查看是否都創建了鏈接 2013-06-04 00:01:53,467 (pool-5-thread-1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0x122a0fad, /172.25.4.32:55518 => /172.25.4.23:4545] BOUND: /172.25.4.23:4545 2013-06-04 00:01:53,467 (pool-5-thread-1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0x122a0fad, /172.25.4.32:55518 => /172.25.4.23:4545] CONNECTED: /172.25.4.32:55518 2013-06-04 00:01:53,773 (pool-5-thread-1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0x021881a7, /172.25.4.32:23731 => /172.25.4.33:4545] BOUND: /172.25.4.33:4545 2013-06-04 00:01:53,773 (pool-5-thread-1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0x021881a7, /172.25.4.32:23731 => /172.25.4.33:4545] CONNECTED: /172.25.4.32:23731 #生成測試log echo "<37>hello via channel selector" | nc localhost 5140 #查看2個sink是否獲得數據 2013-06-04 00:02:06,479 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{Severity=5, Facility=4} body: 68 65 6C 6C 6F 20 76 69 61 20 63 68 61 6E 6E 65 hello via channe } 2013-06-04 00:02:09,788 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{Severity=5, Facility=4} body: 68 65 6C 6C 6F 20 76 69 61 20 63 68 61 6E 6E 65 hello via channe } #flume channel selectors # Multiplexing Channel Selector 通道複用測試 #2個channel和2個sink的配置文件 a1.sources = r1 a1.sinks = k1 k2 a1.channels = c1 c2 # Describe/configure the source a1.sources.r1.type = org.apache.flume.source.http.HTTPSource a1.sources.r1.port = 5140 a1.sources.r1.host = 0.0.0.0 a1.sources.r1.selector.type = multiplexing a1.sources.r1.channels = c1 c2 a1.sources.r1.selector.header = state a1.sources.r1.selector.mapping.CZ = c1 a1.sources.r1.selector.mapping.US = c2 a1.sources.r1.selector.default = c1 # Describe the sink a1.sinks.k1.type = avro a1.sinks.k1.channel = c1 a1.sinks.k1.hostname = 172.25.4.23 a1.sinks.k1.port = 4545 a1.sinks.k2.type = avro a1.sinks.k2.channel = c2 a1.sinks.k2.hostname = 172.25.4.33 a1.sinks.k2.port = 4545 # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.channels.c2.type = memory a1.channels.c2.capacity = 1000 a1.channels.c2.transactionCapacity = 100 #根據配置文件生成測試的header 爲state的POST請求 curl -X POST -d '[{ "headers" :{"state" : "CZ"},"body" : "TEST1"}]' http://localhost:5140 curl -X POST -d '[{ "headers" :{"state" : "US"},"body" : "TEST2"}]' http://localhost:5140 curl -X POST -d '[{ "headers" :{"state" : "SH"},"body" : "TEST3"}]' http://localhost:5140 #查看2個sink獲得數據是否和配置文件一致 Sink1: 2013-06-04 23:45:35,296 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{state=CZ} body: 54 45 53 54 31 TEST1 } 2013-06-04 23:45:50,309 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{state=SH} body: 54 45 53 54 33 TEST3 } Sink2: 2013-06-04 23:45:42,293 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{state=US} body: 54 45 53 54 32 TEST2 } |