Flume的各類類型的組件介紹

1.   Source

NetCat Source:綁定的端口(tcp、udp),將流經端口的每個文本行數據做爲Event輸入;

type:source的類型,必須是netcat。app

bind:要監聽的(本機的)主機名或者ip。此監聽不是過濾發送方。一臺電腦不是說只有一個IP。有多網卡的電腦,對應多個IP。負載均衡

port:綁定的本地的端口。tcp

 

Avro Source:監聽一個avro服務端口,採集Avro數據序列化後的數據;

type:avrosource的類型,必須是avro。spa

bind:要監聽的(本機的)主機名或者ip。此監聽不是過濾發送方。一臺電腦不是說只有一個IP。有多網卡的電腦,對應多個IP。日誌

port:綁定的本地的端口。code

 

Exec Source:於Unix的command在標準輸出上採集數據;

type:source的類型:必須是exec。blog

command:要執行命令。事務

 

Spooling Directory Source:監聽一個文件夾裏的文件的新增,若是有則採集做爲source。

type:source 的類型:必須是spooldirip

spoolDir:監聽的文件夾 【提早建立目錄】內存

fileSuffix:上傳完畢後文件的重命名後綴,默認爲.COMPLETED

deletePolicy:上傳後的文件的刪除策略never和immediate,默認爲never。

fileHeader:是否要加上該文件的絕對路徑在header裏,默認是false。

basenameHeader:是否要加上該文件的名稱在header裏,默認是false。

2. Sink

HDFS Sink:將數據傳輸到hdfs集羣中。

type:sink的類型 必須是hdfs。

hdfs.path:hdfs的上傳路徑。

hdfs.filePrefix:hdfs文件的前綴。默認是:FlumeData

hdfs.rollInterval:間隔多久產生新文件,默認是:30(秒) 0表示不以時間間隔爲準。

hdfs.rollSize:文件到達多大再產生一個新文件,默認是:1024(bytes)0表示不以文件大小爲準。

hdfs.rollCount:event達到多大再產生一個新文件,默認是:10(個)0表示不以event數目爲準。

hdfs.batchSize:每次往hdfs裏提交多少個event,默認爲100

hdfs.fileType:hdfs文件的格式主要包括:SequenceFile, DataStream ,CompressedStream,若是使用了CompressedStream就要設置壓縮方式。

hdfs.codeC:壓縮方式:gzip, bzip2, lzo, lzop, snappy

注:%{host}能夠使用header的key。以及%Y%m%d來表示時間,但關於時間的表示須要在header裏有timestamp這個key。

 

Logger Sink將數據做爲日誌處理(根據flume中的設置的日誌方式來顯示)

要在控制檯顯示在運行agent的時候加入:-Dflume.root.logger=INFO,console 。

type:sink的類型:必須是 logger。

maxBytesToLog:打印body的最長的字節數 默認爲16

 

Avro Sink:數據被轉換成Avro Event,而後發送到指定的服務端口上。

type:sink的類型:必須是 avro。

hostname:指定發送數據的主機名或者ip

port:指定發送數據的端口

 

File Roll Sink:數據發送到本地文件。

type:sink的類型:必須是 file_roll。

sink.directory:存儲文件的目錄【提早建立目錄】

batchSize:一次發送多少個event。默認爲100

sink.rollInterval:多久產生一個新文件,默認爲30s。單位是s。0爲不產生新文件。【即便沒有數據也會產生文件】

 

3.Channel

Memory Channel使用內存做爲數據的存儲。

Type channel的類型:必須爲memory

capacity:channel中的最大event數目

transactionCapacity:channel中容許事務的最大event數目

 

File Channel 使用文件做爲數據的存儲

Type channel的類型:必須爲 file

checkpointDir :檢查點的數據存儲目錄【提早建立目錄】

dataDirs :數據的存儲目錄【提早建立目錄】

transactionCapacity:channel中容許事務的最大event數目

 

Spillable Memory Channel 使用內存做爲channel超過了閥值就存在文件中

Type channel的類型:必須爲SPILLABLEMEMORY

memoryCapacity:內存的容量event數

overflowCapacity:數據存到文件的event閥值數

checkpointDir:檢查點的數據存儲目錄

dataDirs:數據的存儲目錄

 

4. Interceptor

Timestamp Interceptor 時間戳攔截器 在header里加入key爲timestamp,value爲當前時間。

type:攔截器的類型,必須爲timestamp

preserveExisting:若是此攔截器增長的key已經存在,若是這個值設置爲true則保持原來的值,不然覆蓋原來的值。默認爲false

 

Host Interceptor 主機名或者ip攔截器,在header里加入ip或者主機名

type:攔截器的類型,必須爲host

preserveExisting:若是此攔截器增長的key已經存在,若是這個值設置爲true則保持原來的值,不然覆蓋原來的值。默認爲false

useIP:若是設置爲true則使用ip地址,不然使用主機名,默認爲true

hostHeader:使用的header的key名字,默認爲host

 

Static Interceptor 靜態攔截器,是在header里加入固定的key和value。

type:avrosource的類型,必須是static。

preserveExisting:若是此攔截器增長的key已經存在,若是這個值設置爲true則保持原來的值,不然覆蓋原來的值。默認爲false

key:靜態攔截器添加的key的名字

value:靜態攔截器添加的key對應的value值

 

5.  Channel Selector

Multiplexing Channel Selector 根據header的key的值分配channel

selector.type 默認爲replicating

selector.header:選擇做爲判斷的key

selector.default:默認的channel配置

selector.mapping.*:匹配到的channel的配置

 

6. Sink Processor

負載均衡

a1.sinkgroups=g1

a1.sinkgroups.g1.sinks=k1 k2

a1.sinkgroups.g1.processor.type=load_balance

a1.sinkgroups.g1.processor.backoff=true

a1.sinkgroups.g1.processor.selector=round_robin

a1.sinkgroups.g1.processor.selector.maxTimeOut=30000

 

backoff:開啓後,故障的節點會列入黑名單,過必定時間再次發送,若是還失敗,則等待是指數增加;直到達到最大的時間。

若是不開啓,故障的節點每次都會被重試。

selector.maxTimeOut:最大的黑名單時間(單位爲毫秒)。

 

故障轉移

a1.sinkgroups=g1

a1.sinkgroups.g1.sinks=k1 k2

a1.sinkgroups.g1.processor.type=failover

a1.sinkgroups.g1.processor.priority.k1=10

a1.sinkgroups.g1.processor.priority.k2=5

a1.sinkgroups.g1.processor.maxpenalty=10000

#maxpenalty 對於故障的節點最大的黑名單時間 (in millis 毫秒)

相關文章
相關標籤/搜索