Flume以agent爲最小的獨立運行單位。一個agent就是一個JVM。單agent由Source、Sink和Channel三大組件構成。html
Flume的數據流由事件(Event)貫穿始終。事件是Flume的基本數據單位,它攜帶日誌數據(字節數組形式)而且攜帶有頭信息,這些Event由Agent外部的Source,好比上圖中的Web Server生成。當Source捕獲事件後會進行特定的格式化,而後Source會把事件推入(單個或多個)Channel中。你能夠把Channel看做是一個緩衝區,它將保存事件直到Sink處理完該事件。Sink負責持久化日誌或者把事件推向另外一個Source。java
很直白的設計,其中值得注意的是,Flume提供了大量內置的Source、Channel和Sink類型。不一樣類型的Source,Channel和Sink能夠自由組合。組合方式基於用戶設置的配置文件,很是靈活。好比:Channel能夠把事件暫存在內存裏,也能夠持久化到本地硬盤上。Sink能夠把日誌寫入HDFS, HBase,甚至是另一個Source等等。web
若是你覺得Flume就這些能耐那就大錯特錯了。Flume支持用戶創建多級流,也就是說,多個agent能夠協同工做,而且支持Fan-in扇入、Fan-out、Contextual Routing上下文路由、Backup Routes。以下圖所示。apache
flume可支持avro、log4j、syslog、HTTP Post(帶有JSON參數)的source數據來源。ubuntu
在當前的數據來源選項不方便使用時,也能夠構建第三方客戶端client機制向FLUME發送日誌。構建第三方客戶端client的方法有兩個:windows
(1)構建客戶端client與flume已有的某個source,好比avroSource或syslogTcpSource,進行通訊,客戶端client須要將它的數據轉換成這些flume source可理解的message對象。api
(2)另外一種方法就是寫一個第三方的Flumen Source,直接與使用了IPC或RPC協議的客戶端應用程序進行通話,再將數據客戶端數據轉換成flume的Event。數組
lume Client SDK 是一個類庫,可以使得應用程序鏈接到Flumen,並經過RPC協議直接向Flume發送數據到flume的數據流中。服務器
對Flume RPCclient接口的實現,封裝在了RPC機制裏面,用戶應用程序能夠簡單調用flume client SDK中的方法append(Event)或appendBatch(List<Event>),發送數據,而不須要考慮底層信息交換細節。如何提供Event對象參數呢?(1)能夠直接使用Event接口,使用一個簡單的實現方法simpleEvent類;(2)可使用EventBuilder的重載方法withBody()靜態幫助方法;網絡
The client needs tocreate this object with the host and port of the target Flume agent, and canthen use the RpcClient to send data into the agent. The following example showshow to use the Flume Client SDK API within a user’s data-generatingapplication:
對於1.4.0版本中,Avro是默認的RPC協議。NettyAvroRpcClient 和 ThriftRpcClient實現了RpcClient接口。client客戶端須要建立包含Flume agent日誌服務器hostname(即IP)和端口(即Port)的對象,才能使用RPCclient發送數據到agent中。下面的例子展現瞭如何在用戶的應用程序中使用flumeclient SDK:
importorg.apache.flume.Event;
importorg.apache.flume.EventDeliveryException;
importorg.apache.flume.api.RpcClient;
importorg.apache.flume.api.RpcClientFactory;
importorg.apache.flume.event.EventBuilder;
importjava.nio.charset.Charset;
public class MyApp {
public static voidmain(String[]args) {
MyRpcClientFacade client = newMyRpcClientFacade();
// Initializeclient with the remote Flume agent's host and port
client.init("host.example.org",41414);
// Send 10 eventsto the remote Flume agent. That agent should be
// configured tolisten with an AvroSource.
String sampleData = "HelloFlume!";
for (int i = 0; i < 10; i++) {
client.sendDataToFlume(sampleData);
}
client.cleanUp();
}
}
private RpcClient client;
private String hostname;
private int port;
public voidinit(String hostname,int port) {
// Setup the RPCconnection
this.hostname = hostname;
this.port = port;
this.client =RpcClientFactory.getDefaultInstance(hostname,port);
// Use thefollowing method to create a thrift client (instead of the above line):
// this.client =RpcClientFactory.getThriftInstance(hostname, port);
}
public voidsendDataToFlume(String data){
// Create a FlumeEvent object that encapsulates the sample data
Event event = EventBuilder.withBody(data,Charset.forName("UTF-8"));
// Send the event
try {
client.append(event);
} catch (EventDeliveryExceptione) {
// clean up andrecreate the client
client.close();
client = null;
client = RpcClientFactory.getDefaultInstance(hostname,port);
// Use thefollowing method to create a thrift client (instead of the above line):
// this.client =RpcClientFactory.getThriftInstance(hostname, port);
}
}
public voidcleanUp() {
// Close the RPCconnection
client.close();
}
}
此時遠程flume agent須要有一個AvroSource(或ThriftSource,若是使用Thrift client的話)在監聽數據傳輸接口。下面就是一個flume agent的配置文件,該agent在等待用戶應用程序的鏈接:
a1.channels=c1
a1.sources=r1
a1.sinks=k1
a1.channels.c1.type=memory
a1.sources.r1.channels=c1
a1.sources.r1.type=avro
# For using a thrift source set the following instead of the above line.
# a1.source.r1.type = thrift
a1.sources.r1.bind=0.0.0.0
a1.sources.r1.port=41414
a1.sinks.k1.channel=c1
a1.sinks.k1.type=logger
爲了增長擴展性,上述配置文件中關於默認的Flume client的實現能夠進行以下配置:
client.type=default (for avro) or thrift (for thrift)
hosts=h1 # default client accepts only 1 host
# (additional hosts will be ignored)
hosts.h1=host1.example.org:41414 # host and port must both be specified
# (neither has a default)
batch-size=100 # Must be >=1 (default: 100)
connect-timeout=20000 # Must be >=1000 (default: 20000)
request-timeout=20000 # Must be >=1000 (default: 20000)
這個類中包括了默認的Avro RpcClient 來提供client客戶端的容錯處理能力。經過使用間隔的<host>:<port>列表來提供多個flume agents,組成一個容錯組(即用一組agent來提供備用的agent,如:<host1>:<port1><host2>:<port2> <host3>:<port3> ... 若是第一個agent服務器掛了,那麼自動啓用第二個agent服務器)。目前容錯RPCclient還不支持thrift。client類中可定義以下:
// Setup properties for the failover
Propertiesprops=newProperties();
props.put("client.type","default_failover");
// List of hosts (space-separated list of user-chosen host aliases)
props.put("hosts","h1 h2 h3");
// host/port pair for each host alias
Stringhost1="host1.example.org:41414";
Stringhost2="host2.example.org:41414";
Stringhost3="host3.example.org:41414";
props.put("hosts.h1",host1);
props.put("hosts.h2",host2);
props.put("hosts.h3",host3);
// create the client with failover properties
RpcClientclient=RpcClientFactory.getInstance(props);
爲了提升擴展性,failoverclient的實現中能夠進行以下的配置:
client.type=default_failover
hosts=h1 h2 h3 # at least one is required, but 2 or
# more makes better sense
hosts.h1=host1.example.org:41414
hosts.h2=host2.example.org:41414
hosts.h3=host3.example.org:41414
max-attempts=3 # Must be >=0 (default: number of hosts
# specified, 3 in this case). A '0'
# value doesn't make much sense because
# it will just cause an append call to
# immmediately fail. A '1' value means
# that the failover client will try only
# once to send the Event, and if it
# fails then there will be no failover
# to a second client, so this value
# causes the failover client to
# degenerate into just a default client.
# It makes sense to set this value to at
# least the number of hosts that you
# specified.
batch-size=100 # Must be >=1 (default: 100)
connect-timeout=20000 # Must be >=1000 (default: 20000)
request-timeout=20000 # Must be >=1000 (default: 20000)
flume client SDK還提供了一個在多個日誌接收主機之間進行負載均衡的 RPCclient,經過使用間隔的<host>:<port>列表來提供多個flume agents,組成一個負載均衡組(即用一組agent來提供分擔壓力的agent,如:<host1>:<port1> <host2>:<port2> <host3>:<port3>...)。這個client可經過配置,來指定負載均衡策略,好比是隨機選擇一個日誌服務器,仍是以循環的模式選擇一個日誌服務器。用戶也能夠自定義一個class類,經過實現LoadBalancingRpcClient和HostSelector接口,來指定自定義的服務器選擇策略,這種狀況下這個自定義class的全類名須要指定爲host-selector屬性的一個值。當前LoadBalancingRPC Client還不支持thrift。
其中的backoff選項若是爲true時,會將接收失敗的服務器列入臨時黑名單,以防止該出錯服務器被選爲容錯主機,在通過了指定時間後纔會解除黑名單。當通過了指定時間後,若是這個服務器仍是沒反應,那麼認定它是一個持續性錯誤,指定的時間段將會增加,避免進入長時間的等待。
The maximum backofftime can be configured by setting maxBackoff (in milliseconds). The maxBackoffdefault is 30 seconds (specified in the OrderSelector class that’s thesuperclass of both load balancing strategies). The backoff timeout willincrease exponentially with each sequential failure up to the maximum possible backofftimeout. The maximum possible backoff is limited to 65536 seconds (about 18.2hours). For example:
最大的backoff時間可經過maxBackoff配置項指定(單位是毫秒,默認是30S,在load balance 策略的父類OrderSelector中指定了),最大的backoff時間是65536S,大概18.2個小時。可參考的client類中可指定以下:
// Setup properties for the load balancing
Propertiesprops=newProperties();
props.put("client.type","default_loadbalance");
// List of hosts (space-separated list of user-chosen host aliases)
props.put("hosts","h1 h2 h3");
// host/port pair for each host alias
Stringhost1="host1.example.org:41414";
Stringhost2="host2.example.org:41414";
Stringhost3="host3.example.org:41414";
props.put("hosts.h1",host1);
props.put("hosts.h2",host2);
props.put("hosts.h3",host3);
props.put("host-selector","random");// For random host selection
// props.put("host-selector", "round_robin"); // For round-robin host
// // selection
props.put("backoff","true");// Disabled by default.
props.put("maxBackoff","10000");// Defaults 0, which effectively
// becomes 30000 ms
// Create the client with load balancing properties
RpcClientclient=RpcClientFactory.getInstance(props);
爲了提升擴展性,load balanceclient的實現(LoadBalancingRpcClient)中能夠進行以下的配置:
client.type=default_loadbalance
hosts=h1 h2 h3 # At least 2 hosts are required
hosts.h1=host1.example.org:41414
hosts.h2=host2.example.org:41414
hosts.h3=host3.example.org:41414
backoff=false # Specifies whether the client should
# back-off from (i.e. temporarily
# blacklist) a failed host
# (default: false).
maxBackoff=0 # Max timeout in millis that a will
# remain inactive due to a previous
# failure with that host (default: 0,
# which effectively becomes 30000)
host-selector=round_robin # The host selection strategy used
# when load-balancing among hosts
# (default: round_robin).
# Other values are include "random"
# or the FQCN of a custom class
# that implements
# LoadBalancingRpcClient$HostSelector
batch-size=100 # Must be >=1 (default: 100)
connect-timeout=20000 # Must be >=1000 (default: 20000)
request-timeout=20000 # Must be >=1000 (default: 20000)
從單agent來看,Flume使用基於事務的數據傳遞方式來保證事件傳遞的可靠性。Source和Sink被封裝進一個事務。事件被存放在Channel中直到該事件被處理,Channel中的事件纔會被移除。這是Flume提供的點到點的可靠機制。
從多級流來看,前一個agent的sink和後一個agent的source一樣由它們的事務來保障數據的可靠性。
推薦使用FileChannel,事件持久化在本地文件系統裏(性能較差)。
對現有程序改動最小的使用方式是使用直接讀取程序原來記錄的日誌文件,基本能夠實現無縫接入,不須要對現有程序進行任何改動。
對於直接讀取文件Source, 主要有兩種方式:
(1)Exec source方式,最經常使用的就是tail -F [file] 方式組織數據,每次由source獲取文件的增量數據,發送到channel。問題是,若是flume的source掛了,那麼等flume的source再次開啓的這段時間內,增長的日誌內容,就沒辦法被source讀取到了。解決方案:爲source添加一個狀態監控,若是source掛了,監控會把增量數據保存到其餘臨時文件中,下次讀取。
(2) spooling directory source 方式,將原始日誌文件按時間進行分割,放到指定目錄下,flume的source讀取新增文件。但要注意,這個目錄不容許作其餘操做,裏面的文件也不容許編輯。
apache-flume-1.5.2-bin.tar下載地址:http://flume.apache.org/download.html
需事先安裝:jdk
配置flume環境變量:/conf/flume-env.sh
JAVA_HOME=/usr/jdk7/jdk1.7.0_67
FLUME_CLASSPATH=/usr/apache-flume-1.5.2-bin
agent配置:/conf/flume-conf.properties
啓動agent:
$ bin/flume-ngagent -n agent_name -c conf -f conf/flume-conf.properties
問題:若是提示log4j ERROR,並提示log文件不存在,多是目錄權限不夠,不容許生成log日誌文件。能夠經過修改apache-flume-1.5.2-bin目錄的權限:sudo chmod 777 –R 來解決。
-n指定agent名稱
-c指定配置文件目錄
-f指定配置文件
-Dflume.root.logger=DEBUG,console設置日誌等級和輸出地方(可選)
由遠程服務器接收日誌,並寫入服務器目錄路徑
個人實驗環境是windows 7,在windows7中安裝了ubuntu 虛擬機。日誌由windows中的web項目生成,經過配置的log4j,發送到ubuntu虛擬機中的flume agent。
a) log4j.properties文件的配置:
# ---------Debugging log settings-------------------
log4j.rootLogger=info, stdout,logfile,dailylogfile
#----------------flume ng ,爲某個類設置日誌發送--------
log4j.logger.com.trace.web.action.user.login = info,flume
#-----或者爲 log4j.logger.com.trace. = info,flume ,範圍擴大到包
log4j.appender.flume=org.apache.flume.clients.log4jappender.Log4jAppender
log4j.appender.flume.layout = org.apache.log4j.PatternLayout
log4j.appender.flume.layout.ConversionPattern=%d{yyyy-MM-ddHH:mm:ss}%p[%c{2}]-%m%n
#--接收數據的遠程主機IP,此處爲虛擬機的ip地址
log4j.appender.flume.Hostname=192.168.17.128
#--監聽數據發送的遠程主機端口號,需開通(web服務器端和數據接收服務器端要開通)
log4j.appender.flume.Port=41414
發送日誌的.java類文件中的代碼:
private static Logger flumeLog =Logger.getLogger("flume");
#-----或者privatestatic Logger flumeLog = Logger.getLogger(XXX.class);
flumeLog.info("432343234");
b) log4j.xml文件的配置:
<appender name="flume" class="org.apache.flume.clients.log4jappender.Log4jAppender">
<paramname="Port" value="41414" />
<paramname="Hostname" value="21.20.8.6" />
<layoutclass="org.apache.log4j.PatternLayout">
<paramname="ConversionPattern" value="%d{yyyy-MM-dd HH:mm:ss} %m%n" />
</layout>
</appender>
<logger name="com.test">
<paramname="level" value="info" />
<appender-refref="flume" />
</logger>
發送日誌的類文件中的代碼:
#----這種方法會失效,沒法發送日誌
#----private static Logger flumeLog =Logger.getLogger("flume");
private static Logger flumeLog = Logger.getLogger(XXX.class);
flumeLog.info("432343234");
支持log4j------>flume數據傳輸的jar包:
3 遠程服務器端的配置:注意遠程日誌接收服務器和web server都須要開通日誌數據端口。
1安裝flume ng:
第6章節說明了flume ng安裝步驟。
2用於「接收web端數據」的flume.conf文件的配置:
# The configuration file needs to define the sources,
# the channels and the sinks.
# Sources, channels and sinks are defined per agent,
# in this case called 'agent'
#--配置agent
agent1.sources = avro-source1
agent1.channels = ch1
agent1.sinks = sink1
agent1.sources.avro-source1.channels= ch1
#--source的類型avro表示以avro協議接收數據,且數據由avro客戶端事件驅動
agent1.sources.avro-source1.type =avro
#--source綁定的數據源主機地址。web端的日誌已由log4j以RPC協議發送到數據接收服務器的41414端口,那麼數據接收服務器中flume agent的source只需綁定本機的ip,並接收41414端口過來的數據(這裏很重要,不然會接收不到數據,web端也可能沒法發送)。
agent1.sources.avro-source1.bind =192.168.17.128
agent1.sources.avro-source1.port =41414
#agent1.sources.avro-source1.interceptors= i1
#agent1.sources.avro-source1.interceptors.i1.type= timestamp
agent1.channels.ch1.type = memory
#--配置sink,將接收到的數據保存到指定地點。
agent1.sinks.sink1.channel = ch1
#agent1.sinks.sink1.type = logger
#--sink的type=file_roll表示以滾動文件的形式將數據保存到文件中。這裏有個問題,若是不作其餘設置,flume默認30秒生成一個新文件(即便沒有數據)
agent1.sinks.sink1.type = file_roll
#--設置shik組件將文件sink的路徑。
agent1.sinks.sink1.sink.directory =/home/hadoop1/apache-flume-1.5.2-bin/logs
#此處配置的,應該是每隔24秒,生成一個flume日誌數據文件。此處若是不配置,默認是30秒(數字的單位是秒)
agent1.sinks.sink1.sink.rollInterval=300
1) 可能出現的問題:
剛配置時,在上述的配置環境下,有一次數據收集端的agent意外掛掉了(服務器宕機),致使web server端的log4j沒法建立Appender,從而影響須要輸出日誌的對象功能,使web沒法正常運行。
這個問題是由flume的設計思路致使的。若是agent掛掉了,flume爲了節約系統和網絡資源,會在請求若干次後仍無無響應的狀況下,主動放棄請求,並置發送端RPCclient爲null。因此在agent掛掉的時候,只要及時重啓agent,就能夠了;但若是超過了flume設定的閾值,那麼即便重啓agent,仍是不能再發送數據了(RPCclient已經爲null了),此時會報異常:RPCclient 爲null。
建議的解決方案:能夠設置一個針對flume網絡鏈接的心跳線程,每隔一段時間檢測一下網絡鏈接,若是鏈接中斷,那麼從新初始化RPCclient。