下載地址:flume.apache.org/download.ht…html
解壓縮安裝包文件node
[hadoop@hadoop01 apps]$ tar -zxvf apache-flume-1.8.0-bin.tar.gz
[hadoop@hadoop01 apps]$ cd apache-flume-1.8.0-bin/
[hadoop@hadoop01 apache-flume-1.8.0-bin]$ ll
總用量 148
drwxr-xr-x. 2 hadoop hadoop 62 1月 21 14:31 bin
-rw-r--r--. 1 hadoop hadoop 81264 9月 15 20:26 CHANGELOG
drwxr-xr-x. 2 hadoop hadoop 127 1月 21 14:31 conf
-rw-r--r--. 1 hadoop hadoop 5681 9月 15 20:26 DEVNOTES
-rw-r--r--. 1 hadoop hadoop 2873 9月 15 20:26 doap_Flume.rdf
drwxr-xr-x. 10 hadoop hadoop 4096 9月 15 20:48 docs
drwxr-xr-x. 2 hadoop hadoop 8192 1月 21 14:31 lib
-rw-r--r--. 1 hadoop hadoop 27663 9月 15 20:26 LICENSE
-rw-r--r--. 1 hadoop hadoop 249 9月 15 20:26 NOTICE
-rw-r--r--. 1 hadoop hadoop 2483 9月 15 20:26 README.md
-rw-r--r--. 1 hadoop hadoop 1588 9月 15 20:26 RELEASE-NOTES
drwxr-xr-x. 2 hadoop hadoop 68 1月 21 14:31 tools
[hadoop@hadoop01 apache-flume-1.8.0-bin]$
複製代碼
[root@hadoop01 bin]# ln -s /home/hadoop/apps/apache-flume-1.8.0-bin /usr/local/flume
複製代碼
編輯 /etc/profile文件,增長如下內容:正則表達式
export FLUME_HOME=/usr/local/flume
export PATH=$PATH:${JAVA_HOME}/bin:${ZOOKEEPER_HOME}/bin:${HADOOP_HOME}/bin:${HADOOP_HOME}/sbin:${HIVE_HOME}/bin:${FLUME_HOME}/bin
複製代碼
使用example.conf 配置文件啓動一個實例算法
a1.sources = r1
a1.channels = c1
a1.sinks = k1
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
a1.sources.r1.channels = c1
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sinks.k1.type = logger
a1.sinks.k1.channel = c1
複製代碼
啓動命令以下:apache
[root@hadoop01 conf]# pwd
/home/hadoop/apps/apache-flume-1.8.0-bin/conf
[root@hadoop01 conf]# flume-ng agent --conf conf --conf-file example.conf --name a1 -Dflume.root.logger=INFO,console
複製代碼
啓動成功後以下圖所示:json
........略
18/01/27 18:17:25 INFO node.AbstractConfigurationProvider: Channel c1 connected to [r1, k1]
18/01/27 18:17:25 INFO node.Application: Starting new configuration:{ sourceRunners:{r1=EventDrivenSourceRunner: { source:org.apache.flume.source.NetcatSource{name:r1,state:IDLE} }} sinkRunners:{k1=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@20470f counterGroup:{ name:null counters:{} } }} channels:{c1=org.apache.flume.channel.MemoryChannel{name: c1}} }
18/01/27 18:17:25 INFO node.Application: Starting Channel c1
18/01/27 18:17:25 INFO node.Application: Waiting for channel: c1 to start. Sleeping for 500 ms
18/01/27 18:17:25 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: CHANNEL, name: c1: Successfully registered new MBean.
18/01/27 18:17:25 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: c1 started
18/01/27 18:17:26 INFO node.Application: Starting Sink k1
18/01/27 18:17:26 INFO node.Application: Starting Source r1
18/01/27 18:17:26 INFO source.NetcatSource: Source starting
18/01/27 18:17:26 INFO source.NetcatSource: Created serverSocket:sun.nio.ch.ServerSocketChannelImpl[/127.0.0.1:44444]
複製代碼
使用telnet發送數據bootstrap
[root@hadoop01 apps]# telnet localhost 44444
Trying ::1...
telnet: connect to address ::1: Connection refused
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
Are you OK ?
OK
複製代碼
控制檯打印以下:api
Impl[/127.0.0.1:44444]
18/01/27 18:21:00 INFO sink.LoggerSink: Event: { headers:{} body: 41 72 65 20 79 6F 75 20 4F 4B 20 3F 0D Are you OK ?. }
複製代碼
如沒法使用telnet,請先安裝telnet工具緩存
[root@hadoop01 apps]# yum -y install telnet
複製代碼
Source類型 | Type | 用途 |
---|---|---|
Avro Source | avro | 啓動一個Avro Server,可與上一級Agent鏈接 |
HTTP Source | http | 啓動一個HttpServer |
Exec Source | exec | 執行unix command,獲取標準輸出,如tail -f |
Taildir Source | TAILDIR | 監聽目錄或文件 |
Spooling Directory Source | spooldir | 監聽目錄下的新增文件 |
Kafka Source | org.apache.flume.sourc e.kafka.KafkaSource | 讀取Kafka數據 |
JMS Source | jms | 從JMS源讀取數據 |
//avrosource.conf
avroagent.sources = r1
avroagent.channels = c1
avroagent.sinks = k1
avroagent.sources.r1.type = avro
avroagent.sources.r1.bind = 192.168.43.20
avroagent.sources.r1.port = 8888
avroagent.sources.r1.threads= 3
avroagent.sources.r1.channels = c1
avroagent.channels.c1.type = memory
avroagent.channels.c1.capacity = 10000
avroagent.channels.c1.transactionCapacity = 1000
avroagent.sinks.k1.type = logger
avroagent.sinks.k1.channel = c1
複製代碼
[root@hadoop01 conf]# flume-ng agent --conf conf --conf-file avrosource.conf --name avroagent -Dflume.root.logger=INFO,console
複製代碼
啓動成功入下圖所示:安全
...略
18/01/27 18:46:36 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: CHANNEL, name: c1: Successfully registered new MBean.
18/01/27 18:46:36 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: c1 started
18/01/27 18:46:36 INFO node.Application: Starting Sink k1
18/01/27 18:46:36 INFO node.Application: Starting Source r1
18/01/27 18:46:36 INFO source.AvroSource: Starting Avro source r1: { bindAddress: 192.168.43.20, port: 8888 }...
18/01/27 18:46:37 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: r1: Successfully registered new MBean.
18/01/27 18:46:37 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: r1 started
18/01/27 18:46:37 INFO source.AvroSource: Avro source r1 started
複製代碼
execagent.sources = r1
execagent.channels = c1
execagent.sinks = k1
execagent.sources.r1.type = exec
execagent.sources.r1.command = tail -F /home/hadoop/apps/flume/execsource/exectest.log
execagent.sources.r1.channels = c1
execagent.channels.c1.type = memory
execagent.channels.c1.capacity = 10000
execagent.channels.c1.transactionCapacity = 1000
execagent.sinks.k1.type = avro
execagent.sinks.k1.channel = c1
execagent.sinks.k1.hostname = 192.168.43.20
execagent.sinks.k1.port = 8888
複製代碼
啓動 execAgent
[root@hadoop01 conf]# flume-ng agent --conf conf --conf-file execsource.conf --name execagent
複製代碼
啓動成功以下下圖所示:
18/01/27 18:58:43 INFO instrumentation.MonitoredCounterGroup: Component type: SINK, name: k1 started
18/01/27 18:58:43 INFO sink.AbstractRpcSink: Rpc sink k1: Building RpcClient with hostname: 192.168.43.20, port: 8888
18/01/27 18:58:43 INFO sink.AvroSink: Attempting to create Avro Rpc client.
18/01/27 18:58:43 WARN api.NettyAvroRpcClient: Using default maxIOWorkers
18/01/27 18:58:44 INFO sink.AbstractRpcSink: Rpc sink k1 started.
複製代碼
在execAgent監控的文件下寫入內容,觀察sourceagent是否接收到變化內容
[root@hadoop01 execsource]# echo 222 > exectest.log
[root@hadoop01 execsource]# echo 5555 >> exectest.log
[root@hadoop01 execsource]# cat exectest.log
222
5555
複製代碼
在sourceagent控制打印臺下查看監控消息以下:
18/01/27 18:58:50 INFO sink.LoggerSink: Event: { headers:{} body: 31 32 33 123 }
18/01/27 18:59:55 INFO sink.LoggerSink: Event: { headers:{} body: 35 35 35 35 5555 }
複製代碼
則說明2個串聯agent傳遞信息成功。
說明: avroagent 配置文件配置項起始名稱須要與服務啓動 -name 名稱相一致。
a1.sources = r1
a1.channels = c1
a1.sinks = k1
a1.sources.r1.type = spooldir
a1.sources.r1.channels = c1
a1.sources.r1.spoolDir = /home/hadoop/apps/flume/spoolDir
a1.sources.r1.fileHeader = true
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 1000
a1.sinks.k1.type = logger
a1.sinks.k1.channel = c1
複製代碼
/home/hadoop/apps/flume/spoolDir 必須已經建立且具備用戶讀寫權限。
啓動 SpoolDirsourceAgent
[hadoop@hadoop01 conf]$ flume-ng agent --conf conf --conf-file spooldirsource.conf --name a1 -Dflume.root.logger=INFO,console
複製代碼
在spoolDir文件夾下建立文件並寫入文件內容,觀察控制檯消息:
18/01/28 17:06:54 INFO avro.ReliableSpoolingFileEventReader: Preparing to move file /home/hadoop/apps/flume/spoolDir/test to /home/hadoop/apps/flume/spoolDir/test.COMPLETED
18/01/28 17:06:55 INFO sink.LoggerSink: Event: { headers:{file=/home/hadoop/apps/flume/spoolDir/test} body: 32 32 32 222 }
複製代碼
此時監測到SpoolDirSourceAgent 能夠監控到文件變化。
值得說明的是:Spooling Directory Source Agent 並不能監聽子級文件夾的文件變化,也不支持已存在的文件更新數據變化.
kafkasourceagent.sources = r1
kafkasourceagent.channels = c1
kafkasourceagent.sinks = k1
kafkasourceagent.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
kafkasourceagent.sources.r1.channels = c1
kafkasourceagent.sources.r1.batchSize = 100
kafkasourceagent.sources.r1.batchDurationMillis = 1000
kafkasourceagent.sources.r1.kafka.bootstrap.servers = 192.168.43.22:9092,192.168.43.23:9092,192.168.43.24:9092
kafkasourceagent.sources.r1.kafka.topics = flumetopictest1
kafkasourceagent.sources.r1.kafka.consumer.group.id = flumekafkagroupid
kafkasourceagent.channels.c1.type = memory
kafkasourceagent.channels.c1.capacity = 10000
kafkasourceagent.channels.c1.transactionCapacity = 1000
kafkasourceagent.sinks.k1.type = logger
kafkasourceagent.sinks.k1.channel = c1
複製代碼
首先啓動3個節點的kafka節點服務,在每一個kafka節點執行,之後臺方式運行
[root@hadoop03 bin]# ./kafka-server-start.sh -daemon ../config/server.properties
複製代碼
在kafka節點上建立一個配置好的Topic flumetoptest1,命令以下:
[root@hadoop03 bin]# ./kafka-topics.sh --create --zookeeper 192.168.43.20:2181 --replication-factor 1 --partitions 3 --topic flumetopictest1
Created topic "flumetopictest1".
複製代碼
建立成功後,啓動一個kafka Source Agent,命令以下:
[root@hadoop01 conf]# flume-ng agent --conf conf --conf-file kafkasource.conf --name kafkasourceagent -Dflume.root.logger=INFO,console
複製代碼
建立一個Kafka 生產者,進行消息發送
root@hadoop03 bin]# ./kafka-console-producer.sh --broker-list 192.168.43.22:9092,192.168.43.23:9092 --topic flumetopictest1
複製代碼
發送消息,此時kafka 就能夠接收到消息:
18/02/03 20:36:57 INFO sink.LoggerSink: Event: { headers:{topic=flumetopictest1, partition=2, timestamp=1517661413068} body: 31 32 33 31 33 32 32 31 12313221 }
18/02/03 20:37:09 INFO sink.LoggerSink: Event: { headers:{topic=flumetopictest1, partition=1, timestamp=1517661428930} body: 77 69 20 61 69 79 6F 75 08 08 08 wi aiyou... }
複製代碼
監聽一個文件夾或者文件,經過正則表達式匹配須要監聽的 數據源文件,Taildir Source經過將監聽的文件位置寫入到文件中來實現斷點續傳,而且可以保證沒有重複數據的讀取.
taildiragent.sources=r1
taildiragent.channels=c1
taildiragent.sinks=k1
taildiragent.sources.r1.type=TAILDIR
taildiragent.sources.r1.positionFile=/home/hadoop/apps/flume/taildir/position/taildir_position.json
taildiragent.sources.r1.filegroups=f1 f2
taildiragent.sources.r1.filegroups.f1=/home/hadoop/apps/flume/taildir/test1/test.log
taildiragent.sources.r1.filegroups.f2=/home/hadoop/apps/flume/taildir/test2/.*log.*
taildiragent.sources.r1.channels=c1
taildiragent.channels.c1.type=memory
taildiragent.channels.c1.transcationCapacity=1000
taildiragent.sinks.k1.type=logger
taildiragent.sinks.k1.channel=c1
複製代碼
啓動一個taildirSource agent ,代碼以下:
[root@hadoop01 conf]# flume-ng agent --conf conf --conf-file taildirsource.conf --name taildiragent -Dflume.root.logger=INFO,console
複製代碼
開始在test1和test2文件夾寫入文件,觀察agent消息接收。
type :channel類型memory
capacity :channel中存儲的最大event數,默認值100
transactionCapacity :一次事務中寫入和讀取的event最大數,默認值100。
keep-alive:在Channel中寫入或讀取event等待完成的超時時間,默認值3秒
byteCapacityBufferPercentage:緩衝空間佔Channel容量(byteCapacity)的百分比,爲event中的頭信息保留了空間,默認值20(單位百分比)
byteCapacity :Channel佔用內存的最大容量,默認值爲Flume堆內存的80%
複製代碼
type:channel類型爲file
checkpointDir:檢查點目錄,默認在啓動flume用戶目錄下建立,建 議單獨配置磁盤路徑
useDualCheckpoints:是否開啓備份檢查點,默認false,建議設置爲true開啓備份檢查點,備份檢查點的做用是當Agent意外出錯致使寫 入檢查點文件異常,在從新啓動File Channel時經過備份檢查點將數據回放到內存中,若是不開啓備份檢查點,在數據回放的過程當中發現檢查點文件異常會對所數據進行全回放,全回放的過程至關耗時
backupCheckpointDir:備份檢查點目錄,最好不要和檢查點目錄在同 一塊磁盤上
checkpointInterval:每次寫檢查點的時間間隔,默認值30000毫秒
dataDirs:數據文件磁盤存儲路徑,建議配置多塊盤的多個路徑,經過磁盤的並行寫入來提升file channel性能,多個磁盤路徑用逗號隔開
transactionCapacity:一次事務中寫入和讀取的event最大數,默認值 10000
maxFileSize:每一個數據文件的最大大小,默認值:2146435071字節
minimumRequiredSpace:磁盤路徑最小剩餘空間,若是磁盤剩餘空 間小於設置值,則再也不寫入數據
capacity:file channel可容納的最大event數
keep-alive:在Channel中寫入或讀取event等待完成的超時時間,默認值3秒
複製代碼
配置一個FileChannel,filechannel.conf 的配置內容以下:
a1.sources = r1
a1.channels = c1
a1.sinks = k1
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
a1.sources.r1.channels = c1
a1.channels.c1.type = file
a1.channels.c1.dataDirs = /home/hadoop/apps/flume/filechannel/data
a1.channels.c1.checkpointDir = /home/hadoop/apps/flume/filechannel/checkpoint
a1.channels.c1.useDualCheckpoints = true
a1.channels.c1.backupCheckpointDir = /home/hadoop/apps/flume/filechannel/backup
a1.sinks.k1.type = logger
a1.sinks.k1.channel = c1
複製代碼
啓動一個FileChannel,啓動命令以下:
[root@hadoop01 bin]# flume-ng agent --conf conf --conf-file filechannle.conf --name a1 -Dflume.root.logger=INFO,console
複製代碼
向配置文件端口44444發送數據,觀察Channel記錄狀況
telnet localhost asdfasd
複製代碼
此時能夠觀察到控制檯打印監控結果
18/02/04 21:15:44 INFO sink.LoggerSink: Event: { headers:{} body: 61 64 66 61 64 66 61 64 66 61 73 66 0D adfadfadfasf. }
18/02/04 21:15:48 INFO file.EventQueueBackingStoreFile: Start checkpoint for /home/hadoop/apps/flume/filechannel/checkpoint/checkpoint, elements to sync = 1
18/02/04 21:15:48 INFO file.EventQueueBackingStoreFile: Updating checkpoint metadata: logWriteOrderID: 1517749968978, queueSize: 0, queueHead: 0
18/02/04 21:15:48 INFO file.EventQueueBackingStoreFile: Attempting to back up checkpoint.
18/02/04 21:15:48 INFO file.Serialization: Skipping in_use.lock because it is in excludes set
18/02/04 21:15:48 INFO file.Serialization: Deleted the following files: , checkpoint, checkpoint.meta, inflightputs, inflighttakes.
18/02/04 21:15:48 INFO file.Log: Updated checkpoint for file: /home/hadoop/apps/flume/filechannel/data/log-2 position: 170 logWriteOrderID: 1517749968978
18/02/04 21:15:49 INFO file.EventQueueBackingStoreFile: Checkpoint backup completed.
複製代碼
Kafka Channel:將分佈式消息隊列kafka做爲channel相對於Memory Channel和File Channel存儲容量更大、 容錯能力更強,彌補了其餘兩種Channel的短板,若是合理利用Kafka的性能,可以達到事半功倍的效果。 關鍵參數以下:
type:Kafka Channel類型org.apache.flume.channel.kafka.KafkaChannel
kafka.bootstrap.servers:Kafka broker列表,格式爲ip1:port1, ip2:port2…,建 議配置多個值提升容錯能力,多個值之間用逗號隔開
kafka.topic:topic名稱,默認值「flume-channel」
kafka.consumer.group.id:Consumer Group Id,全局惟一
parseAsFlumeEvent:是否以Avro FlumeEvent模式寫入到Kafka Channel中, 默認值true,event的header信息與event body都寫入到kafka中
pollTimeout:輪詢超時時間,默認值500毫秒
kafka.consumer.auto.offset.reset:earliest表示從最先的偏移量開始拉取,latest表示從最新的偏移量開始拉取,none表示若是沒有發現該Consumer組以前拉 取的偏移量則拋異常
複製代碼
配置一個KafakChannel, kafkachannel.conf 配置內容以下:
a1.sources = r1
a1.channels = c1
a1.sinks = k1
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
a1.sources.r1.channels = c1
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = 192.168.43.22:9092,192.168.43.23:9092
a1.channels.c1.kafka.topic = flumechannel2
a1.channels.c1.kafka.consumer.group.id = flumecgtest1
a1.sinks.k1.type = logger
a1.sinks.k1.channel = c1
複製代碼
啓動kafak服務,建立一個kafka主題,命令以下:
[root@hadoop03 bin]# ./kafka-server-start.sh -daemon ../config/server.properties
[root@hadoop03 bin]# ./kafka-topics.sh --create --zookeeper 192.168.43.20:2181 --replication-factor 1 --partitions 3 --topic flumechannel2
複製代碼
查看建立的主題信息
[root@hadoop03 bin]# ./kafka-topics.sh --list --zookeeper 192.168.43.20:2181
__consumer_offsets
flumechannel2
topicnewtest1
複製代碼
啓動kafka agent,使用telnet發送數據
[root@hadoop01 conf]# flume-ng agent --conf conf --conf-file kafkachannel.conf --name a1 -Dflume.root.logger=INFO,console
[root@hadoop01 flume]# clear
[root@hadoop01 flume]# telnet localhost 44444
Trying ::1...
telnet: connect to address ::1: Connection refused
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
abc
OK
複製代碼
監聽信息以下:
18/02/04 21:39:33 INFO sink.LoggerSink: Event: { headers:{} body: 61 62 63 0D abc. }
複製代碼
關鍵參數說明
type:Sink類型爲avro。
hostname:綁定的目標Avro Souce主機名稱或者IP
port:綁定的目標Avro Souce端口號
batch-size:批量發送Event數,默認值100
compression-type:是否使用壓縮,若是使用壓縮設則值爲
「deflate」, Avro Sink設置了壓縮那麼Avro Source也應設置相同的 壓縮格式,目前支持zlib壓縮,默認值none
compression-level:壓縮級別,0表示不壓縮,從1到9數字越大壓縮
效果越好,默認值6
複製代碼
關鍵參數信息說明以下:
type:Sink類型爲hdfs。
hdfs.path:HDFS存儲路徑,支持按日期時間分區。
hdfs.filePrefix:Event輸出到HDFS的文件名前綴,默認前綴FlumeData
hdfs.fileSuffix:Event輸出到HDFS的文件名後綴
hdfs.inUsePrefix:臨時文件名前綴
hdfs.inUseSuffix:臨時文件名後綴,默認值.tmp
hdfs.rollInterval:HDFS文件滾動生成時間間隔,默認值30秒,該值設置 爲0表示文件不根據時間滾動生成
複製代碼
配置一個hdfsink.conf文件,配置內容以下:
a1.sources = r1
a1.channels = c1
a1.sinks = k1
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = timestamp
a1.sources.r1.interceptors.i1.preserveExisting = false
a1.sources.r1.channels = c1
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 1000
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = /data/flume/%Y%m%d
a1.sinks.k1.hdfs.filePrefix = hdfssink
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.writeFormat = Text
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 1
a1.sinks.k1.hdfs.roundUnit = minute
a1.sinks.k1.hdfs.callTimeout = 60000
複製代碼
啓動一個hdfssink agent,命令以下:
[root@hadoop01 conf]# flume-ng agent --conf conf --conf-file hdfssink.conf --name a1 -Dflume.root.logger=INFO,console
複製代碼
使用telnet 向44444發送數據,觀察數據寫入結果
[hadoop@hadoop01 root]$ telnet localhost 44444
Trying ::1...
telnet: connect to address ::1: Connection refused
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
abc
OK
2323444
OK
複製代碼
此時控制檯打印,在HDFS文件系統生成一個臨時文件
8/02/04 22:41:52 INFO hdfs.HDFSDataStream: Serializer = TEXT, UseRawLocalFileSystem = false
18/02/04 22:41:52 INFO hdfs.BucketWriter: Creating /data/flume/20180204/hdfssink.1517755312242.tmp
18/02/04 22:42:24 INFO hdfs.BucketWriter: Closing /data/flume/20180204/hdfssink.1517755312242.tmp
18/02/04 22:42:24 INFO hdfs.BucketWriter: Renaming /data/flume/20180204/hdfssink.1517755312242.tmp to /data/flume/20180204/hdfssink.1517755312242
18/02/04 22:42:24 INFO hdfs.HDFSEventSink: Writer callback called.
複製代碼
值得注意的是:請使用hadoop用戶來執行agent的建立和消息的發送,避免因權限致使HDFS文件沒法寫入
Flume經過KafkaSink將Event寫入到Kafka指定的主題中 主要參數說明以下:
type:Sink類型,值爲KafkaSink類路徑 org.apache.flume.sink.kafka.KafkaSink。
kafka.bootstrap.servers:Broker列表,定義格式host:port,多個Broker之間用逗號隔開,能夠配置一個也能夠配置多個,用於Producer發現集羣中的Broker,建議配置多個,防止當個Broker出現問題鏈接 失敗。
kafka.topic:Kafka中Topic主題名稱,默認值flume-topic。
flumeBatchSize:Producer端單次批量發送的消息條數,該值應該根據實際環境適當調整,增大批量發送消息的條數可以在必定程度上提升性能,可是同時也增長了延遲和Producer端數據丟失的風險。 默認值100。
kafka.producer.acks:設置Producer端發送消息到Borker是否等待接收Broker返回成功送達信號。0表示Producer發送消息到Broker以後不須要等待Broker返回成功送達的信號,這種方式吞吐量高,可是存 在數據丟失的風險。1表示Broker接收到消息成功寫入本地log文件後向Producer返回成功接收的信號,不須要等待全部的Follower所有同步完消息後再作迴應,這種方式在數據丟失風險和吞吐量之間作了平衡。all(或者-1)表示Broker接收到Producer的消息成功寫入本 地log而且等待全部的Follower成功寫入本地log後向Producer返回成功接收的信號,這種方式可以保證消息不丟失,可是性能最差。默 認值1。
useFlumeEventFormat:默認值false,Kafka Sink只會將Event body內 容發送到Kafka Topic中。若是設置爲true,Producer發送到KafkaTopic中的Event將可以保留Producer端頭信息
複製代碼
配置一個kafkasink.conf,具體配置內容以下:
a1.sources = r1
a1.channels = c1
a1.sinks = k1
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
a1.sources.r1.channels = c1
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 1000
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.channel = c1
a1.sinks.k1.kafka.topic = FlumeKafkaSinkTopic1
a1.sinks.k1.kafka.bootstrap.servers = 192.168.43.22:9092,192.168.43.23:9092
a1.sinks.k1.kafka.flumeBatchSize = 100
a1.sinks.k1.kafka.producer.acks = 1
複製代碼
啓動kafka Broker節點22和Broker節點23
[root@hadoop03 bin]# ./kafka-server-start.sh -daemon ../config/server.properties
複製代碼
按配置文件建立主題信息
[root@hadoop03 bin]# ./kafka-topics.sh --create --zookeeper 192.168.43.20:2181 --replication-factor 1 --partitions 3 --topic FlumeKafkaSinkTopic1
Created topic "FlumeKafkaSinkTopic1".
複製代碼
啓動一個kafkasink agent,啓動命令以下:
[root@hadoop01 conf]# flume-ng agent --conf conf --conf-file kafkasink.conf --name a1 >/dev/null 2>&1 &
複製代碼
一個source將一個event拷貝到多個channel,經過不一樣的sink消費不一樣的channel,將相同的event輸出到不一樣的地方 配置文件:replicating_selector.conf
a1.sources = r1
a1.channels = c1 c2
a1.sinks = k1 k2
#定義source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
#設置複製選擇器
a1.sources.r1.selector.type = replicating
#設置required channel
a1.sources.r1.channels = c1 c2
#設置channel c1
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 1000
#設置channel c2
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 1000
#設置kafka sink
a1.sinks.k1.channel = c1
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = FlumeSelectorTopic1
a1.sinks.k1.kafka.bootstrap.servers = 192.168.43.22:9092,192.168.23.103:9092
a1.sinks.k1.kafka.flumeBatchSize = 5
a1.sinks.k1.kafka.producer.acks = 1
#設置file sink
a1.sinks.k2.channel = c2
a1.sinks.k2.type = file_roll
a1.sinks.k2.sink.directory = /home/hadoop/apps/flume/selector
a1.sinks.k2.sink.rollInterval = 60
複製代碼
分別寫入到kafka和文件中
建立主題FlumeKafkaSinkTopic1
bin/kafka-topics.sh --create --zookeeper 192.168.183.100:2181 --replication-factor 1 --partitions 3 --topic FlumeSelectorTopic1
複製代碼
啓動flume agent
bin/flume-ng agent --conf conf --conf-file conf/replicating_selector.conf --name a1
複製代碼
使用telnet發送數據
telnet localhost 44444
複製代碼
查看/home/hadoop/apps/flume/selector路徑下的數據
查看kafka FlumeSelectorTopic1主題數據
bin/kafka-console-consumer.sh --zookeeper 192.168.183.100:2181 --from-beginning --topic FlumeSelectorTopic1
複製代碼
-Multiplexing Channel Selector多路複用選擇器根據event的頭信息中不 同鍵值數據來判斷Event應該被寫入到哪一個Channel中
selector.type:Channel選擇器類型爲multiplexing
selector.header:設置頭信息中用於檢測的headerName
selector.default:默認寫入的Channel列表
selector.mapping.*:headerName對應的不一樣值映射的不一樣Channel列表
selector.optional:可選寫入的Channel列表
複製代碼
配置文件multiplexing_selector.conf、avro_sink1.conf、avro_sink2.conf、avro_sink3.conf 向不一樣的avro_sink對應的配置文件的agent發送數據,不一樣的avro_sink配置文件經過static interceptor在event頭信息中寫入不一樣的靜態數據 multiplexing_selector根據event頭信息中不一樣的靜態數據類型分別發送到不一樣的目的地
multiplexing_selector.conf
a3.sources = r1
a3.channels = c1 c2 c3
a3.sinks = k1 k2 k3
a3.sources.r1.type = avro
a3.sources.r1.bind = 192.168.183.100
a3.sources.r1.port = 8888
a3.sources.r1.threads= 3
#設置multiplexing selector
a3.sources.r1.selector.type = multiplexing
a3.sources.r1.selector.header = logtype
#經過header中logtype鍵對應的值來選擇不一樣的sink
a3.sources.r1.selector.mapping.ad = c1
a3.sources.r1.selector.mapping.search = c2
a3.sources.r1.selector.default = c3
a3.sources.r1.channels = c1 c2 c3
a3.channels.c1.type = memory
a3.channels.c1.capacity = 10000
a3.channels.c1.transactionCapacity = 1000
a3.channels.c2.type = memory
a3.channels.c2.capacity = 10000
a3.channels.c2.transactionCapacity = 1000
a3.channels.c3.type = memory
a3.channels.c3.capacity = 10000
a3.channels.c3.transactionCapacity = 1000
#分別設置三個sink的不一樣輸出
a3.sinks.k1.type = file_roll
a3.sinks.k1.channel = c1
a3.sinks.k1.sink.directory = /home/hadoop/apps/flume/multiplexing/k11
a3.sinks.k1.sink.rollInterval = 60
a3.sinks.k2.channel = c2
a3.sinks.k2.type = file_roll
a3.sinks.k2.sink.directory = /home/hadoop/apps/flume/multiplexing/k12
a3.sinks.k2.sink.rollInterval = 60
a3.sinks.k3.channel = c3
a3.sinks.k3.type = file_roll
a3.sinks.k3.sink.directory = /home/hadoop/apps/flume/multiplexing/k13
a3.sinks.k3.sink.rollInterval = 60
複製代碼
avro_sink1.conf
agent1.sources = r1
agent1.channels = c1
agent1.sinks = k1
agent1.sources.r1.type = netcat
agent1.sources.r1.bind = localhost
agent1.sources.r1.port = 44444
agent1.sources.r1.interceptors = i1
agent1.sources.r1.interceptors.i1.type = static
agent1.sources.r1.interceptors.i1.key = logtype
agent1.sources.r1.interceptors.i1.value = ad
agent1.sources.r1.interceptors.i1.preserveExisting = false
agent1.sources.r1.channels = c1
agent1.channels.c1.type = memory
agent1.channels.c1.capacity = 10000
agent1.channels.c1.transactionCapacity = 1000
agent1.sinks.k1.type = avro
agent1.sinks.k1.channel = c1
agent1.sinks.k1.hostname = 192.168.183.100
agent1.sinks.k1.port = 8888
複製代碼
avro_sink2.conf
agent2.sources = r1
agent2.channels = c1
agent2.sinks = k1
agent2.sources.r1.type = netcat
agent2.sources.r1.bind = localhost
agent2.sources.r1.port = 44445
agent2.sources.r1.interceptors = i1
agent2.sources.r1.interceptors.i1.type = static
agent2.sources.r1.interceptors.i1.key = logtype
agent2.sources.r1.interceptors.i1.value = search
agent2.sources.r1.interceptors.i1.preserveExisting = false
agent2.sources.r1.channels = c1
agent2.channels.c1.type = memory
agent2.channels.c1.capacity = 10000
agent2.channels.c1.transactionCapacity = 1000
agent2.sinks.k1.type = avro
agent2.sinks.k1.channel = c1
agent2.sinks.k1.hostname = 192.168.183.100
agent2.sinks.k1.port = 8888
複製代碼
avro_sink3.conf
agent3.sources = r1
agent3.channels = c1
agent3.sinks = k1
agent3.sources.r1.type = netcat
agent3.sources.r1.bind = localhost
agent3.sources.r1.port = 44446
agent3.sources.r1.interceptors = i1
agent3.sources.r1.interceptors.i1.type = static
agent3.sources.r1.interceptors.i1.key = logtype
agent3.sources.r1.interceptors.i1.value = other
agent3.sources.r1.interceptors.i1.preserveExisting = false
agent3.sources.r1.channels = c1
agent3.channels.c1.type = memory
agent3.channels.c1.capacity = 10000
agent3.channels.c1.transactionCapacity = 1000
agent3.sinks.k1.type = avro
agent3.sinks.k1.channel = c1
agent3.sinks.k1.hostname = 192.168.183.100
agent3.sinks.k1.port = 8888
複製代碼
在/home/hadoop/apps/flume/multiplexing目錄下分別建立看k1 k2 k3目錄
bin/flume-ng agent --conf conf --conf-file conf/multiplexing_selector.conf --name a3 -Dflume.root.logger=INFO,console
bin/flume-ng agent --conf conf --conf-file conf/avro_sink1.conf --name agent1 >/dev/null 2>&1 &
bin/flume-ng agent --conf conf --conf-file conf/avro_sink2.conf --name agent2 >/dev/null 2>&1 &
bin/flume-ng agent --conf conf --conf-file conf/avro_sink3.conf --name agent3 >/dev/null 2>&1 &
複製代碼
使用telnet發送數據 telnet localhost 44444
關鍵參數說明:
sinks:sink組內的子Sink,多個子sink之間用空格隔開
processor.type:設置負載均衡類型load_balance
processor.backoff:設置爲true時,若是在系統運行過程當中執行的Sink失敗,會將失敗的Sink放進一個冷卻池中。默認值false
processor.selector.maxTimeOut:失敗sink在冷卻池中最大駐留時間,默認值30000ms
processor.selector:負載均衡選擇算法,可使用輪詢「round_robin」、隨機「random」或者是繼承AbstractSinkSelector類的自定義負載均衡實現類
複製代碼
關鍵參數說明:
sinks:sink組內的子Sink,多個子sink之間用空格隔開
processor.type:設置故障轉移類型「failover」
processor.priority.<sinkName>:指定Sink組內各子Sink的優先級別,優先級從高到低,數值越大優先級越高
processor.maxpenalty:等待失敗的Sink恢復的最長時間,默認值30000毫秒
複製代碼