Flume-ng分佈式環境的部署和配置(三)

1.Flume Sink Processors測試java

#Failover Sink Processorshell

Failover Sink Processor maintains a prioritized list of  sinks, guaranteeing that so long as one is available events will be processed  (delivered)apache

#配置文件bash

# Name the  components on this agentapp

a1.sources =  r1curl

a1.sinks =  k1 k2jvm

a1.channels  = c1 c2tcp


a1.sinkgroups  = g1ide

a1.sinkgroups.g1.sinks  = k1 k2oop

a1.sinkgroups.g1.processor.type  = failover

a1.sinkgroups.g1.processor.priority.k1  = 5

a1.sinkgroups.g1.processor.priority.k2  = 10

a1.sinkgroups.g1.processor.maxpenalty  = 10000


#  Describe/configure the source

a1.sources.r1.type  = syslogtcp

a1.sources.r1.port  = 5140

a1.sources.r1.host  = localhost

a1.sources.r1.selector.type  = replicating

a1.sources.r1.channels  = c1 c2


# Describe  the sink

a1.sinks.k1.type  = avro

a1.sinks.k1.channel  = c1

a1.sinks.k1.hostname  = 172.25.4.23

a1.sinks.k1.port  = 4545


a1.sinks.k2.type  = avro

a1.sinks.k2.channel  = c2

a1.sinks.k2.hostname  = 172.25.4.33

a1.sinks.k2.port  = 4545

# Use a  channel which buffers events in memory

a1.channels.c1.type  = memory

a1.channels.c1.capacity  = 1000

a1.channels.c1.transactionCapacity  = 100


a1.channels.c2.type  = memory

a1.channels.c2.capacity  = 1000

a1.channels.c2.transactionCapacity  = 100


#生成測試log

echo  "<37>test1 failover"  | nc localhost 5140


#sink2上產生logsink1因爲優先級小,沒有產生

2013-06-05  00:10:51,194 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO -  org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: {  headers:{Severity=5, Facility=4} body: 74 65 73 74 31 20 66 61 69 6C 6F 76 65  72       test1 failover }


#主動關閉sink2,再次生成測試log

echo "<37>test2 failover"   | nc localhost 5140


#sink1上會同時生成test1test2

2013-06-05  00:11:14,312 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO -  org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: {  headers:{Severity=5, Facility=4} body: 74 65 73 74 31 20 66 61 69 6C 6F 76 65  72       test1  failover }

2013-06-05  00:11:14,312 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO -  org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: {  headers:{Severity=5, Facility=4} body: 74 65 73 74 32 20 66 61 69 6C 6F 76 65  72       test2 failover }


#再次打開sink2,log會根據優先級再到sink2

echo  "<37>test4 failover" | nc localhost 5140

echo  "<37>test5 failover"  | nc localhost 5140


2013-06-05  00:12:33,071 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO -  org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: {  headers:{Severity=5, Facility=4} body: 74 65 73 74 34 20 66 61 69 6C 6F 76 65  72       test4 failover }

2013-06-05  00:12:55,088 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO -  org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: {  headers:{Severity=5, Facility=4} body: 74 65 73 74 35 20 66 61 69 6C 6F 76 65  72       test5 failover }


#Load balancing Sink Processor測試

Load balancing sink processor provides the ability to  load-balance flow over multiple sinks. It maintains an indexed list of active  sinks on which the load must be distributed.


#配置文件,:load balance type下必須指定同一個channel到不一樣的sinks,不然不生效

# Name the  components on this agent

a1.sources =  r1

a1.sinks =  k1 k2

a1.channels  = c1


a1.sinkgroups  = g1

a1.sinkgroups.g1.sinks  = k1 k2

a1.sinkgroups.g1.processor.type  = load_balance

a1.sinkgroups.g1.processor.backoff  = true

a1.sinkgroups.g1.processor.selector  = round_robin


#  Describe/configure the source

a1.sources.r1.type  = syslogtcp

a1.sources.r1.port  = 5140

a1.sources.r1.host  = localhost

a1.sources.r1.channels  = c1


# Describe  the sink

a1.sinks.k1.type  = avro

a1.sinks.k1.channel  = c1

a1.sinks.k1.hostname  = 172.25.4.23

a1.sinks.k1.port  = 4545


a1.sinks.k2.type  = avro

a1.sinks.k2.channel  = c1

a1.sinks.k2.hostname  = 172.25.4.33

a1.sinks.k2.port  = 4545


# Use a  channel which buffers events in memory

a1.channels.c1.type  = memory

a1.channels.c1.capacity  = 1000

a1.channels.c1.transactionCapacity  = 100


#生成4個測試log

[root@cc-staging-loginmgr2  ~]# echo "<37>test2 loadbalance"  | nc localhost 5140

[root@cc-staging-loginmgr2  ~]# echo "<37>test3 loadbalance"  | nc localhost 5140

[root@cc-staging-loginmgr2  ~]# echo "<37>test4 loadbalance" | nc localhost 5140

[root@cc-staging-loginmgr2  ~]# echo "<37>test5 loadbalance"  | nc localhost 5140


#查看sink輸出結果是否爲輪詢模式

Sink1:

2013-06-06  01:36:03,516 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO -  org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: {  headers:{Severity=5, Facility=4} body: 74 65 73 74 32 20 6C 6F 61 64 62 61 6C  61 6E 63 test2 loadbalanc }

2013-06-06  01:36:09,769 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO -  org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: {  headers:{Severity=5, Facility=4} body: 74 65 73 74 34 20 6C 6F 61 64 62 61 6C  61 6E 63 test4 loadbalanc }


Sink2:

2013-06-06  01:36:05,809 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO -  org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: {  headers:{Severity=5, Facility=4} body: 74 65 73 74 33 20 6C 6F 61 64 62 61 6C  61 6E 63 test3 loadbalanc }

2013-06-06 01:36:37,057  (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO -  org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: {  headers:{Severity=5, Facility=4} body: 74 65 73 74 35 20 6C 6F 61 64 62 61 6C  61 6E 63 test5 loadbalanc }



2. Event Serializers測試

Body Text  Serializer

Alias: text.  This interceptor writes the body of the event to an output stream without any  transformation or modification(body中的內容變成文本內容)


#配置文件

a1.sources.r1.type  = org.apache.flume.source.http.HTTPSource

a1.sources.r1.port  = 5140

a1.sources.r1.host  = localhost

a1.sources.r1.channels  = c1


# Describe  the sink

a1.sinks.k1.type  = file_roll

a1.sinks.k1.channel  = c1

a1.sinks.k1.sink.directory  = /var/log/flume

a1.sinks.k1.sink.serializer  = text

a1.sinks.k1.sink.serializer.appendNewline  = false


#生成測試log

curl -X POST  -d '[{ "headers"  :{"host":"cc-staging-loginmgr2"},"body" :  "TEST1 BODY TEXT"}]'  http://localhost:5140

curl -X POST  -d '[{ "headers"  :{"host":"cc-staging-loginmgr2"},"body" :  "TEST2 BODY TEXT"}]'  http://localhost:5140

curl -X POST  -d '[{ "headers"  :{"host":"cc-staging-loginmgr2"},"body" :  "TEST3 BODY TEXT"}]'  http://localhost:5140


#查看file roll 文件中的文本內容

cat /var/log/flume/1370675739270-1  

TEST1 BODY TEXT

TEST2 BODY TEXT

TEST3 BODY TEXT


#Avro Event  Serializer

Alias: avro_event.  This interceptor serializes Flume events into an Avro container file

flume event變成avro 中包含的文件



1.Flume Interceptors測試

Timestamp Interceptor

This  interceptor inserts into the event headers, the time in millis at which it  processes the event. This interceptor inserts a header with key timestamp  whose value is the relevant timestamp


Host Interceptor

This  interceptor inserts the hostname or IP address of the host that this agent is  running on. It inserts a header with key host or a configured key whose value  is the hostname or IP address of the host


#配置文件

# Name the  components on this agent

a1.sources =  r1

a1.sinks =  k1

a1.channels  = c1


#  Describe/configure the source

a1.sources.r1.type  = syslogtcp

a1.sources.r1.bind  = 0.0.0.0

a1.sources.r1.port  = 5140

a1.sources.r1.channels  = c1


a1.sources.r1.interceptors  = i1 i2

a1.sources.r1.interceptors.i1.preserveExisting  = false

a1.sources.r1.interceptors.i1.type  = timestamp

a1.sources.r1.interceptors.i2.type  = host

a1.sources.r1.interceptors.i2.hostHeader  = hostname

a1.sources.r1.interceptors.i2.useIP  = false


# Describe  the sink

a1.sinks.k1.type  = hdfs

a1.sinks.k1.channel  = c1

a1.sinks.k1.hdfs.path  = hdfs://master:9000/user/hadoop/flume/collected/%Y-%m-%d/%H%M

a1.sinks.k1.hdfs.filePrefix  = %{hostname}.


# Use a  channel which buffers events in memory

a1.channels.c1.type  = memory

a1.channels.c1.capacity  = 1000

a1.channels.c1.transactionCapacity  = 100


# Bind the  source and sink to the channel

a1.sources.r1.channels  = c1

a1.sinks.k1.channel  = c1


#啓動agent

cd /usr/local/apache-flume-1.3.1-bin/conf

flume-ng  agent -c . -f dynamic_intercept.conf -n a1 -Dflume.root.logger=INFO,console


#生成測試log

echo  "<37>test dynamic interceptor"  | nc localhost 5140


#查看hdfs生成的文件,能夠看到timestamphostname都已經生成在header裏面,能夠根據自定義的格式生成文件夾

./hadoop dfs  -ls hdfs://172.25.4.35:9000/user/hadoop/flume/collected/2013-06-16/2331/

Found 1  items

-rw-r--r--   3 root supergroup        140 2013-06-16 23:32  /user/hadoop/flume/collected/2013-06-16/2331/cc-staging-loginmgr2..1371450697118


Static Interceptor

Static  interceptor allows user to append a static header with static value to all  events


#配置文件

# Name the  components on this agent

a1.sources =  r1

a1.sinks =  k1

a1.channels  = c1


#  Describe/configure the source

a1.sources.r1.type  = syslogtcp

a1.sources.r1.port  = 5140

a1.sources.r1.host  = localhost

a1.sources.r1.channels  = c1

a1.sources.r1.interceptors  = i1

a1.sources.r1.interceptors.i1.type  = static

a1.sources.r1.interceptors.i1.key = datacenter

a1.sources.r1.interceptors.i1.value = NEW_YORK


# Describe  the sink

a1.sinks.k1.type  = logger


# Use a  channel which buffers events in memory

a1.channels.c1.type  = memory

a1.channels.c1.capacity  = 1000

a1.channels.c1.transactionCapacity  = 100


# Bind the source  and sink to the channel

a1.sources.r1.channels  = c1

a1.sinks.k1.channel  = c1


#啓動agent

cd /usr/local/apache-flume-1.3.1-bin/conf

flume-ng  agent -c . -f dynamic_intercept.conf -n a1  -Dflume.root.logger=INFO,console


#生成測試log

echo  "<37>test1 static interceptor"  | nc localhost 5140


#查看console輸出結果

2013-06-17  00:15:38,453 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO -  org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: {  headers:{Severity=5, Facility=4, datacenter=NEW_YORK}  body: 74 65 73 74 31 20 73 74 61 74 69 63 20 69 6E 74 test1 static int }



2. zabbix監控Flume

#JVM性能監控

Young GC  counts

sudo  /usr/local/jdk1.7.0_21/bin/jstat -gcutil $(pgrep java)|tail -1|awk '{print  $6}'


Full GC  counts

sudo  /usr/local/jdk1.7.0_21/bin/jstat -gcutil $(pgrep java)|tail -1|awk '{print  $8}'


JVM total  memory usage

sudo  /usr/local/jdk1.7.0_21/bin/jmap -histo $(pgrep java)|grep Total|awk '{print  $3}'


JVM total  instances usage

sudo  /usr/local/jdk1.7.0_21/bin/jmap -histo $(pgrep java)|grep Total|awk '{print  $2}'


#flume應用參數監控

啓動時加上JSON repoting參數,這樣就能夠經過http://localhost:34545/metrics訪問

flume-ng  agent -c . -f exec.conf -n a1 -Dflume.root.logger=INFO,console -Dflume.monitoring.type=http -Dflume.monitoring.port=34545


#生成一些數據

for i in  {1..100};do echo "exec test$i" >> /usr/logs/log.10;echo  $i;done


#經過shell腳本對JSON輸出進行排版

[root@cc-staging-loginmgr2  conf]# curl http://localhost:34545/metrics 2>/dev/null|sed -e  's/\([,]\)\s*/\1\n/g' -e 's/[{}]/\n/g' -e 's/[",]//g'


CHANNEL.c1:

EventPutSuccessCount:100

ChannelFillPercentage:0.0

Type:CHANNEL

StopTime:0

EventPutAttemptCount:100

ChannelSize:0

StartTime:1371709073310

EventTakeSuccessCount:100

ChannelCapacity:1000

EventTakeAttemptCount:115


#配置監控flume的腳本文件

[root@cc-staging-loginmgr2  conf]#cat /opt/scripts/monitor_flume.sh

curl  http://localhost:34545/metrics 2>/dev/null|sed -e 's/\([,]\)\s*/\1\n/g' -e  's/[{}]/\n/g' -e 's/[",]//g'|grep $1|awk -F: '{print $2}'


#zabbix agent配置文件進行部署

cat  /etc/zabbix/zabbix_agentd/zabbix_agentd.userparams.conf

UserParameter=ygc.counts,sudo  /usr/local/jdk1.7.0_21/bin/jstat -gcutil $(pgrep java|head -1)|tail -1|awk  '{print $6}'

UserParameter=fgc.counts,sudo  /usr/local/jdk1.7.0_21/bin/jstat -gcutil $(pgrep java|head -1)|tail -1|awk  '{print $8}'

UserParameter=jvm.memory.usage,sudo  /usr/local/jdk1.7.0_21/bin/jmap -histo $(pgrep java|head -1)|grep Total|awk  '{print $3}'

UserParameter=jvm.instances.usage,sudo  /usr/local/jdk1.7.0_21/bin/jmap -histo $(pgrep java|head -1)|grep Total|awk '{print  $2}'

UserParameter=flume.monitor[*],/bin/bash  /opt/scripts/monitor_flume.sh $1

相關文章
相關標籤/搜索