Flume Sink是將事件寫入到Hadoop分佈式文件系統(HDFS)中。主要是Flume在Hadoop環境中的應用,即Flume採集數據輸出到HDFS,適用大數據日誌場景。html
目前,它支持HDFS的文本和序列文件格式,以及支持兩個文件類型的壓縮。支持將所用的時間、數據大小、事件的數量爲操做參數,對HDFS文件進行關閉(關閉當前文件,並建立一個新的)。它還能夠對事源的機器名(hostname)及時間屬性分離數據,即經過時間戳將數據分佈到對應的文件路徑。 HDFS目錄路徑可能包含格式轉義序列用於取代由HDFS Sink生成一個目錄/文件名存儲的事件。node
注意:Hadoop的版本須要支持sync()方法調用,固然首先得按照Hadoop。mysql
下面是HDFS Sinks轉義符的支持目錄:git
Aliasweb |
Descriptionsql |
%{host}數據庫 |
Substitute value of event header named 「host」. Arbitrary header names are supported.apache |
%t緩存 |
Unix time in milliseconds安全 |
%a |
locale’s short weekday name (Mon, Tue, ...) |
%A |
locale’s full weekday name (Monday, Tuesday, ...) |
%b |
locale’s short month name (Jan, Feb, ...) |
%B |
locale’s long month name (January, February, ...) |
%c |
locale’s date and time (Thu Mar 3 23:05:25 2005) |
%d |
day of month (01) 每個月中的第幾天 |
%D |
date; same as %m/%d/%y |
%H |
hour (00..23) |
%I |
hour (01..12) |
%j |
day of year (001..366) 一年中的第幾天 |
%k |
hour ( 0..23) |
%m |
month (01..12) |
%M |
minute (00..59) |
%p |
locale’s equivalent of am or pm |
%s |
seconds since 1970-01-01 00:00:00 UTC |
%S |
second (00..59) |
%y |
last two digits of year (00..99) 年的後兩位 |
%Y |
year (2010) |
%z |
+hhmm numeric timezone (for example, -0400) |
下面是官網給出的HDFS Sinks的配置,加粗的參數是必選,可選項十分豐富,這裏就不一一列出來了
Name |
Default |
Description |
channel |
– |
|
type |
– |
The component type name, needs to be hdfs |
hdfs.path |
– |
HDFS directory path (eg hdfs://namenode/flume/webdata/) |
hdfs.filePrefix |
FlumeData |
Name prefixed to files created by Flume in hdfs directory 文件前綴 |
hdfs.fileType |
SequenceFile |
File format: currently SequenceFile, DataStream or CompressedStream |
hdfs.useLocalTimeStamp |
false |
Use the local time (instead of the timestamp from the event header) while replacing the escape sequences. |
hdfs.codeC |
– |
Compression codec. one of following : gzip, bzip2, lzo, lzop, snappy |
hdfs.round |
false |
Should the timestamp be rounded down (if true, affects all time based escape sequences except %t) 定時間用 |
hdfs.roundValue |
1 |
Rounded down to the highest multiple of this (in the unit configured using hdfs.roundUnit), less than current time.(須要hdfs.round爲true) |
hdfs.roundUnit |
second |
The unit of the round down value - second, minute or hour.(同上) |
下面是官網的例子,他的三個round*配置是將向下舍入到最後10分鐘的時間戳記錄。
假設如今是上午10時56分20秒等等,2014年10月24日的Flume Sinks的數據到輸出到HDFS的路徑爲/flume/events/2014-10-24/1050/00的。。
a1.channels=c1
a1.sinks=k1
a1.sinks.k1.type=hdfs
a1.sinks.k1.channel=c1
a1.sinks.k1.hdfs.path=/flume/events/%y-%m-%d/%H%M/%S
a1.sinks.k1.hdfs.filePrefix=events-
a1.sinks.k1.hdfs.round=true
a1.sinks.k1.hdfs.roundValue=10
a1.sinks.k1.hdfs.roundUnit=minute
下面是實際的例子:
[html] view plain copy
#配置文件:hdfs_case9.conf
#Name the components on this agent
a1.sources= r1
a1.sinks= k1
a1.channels= c1
#Describe/configure the source
a1.sources.r1.type= syslogtcp
a1.sources.r1.bind= 192.168.233.128
a1.sources.r1.port= 50000
a1.sources.r1.channels= c1
#Describe the sink
a1.sinks.k1.type= hdfs
a1.sinks.k1.channel= c1
a1.sinks.k1.hdfs.path= hdfs://carl:9000/flume/
a1.sinks.k1.hdfs.filePrefix= carl
a1.sinks.k1.hdfs.round= true
a1.sinks.k1.hdfs.roundValue= 1
a1.sinks.k1.hdfs.roundUnit= minute
a1.sinks.k1.hdfs.fileType=DataStream
# Usea channel which buffers events in memory
a1.channels.c1.type= memory
a1.channels.c1.capacity= 1000
a1.channels.c1.transactionCapacity= 100
這裏咱們偷懶拷了上節TCP的例子,而後加入sinks爲HDFS中。咱們設置數據是放入在HDFS的目錄爲hdfs://carl:9000/flume/,文件前綴爲carl,其中這裏有個設置要說明下:a1.sinks.k1.hdfs.fileType=DataStream,由於文件格式默認是 SequenceFile,若是直接打開是亂碼,這個不方便演示,所以咱們設置成普通數據格式。
#敲命令
flume-ng agent -cconf -f conf/hdfs_case9.conf -n a1 -Dflume.root.logger=INFO,console
啓動成功後
打開另外一個終端輸入,往偵聽端口送數據
echo "hello looklook7hello hdfs" | nc 192.168.233.128 50000
#在啓動的終端查看console輸出
這裏能夠看到他報了一個錯誤,說isfileclosed不可用。。。這個是這樣的,這邊的Hadoop是cdh3版本的,而flume ng 是說支持cdh4版本的,因此版本不匹配。不過這個無妨,下面看他們數據已經插入進去了,一開始生成一個hdfs://carl:9000/flume//carl.1414122459804.tmp,
而後數據進去了生成文件hdfs://carl:9000/flume/carl.1414122459804
那咱們看下數據文件,hdfs://carl:9000/flume/carl.1414122459804
咱們看到日誌文件的生成過程,最後數據已經進去了。
而後我對配置文件裏的這這個參數改下,參照官網的例子
a1.sinks.k1.hdfs.path= hdfs://carl:9000/flume/%y-%m-%d/%H%M/%S
而後加上這個參數
a1.sinks.k1.hdfs.useLocalTimeStamp=true
啓動
打開另外一個終端輸入,往偵聽端口送數據
echo "hello looklook7hello hdfs" | nc 192.168.233.128 50000
這裏若是不加上面的參數a1.sinks.k1.hdfs.useLocalTimeStamp=true,會須要向事件裏面明確header,不然會報錯,以下
數據成功發送後,會生成數據文件
數據目錄是/flume/14-10-24/1354/00
由於咱們設的參數是1分鐘a1.sinks.k1.hdfs.roundValue= 1 這個與官網講的一致
INFO級別的日誌事件。一般有用的測試/調試目的。以前的測試裏有些,下面就很少贅述
下面是官網配置
Property Name |
Default |
Description |
channel |
– |
|
type |
– |
The component type name, needs to be logger |
Avro Sink主要用於Flume分層結構。Flumeevent 發送給這個sink的事件都會轉換成Avro事件,發送到配置好的Avro主機和端口上。這些事件能夠批量傳輸給通道。
下面是官網配置,加粗爲必須,可選項太多就不一一列了
Property Name |
Default Description |
|
channel |
– |
|
type |
– |
The component type name, needs to be avro. |
hostname |
– |
The hostname or IP address to bind to. |
port |
– |
The port # to listen on. |
下面是官網例子
a1.channels=c1
a1.sinks=k1
a1.sinks.k1.type=avro
a1.sinks.k1.channel=c1
a1.sinks.k1.hostname=10.10.10.10
a1.sinks.k1.port=4545
由於Avro Sink主要用於Flume分層結構,那麼這邊都會想到咱們學習心得(二)關於集羣配置的列子就是關於Avro Sink與Avro Source的一個實例,其中pull.cof是關於Avro Source的例子,而push.conf 是Avro Sink的例子,具體內容你們能夠去第二節看,這裏不作贅述。
3、Avro Sink
Thrift也是用來支持Flume分層結構。Flumeevent 發送給這個sink的事件都會轉換成Thrift事件,發送到配置好的Thrift主機和端口上。這些事件能夠批量傳輸給通道。和Avro Sink如出一轍。這邊也就略過了。
IRC Sink 從通道中取得信息到IRCServer,這個沒有IRC Server。。。沒法測試,也略過吧。。。
存儲到本地存儲中。他有個滾動間隔的設置,設置多長時間去生成文件(默認是30秒)。
下面是官網配置
Property Name |
Default |
Description |
channel |
– |
|
type |
– |
The component type name, needs to be file_roll. |
sink.directory |
– |
The directory where files will be stored |
sink.rollInterval |
30 |
Roll the file every 30 seconds. Specifying 0 will disable rolling and cause all events to be written to a single file. |
sink.serializer |
TEXT |
Other possible options include avro_event or the FQCN of an implementation of EventSerializer.Builder interface. |
batchSize |
100 |
接下去是官網例子
a1.channels=c1
a1.sinks=k1
a1.sinks.k1.type=file_roll
a1.sinks.k1.channel=c1
a1.sinks.k1.sink.directory=/var/log/flume
下面是測試例子:
[html] view plain copy
#配置文件:fileroll_case10.conf
#Name the components on this agent
a1.sources= r1
a1.sinks= k1
a1.channels= c1
#Describe/configure the source
a1.sources.r1.type= syslogtcp
a1.sources.r1.port= 50000
a1.sources.r1.host= 192.168.233.128
a1.sources.r1.channels= c1
#Describe the sink
a1.sinks.k1.type= file_roll
a1.sinks.k1.channel= c1
a1.sinks.k1.sink.directory= /tmp/logs
# Usea channel which buffers events in memory
a1.channels.c1.type= memory
a1.channels.c1.capacity= 1000
a1.channels.c1.transactionCapacity= 100
#敲命令
flume-ng agent -cconf -f conf/fileroll_case10.conf -n a1 -Dflume.root.logger=INFO,console
啓動成功後
打開另外一個終端輸入,往偵聽端口送數據
echo "hello looklook5hello hdfs" | nc 192.168.233.128 50000
#在啓動的終端查看console輸出
能夠看到數據傳過來並生成文件,而後不管是否有數據傳過來,都會每過30秒就會生成文件。
丟棄從通道接收的全部事件。。。這邊就不測試了。。
下面是官網配置
Property Name |
Default |
Description |
channel |
– |
|
type |
– |
The component type name, needs to be null. |
batchSize |
100 |
下面是官網例子
a1.channels=c1
a1.sinks=k1
a1.sinks.k1.type=null
a1.sinks.k1.channel=c1
HBaseSinks負責將數據寫入到Hbase中。Hbase的配置信息從classpath路徑裏面遇到的第一個hbase-site.xml文件中獲取。在配置文件中指定的實現了HbaseEventSerializer 接口的類,用於將事件轉換成Hbase所表示的事件或者增量。而後將這些事件和增量寫入Hbase中。
Hbase Sink支持寫數據到安全的Hbase。爲了將數據寫入安全的Hbase,用戶代理運行必須對配置的table表有寫權限。主要用來驗證對KDC的密鑰表能夠在配置中指定。在Flume Agent的classpath路徑下的Hbase-site.xml文件必須設置到Kerberos認證。
注意有必定很重要,就是這個sinks 對格式的規範要求很是高。
至於 AsyncHBaseSink則是異步的HBaseSinks。
這邊沒有HBase環境,所以也就不演示了。。
一個自定義 Sinks實際上是對Sinks接口的實現。當咱們開始flume代理的時候必須將自定義Sinks和相依賴的jar包放到代理的classpath下面。自定義 Sinks的type就是咱們實現Sinks接口對應的類全路徑。
這裏後面的內容裏會詳細介紹,這裏不作贅述。
Source經過通道添加事件,Sinks經過通道取事件。因此通道相似緩存的存在。
Memory Channel是事件存儲在一個內存隊列中。速度快,吞吐量大。但會有代理出現故障後數據丟失的狀況。
下面是官網配置
Property Name |
Default |
Description |
type |
– |
The component type name, needs to be memory |
capacity |
100 |
The maximum number of events stored in the channel |
transactionCapacity |
100 |
The maximum number of events the channel will take from a source or give to a sink per transaction |
keep-alive |
3 |
Timeout in seconds for adding or removing an event |
byteCapacityBufferPercentage |
20 |
Defines the percent of buffer between byteCapacity and the estimated total size of all events in the channel, to account for data in headers. See below. |
byteCapacity |
see description |
Maximum total bytes of memory allowed as a sum of all events in this channel. The implementation only counts the Event body, which is the reason for providing thebyteCapacityBufferPercentage configuration parameter as well. Defaults to a computed value equal to 80% of the maximum memory available to the JVM (i.e. 80% of the -Xmx value passed on the command line). Note that if you have multiple memory channels on a single JVM, and they happen to hold the same physical events (i.e. if you are using a replicating channel selector from a single source) then those event sizes may be double-counted for channel byteCapacity purposes. Setting this value to 0 will cause this value to fall back to a hard internal limit of about 200 GB. |
以及官網例子
a1.channels=c1
a1.channels.c1.type=memory
a1.channels.c1.capacity=10000
a1.channels.c1.transactionCapacity=10000
a1.channels.c1.byteCapacityBufferPercentage=20
a1.channels.c1.byteCapacity=800000
以前的例子所有是Memory Channel。關於Channel的列子很差演示,後面就不會有例子了。
JDBC Channel是把事件存儲在數據庫。目前的JDBC Channel支持嵌入式Derby。主要是爲了數據持久化,而且可恢復的特性。
Property Name |
Default |
Description |
type |
– |
The component type name, needs to be jdbc |
db.type |
DERBY |
Database vendor, needs to be DERBY. |
driver.class |
org.apache.derby.jdbc.EmbeddedDriver |
Class for vendor’s JDBC driver |
driver.url |
(constructed from other properties) |
JDBC connection URL |
db.username |
「sa」 |
User id for db connection |
db.password |
– |
password for db connection |
下面是官網例子:
a1.channels=c1
a1.channels.c1.type=jdbc
注意默認狀況下,File Channel使用檢查點(checkpointDir)和在用戶目錄(dataDirs)上指定的數據目錄。因此在一個agent下面啓動多個File Channel實例,只會有一個File channel能鎖住文件目錄,其餘的都將初始化失敗。所以,有必要提供明確的路徑的全部已配置的通道,同時考慮最大吞吐率,檢查點與數據目錄最好是在不一樣的磁盤上。
Property Name Default |
Description |
|
type |
– |
The component type name, needs to be file. |
checkpointDir |
~/.flume/file-channel/checkpoint |
The directory where checkp |
dataDirs |
~/.flume/file-channel/data |
Comma separated list of directories for storing log files. Using multiple directories on separate disks can improve file channel peformance |
下面是官網例子
a1.channels=c1
a1.channels.c1.type=file
a1.channels.c1.checkpointDir=/mnt/flume/checkpoint
a1.channels.c1.dataDirs=/mnt/flume/data
File Channel 加密官網也給出了相應的配置
Generating a key with a password seperate from the key store password:
keytool -genseckey -alias key-0 -keypasskeyPassword -keyalg AES\
-keysize 128 -validity 9000 -keystore test.keystore\
-storetype jceks -storepass keyStorePassword
Generating a key with the password the same as the key store password:
keytool -genseckey -alias key-1 -keyalgAES -keysize 128 -validity 9000\
-keystore src/test/resources/test.keystore -storetype jceks\
-storepass keyStorePassword
a1.channels.c1.encryption.activeKey=key-0
a1.channels.c1.encryption.cipherProvider=AESCTRNOPADDING
a1.channels.c1.encryption.keyProvider=key-provider-0
a1.channels.c1.encryption.keyProvider=JCEKSFILE
a1.channels.c1.encryption.keyProvider.keyStoreFile=/path/to/my.keystore
a1.channels.c1.encryption.keyProvider.keyStorePasswordFile=/path/to/my.keystore.password
a1.channels.c1.encryption.keyProvider.keys=key-0
Let’s say you have aged key-0 out and new files should be encrypted withkey-1:
a1.channels.c1.encryption.activeKey=key-1
a1.channels.c1.encryption.cipherProvider=AESCTRNOPADDING
a1.channels.c1.encryption.keyProvider=JCEKSFILE
a1.channels.c1.encryption.keyProvider.keyStoreFile=/path/to/my.keystore
a1.channels.c1.encryption.keyProvider.keyStorePasswordFile=/path/to/my.keystore.password
a1.channels.c1.encryption.keyProvider.keys=key-0 key-1
The same scenerio as above, however key-0 has its own password:
a1.channels.c1.encryption.activeKey=key-1
a1.channels.c1.encryption.cipherProvider=AESCTRNOPADDING
a1.channels.c1.encryption.keyProvider=JCEKSFILE
a1.channels.c1.encryption.keyProvider.keyStoreFile=/path/to/my.keystore
a1.channels.c1.encryption.keyProvider.keyStorePasswordFile=/path/to/my.keystore.password
a1.channels.c1.encryption.keyProvider.keys=key-0 key-1
a1.channels.c1.encryption.keyProvider.keys.key-0.passwordFile=/path/to/key-0.password
前者還在試驗階段。。後者僅僅用來測試目的,不是在生產環境中使用,因此略過。
Custom Channel是對channel接口的實現。須要在classpath中引入實現類和相關的jar文件。這Channel對應的type是該類的完整路徑
下面是官網配置
Property Name |
Default |
Description |
type |
– |
The component type name, needs to be a FQCN |
後面是官網例子
a1.channels=c1
a1.channels.c1.type=org.example.MyChannel