flume實踐

前提: 1.先安裝好telnetnode

rpm -qa telnet-serverapache

rpm -qa xinetdvim

yum list |grep telnet負載均衡

yum install telnet-server.x86_64 curl

yum install telnet.x86_64socket

yum list |grep xinetdoop

yum install xinetd.x86_64測試

systemctl enable xinetd.servicethis

systemctl enable telnet.socketurl

systemctl start telnet.socket

systemctl start xinetd

2.下載軟件包

wget http://mirror.bit.edu.cn/apache/flume/1.6.0/apache-flume-1.6.0-bin.tar.gz

tar zxvf apache-flume-1.6.0-bin.tar.gz

1、單機

1.NetCat

vim conf/flume-netcat.conf

# Name the components on this agent
agent.sources = r1
agent.sinks = k1
agent.channels = c1

# Describe/configuration the source
agent.sources.r1.type = netcat
agent.sources.r1.bind = 127.0.0.1
agent.sources.r1.port = 44444

# Describe the sink
agent.sinks.k1.type = logger

# Use a channel which buffers events in memory
agent.channels.c1.type = memory
agent.channels.c1.capacity = 1000
agent.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
agent.sources.r1.channels = c1
agent.sinks.k1.channel = c1

#驗證

#Server bin/flume-ng agent --conf conf --conf-file conf/flume-netcat.conf --name=agent -Dflume.root.logger=INFO,console

#Client telnet master 44444

2.exec

vim conf/flume-exec.conf

# Name the components on this agent
agent.sources = r1
agent.sinks = k1
agent.channels = c1

# Describe/configuration the source
agent.sources.r1.type = exec
agent.sources.r1.command = tail -f /data/hadoop/flume/test.txt

# Describe the sink
agent.sinks.k1.type = logger

# Use a channel which buffers events in memory
agent.channels.c1.type = memory
agent.channels.c1.capacity = 1000
agent.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
agent.sources.r1.channels = c1
agent.sinks.k1.channel = c1

#驗證

#Server bin/flume-ng agent --conf conf --conf-file conf/flume-exec.conf --name=agent -Dflume.root.logger=INFO,console

#Client while true;do echo date >> /data/hadoop/flume/test.txt ; sleep 1; done

3.Avro

vim conf/flume-avro.conf

# Define a memory channel called c1 on agent
agent.channels.c1.type = memory

# Define an avro source alled r1 on agent and  tell it
agent.sources.r1.channels = c1
agent.sources.r1.type = avro
agent.sources.r1.bind = 127.0.0.1
agent.sources.r1.port = 44444

# Describe/configuration the source
agent.sinks.k1.type = hdfs
agent.sinks.k1.channel = c1
agent.sinks.k1.hdfs.path = hdfs://master:9000/flume_data_pool
agent.sinks.k1.hdfs.filePrefix = events-
agent.sinks.k1.hdfs.fileType = DataStream
agent.sinks.k1.hdfs.writeFormat = Text
agent.sinks.k1.hdfs.rollSize = 0
agent.sinks.k1.hdfs.rollCount= 600000
agent.sinks.k1.hdfs.rollInterval = 600

agent.channels = c1
agent.sources = r1
agent.sinks = k1

#驗證

#Server bin/flume-ng agent --conf conf --conf-file conf/flume-netcat.conf --name=agent -Dflume.root.logger=DEBUG,console

#Client telnet master 44444

4.Avro

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
## exec表示flume回去調用給的命令,而後從給的命令的結果中去拿數據
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /3_flume/text.txt
a1.sources.r1.channels = c1

# Describe the sink
## 表示下沉到hdfs,類型決定了下面的參數
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
## 下面的配置告訴用hdfs去寫文件的時候寫到什麼位置,下面的表示不是寫死的,而是能夠動態的變化的。表示輸出的目錄名稱是可變的
a1.sinks.k1.hdfs.path = /flume/tailout/%y-%m-%d/%H%M/
##表示最後的文件的前綴
a1.sinks.k1.hdfs.filePrefix = events-
## 表示到了須要觸發的時間時,是否要更新文件夾,true:表示要
a1.sinks.k1.hdfs.round = true
## 表示每隔1分鐘改變一次
a1.sinks.k1.hdfs.roundValue = 1
## 切換文件的時候的時間單位是分鐘
a1.sinks.k1.hdfs.roundUnit = minute
## 表示只要過了3秒鐘,就切換生成一個新的文件
a1.sinks.k1.hdfs.rollInterval = 3
## 若是記錄的文件大於20字節時切換一次
a1.sinks.k1.hdfs.rollSize = 20
## 當寫了5個事件時觸發
a1.sinks.k1.hdfs.rollCount = 5
## 收到了多少條消息往dfs中追加內容
a1.sinks.k1.hdfs.batchSize = 10
## 使用本地時間戳
a1.sinks.k1.hdfs.useLocalTimeStamp = true
#生成的文件類型,默認是Sequencefile,可用DataStream:爲普通文本
a1.sinks.k1.hdfs.fileType = DataStream

# Use a channel which buffers events in memory
##使用內存的方式
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

驗證: echo '123456' >> /3_flume/text.txt

2、集羣

1.故障轉移

master:

# agent1 name
agent1.channels = c1
agent1.sources = r1
agent1.sinks = k1 k2

#set group
agent1.sinkgroups = g1

#set channel
agent1.channels.c1.type = memory
agent1.channels.c1.capacity = 1000
agent1.channels.c1.transactionCapacity = 100

agent1.sources.r1.channels = c1
agent1.sources.r1.type = exec
agent1.sources.r1.command = tail -F /3_flume/text.txt

# set sink1
agent1.sinks.k1.channel = c1
agent1.sinks.k1.type = avro
agent1.sinks.k1.hostname = slave1
agent1.sinks.k1.port = 52020

# set sink2
agent1.sinks.k2.channel = c1
agent1.sinks.k2.type = avro
agent1.sinks.k2.hostname = slave2
agent1.sinks.k2.port = 52020

# set sink group
agent1.sinkgroups.g1.sinks = k1 k2

# set failover
agent1.sinkgroups.g1.processor.type = failover
agent1.sinkgroups.g1.processor.k1 = 10
agent1.sinkgroups.g1.processor.k2 = 1
agent1.sinkgroups.g1.processor.maxpenalty = 10000

slave1

# agent1 name
a1.channels = c1
a1.sources = r1
a1.sinks = k1

#set channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# other node, slave to master
a1.sources.r1.type = avro
a1.sources.r1.bind = slave1
a1.sources.r1.port = 52020

# set sink to hdfs
a1.sinks.k1.type = logger
# a1.sinks.k1.type = hdfs
# a1.sinks.k1.hdfs.path=/flume_data_pool
# a1.sinks.k1.hdfs.fileType=DataStream
# a1.sinks.k1.hdfs.writeFormat=TEXT
# a1.sinks.k1.hdfs.rollInterval=1
# a1.sinks.k1.hdfs.filePrefix = %Y-%m-%d

a1.sources.r1.channels = c1
a1.sinks.k1.channel=c1

slave2

# agent1 name
a1.channels = c1
a1.sources = r1
a1.sinks = k1

#set channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# other node, slave to master
a1.sources.r1.type = avro
a1.sources.r1.bind = slave2
a1.sources.r1.port = 52020

# set sink to hdfs
a1.sinks.k1.type = logger
# a1.sinks.k1.type = hdfs
# a1.sinks.k1.hdfs.path=/flume_data_pool
# a1.sinks.k1.hdfs.fileType=DataStream
# a1.sinks.k1.hdfs.writeFormat=TEXT
# a1.sinks.k1.hdfs.rollInterval=1
# a1.sinks.k1.hdfs.filePrefix = %Y-%m-%d

a1.sources.r1.channels = c1
a1.sinks.k1.channel=c1

2.負載均衡

將故障轉移master中的

# set failover
agent1.sinkgroups.g1.processor.type = failover
agent1.sinkgroups.g1.processor.k1 = 10
agent1.sinkgroups.g1.processor.k2 = 1
agent1.sinkgroups.g1.processor.maxpenalty = 10000

換成

a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = load_balance
a1.sinkgroups.g1.processor.selector = round_robin

測試: for i in seq 1 100; do echo '==============' $i >> /3_flume/text.txt; done

2.攔截與過濾

(Timestamp)

a1.sources = r1
a1.sinks = k1
a1.channels = c1

a1.sources.r1.type = http
a1.sources.r1.host = master
a1.sources.r1.port = 52020
a1.sources.r1.channels = c1

a1.sources.r1.interceptors = i1  
a1.sources.r1.interceptors.i1.preserveExisting= false  
a1.sources.r1.interceptors.i1.type = timestamp  

a1.sinks.k1.type = hdfs  
a1.sinks.k1.channel = c1  
a1.sinks.k1.hdfs.path =hdfs://master:9000/flume/%Y-%m-%d/%H%M  
a1.sinks.k1.hdfs.filePrefix = badou.
a1.sinks.k1.hdfs.fileType=DataStream  

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

測試:

curl -X POST -d '[{"headers":{"flume":"nihao"},"body":"hello"}]' http://master:52020

相關文章
相關標籤/搜索