概覽
Apache的Flume是一個分佈式的,質量可靠,可有效地收集,彙總和來自許多不一樣來源的大量日誌數據到集中的數據存儲系統。java
Apache的Flume是在Apache軟件基金會的頂級項目。目前有兩個版本的代碼行,0.9.x版本和1.x的版本本文件適用於1.x的代碼行。請點擊這裏Flume 0.9.x版本的用戶指南。node
系統環境要求web
JDK1.6以上版本shell
數據流數據庫
Flume事件被定義爲一個單位的數據流量有一個字節的有效載荷和一個可選字符串屬性。Flume代理(JVM)的過程當中,承載組件,經過這些事件流從外部源的下一個目的地(跳)。緩存
一個web服務器的產生的事件由 Flume源消耗。外部源發送事件發送到Flume中,會帶着一個識別的格式。例如: 例如:一個Avro Flume源能夠用來接收從Avro clients 或其餘flume代理從Avro link發送事件。當一個Flume 源接收一個事件,他會存儲到一個活多個channels中,這些channel會一直保存着event,直到被Flume sink消費處理掉,例如JDBC Channel做爲一個例子-它使用一個文件系統支持嵌入式數據庫,sink從channel中移除事件,同時放入到一個外部的倉庫,好比HDFS,或者流轉到下一個Flume source 源,source和sink在agent中是以異步運行方式運行事件。服務器
複雜數據流app
Flume到達最終目的地以前,容許用戶創建多跳流活動,經過多個代理。它還容許fan-in和fan-out flows,內容路由和備份路由失敗(故障轉移)。dom
配置一個代理(agent)異步
Flume代理配置存儲在本地配置文件。這是一個文本文件格式以下Java屬性文件格式。在相同的配置文件,能夠指定一個或多個代理的配置。配置文件包括每一個源,接收器和代理渠道的性質和它們鏈接在一塊兒,造成數據流。
配置單個組件
流中每一個組件(源,接收器或通道)的名稱,類型,和一組特定的類型和實例的屬性。例如 Avro源須要一個主機名(或IP地址)和接收數據的端口號。一個內存通道能夠有最大隊列大小(「能力」),HDFS的散熱器須要知道文件系統的URI,路徑建立文件,文件的旋轉頻率(「hdfs.rollInterval」)等,全部這些組件的屬性須要設置在託管 Flume 代理的屬性文件。
組合組件(Wiring the pieces together)
代理須要知道什麼加載各個組件以及它們是如何鏈接,以構成的流動。這是經過列出的源,匯和代理渠道的名稱,而後指定每一個接收器和源的鏈接通道。例如,代理到HDFS flume HDFS cluster1中經過JDBC JDBC通道通道流動稱爲avroWeb Avro 源的事件。該配置文件將包含這些組件和JDBC通道爲avroWeb源和HDFS cluster1中匯做爲共享信道的名稱。
啓動代理(starting an agent)
代理人是開始使用shell腳本稱爲flume-NG是位於flume分佈在bin目錄。你須要在命令行上指定的代理的名稱,config目錄,配置文件:
$ bin/flume-ng agent -n $agent_name -c conf -f conf/flume-conf.properties.template
如今,代理將開始運行的源和匯的配置在給定的屬性文件。
A simple example
在這裏,咱們舉一個例子,配置文件,描述一個單節點的Flume部署。這種配置可讓用戶生成的事件和隨後輸出到控制檯。
# example.conf: A single-node Flume configuration # Name the components on this agent agent1.sources = source1 agent1.sinks = sink1 agent1.channels = channel1 # Describe/configure source1 agent1.sources.source1.type = netcat agent1.sources.source1.bind = localhost agent1.sources.source1.port = 44444 # Describe sink1 agent1.sinks.sink1.type = logger # Use a channel which buffers events in memory agent1.channels.channel1.type = memory agent1.channels.channel1.capacity = 1000 agent1.channels.channel1.transactionCapactiy = 100 # Bind the source and sink to the channel agent1.sources.source1.channels = channel1 agent1.sinks.sink1.channel = channel1
這個配置定義了一個單一的代理,稱爲agent1。 agent1監聽44444端口,通道緩存在內存中事件數據,事件數據記錄到控制檯和一個接收器上的數據源。配置文件名的各個組成部分,而後介紹了他們的類型和配置參數。一個給定的配置文件可能會定義多個命名的代理人;一個給定的Flume進程啓動時傳遞一個標誌,告訴它的具名代理體現。
結合此配置文件,咱們啓動Flume按以下參數:
$ bin/flume-ng agent --conf-file example.conf --name agent1 -Dflume.root.logger=INFO,console
請注意,在完整部署,咱們一般會包括一個選項: - CONF=<conf-dir>。 <conf-dir>目錄將包括一個shell腳本flume-env.sh和內置的Log4j屬性文件。在這個例子中,咱們使用一個Java選項強制flume登陸到控制檯
咱們能夠從一個單獨的終端,而後telnet端口44444和發送flume事件:
$ telnet localhost 44444 Trying 127.0.0.1... Connected to localhost.localdomain (127.0.0.1). Escape character is '^]'. Hello world! <ENTER> OK
他原來的flume終端輸出日誌信息的事件。
12/06/19 15:32:19 INFO source.NetcatSource: Source starting 12/06/19 15:32:19 INFO source.NetcatSource: Created serverSocket:sun.nio.ch.ServerSocketChannelImpl[/127.0.0.1:44444] 12/06/19 15:32:34 INFO sink.LoggerSink: Event: { headers:{} body: 48 65 6C 6C 6F 20 77 6F 72 6C 64 21 0D Hello world!. }
恭喜你 - 你已經成功地配置和部署了一個flume代理!隨後的章節涵蓋更詳細的代理配置。
數據獲取
flume支持從從外部數據源獲取數據的機制。
RPC
在flume中 ,Avro客戶端使用AVRO RPC機制能夠發送一個給定的文件 Avro 源:
$ bin/flume-ng avro-client -H localhost -p 41414 -F /usr/logs/log.10
上面的命令將發送的/ usr/logs/log.10的內容到 flume源監聽端
Executing commands
還有一個exec執行一個給定的命令得到輸出的源。一個單一的輸出,即「line」。回車('\ R')或換行符('\ N'),或二者一塊兒的文本。
注:Flume不支持tail作爲一個源,不過能夠經過exec tail。
Network streams
Flume支持如下的機制,從流行的日誌流類型讀取數據
Flume部署種類
設置多代理流程
合併
在日誌收集的一個很是廣泛的狀況是大量生產客戶日誌的數據發送到一些消費者代理鏈接到存儲子系統。舉例來講,從數以百計的Web服務器收集的日誌發送到十幾代理寫入HDFS集羣
This can be achieved in Flume by configuring a number of first tier agents with an avro sink, all pointing to an avro source of single agent. This source on the second tier agent consolidates the received events into a single channel which is consumed by a sink to its final destination.
多路複用流
定義流
在一個單一的代理定義的流,你須要經過一個通道的來源和接收器連接。你須要列出源,接收器和通道,爲給定的代理,而後指向源和接收器及通道。一個源的實例能夠指定多個通道,但只能指定一個接收器實例通道。格式以下:
# list the sources, sinks and channels for the agent <Agent>.sources = <Source> <Agent>.sinks = <Sink> <Agent>.channels = <Channel1> <Channel2> # set channel for source <Agent>.sources.<Source>.channels = <Channel1> <Channel2> ... # set channel for sink <Agent>.sinks.<Sink>.channel = <Channel1>
例如一個代理名爲weblog-agent,外部經過avro客戶端,而且發送數據經過內存通道給hdfs。在配置文件weblog.config的可能看起來像這樣:
# list the sources, sinks and channels for the agent agent_foo.sources = avro-appserver-src-1 agent_foo.sinks = hdfs-sink-1 agent_foo.channels = mem-channel-1 # set channel for source agent_foo.sources.avro-appserver-src-1.channels = mem-channel-1 # set channel for sink agent_foo.sinks.hdfs-sink-1.channel = mem-channel-1
這將使事件流從avro-AppSrv-source到hdfs-Cluster1-sink經過內存通道mem-channel-1。當代理開始weblog.config做爲其配置文件,它會實例化流。
配置單個組件
定義流以後,你須要設置每一個源,接收器和通道的屬性。能夠分別設定組件的屬性值。
# properties for sources <Agent>.sources.<Source>.<someProperty> = <someValue> # properties for channels <Agent>.channel.<Channel>.<someProperty> = <someValue> # properties for sinks <Agent>.sources.<Sink>.<someProperty> = <someValue>
「type」屬性必須爲每一個組件設置,以瞭解它須要什麼樣的對象。每一個源,接收器和通道類型有其本身的一套,它所需的性能,以實現預期的功能。全部這些,必須根據須要設置。在前面的例子中,咱們拿到從hdfs-Cluster1-sink中的流到HDFS,經過內存通道mem-channel-1的avro-AppSrv-source源。下面是一個例子,顯示了這些組件的配置。
agent_foo.sources = avro-AppSrv-source agent_foo.sinks = hdfs-Cluster1-sink agent_foo.channels = mem-channel-1 # set channel for sources, sinks # properties of avro-AppSrv-source agent_foo.sources.avro-AppSrv-source.type = avro agent_foo.sources.avro-AppSrv-source.bind = localhost agent_foo.sources.avro-AppSrv-source.port = 10000 # properties of mem-channel-1 agent_foo.channels.mem-channel-1.type = memory agent_foo.channels.mem-channel-1.capacity = 1000 agent_foo.channels.mem-channel-1.transactionCapacity = 100 # properties of hdfs-Cluster1-sink agent_foo.sinks.hdfs-Cluster1-sink.type = hdfs agent_foo.sinks.hdfs-Cluster1-sink.hdfs.path = hdfs://namenode/flume/webdata
在一個代理中添加多個流
單個Flume代理能夠包含幾個獨立的流。你能夠在一個配置文件中列出多個源,接收器和通道。這些組件能夠鏈接造成多個流。
# list the sources, sinks and channels for the agent <Agent>.sources = <Source1> <Source2> <Agent>.sinks = <Sink1> <Sink2> <Agent>.channels = <Channel1> <Channel2>
那麼你就能夠鏈接源和接收器到其相應的通道,設置兩個不一樣的流。例如,若是您須要設置一個weblog代理兩個流,一個從外部Avro客戶端到HDFS,另一個是tail的輸出到Avro接收器,而後在這裏是作一個配置:
# list the sources, sinks and channels in the agent agent_foo.sources = avro-AppSrv-source1 exec-tail-source2 agent_foo.sinks = hdfs-Cluster1-sink1 avro-forward-sink2 agent_foo.channels = mem-channel-1 jdbc-channel-2 # flow #1 configuration agent_foo.sources.avro-AppSrv-source1.channels = mem-channel-1 agent_foo.sinks.hdfs-Cluster1-sink1.channel = mem-channel-1 # flow #2 configuration agent_foo.sources.exec-tail-source2.channels = jdbc-channel-2 agent_foo.sinks.avro-forward-sink2.channel = jdbc-channel-2
配置多代理流程
設置一個多層的流,你須要有一個指向下一跳avro源的第一跳的avro 接收器。這將致使第一Flume代理轉發事件到下一個Flume代理。例如,若是您按期發送的文件,每一個事件(1文件)AVRO客戶端使用本地Flume代理,那麼這個當地的代理能夠轉發到另外一個有存儲的代理。
# list sources, sinks and channels in the agent agent_foo.sources = avro-AppSrv-source agent_foo.sinks = avro-forward-sink agent_foo.channels = jdbc-channel # define the flow agent_foo.sources.avro-AppSrv-source.channels = jdbc-channel agent_foo.sinks.avro-forward-sink.channel = jdbc-channel # avro sink properties agent_foo.sources.avro-forward-sink.type = avro agent_foo.sources.avro-forward-sink.hostname = 10.1.1.100 agent_foo.sources.avro-forward-sink.port = 10000 # configure other pieces
HDFS agent config:
list sources, sinks and channels in the agent agent_foo.sources = avro-collection-source agent_foo.sinks = hdfs-sink agent_foo.channels = mem-channel # define the flow agent_foo.sources.avro-collection-source.channels = mem-channel agent_foo.sinks.hdfs-sink.channel = mem-channel # avro sink properties agent_foo.sources.avro-collection-source.type = avro agent_foo.sources.avro-collection-source.bind = 10.1.1.100 agent_foo.sources.avro-collection-source.port = 10000 # configure other pieces #...
這裏咱們鏈接從weblog-agent的avro-forward-sink 到hdfs-agent的avro-collection-source收集源。最終結果從外部源的appserver最終存儲在HDFS的事件。
Fan out flow
Flume支持Fan out流從一個源到多個通道。有兩種模式的Fan out,分別是複製和複用。在複製的狀況下,流的事件被髮送到全部的配置通道。在複用的狀況下,事件被髮送到可用的渠道中的一個子集。Fan out流須要指定源和Fan out通道的規則。這是經過添加一個通道「選擇」,能夠複製或復。再進一步指定選擇的規則,若是它是一個多路。若是你不指定一個選擇,則默認狀況下它複製
# List the sources, sinks and channels for the agent <Agent>.sources = <Source1> <Agent>.sinks = <Sink1> <Sink2> <Agent>.channels = <Channel1> <Channel2> # set list of channels for source (separated by space) <Agent>.sources.<Source1>.channels = <Channel1> <Channel2> # set channel for sinks <Agent>.sinks.<Sink1>.channel = <Channel1> <Agent>.sinks.<Sink2>.channel = <Channel2> <Agent>.sources.<Source1>.selector.type = replicating
複用的選擇集的屬性進一步分叉。這須要指定一個事件屬性映射到一組通道。選擇配置屬性中的每一個事件頭檢查。若是指定的值相匹配,那麼該事件被髮送到全部的通道映射到該值。若是沒有匹配,那麼該事件被髮送到設置爲默認配置的通道。
# Mapping for multiplexing selector <Agent>.sources.<Source1>.selector.type = multiplexing <Agent>.sources.<Source1>.selector.header = <someHeader> <Agent>.sources.<Source1>.selector.mapping.<Value1> = <Channel1> <Agent>.sources.<Source1>.selector.mapping.<Value2> = <Channel1> <Channel2> <Agent>.sources.<Source1>.selector.mapping.<Value3> = <Channel2> #... <Agent>.sources.<Source1>.selector.default = <Channel2>
映射容許每一個值通道能夠重疊。默認值能夠包含任意數量的通道。下面的示例中有一個單一的流複用兩條路徑。代理有一個單一的avro源和鏈接道兩個接收器的兩個通道。
# list the sources, sinks and channels in the agent agent_foo.sources = avro-AppSrv-source1 agent_foo.sinks = hdfs-Cluster1-sink1 avro-forward-sink2 agent_foo.channels = mem-channel-1 jdbc-channel-2 # set channels for source agent_foo.sources.avro-AppSrv-source1.channels = mem-channel-1 jdbc-channel-2 # set channel for sinks agent_foo.sinks.hdfs-Cluster1-sink1.channel = mem-channel-1 agent_foo.sinks.avro-forward-sink2.channel = jdbc-channel-2 # channel selector configuration agent_foo.sources.avro-AppSrv-source1.selector.type = multiplexing agent_foo.sources.avro-AppSrv-source1.selector.header = State agent_foo.sources.avro-AppSrv-source1.selector.mapping.CA = mem-channel-1 agent_foo.sources.avro-AppSrv-source1.selector.mapping.AZ = jdbc-channel-2 agent_foo.sources.avro-AppSrv-source1.selector.mapping.NY = mem-channel-1 jdbc-channel-2 agent_foo.sources.avro-AppSrv-source1.selector.default = mem-channel-1
「State」做爲Header的選擇檢查。若是值是「CA」,而後將其發送到mem-channel-1,若是它的「AZ」的,那麼jdbc-channel-2,若是它的「NY」那麼發到這兩個。若是「State」頭未設置或不匹配的任何三個,而後去默認的mem-channel-1通道。
Flume Sources
Avro Source
Avro端口監聽並接收來自外部的Avro客戶流的事件。當內置AvroSink另外一個(前跳)Flume代理,它能夠建立分層集合配對拓撲。
Property Name | Default | Description |
---|---|---|
channels | – | |
type | – | The component type name, needs to be avro |
bind | – | hostname or IP address to listen on |
port | – | Port # to bind to |
threads | – | Maximum number of worker threads to spawn |
interceptors | – | Space separated list of interceptors |
interceptors.* |
Example for agent named agent_foo
agent_foo.sources = avrosource-1 agent_foo.channels = memoryChannel-1 agent_foo.sources.avrosource-1.type = avro agent_foo.sources.avrosource-1.channels = memoryChannel-1 agent_foo.sources.avrosource-1.bind = 0.0.0.0 agent_foo.sources.avrosource-1.port = 4141
Exec Source
此源啓動運行一個給定的Unix命令,預計這一過程當中不斷產生標準輸出(stderr被簡單地丟棄,除非logStdErr= TRUE)上的數據。若是因任何緣由的進程退出時,源也退出,並不會產生任何進一步的數據。
Property Name | Default | Description |
---|---|---|
channels | – | |
type | – | The component type name, needs to be exec |
command | – | The command to execute |
restartThrottle | 10000 | Amount of time (in millis) to wait before attempting a restart |
restart | false | Whether the executed cmd should be restarted if it dies |
logStdErr | false | Whether the command’s stderr should be logged |
batchSize | 20 | The max number of lines to read and send to the channel at a time |
selector.type | replicating | replicating or multiplexing |
selector.* | Depends on the selector.type value | |
interceptors | – | Space separated list of interceptors |
interceptors.* |
Warning
The problem with ExecSource and other asynchronous sources is that the source can not guarantee that if there is a failure to put the event into the Channel the client knows about it. In such cases, the data will be lost. As a for instance, one of the most commonly requested features is the tail -F [file]-like use case where an application writes to a log file on disk and Flume tails the file, sending each line as an event. While this is possible, there’s an obvious problem; what happens if the channel fills up and Flume can’t send an event? Flume has no way of indicating to the application writing the log file that it needs to retain the log or that the event hasn’t been sent, for some reason. If this doesn’t make sense, you need only know this: Your application can never guarantee data has been received when using a unidirectional asynchronous interface such as ExecSource! As an extension of this warning - and to be completely clear - there is absolutely zero guarantee of event delivery when using this source. You have been warned.
備註: 在ExecSource不能保證,若是有一個失敗的放入到通道的事件,客戶也知道。在這種狀況下,數據將丟失。
Example for agent named agent_foo:
agent_foo.sources = tailsource-1 agent_foo.channels = memoryChannel-1 agent_foo.sources.tailsource-1.type = exec agent_foo.sources.tailsource-1.command = tail -F /var/log/secure agent_foo.sources.tailsource-1.channels = memoryChannel-1