1.Flume Sink Processors測試java
#Failover Sink Processorshell Failover Sink Processor maintains a prioritized list of sinks, guaranteeing that so long as one is available events will be processed (delivered)apache #配置文件bash # Name the components on this agentapp a1.sources = r1curl a1.sinks = k1 k2jvm a1.channels = c1 c2tcp a1.sinkgroups = g1ide a1.sinkgroups.g1.sinks = k1 k2oop a1.sinkgroups.g1.processor.type = failover a1.sinkgroups.g1.processor.priority.k1 = 5 a1.sinkgroups.g1.processor.priority.k2 = 10 a1.sinkgroups.g1.processor.maxpenalty = 10000 # Describe/configure the source a1.sources.r1.type = syslogtcp a1.sources.r1.port = 5140 a1.sources.r1.host = localhost a1.sources.r1.selector.type = replicating a1.sources.r1.channels = c1 c2 # Describe the sink a1.sinks.k1.type = avro a1.sinks.k1.channel = c1 a1.sinks.k1.hostname = 172.25.4.23 a1.sinks.k1.port = 4545 a1.sinks.k2.type = avro a1.sinks.k2.channel = c2 a1.sinks.k2.hostname = 172.25.4.33 a1.sinks.k2.port = 4545 # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.channels.c2.type = memory a1.channels.c2.capacity = 1000 a1.channels.c2.transactionCapacity = 100 #生成測試log echo "<37>test1 failover" | nc localhost 5140 #在sink2上產生log,sink1因爲優先級小,沒有產生 2013-06-05 00:10:51,194 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{Severity=5, Facility=4} body: 74 65 73 74 31 20 66 61 69 6C 6F 76 65 72 test1 failover } #主動關閉sink2,再次生成測試log echo "<37>test2 failover" | nc localhost 5140 #在sink1上會同時生成test1和test2 2013-06-05 00:11:14,312 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{Severity=5, Facility=4} body: 74 65 73 74 31 20 66 61 69 6C 6F 76 65 72 test1 failover } 2013-06-05 00:11:14,312 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{Severity=5, Facility=4} body: 74 65 73 74 32 20 66 61 69 6C 6F 76 65 72 test2 failover } #再次打開sink2,log會根據優先級再到sink2上 echo "<37>test4 failover" | nc localhost 5140 echo "<37>test5 failover" | nc localhost 5140 2013-06-05 00:12:33,071 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{Severity=5, Facility=4} body: 74 65 73 74 34 20 66 61 69 6C 6F 76 65 72 test4 failover } 2013-06-05 00:12:55,088 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{Severity=5, Facility=4} body: 74 65 73 74 35 20 66 61 69 6C 6F 76 65 72 test5 failover } #Load balancing Sink Processor測試 Load balancing sink processor provides the ability to load-balance flow over multiple sinks. It maintains an indexed list of active sinks on which the load must be distributed. #配置文件,注:load balance type下必須指定同一個channel到不一樣的sinks,不然不生效 # Name the components on this agent a1.sources = r1 a1.sinks = k1 k2 a1.channels = c1 a1.sinkgroups = g1 a1.sinkgroups.g1.sinks = k1 k2 a1.sinkgroups.g1.processor.type = load_balance a1.sinkgroups.g1.processor.backoff = true a1.sinkgroups.g1.processor.selector = round_robin # Describe/configure the source a1.sources.r1.type = syslogtcp a1.sources.r1.port = 5140 a1.sources.r1.host = localhost a1.sources.r1.channels = c1 # Describe the sink a1.sinks.k1.type = avro a1.sinks.k1.channel = c1 a1.sinks.k1.hostname = 172.25.4.23 a1.sinks.k1.port = 4545 a1.sinks.k2.type = avro a1.sinks.k2.channel = c1 a1.sinks.k2.hostname = 172.25.4.33 a1.sinks.k2.port = 4545 # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 #生成4個測試log [root@cc-staging-loginmgr2 ~]# echo "<37>test2 loadbalance" | nc localhost 5140 [root@cc-staging-loginmgr2 ~]# echo "<37>test3 loadbalance" | nc localhost 5140 [root@cc-staging-loginmgr2 ~]# echo "<37>test4 loadbalance" | nc localhost 5140 [root@cc-staging-loginmgr2 ~]# echo "<37>test5 loadbalance" | nc localhost 5140 #查看sink輸出結果是否爲輪詢模式 Sink1: 2013-06-06 01:36:03,516 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{Severity=5, Facility=4} body: 74 65 73 74 32 20 6C 6F 61 64 62 61 6C 61 6E 63 test2 loadbalanc } 2013-06-06 01:36:09,769 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{Severity=5, Facility=4} body: 74 65 73 74 34 20 6C 6F 61 64 62 61 6C 61 6E 63 test4 loadbalanc } Sink2: 2013-06-06 01:36:05,809 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{Severity=5, Facility=4} body: 74 65 73 74 33 20 6C 6F 61 64 62 61 6C 61 6E 63 test3 loadbalanc } 2013-06-06 01:36:37,057 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{Severity=5, Facility=4} body: 74 65 73 74 35 20 6C 6F 61 64 62 61 6C 61 6E 63 test5 loadbalanc } |
2. Event Serializers測試
Body Text Serializer Alias: text. This interceptor writes the body of the event to an output stream without any transformation or modification(把body中的內容變成文本內容) #配置文件 a1.sources.r1.type = org.apache.flume.source.http.HTTPSource a1.sources.r1.port = 5140 a1.sources.r1.host = localhost a1.sources.r1.channels = c1 # Describe the sink a1.sinks.k1.type = file_roll a1.sinks.k1.channel = c1 a1.sinks.k1.sink.directory = /var/log/flume a1.sinks.k1.sink.serializer = text a1.sinks.k1.sink.serializer.appendNewline = false #生成測試log curl -X POST -d '[{ "headers" :{"host":"cc-staging-loginmgr2"},"body" : "TEST1 BODY TEXT"}]' http://localhost:5140 curl -X POST -d '[{ "headers" :{"host":"cc-staging-loginmgr2"},"body" : "TEST2 BODY TEXT"}]' http://localhost:5140 curl -X POST -d '[{ "headers" :{"host":"cc-staging-loginmgr2"},"body" : "TEST3 BODY TEXT"}]' http://localhost:5140 #查看file roll 文件中的文本內容 cat /var/log/flume/1370675739270-1 TEST1 BODY TEXT TEST2 BODY TEXT TEST3 BODY TEXT #Avro Event Serializer Alias: avro_event. This interceptor serializes Flume events into an Avro container file 把flume event變成avro 中包含的文件 |
1.Flume Interceptors測試
Timestamp Interceptor This interceptor inserts into the event headers, the time in millis at which it processes the event. This interceptor inserts a header with key timestamp whose value is the relevant timestamp Host Interceptor This interceptor inserts the hostname or IP address of the host that this agent is running on. It inserts a header with key host or a configured key whose value is the hostname or IP address of the host #配置文件 # Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = syslogtcp a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 5140 a1.sources.r1.channels = c1 a1.sources.r1.interceptors = i1 i2 a1.sources.r1.interceptors.i1.preserveExisting = false a1.sources.r1.interceptors.i1.type = timestamp a1.sources.r1.interceptors.i2.type = host a1.sources.r1.interceptors.i2.hostHeader = hostname a1.sources.r1.interceptors.i2.useIP = false # Describe the sink a1.sinks.k1.type = hdfs a1.sinks.k1.channel = c1 a1.sinks.k1.hdfs.path = hdfs://master:9000/user/hadoop/flume/collected/%Y-%m-%d/%H%M a1.sinks.k1.hdfs.filePrefix = %{hostname}. # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 #啓動agent cd /usr/local/apache-flume-1.3.1-bin/conf flume-ng agent -c . -f dynamic_intercept.conf -n a1 -Dflume.root.logger=INFO,console #生成測試log echo "<37>test dynamic interceptor" | nc localhost 5140 #查看hdfs生成的文件,能夠看到timestamp和hostname都已經生成在header裏面,能夠根據自定義的格式生成文件夾 ./hadoop dfs -ls hdfs://172.25.4.35:9000/user/hadoop/flume/collected/2013-06-16/2331/ Found 1 items -rw-r--r-- 3 root supergroup 140 2013-06-16 23:32 /user/hadoop/flume/collected/2013-06-16/2331/cc-staging-loginmgr2..1371450697118 Static Interceptor Static interceptor allows user to append a static header with static value to all events #配置文件 # Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = syslogtcp a1.sources.r1.port = 5140 a1.sources.r1.host = localhost a1.sources.r1.channels = c1 a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type = static a1.sources.r1.interceptors.i1.key = datacenter a1.sources.r1.interceptors.i1.value = NEW_YORK # Describe the sink a1.sinks.k1.type = logger # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 #啓動agent cd /usr/local/apache-flume-1.3.1-bin/conf flume-ng agent -c . -f dynamic_intercept.conf -n a1 -Dflume.root.logger=INFO,console #生成測試log echo "<37>test1 static interceptor" | nc localhost 5140 #查看console輸出結果 2013-06-17 00:15:38,453 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{Severity=5, Facility=4, datacenter=NEW_YORK} body: 74 65 73 74 31 20 73 74 61 74 69 63 20 69 6E 74 test1 static int } |
2. zabbix監控Flume
#JVM性能監控 Young GC counts sudo /usr/local/jdk1.7.0_21/bin/jstat -gcutil $(pgrep java)|tail -1|awk '{print $6}' Full GC counts sudo /usr/local/jdk1.7.0_21/bin/jstat -gcutil $(pgrep java)|tail -1|awk '{print $8}' JVM total memory usage sudo /usr/local/jdk1.7.0_21/bin/jmap -histo $(pgrep java)|grep Total|awk '{print $3}' JVM total instances usage sudo /usr/local/jdk1.7.0_21/bin/jmap -histo $(pgrep java)|grep Total|awk '{print $2}' #flume應用參數監控 啓動時加上JSON repoting參數,這樣就能夠經過http://localhost:34545/metrics訪問 flume-ng agent -c . -f exec.conf -n a1 -Dflume.root.logger=INFO,console -Dflume.monitoring.type=http -Dflume.monitoring.port=34545 #生成一些數據 for i in {1..100};do echo "exec test$i" >> /usr/logs/log.10;echo $i;done #經過shell腳本對JSON輸出進行排版 [root@cc-staging-loginmgr2 conf]# curl http://localhost:34545/metrics 2>/dev/null|sed -e 's/\([,]\)\s*/\1\n/g' -e 's/[{}]/\n/g' -e 's/[",]//g' CHANNEL.c1: EventPutSuccessCount:100 ChannelFillPercentage:0.0 Type:CHANNEL StopTime:0 EventPutAttemptCount:100 ChannelSize:0 StartTime:1371709073310 EventTakeSuccessCount:100 ChannelCapacity:1000 EventTakeAttemptCount:115 #配置監控flume的腳本文件 [root@cc-staging-loginmgr2 conf]#cat /opt/scripts/monitor_flume.sh curl http://localhost:34545/metrics 2>/dev/null|sed -e 's/\([,]\)\s*/\1\n/g' -e 's/[{}]/\n/g' -e 's/[",]//g'|grep $1|awk -F: '{print $2}' #在zabbix agent配置文件進行部署 cat /etc/zabbix/zabbix_agentd/zabbix_agentd.userparams.conf UserParameter=ygc.counts,sudo /usr/local/jdk1.7.0_21/bin/jstat -gcutil $(pgrep java|head -1)|tail -1|awk '{print $6}' UserParameter=fgc.counts,sudo /usr/local/jdk1.7.0_21/bin/jstat -gcutil $(pgrep java|head -1)|tail -1|awk '{print $8}' UserParameter=jvm.memory.usage,sudo /usr/local/jdk1.7.0_21/bin/jmap -histo $(pgrep java|head -1)|grep Total|awk '{print $3}' UserParameter=jvm.instances.usage,sudo /usr/local/jdk1.7.0_21/bin/jmap -histo $(pgrep java|head -1)|grep Total|awk '{print $2}' UserParameter=flume.monitor[*],/bin/bash /opt/scripts/monitor_flume.sh $1 |