主要介紹幾種常見Flume的Sink--匯聚點node
1.Logger Sink web
記錄INFO級別的日誌,通常用於調試。前面介紹Source時候用到的Sink都是這個類型的Sinkapache
必須配置的屬性:json
屬性說明:
!channel –
!type – The component type name, needs to be logger
maxBytesToLog 16 Maximum number of bytes of the Event body to log
要求必須在 --conf 參數指定的目錄下有 log4j的配置文件
能夠經過-Dflume.root.logger=INFO,console在命令啓動時手動指定log4j參數bash
案例:前面的例子都是這種類型的Sinkcurl
2.File Roll Sink分佈式
在本地文件系統中存儲事件。每隔指定時長生成文件保存這段時間內收集到的日誌信息。ide
屬性說明:
!channel –
!type – 類型,必須是"file_roll"
!sink.directory – 文件被存儲的目錄
sink.rollInterval 30 滾動文件每隔30秒(應該是每隔30秒鐘單獨切割數據到一個文件的意思)。若是設置爲0,則禁止滾動,從而致使全部數據被寫入到一個文件。
sink.serializer TEXT Other possible options include avro_event or the FQCN of an implementation of EventSerializer.Builder interface.
batchSize 100 oop
案例:測試
編寫配置文件: #命名Agent a1的組件 a1.sources = r1 a1.sinks = k1 a1.channels = c1 #描述/配置Source a1.sources.r1.type = http a1.sources.r1.port = 6666 #描述Sink a1.sinks.k1.type = file_roll a1.sinks.k1.sink.directory = /home/park/work/apache-flume-1.6.0-bin/mysink #描述內存Channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 #爲Channle綁定Source和Sink a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
啓動flume:
1
|
.
/flume-ng
agent --conf ..
/conf
--conf-
file
..
/conf/template7
.conf --name a1 -Dflume.root.logger=INFO,console
|
測試:
經過curl命令向目標主機發送請求,就會發如今指定的文件夾下出現記錄收集日誌的文件
3.Avro Sink
是實現多級流動 和 扇出流(1到多) 扇入流(多到1) 的基礎。很是重要 可是須要多臺機器
必要屬性說明:
!channel –
!type – The component type name, needs to be avro.
!hostname – The hostname or IP address to bind to.
!port – The port # to listen on.
案例1.多級流動 h1流動到h2
配置配置文件: #命名Agent組件 a1.sources=r1 a1.sinks=k1 a1.channels=c1 #描述/配置Source a1.sources.r1.type=avro a1.sources.r1.bind=0.0.0.0 a1.sources.r1.port=9988 #描述Sink a1.sinks.k1.type=logger #描述內存Channel a1.channels.c1.type=memory a1.channels.c1.capacity=1000 a1.channels.c1.transactionCapacity=1000 #爲Channel綁定Source和Sink a1.sources.r1.channels=c1 a1.sinks.k1.channel=c1 啓動flume: ./flume-ng agent --conf ../conf --conf-file ../conf/template8.conf --name a1 -Dflume.root.logger=INFO,console h1: 配置配置文件 #命名Agent組件 a1.sources=r1 a1.sinks=k1 a1.channels=c1 #描述/配置Source a1.sources.r1.type=http a1.sources.r1.port=8888 #描述Sink a1.sinks.k1.type=avro a1.sinks.k1.hostname=192.168.242.138 a1.sinks.k1.port=9988 #描述內存Channel a1.channels.c1.type=memory a1.channels.c1.capacity=1000 a1.channels.c1.transactionCapacity=1000 #爲Channel綁定Source和Sink a1.sources.r1.chafile:///C:/Users/park/Desktop/Day01_Flume/%E6%96%87%E6%A1%A3/Flume%201.6.0%20User%20Guide%20%E2%80%94%20Apache%20Flume.htm#irc-sinknnels=c1 a1.sinks.k1.channel=c1
啓動flume
發送http請求到h1:
1
|
curl -X POST -d
'[{ "headers" :{"a" : "a1","b" : "b1"},"body" : "hello~http~flume~"}]'
http:
//192
.168.242.133:8888
|
稍等幾秒後,發現h2最終收到了這條消息
案例2:扇出流(h1扇出到h2,h3)
h2 h3: 配置配置文件: #命名Agent組件 a1.sources=r1 a1.sinks=k1 a1.channels=c1 #描述/配置Source a1.sources.r1.type=avro a1.sources.r1.bind=0.0.0.0 a1.sources.r1.port=9988 #描述Sink a1.sinks.k1.type=logger #描述內存Channel a1.channels.c1.type=memory a1.channels.c1.capacity=1000 a1.channels.c1.transactionCapacity=1000 #爲Channel綁定Source和Sink a1.sources.r1.channels=c1 a1.sinks.k1.channel=c1 啓動flume: ./flume-ng agent --conf ../conf --conf-file ../conf/template8.conf --name a1 -Dflume.root.logger=INFO,console h1: 配置配置文件 #命名Agent組件 a1.sources=r1 a1.sinks=k1 k2 a1.channels=c1 c2 #描述/配置Source a1.sources.r1.type=http a1.sources.r1.port=8888 #描述Sink a1.sinks.k1.type=avro a1.sinks.k1.hostname=192.168.242.138 a1.sinks.k1.port=9988 a1.sinks.k2.type=avro a1.sinks.k2.hostname=192.168.242.135 a1.sinks.k2.port=9988 #描述內存Channel a1.channels.c1.type=memory a1.channels.c1.capacity=1000 a1.channels.c1.transactionCapacity=1000 a1.channels.c2.type=memory a1.channels.c2.capacity=1000 a1.channels.c2.transactionCapacity=1000 #爲Channel綁定Source和Sink a1.sources.r1.channels=c1 c2 a1.sinks.k1.channel=c1 a1.sinks.k2.channel=c2
案例3:扇入流()
m3: 編寫配置文件: #命名Agent組件 a1.sources=r1 a1.sinks=k1 a1.channels=c1 #描述/配置Source a1.sources.r1.type=avro a1.sources.r1.bind=0.0.0.0 a1.sources.r1.port=4141 #描述Sink a1.sinks.k1.type=logger #描述內存Channel a1.channels.c1.type=memory a1.channels.c1.capacity=1000 a1.channels.c1.transactionCapacity=1000 #爲Channel綁定Source和Sink a1.sources.r1.channels=c1 a1.sinks.k1.channel=c1 啓動flume: ./flume-ng agent --conf ../conf --conf-file ../conf/template.conf --name a1 -Dflume.root.logger=INFO,console m一、m2: 編寫配置文件: #命名Agent組件 a1.sources=r1 a1.sinks=k1 a1.channels=c1 #描述/配置Source a1.sources.r1.type=http a1.sources.r1.port=8888 #描述Sink a1.sinks.k1.type=avro a1.sinks.k1.hostname=192.168.242.135 a1.sinks.k1.port=4141 #描述內存Channel a1.channels.c1.type=memory a1.channels.c1.capacity=1000 a1.channels.c1.transactionCapacity=1000 #爲Channel綁定Source和Sink a1.sources.r1.channels=c1 a1.sinks.k1.channel=c1 啓動flume: ./flume-ng agent --conf ../conf --conf-file ../conf/template9.conf --name a1 -Dflume.root.logger=INFO,console m1經過curl發送一條http請求,因爲默認使用的是jsonHandler,數據格式必須是指定的json格式: [root@localhost conf]# curl -X POST -d '[{ "headers" :{"flag" : "c"},"body" : "idoall.org_body"}]' http://0.0.0.0:8888 m2經過curl發送一條http請求,因爲默認使用的是jsonHandler,數據格式必須是指定的json格式: [root@localhost conf]# curl -X POST -d '[{ "headers" :{"flag" : "c"},"body" : "idoall.org_body"}]' http://0.0.0.0:8888 發現m3均能正確收到消息
四、HDFS Sink
此Sink將事件寫入到Hadoop分佈式文件系統HDFS中。
目前它支持建立文本文件和序列化文件。對這兩種格式都支持壓縮。 這些文件能夠分卷,按照指定的時間或數據量或事件的數量爲基礎。
它還經過相似時間戳或機器屬性對數據進行 buckets/partitions 操做
HDFS的目錄路徑能夠包含將要由HDFS替換格式的轉移序列用以生成存儲事件的目錄/文件名。
使用這個Sink要求hadoop必須已經安裝好,以便Flume能夠經過hadoop提供的jar包與HDFS進行通訊。
注意,此版本hadoop必須支持sync()調用。
必要屬性說明:
!channel –
!type – 類型名稱,必須是「HDFS」
!hdfs.path – HDFS 目錄路徑 (eg hdfs://namenode/flume/webdata/)
hdfs.filePrefix FlumeData Flume在目錄下建立文件的名稱前綴
hdfs.fileSuffix – 追加到文件的名稱後綴 (eg .avro - 注: 日期時間不會自動添加)
hdfs.inUsePrefix – Flume正在處理的文件所加的前綴
hdfs.inUseSuffix .tmp Flume正在處理的文件所加的後綴
案例:
#命名Agent組件 a1.sources=r1 a1.sinks=k1 a1.channels=c1 #描述/配置Source a1.sources.r1.type=http a1.sources.r1.port=8888 #描述Sink a1.sinks.k1.type=hdfs a1.sinks.k1.hdfs.path=hdfs://0.0.0.0:9000/ppp #描述內存Channel a1.channels.c1.type=memory a1.channels.c1.capacity=1000 a1.channels.c1.transactionCapacity=1000 #爲Channel綁定Source和Sink a1.sources.r1.channels=c1 a1.sinks.k1.channel=c1
啓動flume:
1
|
.
/flume-ng
agent --conf ..
/conf
--conf-
file
..
/conf/template9
.conf --name a1 -Dflume.root.logger=INFO,console
|
測試:經過利用curl給目的主機發送命令,會發如今HDFS中會生成相應的記錄文件。