大數據3-Flume收集數據+落地HDFS

 

flume

  日誌收集系統

    Flume是Cloudera提供的一個高可用的高可靠的分佈式海量日誌採集、聚合傳輸的系統,Flume支持在日誌系統中定製各種數據發送方,用於收集數據;同時,Flume提供對數據進行簡單處理,並寫到各類數據接受方(可定製)的能力。html

    當前Flume有兩個版本Flume 0.9X版本的統稱Flume-og,Flume1.X版本的統稱Flume-ng。因爲Flume-ng通過重大重構,與Flume-og有很大不一樣,使用時請注意區分。

  

 

 

 

  基本概念

     Event 事件

      把讀取的一條日誌信息包裝成一個對象,這個對象就叫Flume Event。java

      本質就是一個json字符串,如:{head:info,body:info}node

    Agent 代理

      代理,是一個java進程(JVM),它承載event,從外部源傳遞到下一個目標的組件。web

      主要由3部分組成:Source、Channel、Sink。正則表達式

    Source 數據源

      Source組件是專門用來收集數據的,能夠處理各類類型、各類格式的日誌數據,包括avro、thrift、exec、jms、spooling directory、netcat、sequence       generator、syslog、http、legacy、自定義。數據庫

    Channel 數據通道

      Source組件把數據收集來之後,臨時存放在channel中,即channel組件在agent中是專門用來存放臨時數據的。對採集到的數據進行簡單的緩存,能夠存放在memory、jdbc、file等等。apache

    Sink 數據匯聚點

      Sink組件是用於把數據發送到目的地的組件,目的地包括hdfs、logger、avro、thrift、ipc、file、null、Hbase、solr、自定義。json

    組合過程

      爲了安全性,數據的傳輸是將數據封裝成一個Event事件。Source會將從服務器收集的數據封裝成Event,而後存儲在緩衝區Channel,Channel的結構與隊列比較類似(先進先出)。Sink就會從緩衝區Channel中抓取數據,抓取到數據時,就會把Channel中的對應數據刪除,而且把抓取的數據寫入HDFS等目標地址或者也能夠是下一個Source。必定是當數據傳輸成功後,纔會刪除緩衝區Channel中的數據,這是爲了可靠性。當接收方Crash(崩潰)時,以即可以從新發送數據。緩存

 

 

 

 

 

  二、可靠性安全

    當節點出現故障時,日誌可以被傳送到其餘節點上而不會丟失。

    Flume提供了三種級別的可靠性保障,從強到弱依次分別爲:

      end-to-end(收到數據agent首先將event寫到磁盤上,當數據傳送成功後,再刪除;若是數據發送失敗,能夠從新發送。) 

      Store on failure(這也是Scribe-Facebook開源的日誌收集系統-採用的策略,當數據接收方crash(崩潰)時,將數據寫到本地,待恢復後,繼續發送) 

      Besteffort(數據發送到接收方後,不會進行確認) 

  三、須要安裝jdk 

jdk安裝

  四、安裝flume

安裝flume

   五、目錄結構

目錄結構

    

 

    

 

 

  Source組件

      重點掌握Avro SourceSpooling Directory Source

#單節點Flume配置 #命名Agent a1的組件 a1.sources = r1 a1.sinks = k1 a1.channels = c1 #描述/配置Source a1.sources.r1.type = netcat #內置類型,接收來自網絡的數據 a1.sources.r1.bind =  0.0.0.0            #等同於網絡的127.0.0.1 a1.sources.r1.port =  22222 #服務的端口號 #描述Sink a1.sinks.k1.type = logger #內置類型 #描述內存Channel a1.channels.c1.type = memory #保存數據到內存 a1.channels.c1.capacity =  1000 #容量最大存放1000條日誌 a1.channels.c1.transactionCapacity =  100 #事務中的一批數據100條 #爲Channle綁定Source和Sink a1.sources.r1.channels = c1 #一個source能夠綁定到多個channel a1.sinks.k1.channel =  c1        #一個sink只能綁定到一個channel
flume.properties
a1.sources = r1 a1.sinks = k1 a1.channels = c1 a1.sources.r1.type = spooldir a1.sources.r1.spoolDir = /usr/local/src/flume/data a1.sinks.k1.type = logger a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
flume-dir.properties
a1.sources = r1 a1.sinks = k1 a1.channels = c1 a1.sources.r1.type = avro a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 22222 a1.sinks.k1.type = logger a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
flume-avro.properties
a1.sources = r1 a1.sinks = k1 a1.channels = c1 a1.sources.r1.type = http a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 22222 a1.sinks.k1.type = avro a1.sinks.k1.hostname = 192.168.220.137 a1.sinks.k1.port = 22222 a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
flume-http.properties
a1.sources = r1 a1.sinks = k1 a1.channels = c1 a1.sources.r1.type = avro a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 22222 a1.sinks.k1.type = avro a1.sinks.k1.hostname = 192.168.220.137 a1.sinks.k1.port = 22222 a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
flume-jt.properties
channels – type – 類型名稱,"AVRO" bind – 須要監聽的主機名或IP port – 要監聽的端口 threads – 工做線程最大線程數 selector.type selector.* interceptors – 空格分隔的攔截器列表 interceptors.* compression-type none 壓縮類型,能夠是「none」或「default」,這個值必須和AvroSource的壓縮格式匹配 ssl false 是否啓用ssl加密,若是啓用還須要配置一個「keystore」和一個「keystore-password」. keystore – 爲SSL提供的 java密鑰文件 所在路徑 keystore-password – 爲SSL提供的 java密鑰文件 密碼 keystore-type JKS 密鑰庫類型能夠是 「JKS」 或 「PKCS12」. exclude-protocols SSLv3 空格分隔開的列表,用來指定在SSL / TLS協議中排除。SSLv3將老是被排除除了所指定的協議。 ipFilter false 若是須要爲netty開啓ip過濾,將此項設置爲true ipFilterRules – 陪netty的ip過濾設置表達式規則
參數說明  flume-avro.properties
channels – type – 類型,須要指定爲"spooldir" spoolDir – 讀取文件的路徑,即"蒐集目錄" fileSuffix .COMPLETED 對處理完成的文件追加的後綴 deletePolicy never 處理完成後是否刪除文件,需是"never"或"immediate" fileHeader false Whether to add a header storing the absolute path filename. fileHeaderKey file Header key to use when appending absolute path filename to event header. basenameHeader false Whether to add a header storing the basename of the file. basenameHeaderKey basename Header Key to use when appending basename of file to event header. ignorePattern ^$ 正則表達式指定哪些文件須要忽略 trackerDir .flumespool Directory to store metadata related to processing of files. If this path is not an absolute path, then it is interpreted as relative to the spoolDir. consumeOrder 處理文件的策略,oldest, youngest 或 random。 maxBackoff 4000 The maximum time (in millis) to wait between consecutive attempts to write to the channel(s) if the channel is full. The source will start at a low backoff and increase it exponentially each time the channel throws a ChannelException, upto the value specified by this parameter. batchSize 100 Granularity at which to batch transfer to the channel inputCharset UTF-8 讀取文件時使用的編碼。 decodeErrorPolicy FAIL 當在輸入文件中發現沒法處理的字符編碼時如何處理。FAIL:拋出一個異常而沒法 ​​解析該文件。REPLACE:用「替換字符」字符,一般是Unicode的U + FFFD更換不可解析角色。 忽略:掉落的不可解析的字符序列。 deserializer LINE 聲明用來將文件解析爲事件的解析器。默認一行爲一個事件。處理類必須實現EventDeserializer.Builder接口。 deserializer.* Varies per event deserializer. bufferMaxLines – (Obselete) This option is now ignored. bufferMaxLineLength 5000 (Deprecated) Maximum length of a line in the commit buffer. Use deserializer.maxLineLength instead. selector.type replicating replicating or multiplexing selector.* Depends on the selector.type value interceptors – Space-separated list of interceptors interceptors.*
參數說明  flume-dir.properties
type 類型,必須爲"HTTP" port – 監聽的端口 bind 0.0.0.0 監聽的主機名或ip handler org.apache.flume.source.http.JSONHandler 處理器類,須要實現HTTPSourceHandler接口 handler.* – 處理器的配置參數 selector.type selector.* interceptors – interceptors.* enableSSL false 是否開啓SSL,若是須要設置爲true。注意,HTTP不支持SSLv3。 excludeProtocols SSLv3 空格分隔的要排除的SSL/TLS協議。SSLv3老是被排除的。 keystore 密鑰庫文件所在位置。 keystorePassword Keystore 密鑰庫密碼
參數說明  flume-http.properties
3.3.6.1啓動時報錯不繼續 [root@localhost conf]# ../bin/flume-ng agent --conf conf --conf-file flume.properties --name a1 -Dflume.root.logger=INFO,console Info: Including Hive libraries found via () for Hive access + exec /usr/local/src/java/jdk1.7.0_51/bin/java -Xmx20m -Dflume.root.logger=INFO,console -cp 'conf:/usr/local/src/flume/apache-flume-1.6.0-bin/lib/*:/lib/*' -Djava.library.path= org.apache.flume.node.Application --conf-file flume.properties --name a1 log4j:WARN No appenders could be found for logger (org.apache.flume.lifecycle.LifecycleSupervisor). log4j:WARN Please initialize the log4j system properly. log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info. 錯誤緣由: log4j屬性文件,路徑不正確 解決辦法: [root@localhost bin]# ./flume-ng agent -c /usr/local/src/flume/apache-flume-1.6.0-bin/conf -f /usr/local/src/flume/apache-flume-1.6.0-bin/conf/flume.properties -n a1 -Dflume.root.logger=INFO,console 或者 [root@localhost bin]# ./flume-ng agent -c ../conf -f ../conf/flume.properties -n a1 -Dflume.root.logger=INFO,console 3.3.6.2監控目錄重名異常 若是文件已經處理過,哪怕完成的文件被刪除,也無濟於事 java.lang.IllegalStateException: File name has been re-used with different files. Spooling assumptions violated for /usr/local/src/flume/data/g.txt.COMPLETED 解決辦法: 不能有重名文件放入到監控的目錄中。 都把文件刪除了,它怎麼還知道這個文件處理過呢? Flume在監控目錄下建立了一個隱藏目錄.flumespool下面有一個隱藏文件.flumespool-main.meta,裏面記錄了處理過的信息。把此隱藏目錄刪除,就能夠處理重名文件。
常見報錯  

 

[root@localhost conf]# ../bin/flume-ng agent -c ./ -f ./flume-avro.properties -n a1 -Dflume.root.logger=INFO,console 啓動結果: 2017-11-07 19:58:03,708 (lifecycleSupervisor-1-2) [INFO - org.apache.flume.source.AvroSource.start(AvroSource.java:253)] Avro source r1 started.
cd /usr/local/src/flume #進入目錄 vi log.txt #建立數據文件,內容以下 hi flume. you are good tools.
準備數據
經過flume提供的avro客戶端向指定機器指定端口發送日誌信息: ./flume-ng –h #幫助能夠看命令格式及參數用法 ./flume-ng avro-client -c ../conf -H 0.0.0.0 -p 22222 -F ../../log.txt 控制檯收到消息: 注意:紅色框中的打印的內容會被截斷,在控制檯不能顯示不少,只顯示很短的一部份內容。
發送avro消息

 

  channel組件

事件將被存儲在內存中的具備指定大小的隊列中。很是適合那些須要高吞吐量可是失敗是會丟失數據的場景下。 參數說明: type – 類型,必須是「memory」 capacity 100 事件存儲在信道中的最大數量 transactionCapacity 100 每一個事務中的最大事件數 keep-alive 3 添加或刪除操做的超時時間 byteCapacityBufferPercentage 20 Defines the percent of buffer between byteCapacity and the estimated total size of all events in the channel, to account for data in headers. See below. byteCapacity see description Maximum total bytes of memory allowed as a sum of all events in this channel. The implementation only counts the Event body, which is the reason for providing the byteCapacityBufferPercentage configuration parameter as well. Defaults to a computed value equal to 80% of the maximum memory available to the JVM (i.e. 80% of the -Xmx value passed on the command line). Note that if you have multiple memory channels on a single JVM, and they happen to hold the same physical events (i.e. if you are using a replicating channel selector from a single source) then those event sizes may be double-counted for channel byteCapacity purposes. Setting this value to 0 will cause this value to fall back to a hard internal limit of about 200 GB.
Memory Channel
事件被持久存儲在可靠的數據庫中。目前支持嵌入式的Derby數據庫。若是可恢復性很是的重要可使用這種方式
JDBC Channel
性能會比較低下,可是即便程序出錯數據不會丟失。 參數說明: type – 類型,必須是「file」 checkpointDir ~/.flume/file-channel/checkpoint 檢查點文件存放的位置 useDualCheckpoints false Backup the checkpoint. If this is set to true, backupCheckpointDir must be set backupCheckpointDir – The directory where the checkpoint is backed up to. This directory must not be the same as the data directories or the checkpoint directory dataDirs ~/.flume/file-channel/data 逗號分隔的目錄列表,用以存放日誌文件。使用單獨的磁盤上的多個目錄能夠提升文件通道效率。 transactionCapacity 10000 The maximum size of transaction supported by the channel checkpointInterval 30000 Amount of time (in millis) between checkpoints maxFileSize 2146435071 一個日誌文件的最大尺寸 minimumRequiredSpace 524288000 Minimum Required free space (in bytes). To avoid data corruption, File Channel stops accepting take/put requests when free space drops below this value capacity 1000000 Maximum capacity of the channel keep-alive 3 Amount of time (in sec) to wait for a put operation use-log-replay-v1 false Expert: Use old replay logic use-fast-replay false Expert: Replay without using queue checkpointOnClose true Controls if a checkpoint is created when the channel is closed. Creating a checkpoint on close speeds up subsequent startup of the file channel by avoiding replay. encryption.activeKey – Key name used to encrypt new data encryption.cipherProvider – Cipher provider type, supported types: AESCTRNOPADDING encryption.keyProvider – Key provider type, supported types: JCEKSFILE encryption.keyProvider.keyStoreFile – Path to the keystore file encrpytion.keyProvider.keyStorePasswordFile – Path to the keystore password file encryption.keyProvider.keys – List of all keys (e.g. history of the activeKey setting) encyption.keyProvider.keys.*.passwordFile – Path to the optional key password file
file channel
內存溢出通道。事件被存儲在內存隊列和磁盤中。 內存隊列做爲主存儲,而磁盤做爲溢出內容的存儲。當內存隊列已滿時,後續的事件將被存儲在文件通道中。這個通道適用於正常操做期間適用內存通道已期實現高效吞吐,而在高峯期間適用文件通道實現高耐受性。經過下降吞吐效率提升系統可耐受性。若是Agent崩潰,則只有存儲在文件系統中的事件能夠被恢復,內存中數據會丟失。此通道處於試驗階段,不建議在生產環境中使用。 參數說明: type – 類型,必須是"SPILLABLEMEMORY" memoryCapacity 10000 內存中存儲事件的最大值,若是想要禁用內存緩衝區將此值設置爲0。 overflowCapacity 100000000 能夠存儲在磁盤中的事件數量最大值。設置爲0能夠禁用磁盤存儲。 overflowTimeout 3 The number of seconds to wait before enabling disk overflow when memory fills up. byteCapacityBufferPercentage 20 Defines the percent of buffer between byteCapacity and the estimated total size of all events in the channel, to account for data in headers. See below. byteCapacity see description Maximum bytes of memory allowed as a sum of all events in the memory queue. The implementation only counts the Event body, which is the reason for providing the byteCapacityBufferPercentage configuration parameter as well. Defaults to a computed value equal to 80% of the maximum memory available to the JVM (i.e. 80% of the -Xmx value passed on the command line). Note that if you have multiple memory channels on a single JVM, and they happen to hold the same physical events (i.e. if you are using a replicating channel selector from a single source) then those event sizes may be double-counted for channel byteCapacity purposes. Setting this value to 0 will cause this value to fall back to a hard internal limit of about 200 GB. avgEventSize 500 Estimated average size of events, in bytes, going into the channel <file channel properties>    see file channel    Any file channel property with the exception of ‘keep-alive’ and ‘capacity’ can be used. The keep-alive of file channel is managed by Spillable Memory Channel. Use ‘overflowCapacity’ to set the File channel’s capacity.
Spillable Memory Channel

 

  sink組件

  

記錄INFO級別的日誌,一般用於調試。 參數說明: channel – type – The component type name, needs to be logger maxBytesToLog 16 Maximum number of bytes of the Event body to log 要求必須在 --conf 參數指定的目錄下有 log4j的配置文件log4j.properties 能夠經過-Dflume.root.logger=INFO,console在命令啓動時手動指定log4j參數
Logger Sink
在本地文件系統中存儲事件。 每隔指定時長生成文件保存這段時間內收集到的日誌信息。 參數說明: channel – type – 類型,必須是"file_roll" sink.directory – 文件被存儲的目錄 sink.rollInterval 30 滾動文件每隔30秒(應該是每隔30秒鐘單獨切割數據到一個文件的意思)。若是設置爲0,則禁止滾動,從而致使全部數據被寫入到一個文件中。 sink.serializer TEXT Other possible options include avro_event or the FQCN of an implementation of EventSerializer.Builder interface. batchSize 100
File Roll Sink
修改內容: a1.sources.r1.type = http #內置類型 a1.sources.r1.port = 22222 #設置監測目錄 a1.sinks.k1.type = file_roll #文件落地 a1.sinks.k1.sink.directory = /usr/local/src/flume/data #存放目錄
配置文件flume-roll-sink.properties
配置第一行,註釋第二行,啓用console。默認是註釋第一行,開啓第二行。 curl -X POST -d '[{"headers":{"tester":"tony"},"body":"hello http flume"}]' http://0.0.0.0:22222 執行結果 [root@localhost data]# pwd /usr/local/src/flume/data #數據所在目錄 [root@localhost data]# ll total 4 -rw-r--r--. 1 root root 0 Nov 9 16:21 1510273266537-1 -rw-r--r--. 1 root root 22 Nov 9 16:21 1510273266537-2 -rw-r--r--. 1 root root 0 Nov 9 16:22 1510273266537-3 -rw-r--r--. 1 root root 0 Nov 9 16:22 1510273266537-4 -rw-r--r--. 1 root root 0 Nov 9 16:23 1510273266537-5 -rw-r--r--. 1 root root 0 Nov 9 16:23 1510273266537-6 [root@localhost data]# tail 1510273266537-2 #數據已經寫入 hello file-roll flume [root@localhost data]# tail 1510273266537-6 #即便沒有數據也會產生文件 注意:默認每隔30秒產生一個日誌文件,但時間不夠精準
模擬http請求

 

  

  Avro Sink

  是實現多級流動 扇出流(1到多) 扇入流(多到1) 的基礎。

 

channel – type – avro. hostname – The hostname or IP address to bind to. port – The port # to listen on. batch-size 100 number of event to batch together for send. connect-timeout 20000 Amount of time (ms) to allow for the first (handshake) request. request-timeout 20000 Amount of time (ms) to allow for requests after the first. 咱們要演示多級流動,就須要多個源,咱們在安裝兩臺服務器。 克隆兩臺新的虛擬機 flume0二、flume03
參數說明

  多級部署結構

 

修改內容: a1.sources.r1.type = http a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 22222 #描述Sink a1.sinks.k1.type = avro a1.sinks.k1.hostname = 192.168.163.130 a1.sinks.k1.port = 22222
flume01配置文件flume-avro-sink.properties
複製文件到其它主機 [root@localhost conf]# pwd /usr/local/src/flume/apache-flume-1.6.0-bin/conf [root@localhost conf]# scp flume-avro-sink.properties root@192.168.163.130:/usr/local/src/flume/apache-flume-1.6.0-bin/conf/flume-avro-sink.properties The authenticity of host '192.168.163.130 (192.168.163.130)' can't be established. RSA key fingerprint is 40:d6:4e:bd:3e:d0:90:3b:86:41:72:90:ec:dd:95:f9. Are you sure you want to continue connecting (yes/no)? yes Warning: Permanently added '192.168.163.130' (RSA) to the list of known hosts. flume-avro-sink.properties 100% 477 0.5KB/s 00:00 [root@localhost conf]# scp flume-avro-sink.properties root@192.168.163.131:/usr/local/src/flume/apache-flume-1.6.0-bin/conf/flume-avro-sink.properties The authenticity of host '192.168.163.131 (192.168.163.131)' can't be established. RSA key fingerprint is 40:d6:4e:bd:3e:d0:90:3b:86:41:72:90:ec:dd:95:f9. Are you sure you want to continue connecting (yes/no)? yes Warning: Permanently added '192.168.163.131' (RSA) to the list of known hosts. flume-avro-sink.properties 100% 477 0.5KB/s 00:00 [root@localhost conf]#
遠程拷貝
修改內容: a1.sources.r1.type = avro #內置類型 a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 22222 #描述Sink a1.sinks.k1.type = avro a1.sinks.k1.hostname = 192.168.163.131 a1.sinks.k1.port = 22222
flume-avro-sink.properties
修改內容: a1.sources.r1.type = avro #內置類型 a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 22222 #描述Sink a1.sinks.k1.type = logger
flume-avro-sink.properties
[root@localhost conf]# ../bin/flume-ng agent -c ./ -f ./flume-avro-sink.properties -n a1 -Dflume.root.logger=INFO,console
啓動各個flume服務器的Agent
在flume01節點上發送消息 curl -X POST -d '[{"headers":{"tester":"tony"},"body":"hello http flume"}]' http://0.0.0.0:22222 執行結果: 2017-11-09 18:58:33,863 (New I/O worker #1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0xdd9a2bfc, /192.168.163.130:34945 => /192.168.163.131:22222] BOUND: /192.168.163.131:22222 2017-11-09 18:58:33,863 (New I/O worker #1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0xdd9a2bfc, /192.168.163.130:34945 => /192.168.163.131:22222] CONNECTED: /192.168.163.130:34945 2017-11-09 19:00:28,463 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{tester=tony} body: 68 65 6C 6C 6F 20 6D 6F 72 65 20 61 76 72 6F 20 hello more avro }
模擬發HTTP請求
啓動節點是有前後順序,flume01要訪問192.168.163.130:22222,但flume02尚未啓動,因此報下列錯誤。 解決辦法:依次倒着啓動各個節點,先flume03,再flume02,再flume01。下面提示綁定成功。
常見錯誤

    

    flume01服務器接收http格式數據爲來源,輸出avro格式數據;flume02服務器接收avro格式數據爲來源,輸出avro格式數據;flume03服務器接收avro格式數據爲來源,輸出到log4j,打印結果到控制檯。

 

HDFS Sink

 

    HDFS分佈式海量數據的存儲和備份,

    HDFS Sink將事件寫入到HDFS中,支持建立文本文件和序列化文件,支持壓縮。

    這些文件能夠分區,按照指定的時間或數據量或事件的數量爲基礎。(如:多少條記錄放一個文件,若是每條日誌都放一個文件,那HDFS就會產生小文件的問題,未來處理的效率過低。能夠設置規則,何時文件發生滾動,造成新文件)。它還能夠經過時間戳或者機器屬性對數據進行buckets(分桶)/partitions(分區)操做。HDFS的目錄流程能夠包含將要由替換格式的轉移序列用於生成存儲事件的目錄/文件名。使用這個Sink要求hadoop必須依據安裝好,以便flume能夠經過hadoop提供的jar包與HDFS進行通訊。注意,此版本的hadoop必須支持sync()調用,這樣數據能夠追加到尾部。

 

 

#命名Agent a1的組件 a1.sources = r1 a1.sinks = k1 a1.channels = c1 #描述/配置Source a1.sources.r1.type = http a1.sources.r1.port = 22222 #描述Sink a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path = hdfs://hadoop01:9000/flume/data #描述內存Channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 #爲Channle綁定Source和Sink a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
配置文件flume-hdfs.properties
channel – type – 類型名稱,必須是「HDFS」 hdfs.path – HDFS 目錄路徑 (eg hdfs://namenode/flume/webdata/) hdfs.fileType SequenceFile File format: currently SequenceFile, DataStream or CompressedStream (1)DataStream will not compress output file and please don’t set codeC (2)CompressedStream requires set hdfs.codeC with an available codeC 默認是序列化文件,可選項:SequenceFile序列化文件/DataStream文本文件/CompressedStream 壓縮文件 hdfs.filePrefix FlumeData Flume在目錄下建立文件的名稱前綴 hdfs.fileSuffix – 追加到文件的名稱後綴 (eg .avro - 注: 日期時間不會自動添加) hdfs.inUsePrefix – Flume正在處理的文件所加的前綴 hdfs.inUseSuffix .tmp Flume正在處理的文件所加的後綴 hdfs.rollInterval 30 Number of seconds to wait before rolling current file (0 = never roll based on time interval) hdfs.rollSize 1024 File size to trigger roll, in bytes (0: never roll based on file size) hdfs.rollCount 10 Number of events written to file before it rolled (0 = never roll based on number of events) hdfs.idleTimeout 0 Timeout after which inactive files get closed (0 = disable automatic closing of idle files) hdfs.batchSize 100 number of events written to file before it is flushed to HDFS hdfs.codeC – Compression codec. one of following : gzip, bzip2, lzo, lzop, snappy hdfs.maxOpenFiles 5000 Allow only this number of open files. If this number is exceeded, the oldest file is closed. hdfs.minBlockReplicas – Specify minimum number of replicas per HDFS block. If not specified, it comes from the default Hadoop config in the classpath. hdfs.writeFormat – Format for sequence file records. One of 「Text」 or 「Writable」 (the default). hdfs.callTimeout 10000 Number of milliseconds allowed for HDFS operations, such as open, write, flush, close. This number should be increased if many HDFS timeout operations are occurring. hdfs.threadsPoolSize 10 Number of threads per HDFS sink for HDFS IO ops (open, write, etc.) hdfs.rollTimerPoolSize 1 Number of threads per HDFS sink for scheduling timed file rolling hdfs.kerberosPrincipal – Kerberos user principal for accessing secure HDFS hdfs.kerberosKeytab – Kerberos keytab for accessing secure HDFS hdfs.proxyUser hdfs.round false 時間戳是否向下取整(若是是true,會影響全部基於時間的轉移序列,除了%T) hdfs.roundValue 1 舍值的邊界值 hdfs.roundUnit 向下舍值的單位 - second, minute , hour hdfs.timeZone Local Time Name of the timezone that should be used for resolving the directory path, e.g. America/Los_Angeles. hdfs.useLocalTimeStamp false Use the local time (instead of the timestamp from the event header) while replacing the escape sequences. hdfs.closeTries 0 Number of times the sink must try renaming a file, after initiating a close attempt. If set to 1, this sink will not re-try a failed rename (due to, for example, NameNode or DataNode failure), and may leave the file in an open state with a .tmp extension. If set to 0, the sink will try to rename the file until the file is eventually renamed (there is no limit on the number of times it would try). The file may still remain open if the close call fails but the data will be intact and in this case, the file will be closed only after a Flume restart. hdfs.retryInterval 180 Time in seconds between consecutive attempts to close a file. Each close call costs multiple RPC round-trips to the Namenode, so setting this too low can cause a lot of load on the name node. If set to 0 or less, the sink will not attempt to close the file if the first attempt fails, and may leave the file open or with a 」.tmp」 extension. serializer TEXT Other possible options include avro_event or the fully-qualified class name of an implementation of the EventSerializer.Builder interface.
參數說明

 

   複製依賴jar文件

     /usr/local/src/hadoop/hadoop-2.7.1/share/hadoop/common/lib      全部的jar複製過去

    /usr/local/src/hadoop/hadoop-2.7.1/share/hadoop/common       3jar

    /usr/local/src/hadoop/hadoop-2.7.1/share/hadoop/hdfs        目錄 hadoop-hdfs-2.7.1.jar

 

 

[root@localhost conf]# ../bin/flume-ng agent -c ./ -f ./flume-hdfs.properties -n a1 -Dflume.root.logger=INFO,console 執行結果,飛速打印結果 模擬發HTTP請求 在flume01節點上發送消息 curl -X POST -d '[{"headers":{"tester":"tony"},"body":"hello http flume"}]' http://0.0.0.0:22222 執行結果: org.apache.flume.sink.hdfs.BucketWriter.open(BucketWriter.java:234)] Creating hdfs://hadoop01:9000/flume/data/FlumeData.1510560200492.tmp
啓動和模擬http請求

 

 

 

hadoop fs -put '/usr/local/src/hive/data/english.txt' /user/hive/warehouse/test.db/tb_book

相關文章
相關標籤/搜索