Flume NG 學習筆記(五)Sinks和Channel配置

1、HDFS Sink

Flume Sink是將事件寫入到Hadoop分佈式文件系統(HDFS)中。主要是Flume在Hadoop環境中的應用,即Flume採集數據輸出到HDFS,適用大數據日誌場景。html

目前,它支持HDFS的文本和序列文件格式,以及支持兩個文件類型的壓縮。支持將所用的時間、數據大小、事件的數量爲操做參數,對HDFS文件進行關閉(關閉當前文件,並建立一個新的)。它還能夠對事源的機器名(hostname)及時間屬性分離數據,即經過時間戳將數據分佈到對應的文件路徑。 HDFS目錄路徑可能包含格式轉義序列用於取代由HDFS Sink生成一個目錄/文件名存儲的事件。node

注意:Hadoop的版本須要支持sync()方法調用,固然首先得按照Hadoop。mysql

下面是HDFS  Sinks轉義符的支持目錄:git

Aliasweb

Descriptionsql

%{host}數據庫

Substitute value of event header named 「host」. Arbitrary header names are supported.apache

%t緩存

Unix time in milliseconds安全

%a

locale’s short weekday name (Mon, Tue, ...)

%A

locale’s full weekday name (Monday, Tuesday, ...)

%b

locale’s short month name (Jan, Feb, ...)

%B

locale’s long month name (January, February, ...)

%c

locale’s date and time (Thu Mar 3 23:05:25 2005)

%d

day of month (01) 每個月中的第幾天

%D

date; same as %m/%d/%y

%H

hour (00..23)

%I

hour (01..12)

%j

day of year (001..366) 一年中的第幾天

%k

hour ( 0..23)

%m

month (01..12)

%M

minute (00..59)

%p

locale’s equivalent of am or pm

%s

seconds since 1970-01-01 00:00:00 UTC

%S

second (00..59)

%y

last two digits of year (00..99)  年的後兩位

%Y

year (2010)

%z

+hhmm numeric timezone (for example, -0400)

 

下面是官網給出的HDFS  Sinks的配置,加粗的參數是必選,可選項十分豐富,這裏就不一一列出來了

Name

Default

Description

channel


type

The component type name, needs to be hdfs

hdfs.path

HDFS directory path (eg hdfs://namenode/flume/webdata/)

hdfs.filePrefix

FlumeData

Name prefixed to files created by Flume in hdfs directory 文件前綴

hdfs.fileType

SequenceFile

File format: currently SequenceFileDataStream or CompressedStream

hdfs.useLocalTimeStamp

false

Use the local time (instead of the timestamp from the event header) while replacing the escape sequences.

hdfs.codeC

Compression codec. one of following : gzip, bzip2, lzo, lzop, snappy

hdfs.round

false

Should the timestamp be rounded down (if true, affects all time based escape sequences except %t) 定時間用

hdfs.roundValue

1

Rounded down to the highest multiple of this (in the unit configured using hdfs.roundUnit), less than current time.(須要hdfs.round爲true)

hdfs.roundUnit

second

The unit of the round down value - second, minute or hour.(同上)

 

下面是官網的例子,他的三個round*配置是將向下舍入到最後10分鐘的時間戳記錄。

假設如今是上午10時56分20秒等等,2014年10月24日的Flume Sinks的數據到輸出到HDFS的路徑爲/flume/events/2014-10-24/1050/00的。。

a1.channels=c1

a1.sinks=k1

a1.sinks.k1.type=hdfs

a1.sinks.k1.channel=c1

a1.sinks.k1.hdfs.path=/flume/events/%y-%m-%d/%H%M/%S

a1.sinks.k1.hdfs.filePrefix=events-

a1.sinks.k1.hdfs.round=true

a1.sinks.k1.hdfs.roundValue=10

a1.sinks.k1.hdfs.roundUnit=minute

下面是實際的例子:


[html] view plain copy

  1. #配置文件:hdfs_case9.conf  

  2. #Name the components on this agent  

  3. a1.sourcesr1  

  4. a1.sinksk1  

  5. a1.channelsc1  

  6.    

  7. #Describe/configure the source  

  8. a1.sources.r1.typesyslogtcp  

  9. a1.sources.r1.bind192.168.233.128  

  10. a1.sources.r1.port50000  

  11. a1.sources.r1.channelsc1  

  12.    

  13. #Describe the sink  

  14. a1.sinks.k1.typehdfs  

  15. a1.sinks.k1.channelc1  

  16. a1.sinks.k1.hdfs.pathhdfs://carl:9000/flume/  

  17. a1.sinks.k1.hdfs.filePrefixcarl  

  18. a1.sinks.k1.hdfs.roundtrue  

  19. a1.sinks.k1.hdfs.roundValue1  

  20. a1.sinks.k1.hdfs.roundUnitminute  

  21. a1.sinks.k1.hdfs.fileType=DataStream  

  22.    

  23. # Usea channel which buffers events in memory  

  24. a1.channels.c1.typememory  

  25. a1.channels.c1.capacity1000  

  26. a1.channels.c1.transactionCapacity100  




這裏咱們偷懶拷了上節TCP的例子,而後加入sinks爲HDFS中。咱們設置數據是放入在HDFS的目錄爲hdfs://carl:9000/flume/,文件前綴爲carl,其中這裏有個設置要說明下:a1.sinks.k1.hdfs.fileType=DataStream,由於文件格式默認是 SequenceFile,若是直接打開是亂碼,這個不方便演示,所以咱們設置成普通數據格式。

#敲命令

flume-ng agent -cconf -f conf/hdfs_case9.conf -n a1 -Dflume.root.logger=INFO,console

啓動成功後

打開另外一個終端輸入,往偵聽端口送數據

echo "hello looklook7hello hdfs" | nc 192.168.233.128 50000

#在啓動的終端查看console輸出


這裏能夠看到他報了一個錯誤,說isfileclosed不可用。。。這個是這樣的,這邊的Hadoop是cdh3版本的,而flume ng 是說支持cdh4版本的,因此版本不匹配。不過這個無妨,下面看他們數據已經插入進去了,一開始生成一個hdfs://carl:9000/flume//carl.1414122459804.tmp,

而後數據進去了生成文件hdfs://carl:9000/flume/carl.1414122459804

那咱們看下數據文件,hdfs://carl:9000/flume/carl.1414122459804


咱們看到日誌文件的生成過程,最後數據已經進去了。

 

而後我對配置文件裏的這這個參數改下,參照官網的例子

a1.sinks.k1.hdfs.path= hdfs://carl:9000/flume/%y-%m-%d/%H%M/%S

而後加上這個參數

a1.sinks.k1.hdfs.useLocalTimeStamp=true

啓動

打開另外一個終端輸入,往偵聽端口送數據

echo "hello looklook7hello hdfs" | nc 192.168.233.128 50000

 

這裏若是不加上面的參數a1.sinks.k1.hdfs.useLocalTimeStamp=true,會須要向事件裏面明確header,不然會報錯,以下


數據成功發送後,會生成數據文件


數據目錄是/flume/14-10-24/1354/00

由於咱們設的參數是1分鐘a1.sinks.k1.hdfs.roundValue= 1 這個與官網講的一致

 

2、Logger Sink

INFO級別的日誌事件。一般有用的測試/調試目的。以前的測試裏有些,下面就很少贅述

下面是官網配置

Property Name

Default

Description

channel


type

The component type name, needs to be logger

 

3、Avro Sink

Avro Sink主要用於Flume分層結構。Flumeevent 發送給這個sink的事件都會轉換成Avro事件,發送到配置好的Avro主機和端口上。這些事件能夠批量傳輸給通道。

下面是官網配置,加粗爲必須,可選項太多就不一一列了

Property Name

Default Description


channel


type

The component type name, needs to be avro.

hostname

The hostname or IP address to bind to.

port

The port # to listen on.

下面是官網例子

a1.channels=c1

a1.sinks=k1

a1.sinks.k1.type=avro

a1.sinks.k1.channel=c1

a1.sinks.k1.hostname=10.10.10.10

a1.sinks.k1.port=4545

由於Avro Sink主要用於Flume分層結構,那麼這邊都會想到咱們學習心得(二)關於集羣配置的列子就是關於Avro Sink與Avro Source的一個實例,其中pull.cof是關於Avro Source的例子,而push.conf 是Avro Sink的例子,具體內容你們能夠去第二節看,這裏不作贅述。

 

3、Avro Sink

Thrift也是用來支持Flume分層結構。Flumeevent 發送給這個sink的事件都會轉換成Thrift事件,發送到配置好的Thrift主機和端口上。這些事件能夠批量傳輸給通道。和Avro Sink如出一轍。這邊也就略過了。

 

4、IRC Sink

IRC Sink 從通道中取得信息到IRCServer,這個沒有IRC Server。。。沒法測試,也略過吧。。。

 

5、File RollSink

存儲到本地存儲中。他有個滾動間隔的設置,設置多長時間去生成文件(默認是30秒)。

下面是官網配置

Property Name

Default

Description

channel


type

The component type name, needs to be file_roll.

sink.directory

The directory where files will be stored

sink.rollInterval

30

Roll the file every 30 seconds. Specifying 0 will disable rolling and cause all events to be written to a single file.

sink.serializer

TEXT

Other possible options include avro_event or the FQCN of an implementation of EventSerializer.Builder interface.

batchSize

100


接下去是官網例子

a1.channels=c1

a1.sinks=k1

a1.sinks.k1.type=file_roll

a1.sinks.k1.channel=c1

a1.sinks.k1.sink.directory=/var/log/flume

下面是測試例子:


[html] view plain copy

  1. #配置文件:fileroll_case10.conf  

  2. #Name the components on this agent  

  3. a1.sourcesr1  

  4. a1.sinksk1  

  5. a1.channelsc1  

  6.    

  7. #Describe/configure the source  

  8. a1.sources.r1.typesyslogtcp  

  9. a1.sources.r1.port50000  

  10. a1.sources.r1.host192.168.233.128  

  11. a1.sources.r1.channelsc1  

  12.    

  13. #Describe the sink  

  14. a1.sinks.k1.typefile_roll  

  15. a1.sinks.k1.channelc1  

  16. a1.sinks.k1.sink.directory= /tmp/logs  

  17.    

  18. # Usea channel which buffers events in memory  

  19. a1.channels.c1.typememory  

  20. a1.channels.c1.capacity1000  

  21. a1.channels.c1.transactionCapacity100  




#敲命令

flume-ng agent -cconf -f conf/fileroll_case10.conf -n a1 -Dflume.root.logger=INFO,console

啓動成功後

打開另外一個終端輸入,往偵聽端口送數據

echo "hello looklook5hello hdfs" | nc 192.168.233.128 50000

#在啓動的終端查看console輸出


能夠看到數據傳過來並生成文件,而後不管是否有數據傳過來,都會每過30秒就會生成文件。

 

6、Null Sink

丟棄從通道接收的全部事件。。。這邊就不測試了。。

下面是官網配置

Property Name

Default

Description

channel


type

The component type name, needs to be null.

batchSize

100


下面是官網例子

a1.channels=c1

a1.sinks=k1

a1.sinks.k1.type=null

a1.sinks.k1.channel=c1

 

7、HBaseSinks與AsyncHBaseSink

HBaseSinks負責將數據寫入到Hbase中。Hbase的配置信息從classpath路徑裏面遇到的第一個hbase-site.xml文件中獲取。在配置文件中指定的實現了HbaseEventSerializer 接口的類,用於將事件轉換成Hbase所表示的事件或者增量。而後將這些事件和增量寫入Hbase中。

Hbase Sink支持寫數據到安全的Hbase。爲了將數據寫入安全的Hbase,用戶代理運行必須對配置的table表有寫權限。主要用來驗證對KDC的密鑰表能夠在配置中指定。在Flume Agent的classpath路徑下的Hbase-site.xml文件必須設置到Kerberos認證。

注意有必定很重要,就是這個sinks 對格式的規範要求很是高。

至於 AsyncHBaseSink則是異步的HBaseSinks。

 

這邊沒有HBase環境,所以也就不演示了。。

8、Custom Sink

一個自定義 Sinks實際上是對Sinks接口的實現。當咱們開始flume代理的時候必須將自定義Sinks和相依賴的jar包放到代理的classpath下面。自定義 Sinkstype就是咱們實現Sinks接口對應的類全路徑。

這裏後面的內容裏會詳細介紹,這裏不作贅述。

 

9、MemoryChannel

Source經過通道添加事件,Sinks經過通道取事件。因此通道相似緩存的存在。

Memory Channel是事件存儲在一個內存隊列中。速度快,吞吐量大。但會有代理出現故障後數據丟失的狀況。

下面是官網配置

Property Name

Default

Description

type

The component type name, needs to be memory

capacity

100

The maximum number of events stored in the channel

transactionCapacity

100

The maximum number of events the channel will take from a source or give to a sink per transaction

keep-alive

3

Timeout in seconds for adding or removing an event

byteCapacityBufferPercentage

20

Defines the percent of buffer between byteCapacity and the estimated total size of all events in the channel, to account for data in headers. See below.

byteCapacity

see description

Maximum total bytes of memory allowed as a sum of all events in this channel. The implementation only counts the Event body, which is the reason for providing thebyteCapacityBufferPercentage configuration parameter as well. Defaults to a computed value equal to 80% of the maximum memory available to the JVM (i.e. 80% of the -Xmx value passed on the command line). Note that if you have multiple memory channels on a single JVM, and they happen to hold the same physical events (i.e. if you are using a replicating channel selector from a single source) then those event sizes may be double-counted for channel byteCapacity purposes. Setting this value to 0 will cause this value to fall back to a hard internal limit of about 200 GB.

以及官網例子

a1.channels=c1

a1.channels.c1.type=memory

a1.channels.c1.capacity=10000

a1.channels.c1.transactionCapacity=10000

a1.channels.c1.byteCapacityBufferPercentage=20

a1.channels.c1.byteCapacity=800000

以前的例子所有是Memory Channel。關於Channel的列子很差演示,後面就不會有例子了。

 

10、JDBCChannel

JDBC Channel是把事件存儲在數據庫。目前的JDBC Channel支持嵌入式Derby。主要是爲了數據持久化,而且可恢復的特性。

Property Name

Default

Description

type

The component type name, needs to be jdbc

db.type

DERBY

Database vendor, needs to be DERBY.

driver.class

org.apache.derby.jdbc.EmbeddedDriver

Class for vendor’s JDBC driver

driver.url

(constructed from other properties)

JDBC connection URL

db.username

「sa」

User id for db connection

db.password

password for db connection

下面是官網例子:

a1.channels=c1

a1.channels.c1.type=jdbc

 

11、FileChannel

注意默認狀況下,File Channel使用檢查點(checkpointDir)和在用戶目錄(dataDirs)上指定的數據目錄。因此在一個agent下面啓動多個File Channel實例,只會有一個File channel能鎖住文件目錄,其餘的都將初始化失敗。所以,有必要提供明確的路徑的全部已配置的通道,同時考慮最大吞吐率,檢查點與數據目錄最好是在不一樣的磁盤上。

Property Name Default

Description


type

The component type name, needs to be file.

checkpointDir

~/.flume/file-channel/checkpoint

The directory where checkp

dataDirs

~/.flume/file-channel/data

Comma separated list of directories for storing log files. Using multiple directories on separate disks can improve file channel peformance

下面是官網例子

a1.channels=c1

a1.channels.c1.type=file

a1.channels.c1.checkpointDir=/mnt/flume/checkpoint

a1.channels.c1.dataDirs=/mnt/flume/data

File Channel 加密官網也給出了相應的配置

Generating a key with a password seperate from the key store password:

keytool -genseckey -alias key-0 -keypasskeyPassword -keyalg AES\

 -keysize 128 -validity 9000 -keystore test.keystore\

 -storetype jceks -storepass keyStorePassword

Generating a key with the password the same as the key store password:

keytool -genseckey -alias key-1 -keyalgAES -keysize 128 -validity 9000\

 -keystore src/test/resources/test.keystore -storetype jceks\

 -storepass keyStorePassword

a1.channels.c1.encryption.activeKey=key-0

a1.channels.c1.encryption.cipherProvider=AESCTRNOPADDING

a1.channels.c1.encryption.keyProvider=key-provider-0

a1.channels.c1.encryption.keyProvider=JCEKSFILE

a1.channels.c1.encryption.keyProvider.keyStoreFile=/path/to/my.keystore

a1.channels.c1.encryption.keyProvider.keyStorePasswordFile=/path/to/my.keystore.password

a1.channels.c1.encryption.keyProvider.keys=key-0

Let’s say you have aged key-0 out and new files should be encrypted withkey-1:

a1.channels.c1.encryption.activeKey=key-1

a1.channels.c1.encryption.cipherProvider=AESCTRNOPADDING

a1.channels.c1.encryption.keyProvider=JCEKSFILE

a1.channels.c1.encryption.keyProvider.keyStoreFile=/path/to/my.keystore

a1.channels.c1.encryption.keyProvider.keyStorePasswordFile=/path/to/my.keystore.password

a1.channels.c1.encryption.keyProvider.keys=key-0 key-1

The same scenerio as above, however key-0 has its own password:

a1.channels.c1.encryption.activeKey=key-1

a1.channels.c1.encryption.cipherProvider=AESCTRNOPADDING

a1.channels.c1.encryption.keyProvider=JCEKSFILE

a1.channels.c1.encryption.keyProvider.keyStoreFile=/path/to/my.keystore

a1.channels.c1.encryption.keyProvider.keyStorePasswordFile=/path/to/my.keystore.password

a1.channels.c1.encryption.keyProvider.keys=key-0 key-1

a1.channels.c1.encryption.keyProvider.keys.key-0.passwordFile=/path/to/key-0.password

 

12、Spillable Memory Channel 與Pseudo Transaction Channel

前者還在試驗階段。。後者僅僅用來測試目的,不是在生產環境中使用,因此略過。

 

十3、CustomChannel

Custom Channel是對channel接口的實現。須要在classpath中引入實現類和相關的jar文件。這Channel對應的type是該類的完整路徑

下面是官網配置

Property Name

Default

Description

type

The component type name, needs to be a FQCN

後面是官網例子

a1.channels=c1

a1.channels.c1.type=org.example.MyChannel

相關文章
相關標籤/搜索