這一篇我將給你們介紹Flume相關的技術點及應用。java
Flume(日誌收集框架)是分佈式的,可靠的,用於從不一樣的來源有效收集、彙集和移動大量的日誌數據用以集中式管理的系統。目前是apache的一個頂級項目。node
以下是技術結構圖:web
以下是應用架構圖:正則表達式
jdk6以上,推薦java7。shell
能夠apache官網下載flume的安裝包。數據庫
下載時注意,flume具備兩個版本,0.9.x和1.x兩個版本並不兼容,我在學習研究的時候用的是最新的1.x版本,也叫flume-ng版本。此篇博文也是基於這個版原本進行一些列實驗的。apache
具體版本爲:apache-flume-1.6.0-binjson
解壓到指定目錄便可。windows
Flume事件:被定義爲一個具備有效荷載的字節數據流和可選的字符串屬性集。即一條日誌就是一個Flume Event。數組
當一條日誌進入Flume的時候就會轉換爲Flume Event,Flume Event本質是一個json字符串。
Flume代理:是一個進程承載從外部源事件流到下一個目的地的過程。包含source、channel和sink。
數據源:消耗外部傳遞給他的事件,外部源將數據按照Flume Source能識別的格式將Flume事件發送給Flume Source。一個source和能夠對接多個Channel。
數據通道:是一個被動的存儲,用來保持事件,直到由一個Flume Sink消耗。一個Channel能夠對接多個Source,但只能對接一個Sink。在Sink分組的狀況下一個Channel能夠配置多個Sink,但最終使用的時候仍是組中的某一個Sink(這個後面會講到)。
數據匯聚點:表明外部數據存放位置。發送flume中的事件到指定的外部目標。一個Sink只能對接一個Channel。
Flume容許用戶進行多級流動到最終目的地,也容許扇出流(一到多)、扇入流(多到一)的流動和故障轉移、失敗處理。
事務型的數據傳遞,保證數據的可靠性。
通道能夠之內存或文件的方式實現,內存更快,可是不可恢復,文件比較慢但提供了可恢復性。
首先須要經過一個配置文件來配置Agent。經過flume提供的工具啓動agent就能夠工做了。
example.conf:單節點Flume配置。
Flume根目錄下的conf目錄中,並無提供配置文件的模版,只能手動建立,example.conf文件,如下是文件內容以及格式:
#命名Agent a1的組件 a1.sources = r1 a1.sinks = k1 a1.channels = c1
#描述/配置Source a1.sources.r1.type = netcat a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 44444
#描述Sink a1.sinks.k1.type = logger
#描述內存Channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100
#爲Channle綁定Source和Sink a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
一個配置文件中能夠配置多個Agent,一個Agent中能夠包含多個Source、Sink、Channel。
一個Source能夠綁定到多個通道,一個Sink只能綁定到一個通道。
文件內容格式總體以下:
#命名Agent a1的組件 a1.sources = r1 a1.sinks = k1 a1.channels = c1 #描述/配置Source a1.sources.r1.type = netcat a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 44444 #描述Sink a1.sinks.k1.type = logger #描述內存Channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 #爲Channle綁定Source和Sink a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
經過Flume工具,啓動agent,命令以下:
$ bin/flume-ng agent --conf conf --conf-file example.conf --name a1 -Dflume.root.logger=INFO,console
簡寫命令:
$bin/flume-ng agent -n a1 -c conf/ -f conf/example.conf -Dflume.root.logger=INFO,console
注意:此時是在conf目錄下啓動的,命令書寫時須要注意,本身所在的目錄。
在windows中經過telnet命令鏈接flume所在機器的44444端口發送數據。發現,flume確實收集到了該信息。
telnet組件在Windows中默認是關閉的,須要找到控制面板中的升級/卸載軟件,進行組件安裝。
監聽AVRO端口來接受來自外部AVRO客戶端的事件流。利用Avro Source能夠實現多級流動、扇出流、扇入流等效果。另外也能夠接受經過flume提供的Avro客戶端發送的日誌信息。
注意:如下標紅的是必備屬性。
channels:綁定的通道,支持多通道綁定,以空格隔開。
type:指定Source類型,類型名稱,"AVRO"。
bind:須要監聽的主機名或IP。
port:要監聽的端口。
threads:工做線程最大線程數
selector.type:選擇器類型。
selector.*
interceptors:空格分隔的攔截器列表
interceptors.*
compression-type none:壓縮類型,能夠是「none」或「default」,這個值必須和AvroSource的壓縮格式匹配。
ssl false :是否啓用ssl加密,默認false,若是啓用還須要配置一個「keystore」和一個「keystore-password」。
keystore:爲SSL提供的java密鑰文件所在路徑。
keystore-password:爲SSL提供的java密鑰文件密碼。
keystore-type JKS:密鑰庫類型能夠是「JKS」或「PKCS12」。
exclude-protocols SSLv3:空格分隔開的列表,用來指定在SSL/TLS協議中排除。SSLv3將老是被排除除了所指定的協議。
ipFilter false:若是須要爲netty開啓ip過濾,將此項設置爲true。
ipFilterRules:配置netty的ip過濾設置表達式規則。
#命名Agent a1的組件 a1.sources = r1 a1.sinks = k1 a1.channels = c1 #描述/配置Source a1.sources.r1.type = avro a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 33333 #描述Sink a1.sinks.k1.type = logger #描述內存Channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 #爲Channle綁定Source和Sink a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
經過如下命令啓動Flume:
徹底命令:
./flume-ng agent --conf ../conf --conf-file ../conf/template2.conf --name a1 -Dflume.root.logger=INFO,console
簡寫命令:
./flume-ng agent -n a1 -c ../conf -f ../conf/template2.conf -Dflume.root.logger=INFO,console
注意:此時是在bin目錄下啓動的。
經過flume提供的avro客戶端向指定機器指定端口發送日誌信息:
./flume-ng avro-client --conf ../conf --host 0.0.0.0 --port 33333 --filename ../mydata/log1.txt
發現確實收集到了日誌。
能夠將命令產生的輸出做爲源。一般能夠用於記錄命令的執行狀況。
channels:綁定的通道,支持多通道綁定,以空格隔開。
type:類型名稱,須要是"exec"。
command:要執行的命令,例如:command = cmd。
shell :A shell invocation used to run the command. e.g. /bin/sh -c. Required only for commands relying on shell features like wildcards, back ticks, pipes etc.
restartThrottle 10000:毫秒爲單位的時間,用來聲明等待多久後嘗試重試命令。
restart false:若是命令掛了,是否重啓命令。默認false。
logStdErr false:不管是不是標準錯誤都該被記錄。
batchSize 20:同時發送到通道中的最大行數。
batchTimeout 3000:若是緩衝區沒有滿,通過多長時間發送數據。
selector.type:選擇模式,分爲複製和多路複用。
selector.* :Depends on the selector.type value
interceptors:攔截器,可配置多個攔截器,用空格分隔的攔截器列表。
interceptors.*
#命名Agent a1的組件 a1.sources = r1 a1.sinks = k1 a1.channels = c1 #描述/配置Source a1.sources.r1.type = avro a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 33333 #描述Sink a1.sinks.k1.type = logger #描述內存Channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 #爲Channle綁定Source和Sink a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
./flume-ng agent --conf ../conf --conf-file ../conf/template2.conf --name a1 -Dflume.root.logger=INFO,console
能夠經過tail命令,收集日誌文件中後續追加的日誌。
這個Source容許你將文件將要收集的數據放置到"自動蒐集"目錄中。這個Source將監視該目錄,並將解析新文件的出現。事件處理邏輯是可插拔的,當一個文件被徹底讀入信道,它會被重命名或可選的直接刪除。
要注意的是,放置到自動蒐集目錄下的文件不能修改,若是修改,則flume會報錯。
另外,也不能產生重名的文件,若是有重名的文件被放置進來,則flume會報錯。
channels:綁定的通道,支持多通道綁定,以空格隔開。
type:指定類型,須要指定爲"spooldir"。
spoolDir:讀取文件的路徑,即"蒐集目錄"。
fileSuffix.COMPLETED:對處理完成的文件追加的後綴。
deletePolicy = never:處理完成後是否刪除文件,需是"never"或"immediate"。
fileHeader = false:Whether to add a header storing the absolute path filename.
fileHeaderKey = file:Header key to use when appending absolute path filename to event header.
basenameHeader = false:Whether to add a header storing the basename of the file.
basenameHeaderKey = basename:Header Key to use when appending basename of file to event header.
ignorePattern = ^$:正則表達式指定哪些文件須要忽略。
trackerDir.flumespool:Directory to store metadata related to processing of files. If this path is not an absolute path, then it is interpreted as relative to the spoolDir.
consumeOrder:處理文件的策略,oldest、youngest或random。
maxBackoff = 4000:The maximum time (in millis) to wait between consecutive attempts to write to the channel(s) if the channel is full. The source will start at a low backoff and increase it exponentially each time the channel throws a ChannelException, upto the value specified by this parameter.
batchSize = 100:Granularity at which to batch transfer to the channel.
inputCharset = UTF-8:讀取文件時使用的編碼。
decodeErrorPolicy = FAIL:當在輸入文件中發現沒法處理的字符編碼時如何處理。FAIL:拋出一個異常而沒法解析該文件。REPLACE:用「替換字符」字符,一般是Unicode的U + FFFD更換不可解析角色。忽略:掉落的不可解析的字符序列。
deserializer = LINE:聲明用來將文件解析爲事件的解析器。默認一行爲一個事件。處理類必須實現EventDeserializer.Builder接口。
deserializer.*:Varies per event deserializer.
bufferMaxLines:(Obselete) This option is now ignored.
bufferMaxLineLength = 5000:(Deprecated) Maximum length of a line in the commit buffer. Use deserializer.maxLineLength instead.
selector.type = replicating:replicating or multiplexing。
selector.* :Depends on the selector.type value。
interceptors:Space-separated list of interceptors。
interceptors.*
#命名Agent a1的組件 a1.sources = r1 a1.sinks = k1 a1.channels = c1 #描述/配置Source a1.sources.r1.type = spooldir a1.sources.r1.spoolDir = /home/park/work/apache-flume-1.6.0-bin/mydata #描述Sink a1.sinks.k1.type = logger #描述內存Channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 #爲Channle綁定Source和Sink a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
./flume-ng agent --conf ../conf --conf-file ../conf/template4.conf --name a1 -Dflume.root.logger=INFO,console
向指定目錄中傳輸文件,發現flume收集到了該文件,將文件中的每一行都做爲日誌來處理。
一個NetCat Source用來監聽一個指定端口,並將接收到的數據的每一行轉換爲一個事件。
channels :綁定的通道,支持多通道綁定,以空格隔開。
type:類型名稱,須要被設置爲"netcat"
bind:指定要綁定到的ip或主機名。
port:指定要綁定到的端口號。
max-line-length = 512:單行最大字節數。
ack-every-event = true:對於收到的每個Event是否響應"OK"。
selector.type
selector.*
interceptors
interceptors.*
參見快速入門案例。
序列發生器源
一個簡單的序列發生器,不斷的產生事件,值是從0開始每次遞增1。
主要用來進行測試。
channels :綁定的通道,支持多通道綁定,以空格隔開。
type:類型名稱,必須爲"seq"。
selector.type
selector.*
interceptors
interceptors.*
batchSize 1
#命名Agent a1的組件 a1.sources = r1 a1.sinks = k1 a1.channels = c1 #描述/配置Source a1.sources.r1.type = seq #描述Sink a1.sinks.k1.type = logger #描述內存Channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 #爲Channle綁定Source和Sink a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
./flume-ng agent --conf ../conf --conf-file ../conf/template4.conf --name a1 -Dflume.root.logger=INFO,console
發現打印了日誌。
此Source接受HTTP的GET和POST請求做爲Flume的事件。其中GET方式應該只用於試驗。
須要提供一個可插拔的"處理器"來將請求轉換爲事件對象,這個處理器必須實現HTTPSourceHandler接口。
這個處理器接受一個 HttpServletRequest對象,並返回一個Flume Envent對象集合。
從一個HTTP請求中獲得的事件將在一個事務中提交到通道中。thus allowing for increased efficiency on channels like the file channel。
若是處理器拋出一個異常,Source將會返回一個400的HTTP狀態碼。
若是通道已滿,沒法再將Event加入Channel,則Source返回503的HTTP狀態碼,表示暫時不可用。
type:類型,必須爲"HTTP"
port :監聽的端口
bind 0.0.0.0 監聽的主機名或ip
handler org.apache.flume.source.http.JSONHandler 處理器類,須要實現HTTPSourceHandler接口
handler.* – 處理器的配置參數
selector.type
selector.*
interceptors –
interceptors.*
enableSSL false 是否開啓SSL,若是須要設置爲true。注意,HTTP不支持SSLv3。
excludeProtocols SSLv3 空格分隔的要排除的SSL/TLS協議。SSLv3老是被排除的。
keystore 密鑰庫文件所在位置。
keystorePassword Keystore 密鑰庫密碼
#命名Agent a1的組件 a1.sources = r1 a1.sinks = k1 a1.channels = c1 #描述/配置Source a1.sources.r1.type = http a1.sources.r1.port = 66666 #描述Sink a1.sinks.k1.type = logger #描述內存Channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 #爲Channle綁定Source和Sink a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
./flume-ng agent --conf ../conf --conf-file ../conf/template6.conf --name a1 -Dflume.root.logger=INFO,console
經過命令發送HTTP請求到指定端口:
curl -X POST -d '[{ "headers" :{"a" : "a1","b" : "b1"},"body" : "hello~http~flume~"}]' http://0.0.0.0:6666
發現flume收集到了日誌。
能夠處理JSON格式的數據,並支持UTF-8 UTF-16 UTF-32字符集
該handler接受Evnet數組,並根據請求頭中指定的編碼將其轉換爲Flume Event
若是沒有指定編碼,默認編碼爲UTF-8.
JSON格式以下:
[{ "headers" : { "timestamp" : "434324343", "host" : "random_host.example.com" }, "body" : "random_body" }, { "headers" : { "namenode" : "namenode.example.com", "datanode" : "random_datanode.example.com" }, "body" : "really_random_body" }]
To set the charset, the request must have content type specified as application/json;charset=UTF-8 (replace UTF-8 with UTF-16 or UTF-32 as required).
One way to create an event in the format expected by this handler is to use JSONEvent provided in the Flume SDK and use Google Gson to create the JSON string using the Gson#fromJson(Object, Type) method.
Typetype=newTypeToken<List<JSONEvent>>(){}.getType();
BlobHandler是一種將請求中上傳文件信息轉化爲event的處理器。
參數說明:
handler:The FQCN of this class: org.apache.flume.sink.solr.morphline.BlobHandler
handler.maxBlobLength 100000000 The maximum number of bytes to read and buffer for a given request
自定義源
自定義源是本身實現源接口獲得的。自定義源的類和其依賴包必須在開始時就放置到Flume的類加載目錄下,即lib目錄。
加!爲必須屬性:
!channels –
!type – 類型,必須設置爲本身的自定義處理類的全路徑名
selector.type
selector.*
interceptors –
interceptors.*
匯聚點
記錄指定級別的日誌,一般用於調試。在控制檯輸出接收到的信息。
channel
type : The component type name, needs to be logger
maxBytesToLog 16 Maximum number of bytes of the Event body to log
要求必須在 --conf 參數指定的目錄下有 log4j的配置文件
能夠經過-Dflume.root.logger=INFO,console在命令啓動時手動指定log4j參數
案例:參見入門案例。
在本地文件系統中存儲事件。
每隔指定時長生成文件保存這段時間內收集到的日誌信息。
channel
type :類型,必須是"file_roll"。
sink.directory :文件被存儲的目錄。
sink.rollInterval 30 滾動文件每隔30秒(應該是每隔30秒鐘單獨切割數據到一個文件的意思)。若是設置爲0,則禁止滾動,從而致使全部數據被寫入到一個文件中。
sink.serializer TEXT Other possible options include avro_event or the FQCN of an implementation of EventSerializer.Builder interface.
batchSize 100
#命名Agent a1的組件 a1.sources = r1 a1.sinks = k1 a1.channels = c1 #描述/配置Source a1.sources.r1.type = http a1.sources.r1.port = 6666 #描述Sink a1.sinks.k1.type = file_roll a1.sinks.k1.sink.directory = /home/park/work/apache-flume-1.6.0-bin/mysink #描述內存Channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 #爲Channle綁定Source和Sink a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
./flume-ng agent --conf ../conf --conf-file ../conf/template7.conf --name a1 -Dflume.root.logger=INFO,console
是實現多級流動和扇出流(1到多)、扇入流(多到1)的基礎。
channel
type :The component type name, needs to be avro.
hostname : The hostname or IP address to bind to.
port :The port # to listen on.
batch-size 100 number of event to batch together for send.
connect-timeout 20000 Amount of time (ms) to allow for the first (handshake) request.
request-timeout 20000 Amount of time (ms) to allow for requests after the first.
reset-connection-interval none Amount of time (s) before the connection to the next hop is reset. This will force the Avro Sink to reconnect to the next hop. This will allow the sink to connect to hosts behind a hardware load-balancer when news hosts are added without having to restart the agent.
compression-type none This can be 「none」 or 「deflate」. The compression-type must match the compression-type of matching AvroSource
compression-level 6 The level of compression to compress event. 0 = no compression and 1-9 is compression. The higher the number the more compression
ssl false Set to true to enable SSL for this AvroSink. When configuring SSL, you can optionally set a 「truststore」, 「truststore-password」, 「truststore-type」, and specify whether to 「trust-all-certs」.
trust-all-certs false If this is set to true, SSL server certificates for remote servers (Avro Sources) will not be checked. This should NOT be used in production because it makes it easier for an attacker to execute a man-in-the-middle attack and 「listen in」 on the encrypted connection.
truststore – The path to a custom Java truststore file. Flume uses the certificate authority information in this file to determine whether the remote Avro Source’s SSL authentication credentials should be trusted. If not specified, the default Java JSSE certificate authority files (typically 「jssecacerts」 or 「cacerts」 in the Oracle JRE) will be used.
truststore-password – The password for the specified truststore.
truststore-type JKS The type of the Java truststore. This can be 「JKS」 or other supported Java truststore type.
exclude-protocols SSLv3 Space-separated list of SSL/TLS protocols to exclude. SSLv3 will always be excluded in addition to the protocols specified.
maxIoWorkers 2 * the number of available processors in the machine The maximum number of I/O w
①h2
1)配置配置文件
#命名Agent組件 a1.sources=r1 a1.sinks=k1 a1.channels=c1 #描述/配置Source a1.sources.r1.type=avro a1.sources.r1.bind=0.0.0.0 a1.sources.r1.port=9988 #描述Sink a1.sinks.k1.type=logger #描述內存Channel a1.channels.c1.type=memory a1.channels.c1.capacity=1000 a1.channels.c1.transactionCapacity=1000 #爲Channel綁定Source和Sink a1.sources.r1.channels=c1 a1.sinks.k1.channel=c1
2)啓動flume
./flume-ng agent --conf ../conf --conf-file ../conf/template8.conf --name a1 -Dflume.root.logger=INFO,console
②h1
1)配置配置文件
#命名Agent組件 a1.sources=r1 a1.sinks=k1 a1.channels=c1 #描述/配置Source a1.sources.r1.type=http a1.sources.r1.port=8888 #描述Sink a1.sinks.k1.type=avro a1.sinks.k1.hostname=192.168.242.138 a1.sinks.k1.port=9988 #描述內存Channel a1.channels.c1.type=memory a1.channels.c1.capacity=1000 a1.channels.c1.transactionCapacity=1000 #爲Channel綁定Source和Sink a1.sources.r1.channels=c1 a1.sinks.k1.channel=c1
2)啓動flume
./flume-ng agent --conf ../conf --conf-file ../conf/template8.conf --name a1 -Dflume.root.logger=INFO,console
③測試
發送http請求到h1:
curl -X POST -d '[{ "headers" :{"a" : "a1","b" : "b1"},"body" : "hello~http~flume~"}]' http://192.168.242.133:8888
稍等幾秒後,發現h2最終收到了這條消息。
①h二、h3
1)配置配置文件
#命名Agent組件 a1.sources=r1 a1.sinks=k1 a1.channels=c1 #描述/配置Source a1.sources.r1.type=avro a1.sources.r1.bind=0.0.0.0 a1.sources.r1.port=9988 #描述Sink a1.sinks.k1.type=logger #描述內存Channel a1.channels.c1.type=memory a1.channels.c1.capacity=1000 a1.channels.c1.transactionCapacity=1000 #爲Channel綁定Source和Sink a1.sources.r1.channels=c1 a1.sinks.k1.channel=c1
2)啓動flume
./flume-ng agent --conf ../conf --conf-file ../conf/template8.conf --name a1 -Dflume.root.logger=INFO,console
②h1
1)配置配置文件
#命名Agent組件 a1.sources=r1 a1.sinks=k1 k2 a1.channels=c1 c2 #描述/配置Source a1.sources.r1.type=http a1.sources.r1.port=8888 #描述Sink a1.sinks.k1.type=avro a1.sinks.k1.hostname=192.168.242.138 a1.sinks.k1.port=9988 a1.sinks.k2.type=avro a1.sinks.k2.hostname=192.168.242.135 a1.sinks.k2.port=9988 #描述內存Channel a1.channels.c1.type=memory a1.channels.c1.capacity=1000 a1.channels.c1.transactionCapacity=1000 a1.channels.c2.type=memory a1.channels.c2.capacity=1000 a1.channels.c2.transactionCapacity=1000 #爲Channel綁定Source和Sink a1.sources.r1.channels=c1 c2 a1.sinks.k1.channel=c1 a1.sinks.k2.channel=c2
2)啓動flume
./flume-ng agent --conf ../conf --conf-file ../conf/template8.conf --name a1 -Dflume.root.logger=INFO,console
①h2 h3
1)配置配置文件
#命名Agent組件 a1.sources=r1 a1.sinks=k1 a1.channels=c1 #描述/配置Source a1.sources.r1.type=avro a1.sources.r1.bind=0.0.0.0 a1.sources.r1.port=9988 #描述Sink a1.sinks.k1.type=logger #描述內存Channel a1.channels.c1.type=memory a1.channels.c1.capacity=1000 a1.channels.c1.transactionCapacity=1000 #爲Channel綁定Source和Sink a1.sources.r1.channels=c1 a1.sinks.k1.channel=c1
2)啓動flume
./flume-ng agent --conf ../conf --conf-file ../conf/template8.conf --name a1 -Dflume.root.logger=INFO,console
②h1
1)配置配置文件
#配置Agent組件 a1.sources=r1 a1.sinks=k1 k2 a1.channels=c1 c2 #描述/配置Source a1.sources.r1.type=http a1.sources.r1.port=8888 a1.sources.r1.selector.type=multiplexing a1.sources.r1.selector.header=flag a1.sources.r1.selector.mapping.aaa=c1 a1.sources.r1.selector.mapping.bbb=c2 a1.sources.r1.selector.default=c1 #描述Sink a1.sinks.k1.type=avro a1.sinks.k1.hostname=192.168.242.138 a1.sinks.k1.port=9988 a1.sinks.k2.type=avro a1.sinks.k2.hostname=192.168.242.135 a1.sinks.k2.port=9988 #描述內存Channel a1.channels.c1.type=memory a1.channels.c1.capacity=1000 a1.channels.c1.transactionCapacity=1000 a1.channels.c2.type=memory a1.channels.c2.capacity=1000 a1.channels.c2.transactionCapacity=1000 #爲Channel綁定Source和Sink a1.sources.r1.channels=c1 c2 a1.sinks.k1.channel=c1 a1.sinks.k2.channel=c2
2)啓動flume
./flume-ng agent --conf ../conf --conf-file ../conf/template8.conf --name a1 -Dflume.root.logger=INFO,console
發送http請求進行測試。發現能夠實現路由效果。
①m3
1)編寫配置文件
#命名Agent組件 a1.sources=r1 a1.sinks=k1 a1.channels=c1 #描述/配置Source a1.sources.r1.type=avro a1.sources.r1.bind=0.0.0.0 a1.sources.r1.port=4141 #描述Sink a1.sinks.k1.type=logger #描述內存Channel a1.channels.c1.type=memory a1.channels.c1.capacity=1000 a1.channels.c1.transactionCapacity=1000 #爲Channel綁定Source和Sink a1.sources.r1.channels=c1 a1.sinks.k1.channel=c1
2)啓動flume
./flume-ng agent --conf ../conf --conf-file ../conf/template.conf --name a1 -Dflume.root.logger=INFO,console
②m一、m2
1)編寫配置文件
#命名Agent組件 a1.sources=r1 a1.sinks=k1 a1.channels=c1 #描述/配置Source a1.sources.r1.type=http a1.sources.r1.port=8888 #描述Sink a1.sinks.k1.type=avro a1.sinks.k1.hostname=192.168.242.135 a1.sinks.k1.port=4141 #描述內存Channel a1.channels.c1.type=memory a1.channels.c1.capacity=1000 a1.channels.c1.transactionCapacity=1000 #爲Channel綁定Source和Sink a1.sources.r1.channels=c1 a1.sinks.k1.channel=c1
2)啓動flume
./flume-ng agent --conf ../conf --conf-file ../conf/template9.conf --name a1 -Dflume.root.logger=INFO,console
③測試
m1經過curl發送一條http請求,因爲默認使用的是jsonHandler,數據格式必須是指定的json格式:
curl -X POST -d '[{ "headers" :{"flag" : "c"},"body" : "idoall.org_body"}]' http://0.0.0.0:8888
m2經過curl發送一條http請求,因爲默認使用的是jsonHandler,數據格式必須是指定的json格式:
curl -X POST -d '[{ "headers" :{"flag" : "c"},"body" : "idoall.org_body"}]' http://0.0.0.0:8888
發現m3均能正確收到消息。
此Sink將事件寫入到Hadoop分佈式文件系統HDFS中。目前它支持建立文本文件和序列化文件。對這兩種格式都支持壓縮。
這些文件能夠分卷,按照指定的時間或數據量或事件的數量爲基礎。
它還經過相似時間戳或機器屬性對數據進行 buckets/partitions 操做 It also buckets/partitions data by attributes like timestamp or machine where the event originated.
HDFS的目錄路徑能夠包含將要由HDFS替換格式的轉移序列用以生成存儲事件的目錄/文件名。
使用這個Sink要求hadoop必須已經安裝好,以便Flume能夠經過hadoop提供的jar包與HDFS進行通訊。
注意,此版本hadoop必須支持sync()調用。
channel:
type:類型名稱,必須是「HDFS」
hdfs.path:HDFS 目錄路徑 (eg hdfs://namenode/flume/webdata/)
hdfs.filePrefix FlumeData Flume在目錄下建立文件的名稱前綴
hdfs.fileSuffix – 追加到文件的名稱後綴 (eg .avro - 注: 日期時間不會自動添加)
hdfs.inUsePrefix – Flume正在處理的文件所加的前綴
hdfs.inUseSuffix .tmp Flume正在處理的文件所加的後綴
hdfs.rollInterval 30 Number of seconds to wait before rolling current file (0 = never roll based on time interval)
hdfs.rollSize 1024 File size to trigger roll, in bytes (0: never roll based on file size)
hdfs.rollCount 10 Number of events written to file before it rolled (0 = never roll based on number of events)
hdfs.idleTimeout 0 Timeout after which inactive files get closed (0 = disable automatic closing of idle files)
hdfs.batchSize 100 number of events written to file before it is flushed to HDFS
hdfs.codeC – Compression codec. one of following : gzip, bzip2, lzo, lzop, snappy
hdfs.fileType SequenceFile File format: currently SequenceFile, DataStream or CompressedStream (1)DataStream will not compress output file and please don’t set codeC (2)CompressedStream requires set hdfs.codeC with an available codeC
hdfs.maxOpenFiles 5000 Allow only this number of open files. If this number is exceeded, the oldest file is closed.
hdfs.minBlockReplicas – Specify minimum number of replicas per HDFS block. If not specified, it comes from the default Hadoop config in the classpath.
hdfs.writeFormat – Format for sequence file records. One of 「Text」 or 「Writable」 (the default).
hdfs.callTimeout 10000 Number of milliseconds allowed for HDFS operations, such as open, write, flush, close. This number should be increased if many HDFS timeout operations are occurring.
hdfs.threadsPoolSize 10 Number of threads per HDFS sink for HDFS IO ops (open, write, etc.)
hdfs.rollTimerPoolSize 1 Number of threads per HDFS sink for scheduling timed file rolling
hdfs.kerberosPrincipal – Kerberos user principal for accessing secure HDFS
hdfs.kerberosKeytab – Kerberos keytab for accessing secure HDFS
hdfs.proxyUser
hdfs.round false 時間戳是否向下取整(若是是true,會影響全部基於時間的轉移序列,除了%T)
hdfs.roundValue 1 舍值的邊界值
hdfs.roundUnit 向下舍值的單位 - second, minute , hour
hdfs.timeZone Local Time Name of the timezone that should be used for resolving the directory path, e.g. America/Los_Angeles.
hdfs.useLocalTimeStamp false Use the local time (instead of the timestamp from the event header) while replacing the escape sequences.
hdfs.closeTries 0 Number of times the sink must try renaming a file, after initiating a close attempt. If set to 1, this sink will not re-try a failed rename (due to, for example, NameNode or DataNode failure), and may leave the file in an open state with a .tmp extension. If set to 0, the sink will try to rename the file until the file is eventually renamed (there is no limit on the number of times it would try). The file may still remain open if the close call fails but the data will be intact and in this case, the file will be closed only after a Flume restart.
hdfs.retryInterval 180 Time in seconds between consecutive attempts to close a file. Each close call costs multiple RPC round-trips to the Namenode, so setting this too low can cause a lot of load on the name node. If set to 0 or less, the sink will not attempt to close the file if the first attempt fails, and may leave the file open or with a 」.tmp」 extension.
serializer TEXT Other possible options include avro_event or the fully-qualified class name of an implementation of the EventSerializer.Builder interface.
#命名Agent組件 a1.sources=r1 a1.sinks=k1 a1.channels=c1 #描述/配置Source a1.sources.r1.type=http a1.sources.r1.port=8888 #描述Sink a1.sinks.k1.type=hdfs a1.sinks.k1.hdfs.path=hdfs://0.0.0.0:9000/ppp #描述內存Channel a1.channels.c1.type=memory a1.channels.c1.capacity=1000 a1.channels.c1.transactionCapacity=1000 #爲Channel綁定Source和Sink a1.sources.r1.channels=c1 a1.sinks.k1.channel=c1
./flume-ng agent --conf ../conf --conf-file ../conf/template9.conf --name a1 -Dflume.root.logger=INFO,console
目前官方文檔:只建議用做開發測試,不建議用做生產環境。因此這裏也不作介紹了。
自定義接收器,是本身實現的接收器接口Sink來實現的。
自定義接收器的類及其依賴類須在Flume啓動前放置到Flume類加載目錄下。
type :類型,須要指定爲本身實現的Sink類的全路徑名。
選擇器用來控制在扇出時數據分發的方式。選擇器能夠工做在複製、多路複用(路由)模式下。配置在Source中。
這個模式,是將全部的日誌信息,都發給每一個sink的接收點。
selector.type = replicating:類型名稱,必須是replicating。
selector.optional:標誌通道爲可選。
案例:參看avro sink案例
能夠定義本身的規則,往每一個數據消費者發送的數據,均可以根據規則發送。
selector.type = multiplexing:類型,必須是"multiplexing"。
selector.header:指定要監測的頭的名稱。
selector.default:默認匹配,默認去往那個channel。
selector.mapping.* :匹配規則,*表明本身設置的匹配規則。
a1.sources = r1 a1.channels = c1 c2 c3 c4 a1.sources.r1.selector.type = multiplexing a1.sources.r1.selector.header = state a1.sources.r1.selector.mapping.CZ = c1 a1.sources.r1.selector.mapping.US = c2 c3 a1.sources.r1.selector.default = c4
案例:參看 avro sink案例
Processor:處理器,實現負載均衡或失敗恢復的組件。須要將若干sink組成一個sink group組,爲sink group配置processor,processor能夠基於切換group中的sink來實現負載均衡或失敗恢復。此處說的分組,就是開篇介紹Sink時說的分組。
基於Sink分組進行配置。
sinkgroups:組名。
sinks:用空格分隔的Sink集合。
processor.type = default:類型名稱,必須是default、failover或load_balance。
Default Sink Processor:只接受一個Sink。不要求用戶爲單一Sink建立processor。
Failover Sink Processor:失敗恢復模式,維護一個sink們的優先表。確保只要一個是可用的事件就能夠被處理。
失敗處理原理是,爲失效的sink指定一個冷卻時間,在冷卻時間到達後再從新使用。
sink們能夠被配置一個優先級,數字越大優先級越高。
若是sink發送事件失敗,則下一個最高優先級的sink將會嘗試接着發送事件。
若是沒有指定優先級,則優先級順序取決於sink們的配置順序,先配置的默認優先級高於後配置的。
在配置的過程當中,設置一個group processor ,而且爲每一個sink都指定一個優先級。
優先級必須是惟一的。
另外能夠設置maxpenalty屬性指定限定失敗時間。
sinks:Space-separated list of sinks that are participating in the group
processor.type = default:The component type name, needs to be failover
processor.priority.<sinkName> Priority value. <sinkName> must be one of the sink instances associated with the current sink group A higher priority value Sink gets activated earlier. A larger absolute value indicates higher priority
processor.maxpenalty = 30000: The maximum backoff period for the failed Sink (in millis)
a1.sinkgroups = g1 a1.sinkgroups.g1.sinks = k1 k2 a1.sinkgroups.g1.processor.type = failover a1.sinkgroups.g1.processor.priority.k1 = 5 a1.sinkgroups.g1.processor.priority.k2 = 10 a1.sinkgroups.g1.processor.maxpenalty = 10000
Load balancing Sink processor:負載均衡模式,提供了在多個sink之間實現負載均衡的能力。它維護了一個活動sink的索引列表。它支持輪詢或隨機方式的負載均衡,默認值是輪詢方式,能夠經過配置指定。也能夠經過實現AbstractSinkSelector接口實現自定義的選擇機制。
輪訓模式在數據量很小的時候,可能會按照隨機的方式分發數據。
processor.sinks:Space-separated list of sinks that are participating in the group
processor.type = default:The component type name, needs to be load_balance
processor.backoff = false:Should failed sinks be backed off exponentially.
processor.selector = round_robin:Selection mechanism. Must be either round_robin, random or FQCN of custom class that inherits from AbstractSinkSelector
processor.selector.maxTimeOut 30000 Used by backoff selectors to limit exponential backoff (in milliseconds)
a1.sinkgroups = g1 a1.sinkgroups.g1.sinks = k1 k2 a1.sinkgroups.g1.processor.type = load_balance a1.sinkgroups.g1.processor.backoff = true a1.sinkgroups.g1.processor.selector = random
攔截器
Flume有能力在運行階段修改、刪除Event,這是經過攔截器(Interceptors)來實現的。攔截器須要實現org.apache.flume.interceptor.Interceptor接口。攔截器能夠修改或刪除事件基於開發者在選擇器中選擇的任何條件。
攔截器採用了責任鏈模式,多個攔截器能夠按指定順序攔截。一個攔截器返回的事件列表被傳遞給鏈中的下一個攔截器。
若是一個攔截器須要刪除事件,它只須要在返回的事件集中不包含要刪除的事件便可。若是要刪除全部事件,只需返回一個空列表。
這個攔截器在事件頭中插入以毫秒爲單位的當前處理時間。頭的名字爲timestamp,值爲當前處理的時間戳。若是在以前已經有這個時間戳,則不保留原有的時間戳。
type:類型名稱,必須是timestamp或自定義類的全路徑名
preserveExisting false 若是時間戳已經存在是否保留
這個攔截器插入當前處理Agent的主機名或ip,頭的名字爲host或配置的名稱,值是主機名或ip地址,基於配置。
type:類型名稱,必須是host
preserveExisting false 若是主機名已經存在是否保留
useIP true 若是配置爲true則用IP,配置爲false則用主機名
hostHeader host 加入頭時使用的名稱
此攔截器容許用戶增長靜態頭信息使用靜態的值到全部事件。目前的實現中不容許一次指定多個頭。若是須要增長多個靜態頭能夠指定多個Static interceptors。
type:類型,必須是static
preserveExisting true 若是配置頭已經存在是否應該保留
key key 要增長的透明
value value 要增長的頭值
這個攔截器在全部事件頭中增長一個全局一致性標誌。其實就是UUID。
type:類型名稱,必須是org.apache.flume.sink.solr.morphline.UUIDInterceptor$Builder
headerName id 頭名稱
preserveExisting true 若是頭已經存在,是否保留
prefix 「」 在UUID前拼接的字符串前綴
這個攔截器提供了簡單的基於字符串的正則搜索和替換功能。
type:類型名稱,必須是"search_replace"
searchPattern – 要搜索和替換的正則表達式
replaceString – 要替換爲的字符串
charset UTF-8 字符集編碼,默認utf-8
此攔截器經過解析事件體去匹配給定正則表達式來篩選事件。
所提供的正則表達式便可以用來包含或刨除事件。
type – 類型,必須設定爲regex_filter
regex 」.*」 所要匹配的正則表達式
excludeEvents false 若是是true則刨除匹配的事件,false則包含匹配的事件。
使用指定正則表達式匹配事件,並將匹配到的組做爲頭加入到事件中。
它也支持插件化的序列化器用來格式化匹配到的組在加入他們做爲頭以前。
type:類型,必須是regex_extractor
regex:要匹配的正則表達式
serializers:Space-separated list of serializers for mapping matches to header names and serializing their values. (See example below) Flume provides built-in support for the following serializers: org.apache.flume.interceptor.RegexExtractorInterceptorPassThroughSerializer org.apache.flume.interceptor.RegexExtractorInterceptorMillisSerializer
serializers.<s1>.type default Must be default (org.apache.flume.interceptor.RegexExtractorInterceptorPassThroughSerializer), org.apache.flume.interceptor.RegexExtractorInterceptorMillisSerializer, or the FQCN of a custom class that implements org.apache.flume.interceptor.RegexExtractorInterceptorSerializer
serializers.<s1>.name –
serializers.* – Serializer-specific properties
If the Flume event body contained 1:2:3.4foobar5 and the following configuration was used
a1.sources.r1.interceptors.i1.regex = (\\d):(\\d):(\\d) a1.sources.r1.interceptors.i1.serializers = s1 s2 s3 a1.sources.r1.interceptors.i1.serializers.s1.name = one a1.sources.r1.interceptors.i1.serializers.s2.name = two a1.sources.r1.interceptors.i1.serializers.s3.name = three
The extracted event will contain the same body but the following headers will have been added one=>1, two=>2, three=>3
事件將被存儲在內存中的具備指定大小的隊列中。
很是適合那些須要高吞吐量可是失敗是會丟失數據的場景下。
type :類型,必須是「memory」
capacity 100 事件存儲在信道中的最大數量
transactionCapacity 100 每一個事務中的最大事件數
keep-alive 3 添加或刪除操做的超時時間
byteCapacityBufferPercentage 20 Defines the percent of buffer between byteCapacity and the estimated total size of all events in the channel, to account for data in headers. See below.
byteCapacity see description Maximum total bytes of memory allowed as a sum of all events in this channel. The implementation only counts the Event body, which is the reason for providing the byteCapacityBufferPercentage configuration parameter as well. Defaults to a computed value equal to 80% of the maximum memory available to the JVM (i.e. 80% of the -Xmx value passed on the command line). Note that if you have multiple memory channels on a single JVM, and they happen to hold the same physical events (i.e. if you are using a replicating channel selector from a single source) then those event sizes may be double-counted for channel byteCapacity purposes. Setting this value to 0 will cause this value to fall back to a hard internal limit of about 200 GB.
案例:參看入門案例
事件被持久存儲在可靠的數據庫中。目前支持嵌入式的Derby數據庫。若是可恢復性很是的重要可使用這種方式。
目前官方建議:最好不要使用在生產環境。
性能會比較低下,可是即便程序出錯數據不會丟失。
type:類型,必須是「file」
checkpointDir ~/.flume/file-channel/checkpoint 檢查點文件存放的位置
useDualCheckpoints false Backup the checkpoint. If this is set to true, backupCheckpointDir must be set
backupCheckpointDir – The directory where the checkpoint is backed up to. This directory must not be the same as the data directories or the checkpoint directory
dataDirs ~/.flume/file-channel/data 逗號分隔的目錄列表,用以存放日誌文件。使用單獨的磁盤上的多個目錄能夠提升文件通道效率。
transactionCapacity 10000 The maximum size of transaction supported by the channel
checkpointInterval 30000 Amount of time (in millis) between checkpoints
maxFileSize 2146435071 一個日誌文件的最大尺寸
minimumRequiredSpace 524288000 Minimum Required free space (in bytes). To avoid data corruption, File Channel stops accepting take/put requests when free space drops below this value
capacity 1000000 Maximum capacity of the channel
keep-alive 3 Amount of time (in sec) to wait for a put operation
use-log-replay-v1 false Expert: Use old replay logic
use-fast-replay false Expert: Replay without using queue
checkpointOnClose true Controls if a checkpoint is created when the channel is closed. Creating a checkpoint on close speeds up subsequent startup of the file channel by avoiding replay.
encryption.activeKey – Key name used to encrypt new data
encryption.cipherProvider – Cipher provider type, supported types: AESCTRNOPADDING
encryption.keyProvider – Key provider type, supported types: JCEKSFILE
encryption.keyProvider.keyStoreFile – Path to the keystore file
encrpytion.keyProvider.keyStorePasswordFile – Path to the keystore password file
encryption.keyProvider.keys – List of all keys (e.g. history of the activeKey setting)
encyption.keyProvider.keys.*.passwordFile – Path to the optional key password file
內存溢出通道
事件被存儲在內存隊列和磁盤中。
內存隊列做爲主存儲,而磁盤做爲溢出內容的存儲。
內存存儲經過embedded File channel來進行管理。
當內存隊列已滿時,後續的事件將被存儲在文件通道中。
這個通道適用於正常操做期間適用內存通道已期實現高效吞吐,而在高峯期間適用文件通道實現高耐受性。經過下降吞吐效率提升系統可耐受性。
若是Agent崩潰,則只有存儲在文件系統中的事件能夠被恢復。
此通道處於試驗階段,不建議在生產環境中使用。
1.屬性說明
type:類型,必須是"SPILLABLEMEMORY"
memoryCapacity 10000 內存中存儲事件的最大值,若是想要禁用內存緩衝區將此值設置爲0。
overflowCapacity 100000000 能夠存儲在磁盤中的事件數量最大值。設置爲0能夠禁用磁盤存儲。
overflowTimeout 3 The number of seconds to wait before enabling disk overflow when memory fills up.
byteCapacityBufferPercentage 20 Defines the percent of buffer between byteCapacity and the estimated total size of all events in the channel, to account for data in headers. See below.
byteCapacity see description Maximum bytes of memory allowed as a sum of all events in the memory queue. The implementation only counts the Event body, which is the reason for providing the byteCapacityBufferPercentage configuration parameter as well. Defaults to a computed value equal to 80% of the maximum memory available to the JVM (i.e. 80% of the -Xmx value passed on the command line). Note that if you have multiple memory channels on a single JVM, and they happen to hold the same physical events (i.e. if you are using a replicating channel selector from a single source) then those event sizes may be double-counted for channel byteCapacity purposes. Setting this value to 0 will cause this value to fall back to a hard internal limit of about 200 GB.
avgEventSize 500 Estimated average size of events, in bytes, going into the channel
<file channel properties> see file channel Any file channel property with the exception of ‘keep-alive’ and ‘capacity’ can be used. The keep-alive of file channel is managed by Spillable Memory Channel. Use ‘overflowCapacity’ to set the File channel’s capacity.
自定義渠道須要本身實現Channel接口。
自定義Channle類及其依賴類必須在Flume啓動前放置到類加載的目錄下。
type:本身實現的Channle類的全路徑名稱。