前提: 1.先安裝好telnetnode
rpm -qa telnet-server
apache
rpm -qa xinetd
vim
yum list |grep telnet
負載均衡
yum install telnet-server.x86_64
curl
yum install telnet.x86_64
socket
yum list |grep xinetd
oop
yum install xinetd.x86_64
測試
systemctl enable xinetd.service
this
systemctl enable telnet.socket
url
systemctl start telnet.socket
systemctl start xinetd
2.下載軟件包
wget http://mirror.bit.edu.cn/apache/flume/1.6.0/apache-flume-1.6.0-bin.tar.gz
tar zxvf apache-flume-1.6.0-bin.tar.gz
vim conf/flume-netcat.conf
# Name the components on this agent agent.sources = r1 agent.sinks = k1 agent.channels = c1 # Describe/configuration the source agent.sources.r1.type = netcat agent.sources.r1.bind = 127.0.0.1 agent.sources.r1.port = 44444 # Describe the sink agent.sinks.k1.type = logger # Use a channel which buffers events in memory agent.channels.c1.type = memory agent.channels.c1.capacity = 1000 agent.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel agent.sources.r1.channels = c1 agent.sinks.k1.channel = c1
#驗證
#Server bin/flume-ng agent --conf conf --conf-file conf/flume-netcat.conf --name=agent -Dflume.root.logger=INFO,console
#Client telnet master 44444
vim conf/flume-exec.conf
# Name the components on this agent agent.sources = r1 agent.sinks = k1 agent.channels = c1 # Describe/configuration the source agent.sources.r1.type = exec agent.sources.r1.command = tail -f /data/hadoop/flume/test.txt # Describe the sink agent.sinks.k1.type = logger # Use a channel which buffers events in memory agent.channels.c1.type = memory agent.channels.c1.capacity = 1000 agent.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel agent.sources.r1.channels = c1 agent.sinks.k1.channel = c1
#驗證
#Server bin/flume-ng agent --conf conf --conf-file conf/flume-exec.conf --name=agent -Dflume.root.logger=INFO,console
#Client while true;do echo
date >> /data/hadoop/flume/test.txt ; sleep 1; done
vim conf/flume-avro.conf
# Define a memory channel called c1 on agent agent.channels.c1.type = memory # Define an avro source alled r1 on agent and tell it agent.sources.r1.channels = c1 agent.sources.r1.type = avro agent.sources.r1.bind = 127.0.0.1 agent.sources.r1.port = 44444 # Describe/configuration the source agent.sinks.k1.type = hdfs agent.sinks.k1.channel = c1 agent.sinks.k1.hdfs.path = hdfs://master:9000/flume_data_pool agent.sinks.k1.hdfs.filePrefix = events- agent.sinks.k1.hdfs.fileType = DataStream agent.sinks.k1.hdfs.writeFormat = Text agent.sinks.k1.hdfs.rollSize = 0 agent.sinks.k1.hdfs.rollCount= 600000 agent.sinks.k1.hdfs.rollInterval = 600 agent.channels = c1 agent.sources = r1 agent.sinks = k1
#驗證
#Server bin/flume-ng agent --conf conf --conf-file conf/flume-netcat.conf --name=agent -Dflume.root.logger=DEBUG,console
#Client telnet master 44444
# Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source ## exec表示flume回去調用給的命令,而後從給的命令的結果中去拿數據 a1.sources.r1.type = exec a1.sources.r1.command = tail -F /3_flume/text.txt a1.sources.r1.channels = c1 # Describe the sink ## 表示下沉到hdfs,類型決定了下面的參數 a1.sinks.k1.type = hdfs a1.sinks.k1.channel = c1 ## 下面的配置告訴用hdfs去寫文件的時候寫到什麼位置,下面的表示不是寫死的,而是能夠動態的變化的。表示輸出的目錄名稱是可變的 a1.sinks.k1.hdfs.path = /flume/tailout/%y-%m-%d/%H%M/ ##表示最後的文件的前綴 a1.sinks.k1.hdfs.filePrefix = events- ## 表示到了須要觸發的時間時,是否要更新文件夾,true:表示要 a1.sinks.k1.hdfs.round = true ## 表示每隔1分鐘改變一次 a1.sinks.k1.hdfs.roundValue = 1 ## 切換文件的時候的時間單位是分鐘 a1.sinks.k1.hdfs.roundUnit = minute ## 表示只要過了3秒鐘,就切換生成一個新的文件 a1.sinks.k1.hdfs.rollInterval = 3 ## 若是記錄的文件大於20字節時切換一次 a1.sinks.k1.hdfs.rollSize = 20 ## 當寫了5個事件時觸發 a1.sinks.k1.hdfs.rollCount = 5 ## 收到了多少條消息往dfs中追加內容 a1.sinks.k1.hdfs.batchSize = 10 ## 使用本地時間戳 a1.sinks.k1.hdfs.useLocalTimeStamp = true #生成的文件類型,默認是Sequencefile,可用DataStream:爲普通文本 a1.sinks.k1.hdfs.fileType = DataStream # Use a channel which buffers events in memory ##使用內存的方式 a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
驗證: echo '123456' >> /3_flume/text.txt
master:
# agent1 name agent1.channels = c1 agent1.sources = r1 agent1.sinks = k1 k2 #set group agent1.sinkgroups = g1 #set channel agent1.channels.c1.type = memory agent1.channels.c1.capacity = 1000 agent1.channels.c1.transactionCapacity = 100 agent1.sources.r1.channels = c1 agent1.sources.r1.type = exec agent1.sources.r1.command = tail -F /3_flume/text.txt # set sink1 agent1.sinks.k1.channel = c1 agent1.sinks.k1.type = avro agent1.sinks.k1.hostname = slave1 agent1.sinks.k1.port = 52020 # set sink2 agent1.sinks.k2.channel = c1 agent1.sinks.k2.type = avro agent1.sinks.k2.hostname = slave2 agent1.sinks.k2.port = 52020 # set sink group agent1.sinkgroups.g1.sinks = k1 k2 # set failover agent1.sinkgroups.g1.processor.type = failover agent1.sinkgroups.g1.processor.k1 = 10 agent1.sinkgroups.g1.processor.k2 = 1 agent1.sinkgroups.g1.processor.maxpenalty = 10000
slave1
# agent1 name a1.channels = c1 a1.sources = r1 a1.sinks = k1 #set channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # other node, slave to master a1.sources.r1.type = avro a1.sources.r1.bind = slave1 a1.sources.r1.port = 52020 # set sink to hdfs a1.sinks.k1.type = logger # a1.sinks.k1.type = hdfs # a1.sinks.k1.hdfs.path=/flume_data_pool # a1.sinks.k1.hdfs.fileType=DataStream # a1.sinks.k1.hdfs.writeFormat=TEXT # a1.sinks.k1.hdfs.rollInterval=1 # a1.sinks.k1.hdfs.filePrefix = %Y-%m-%d a1.sources.r1.channels = c1 a1.sinks.k1.channel=c1
slave2
# agent1 name a1.channels = c1 a1.sources = r1 a1.sinks = k1 #set channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # other node, slave to master a1.sources.r1.type = avro a1.sources.r1.bind = slave2 a1.sources.r1.port = 52020 # set sink to hdfs a1.sinks.k1.type = logger # a1.sinks.k1.type = hdfs # a1.sinks.k1.hdfs.path=/flume_data_pool # a1.sinks.k1.hdfs.fileType=DataStream # a1.sinks.k1.hdfs.writeFormat=TEXT # a1.sinks.k1.hdfs.rollInterval=1 # a1.sinks.k1.hdfs.filePrefix = %Y-%m-%d a1.sources.r1.channels = c1 a1.sinks.k1.channel=c1
將故障轉移master中的
# set failover agent1.sinkgroups.g1.processor.type = failover agent1.sinkgroups.g1.processor.k1 = 10 agent1.sinkgroups.g1.processor.k2 = 1 agent1.sinkgroups.g1.processor.maxpenalty = 10000
換成
a1.sinkgroups = g1 a1.sinkgroups.g1.sinks = k1 k2 a1.sinkgroups.g1.processor.type = load_balance a1.sinkgroups.g1.processor.selector = round_robin
測試: for i in
seq 1 100; do echo '==============' $i >> /3_flume/text.txt; done
(Timestamp)
a1.sources = r1 a1.sinks = k1 a1.channels = c1 a1.sources.r1.type = http a1.sources.r1.host = master a1.sources.r1.port = 52020 a1.sources.r1.channels = c1 a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.preserveExisting= false a1.sources.r1.interceptors.i1.type = timestamp a1.sinks.k1.type = hdfs a1.sinks.k1.channel = c1 a1.sinks.k1.hdfs.path =hdfs://master:9000/flume/%Y-%m-%d/%H%M a1.sinks.k1.hdfs.filePrefix = badou. a1.sinks.k1.hdfs.fileType=DataStream a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
測試:
curl -X POST -d '[{"headers":{"flume":"nihao"},"body":"hello"}]' http://master:52020