flume參數:node
#example.conf:單節點Flume配置 #命名此代理上的組件 a1.sources = r1 a1.sinks = k1 a1.channels = c1 #描述/配置源 a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 44444 #描述接收器 a1.sinks.k1.type = logger #使用緩衝內存中事件的通道 a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 #將源和接收器綁定到通道 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
此配置定義名爲a1的單個代理。a1有一個偵聽端口44444上的數據的源,一個緩衝內存中事件數據的通道,以及一個將事件數據記錄到控制檯的接收器。web
根據scource、channel、sink劃分shell
一、Sources
Flume中經常使用的Source有NetCat,Avro,Exec,Spooling Directory,Taildir,也能夠根據業務場景的須要自定義Source,具體介紹以下。apache
1)NetCat Source
NetCat Source可使用TCP和UDP兩種協議方式,使用方法基本相同,經過監聽指定的IP和端口來傳輸數據,它會將監聽到的每一行數據轉化成一個Event寫入到Channel中。(必須參數以@標示,下類同)
json
Property Name Default Description channels@ – type@ – 類型指定爲:netcat bind@ – 綁定機器名或IP地址 port@ – 端口號 max-line-length 512 一行的最大字節數 ack-every-event true 對成功接受的Event返回OK selector.type replicating 選擇器類型replicating or multiplexing selector.* 選擇器相關參數 interceptors – 攔截器列表,多個以空格分隔 interceptors.* 攔截器相關參數
2)Avro Sourcebootstrap
不一樣主機上的Agent經過網絡傳輸數據可以使用的Source,通常是接受Avro client的數據,或和是上一級Agent的Avro Sink成對存在。緩存
Property Name Default Description channels@ – type@ – 類型指定爲:avro bind@ – 監聽的主機名或IP地址 port@ – 端口號 threads – 傳輸可以使用的最大線程數 selector.type selector.* interceptors – 攔截器列表 interceptors.* compression-type none 可設置爲「none」 或 「deflate」. 壓縮類型須要和AvroSource匹配
3)Exec Source安全
Exec source經過執行給定的Unix命令的傳輸結果數據,如,cat,tail -F等,實時性比較高,可是一旦Agent進程出現問題,可能會致使數據的丟失。網絡
Property Name Default Description channels@ – type@ – 類型指定爲:exec command@ – 須要去執行的命令 shell – 運行命令的shell腳本文件 restartThrottle 10000 嘗試重啓的超時時間 restart false 若是命令執行失敗,是否重啓 logStdErr false 是否記錄錯誤日誌 batchSize 20 批次寫入channel的最大日誌數量 batchTimeout 3000 批次寫入數據的最大等待時間(毫秒) selector.type replicating 選擇器類型replicating or multiplexing selector.* 選擇器其餘參數 interceptors – 攔截器列表,多個空格分隔 interceptors.*
4)Spooling Directory Sourceapp
經過監控一個文件夾將新增文件內容轉換成Event傳輸數據,特色是不會丟失數據,使用Spooling Directory Source須要注意的兩點是,
1)不能對被監控的文件夾下的新增的文件作出任何更改,
2)新增到監控文件夾的文件名稱必須是惟一的。因爲是對整個新增文件的監控,Spooling Directory Source的實時性相對較低,不過能夠採用對文件高粒度分割達到近似實時。
Property Name Default Description channels@ – type@ – 類型指定:spooldir. spoolDir@ – 被監控的文件夾目錄 fileSuffix .COMPLETED 完成數據傳輸的文件後綴標誌 deletePolicy never 刪除已經完成數據傳輸的文件時間:never or immediate fileHeader false 是否在header中添加文件的完整路徑信息 fileHeaderKey file 若是header中添加文件的完整路徑信息時key的名稱 basenameHeader false 是否在header中添加文件的基本名稱信息 basenameHeaderKey basename 若是header中添加文件的基本名稱信息時key的名稱 includePattern ^.*$ 使用正則來匹配新增文件須要被傳輸數據的文件 ignorePattern ^$ 使用正則來忽略新增的文件 trackerDir .flumespool 存儲元數據信息目錄 consumeOrder oldest 文件消費順序:oldest, youngest and random. maxBackoff 4000 若是channel容量不足,嘗試寫入的超時時間,若是仍然不能寫入,則會拋出ChannelException batchSize 100 批次處理粒度 inputCharset UTF-8 輸入碼錶格式 decodeErrorPolicy FAIL 遇到不可解碼字符後的處理方式:FAIL,REPLACE,IGNORE selector.type replicating 選擇器類型:replicating or multiplexing selector.* 選擇器其餘參數 interceptors – 攔截器列表,空格分隔 interceptors.*
5)Taildir Source
能夠實時的監控指定一個或多個文件中的新增內容,因爲該方式將數據的偏移量保存在一個指定的json文件中,即便在Agent掛掉或被kill也不會有數據的丟失,須要注意的是,該Source不能在Windows上使用。
Property Name Default Description channels@ – type@ – 指定類型:TAILDIR. filegroups@ – 文件組的名稱,多個空格分隔 filegroups.<filegroupName>@ – 被監控文件的絕對路徑 positionFile ~/.flume/taildir_position.json 存儲數據偏移量路徑 headers.<filegroupName>.<headerKey> – Header key的名稱 byteOffsetHeader false 是否添加字節偏移量到key爲‘byteoffset’值中 skipToEnd false 當偏移量不能寫入到文件時是否跳到文件結尾 idleTimeout 120000 關閉沒有新增內容的文件超時時間(毫秒) writePosInterval 3000 在positionfile 寫入每個文件lastposition的時間間隔 batchSize 100 批次處理行數 fileHeader false 是否添加header存儲文件絕對路徑 fileHeaderKey file fileHeader啓用時,使用的key
二、Channels
官網提供的Channel有多種類型可供選擇,這裏介紹Memory Channel和File Channel。
1)Memory Channel
Memory Channel是使用內存來存儲Event,使用內存的意味着數據傳輸速率會很快,可是當Agent掛掉後,存儲在Channel中的數據將會丟失。
Property Name Default Description type@ – 類型指定爲:memory capacity 100 存儲在channel中的最大容量 transactionCapacity 100 從一個source中去或者給一個sink,每一個事務中最大的事件數 keep-alive 3 對於添加或者刪除一個事件的超時的秒鐘 byteCapacityBufferPercentage 20 定義緩存百分比 byteCapacity see description Channel中容許存儲的最大字節總數
2)File Channel
File Channel使用磁盤來存儲Event,速率相對於Memory Channel較慢,但數據不會丟失。
Property Name Default Description type@ – 類型指定:file. checkpointDir ~/.flume/file-channel/checkpoint checkpoint目錄 useDualCheckpoints false 備份checkpoint,爲True,backupCheckpointDir必須設置 backupCheckpointDir – 備份checkpoint目錄 dataDirs ~/.flume/file-channel/data 數據存儲所在的目錄設置 transactionCapacity 10000 Event存儲最大值 checkpointInterval 30000 checkpoint間隔時間 maxFileSize 2146435071 單一日誌最大設置字節數 minimumRequiredSpace 524288000 最小的請求閒置空間(以字節爲單位) capacity 1000000 Channel最大容量 keep-alive 3 一個存放操做的等待時間值(秒) use-log-replay-v1 false Expert: 使用老的回覆邏輯 use-fast-replay false Expert: 回覆不須要隊列 checkpointOnClose true
Flume經常使用Sinks有Log Sink,HDFS Sink,Avro Sink,Kafka Sink,固然也能夠自定義Sink。
1)Logger Sink
Logger Sink以INFO 級別的日誌記錄到log日誌中,這種方式一般用於測試。
Property Name Default Description
channel@ –
type@ – 類型指定:logger
maxBytesToLog 16 可以記錄的最大Event Body字節數
2)HDFS Sink
Sink數據到HDFS,目前支持text 和 sequence files兩種文件格式,支持壓縮,並能夠對數據進行分區,分桶存儲。
Name Default Description channel@ – type@ – 指定類型:hdfs hdfs.path@ – HDFS的路徑,eg hdfs://namenode/flume/webdata/ hdfs.filePrefix FlumeData 保存數據文件的前綴名 hdfs.fileSuffix – 保存數據文件的後綴名 hdfs.inUsePrefix – 臨時寫入的文件前綴名 hdfs.inUseSuffix .tmp 臨時寫入的文件後綴名 hdfs.rollInterval 30 間隔多長將臨時文件滾動成最終目標文件,單位:秒, 若是設置成0,則表示不根據時間來滾動文件 hdfs.rollSize 1024 當臨時文件達到多少(單位:bytes)時,滾動成目標文件, 若是設置成0,則表示不根據臨時文件大小來滾動文件 hdfs.rollCount 10 當 events 數據達到該數量時候,將臨時文件滾動成目標文件, 若是設置成0,則表示不根據events數據來滾動文件 hdfs.idleTimeout 0 當目前被打開的臨時文件在該參數指定的時間(秒)內, 沒有任何數據寫入,則將該臨時文件關閉並重命名成目標文件 hdfs.batchSize 100 每一個批次刷新到 HDFS 上的 events 數量 hdfs.codeC – 文件壓縮格式,包括:gzip, bzip2, lzo, lzop, snappy hdfs.fileType SequenceFile 文件格式,包括:SequenceFile, DataStream,CompressedStre, 當使用DataStream時候,文件不會被壓縮,不須要設置hdfs.codeC; 當使用CompressedStream時候,必須設置一個正確的hdfs.codeC值; hdfs.maxOpenFiles 5000 最大容許打開的HDFS文件數,當打開的文件數達到該值, 最先打開的文件將會被關閉 hdfs.minBlockReplicas – HDFS副本數,寫入 HDFS 文件塊的最小副本數。 該參數會影響文件的滾動配置,通常將該參數配置成1,才能夠按照配置正確滾動文件 hdfs.writeFormat Writable 寫 sequence 文件的格式。包含:Text, Writable(默認) hdfs.callTimeout 10000 執行HDFS操做的超時時間(單位:毫秒) hdfs.threadsPoolSize 10 hdfs sink 啓動的操做HDFS的線程數 hdfs.rollTimerPoolSize 1 hdfs sink 啓動的根據時間滾動文件的線程數 hdfs.kerberosPrincipal – HDFS安全認證kerberos配置 hdfs.kerberosKeytab – HDFS安全認證kerberos配置 hdfs.proxyUser 代理用戶 hdfs.round false 是否啓用時間上的」捨棄」 hdfs.roundValue 1 時間上進行「捨棄」的值 hdfs.roundUnit second 時間上進行」捨棄」的單位,包含:second,minute,hour hdfs.timeZone Local Time 時區。 hdfs.useLocalTimeStamp false 是否使用當地時間 hdfs.closeTries 0 Number hdfs sink 關閉文件的嘗試次數; 若是設置爲1,當一次關閉文件失敗後,hdfs sink將不會再次嘗試關閉文件, 這個未關閉的文件將會一直留在那,而且是打開狀態; 設置爲0,當一次關閉失敗後,hdfs sink會繼續嘗試下一次關閉,直到成功 hdfs.retryInterval 180 hdfs sink 嘗試關閉文件的時間間隔, 若是設置爲0,表示不嘗試,至關於於將hdfs.closeTries設置成1 serializer TEXT 序列化類型 serializer.*
3)Avro Sink
Property Name Default Description channel@ – type@ – 指定類型:avro. hostname@ – 主機名或IP port@ – 端口號 batch-size 100 批次處理Event數 connect-timeout 20000 鏈接超時時間 request-timeout 20000 請求超時時間 compression-type none 壓縮類型,「none」 or 「deflate」. compression-level 6 壓縮級別,0表示不壓縮,1-9數字越大,壓縮比越高 ssl false 使用ssl加密
4)Kafka Sink
傳輸數據到Kafka中,須要注意的是Flume版本和Kafka版本的兼容性
Property Name Default Description type – 指定類型:org.apache.flume.sink.kafka.KafkaSink kafka.bootstrap.servers – kafka服務地址 kafka.topic default-flume-topic kafka Topic flumeBatchSize 100 批次寫入kafka Event數 kafka.producer.acks 1 多少個副本確認後才能肯定消息傳遞成功,0表示不須要確認 1表示只須要首要的副本獲得確認,-1表示等待全部確認。
啓動參數:
命令
bin / flume-ng agent -conf conf -z zkhost:2181,zkhost1:2181 -p / flume -name a1 -Dflume.root.logger = INFO,console
一、flume-ng agent 運行一個Flume Agent
二、-conf 指定配置文件,這個配置文件必須在全局選項的–conf參數定義的目錄下。
三、-z Zookeeper鏈接字符串。以逗號分隔的主機名列表:port
四、-p Zookeeper中的基本路徑,用於存儲代理配置
五、-name a1 Agent的名稱a1
六、-Dflume.root.logger=INFO,console 該參數將會把flume的日誌輸出到console,爲了將其輸出到日誌文件(默認在$FLUME_HOME/logs),能夠將console改成LOGFILE形式
具體的配置能夠修改$FLUME_HOME/conf/log4j.properties
-Dflume.log.file=./wchatAgent.logs 該參數直接輸出日誌到目標文件
具體: