【翻譯】Flume 1.8.0 User Guide(用戶指南) Sink

翻譯自官網flume1.8用戶指南,原文地址:Flume 1.8.0 User Guidehtml

篇幅限制,分爲如下5篇:java

【翻譯】Flume 1.8.0 User Guide(用戶指南)node

【翻譯】Flume 1.8.0 User Guide(用戶指南) sourcegit

【翻譯】Flume 1.8.0 User Guide(用戶指南) Sinkweb

【翻譯】Flume 1.8.0 User Guide(用戶指南) Channelapache

【翻譯】Flume 1.8.0 User Guide(用戶指南) Processorsjson

Flume Sinks

一、HDFS Sink 

這個sink 將事件寫入Hadoop分佈式文件系統(HDFS)。它目前支持建立文本和序列文件。它支持兩種文件類型的壓縮。能夠根據運行時間、數據大小或事件數量按期滾動文件(關閉當前文件並建立一個新文件)。它還經過屬性(如時間戳或事件起源的機器)存儲/分區數據。HDFS目錄路徑可能包含格式化轉義序列,該序列將被HDFS sink替換,以生成用於存儲事件的目錄/文件名。使用這個sink須要安裝hadoop,這樣Flume就可使用hadoop jar與HDFS集羣通訊。請注意,須要一個支持sync()調用的Hadoop版本。bootstrap

如下是支持的轉義序列:安全

Alias Description
%{host} Substitute value of event header named 「host」. Arbitrary header names are supported.
%t Unix time in milliseconds
%a locale’s short weekday name (Mon, Tue, ...)
%A locale’s full weekday name (Monday, Tuesday, ...)
%b locale’s short month name (Jan, Feb, ...)
%B locale’s long month name (January, February, ...)
%c locale’s date and time (Thu Mar 3 23:05:25 2005)
%d day of month (01)
%e day of month without padding (1)
%D date; same as %m/%d/%y
%H hour (00..23)
%I hour (01..12)
%j day of year (001..366)
%k hour ( 0..23)
%m month (01..12)
%n month without padding (1..12)
%M minute (00..59)
%p locale’s equivalent of am or pm
%s seconds since 1970-01-01 00:00:00 UTC
%S second (00..59)
%y last two digits of year (00..99)
%Y year (2010)
%z +hhmm numeric timezone (for example, -0400)
%[localhost] Substitute the hostname of the host where the agent is running
%[IP] Substitute the IP address of the host where the agent is running
%[FQDN] Substitute the canonical hostname of the host where the agent is running

注意:轉義字符串%[localhost]、%[IP]和%[FQDN]都依賴於Java獲取主機名的能力,這在某些網絡環境中可能會失敗。 服務器

正在使用的文件的名稱將被打亂最後是」.tmp「。一旦文件被關閉,這個擴展名將被刪除。這容許在目錄中排除部分完成的文件。必須屬性以粗體顯示。

注意,對於全部與時間相關的轉義序列,帶有鍵「timestamp」的消息頭必須存在於事件的消息頭中(除非是hdfs)。useLocalTimeStamp設置爲true)。自動添加的一種方法是使用TimestampInterceptor。

 

Name Default Description
channel  
type The component type name, needs to be hdfs
hdfs.path HDFS directory path (eg hdfs://namenode/flume/webdata/)
hdfs.filePrefix FlumeData Name prefixed to files created by Flume in hdfs directory
hdfs.fileSuffix Suffix to append to file (eg .avro - NOTE: period is not automatically added)
hdfs.inUsePrefix Prefix that is used for temporal files that flume actively writes into
hdfs.inUseSuffix .tmp Suffix that is used for temporal files that flume actively writes into
hdfs.rollInterval 30 Number of seconds to wait before rolling current file (0 = never roll based on time interval)
hdfs.rollSize 1024 File size to trigger roll, in bytes (0: never roll based on file size)
hdfs.rollCount 10 Number of events written to file before it rolled (0 = never roll based on number of events)
hdfs.idleTimeout 0 Timeout after which inactive files get closed (0 = disable automatic closing of idle files)
hdfs.batchSize 100 number of events written to file before it is flushed to HDFS
hdfs.codeC Compression codec. one of following : gzip, bzip2, lzo, lzop, snappy
hdfs.fileType SequenceFile

File format: currently SequenceFileDataStream or CompressedStream (1)DataStream

will not compress output file and please don’t set codeC (2)CompressedStream requires set hdfs.codeC with an available codeC

hdfs.maxOpenFiles 5000 Allow only this number of open files. If this number is exceeded, the oldest file is closed.
hdfs.minBlockReplicas

Specify minimum number of replicas per HDFS block. If not specified, it comes from the

default Hadoop config in the classpath.

hdfs.writeFormat Writable

Format for sequence file records. One of Text or Writable. Set to Text before creating data files with Flume,

otherwise those files cannot be read by either Apache Impala (incubating) or Apache Hive.

hdfs.callTimeout 10000

Number of milliseconds allowed for HDFS operations, such as open, write, flush, close.

This number should be increased if many HDFS timeout operations are occurring.

hdfs.threadsPoolSize 10 Number of threads per HDFS sink for HDFS IO ops (open, write, etc.)
hdfs.rollTimerPoolSize 1 Number of threads per HDFS sink for scheduling timed file rolling
hdfs.kerberosPrincipal Kerberos user principal for accessing secure HDFS
hdfs.kerberosKeytab Kerberos keytab for accessing secure HDFS
hdfs.proxyUser    
hdfs.round false Should the timestamp be rounded down (if true, affects all time based escape sequences except %t)
hdfs.roundValue 1 Rounded down to the highest multiple of this (in the unit configured using hdfs.roundUnit), less than current time.
hdfs.roundUnit second The unit of the round down value - secondminute or hour.
hdfs.timeZone Local Time Name of the timezone that should be used for resolving the directory path, e.g. America/Los_Angeles.
hdfs.useLocalTimeStamp false Use the local time (instead of the timestamp from the event header) while replacing the escape sequences.
hdfs.closeTries 0

Number of times the sink must try renaming a file, after initiating a close attempt. If set to 1,

this sink will not re-try a failed rename (due to, for example, NameNode or DataNode failure),

and may leave the file in an open state with a .tmp extension. If set to 0, the sink will try to rename the file until

the file is eventually renamed (there is no limit on the number of times it would try). The file may still remain open

if the close call fails but the data will be intact and in this case, the file will be closed only after a Flume restart.

hdfs.retryInterval 180

Time in seconds between consecutive attempts to close a file. Each close call costs multiple RPC round-trips to the Namenode,

so setting this too low can cause a lot of load on the name node. If set to 0 or less,

the sink will not attempt to close the file if the first attempt fails, and may leave the file open or with a 」.tmp」 extension.

serializer TEXT

Other possible options include avro_event or the fully-qualified class name of an implementation of

theEventSerializer.Builder interface.

serializer.*    

agent a1 示例:

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/%S
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute

上面的配置將把時間戳四捨五入到最後10分鐘。例如,時間戳爲2012年6月12日上午11:54:34的事件將致使hdfs路徑變爲/flume/events/2012-06-12/1150/00。

 2. hive sink

此接收器將包含分隔文本或JSON數據的事件直接匯入Hive表或分區。事件是使用Hive事務編寫的。一旦一組事件提交給Hive,它們就會當即對Hive查詢可見。能夠預先建立flume要寫到的分區,也能夠選擇,分區不存在的時候,由flume建立分區。來自傳入事件數據的字段映射到Hive表中的相應列。

Name Default Description
channel  
type The component type name, needs to be hive
hive.metastore Hive metastore URI (eg thrift://a.b.com:9083 )
hive.database Hive database name
hive.table Hive table name
hive.partition

Comma separate list of partition values identifying the partition to write to. May contain escape sequences.

E.g: If the table is partitioned by (continent: string, country :string, time : string) then ‘Asia,India,2014-02-26-01-21’

will indicate continent=Asia,country=India,time=2014-02-26-01-21

hive.txnsPerBatchAsk 100

Hive grants a batch of transactions instead of single transactions to streaming clients like Flume.

This setting configures the number of desired transactions per Transaction Batch.

Data from all transactions in a single batch end up in a single file. Flume will write a maximum of

batchSize events in each transaction in the batch.

This setting in conjunction with batchSize provides control over the size of each file.

Note that eventually Hive will transparently compact these files into larger files.

heartBeatInterval 240

(In seconds) Interval between consecutive heartbeats sent to Hive to keep unused transactions from expiring.

Set this value to 0 to disable heartbeats.

autoCreatePartitions true Flume will automatically create the necessary Hive partitions to stream to
batchSize 15000 Max number of events written to Hive in a single Hive transaction
maxOpenConnections 500 Allow only this number of open connections. If this number is exceeded, the least recently used connection is closed.
callTimeout 10000 (In milliseconds) Timeout for Hive & HDFS I/O operations, such as openTxn, write, commit, abort.
serializer  

Serializer is responsible for parsing out field from the event and mapping them to columns in the hive table.

Choice of serializer depends upon the format of the data in the event. Supported serializers: DELIMITED and JSON

roundUnit minute The unit of the round down value - secondminute or hour.
roundValue 1 Rounded down to the highest multiple of this (in the unit configured using hive.roundUnit), less than current time
timeZone Local Time Name of the timezone that should be used for resolving the escape sequences in partition, e.g. America/Los_Angeles.
useLocalTimeStamp false Use the local time (instead of the timestamp from the event header) while replacing the escape sequences.

Hive sink提供如下序列化器:

JSON:處理UTF8編碼的JSON(嚴格語法)事件,不須要配置。JSON中的對象名稱直接映射到Hive表中具備相同名稱的列。在內部使用org.apache.hive.hcatalog.data.JsonSerDe ,可是它獨立於Hive表。這個序列化器須要安裝HCatalog。
DELIMITED:處理簡單的分隔文本事件。內部使用LazySimpleSerde,但獨立於Hive表的Serde。

Name Default Description
serializer.delimiter , (Type: string) The field delimiter in the incoming data. To use special characters, surround them with double quotes like 「\t」
serializer.fieldnames

The mapping from input fields to columns in hive table. Specified as a comma separated list (no spaces) of

hive table columns names, identifying the input fields in order of their occurrence. To skip fields leave the

column name unspecified. Eg. ‘time,,ip,message’ indicates the 1st, 3rd and 4th fields in input map to time,

ip and message columns in the hive table.

serializer.serdeSeparator Ctrl-A

(Type: character) Customizes the separator used by underlying serde. There can be a gain in efficiency if the

fields in serializer.fieldnames are in same order as table columns, the serializer.delimiter is same as the

serializer.serdeSeparator and number of fields in serializer.fieldnames is less than or equal to number of

table columns, as the fields in incoming event body do not need to be reordered to match order of table columns.

Use single quotes for special characters like ‘\t’. Ensure input fields do not contain this character.

NOTE: If serializer.delimiter is a single character, preferably set this to the same character

如下是支持的轉義序列:

 

Alias Description
%{host} Substitute value of event header named 「host」. Arbitrary header names are supported.
%t Unix time in milliseconds
%a locale’s short weekday name (Mon, Tue, ...)
%A locale’s full weekday name (Monday, Tuesday, ...)
%b locale’s short month name (Jan, Feb, ...)
%B locale’s long month name (January, February, ...)
%c locale’s date and time (Thu Mar 3 23:05:25 2005)
%d day of month (01)
%D date; same as %m/%d/%y
%H hour (00..23)
%I hour (01..12)
%j day of year (001..366)
%k hour ( 0..23)
%m month (01..12)
%M minute (00..59)
%p locale’s equivalent of am or pm
%s seconds since 1970-01-01 00:00:00 UTC
%S second (00..59)
%y last two digits of year (00..99)
%Y year (2010)
%z +hhmm numeric timezone (for example, -0400)

注意,對於全部與時間相關的轉義序列,帶有鍵「timestamp」的消息頭必須存在於事件的消息頭中(除非useLocalTimeStamp設置爲true)。自動添加的一種方法是使用TimestampInterceptor。

 hive table 示例:

create table weblogs ( id int , msg string )
    partitioned by (continent string, country string, time string)
    clustered by (id) into 5 buckets
    stored as orc;

agent a1 示例:

a1.channels = c1
a1.channels.c1.type = memory
a1.sinks = k1
a1.sinks.k1.type = hive
a1.sinks.k1.channel = c1
a1.sinks.k1.hive.metastore = thrift://127.0.0.1:9083
a1.sinks.k1.hive.database = logsdb
a1.sinks.k1.hive.table = weblogs
a1.sinks.k1.hive.partition = asia,%{country},%y-%m-%d-%H-%M
a1.sinks.k1.useLocalTimeStamp = false
a1.sinks.k1.round = true
a1.sinks.k1.roundValue = 10
a1.sinks.k1.roundUnit = minute
a1.sinks.k1.serializer = DELIMITED
a1.sinks.k1.serializer.delimiter = "\t"
a1.sinks.k1.serializer.serdeSeparator = '\t'
a1.sinks.k1.serializer.fieldnames =id,,msg

上面的配置將把時間戳四捨五入到最後10分鐘。例如,將時間戳標頭設置爲2012年6月12日上午11:54:34,將「國家」標頭設置爲「印度」的事件將計算分區(大陸=「亞洲」,國家=「印度」,時間=「2012-06-12-11-50」)。序列化器被配置爲接受包含三個字段的製表符分隔的輸入,並跳過第二個字段。

 3. Logger Sink

在INFO級別記錄事件。一般用於測試/調試目的。必須屬性以粗體顯示。此sink是唯一不須要在日誌原始數據部分中解釋的額外配置的異常。

 

Property Name Default Description
channel  
type The component type name, needs to be logger
maxBytesToLog 16 Maximum number of bytes of the Event body to log

 agent a1 示例:

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = logger
a1.sinks.k1.channel = c1

4. Avro Sink

這個sink構成了Flume分層收集支持的一半。發送到此sink的Flume事件被轉換爲Avro事件併發送到配置的主機名/端口對。事件以配置的批大小的批次從配置的Channel中獲取。必須屬性以粗體顯示。

Property Name Default Description
channel  
type The component type name, needs to be avro.
hostname The hostname or IP address to bind to.
port The port # to listen on.
batch-size 100 number of event to batch together for send.
connect-timeout 20000 Amount of time (ms) to allow for the first (handshake) request.
request-timeout 20000 Amount of time (ms) to allow for requests after the first.
reset-connection-interval none

Amount of time (s) before the connection to the next hop is reset. This will force the Avro Sink to reconnect to the next hop.

This will allow the sink to connect to hosts behind a hardware load-balancer when news

hosts are added without having to restart the agent.

compression-type none This can be 「none」 or 「deflate」. The compression-type must match the compression-type of matching AvroSource
compression-level 6

The level of compression to compress event. 0 = no compression and 1-9 is compression.

The higher the number the more compression

ssl false

Set to true to enable SSL for this AvroSink. When configuring SSL, you can optionally

set a 「truststore」, 「truststore-password」, 「truststore-type」, and specify whether to 「trust-all-certs」.

trust-all-certs false

If this is set to true, SSL server certificates for remote servers (Avro Sources) will not be checked.

This should NOT be used in production because it makes it easier for an attacker to execute a

man-in-the-middle attack and 「listen in」 on the encrypted connection.

truststore

The path to a custom Java truststore file. Flume uses the certificate authority information in this file

to determine whether the remote Avro Source’s SSL authentication credentials should be trusted.

If not specified, the default Java JSSE certificate authority files (typically 「jssecacerts」 or 「cacerts」

in the Oracle JRE) will be used.

truststore-password The password for the specified truststore.
truststore-type JKS The type of the Java truststore. This can be 「JKS」 or other supported Java truststore type.
exclude-protocols SSLv3 Space-separated list of SSL/TLS protocols to exclude. SSLv3 will always be excluded in addition to the protocols specified.
maxIoWorkers 2 * the number of available processors in the machine The maximum number of I/O worker threads. This is configured on the NettyAvroRpcClient NioClientSocketChannelFactory.

agent a1 示例:

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = avro
a1.sinks.k1.channel = c1
a1.sinks.k1.hostname = 10.10.10.10
a1.sinks.k1.port = 4545

5. Thrift Sink

這個接收器構成了Flume分層收集支持的一半。發送到此接收器的Flume事件被轉換爲Thrift事件併發送到配置的主機名/端口對。事件以配置的批大小的批次從配置的Channel中獲取。
經過啓用kerberos身份驗證,能夠將Thrift sink配置爲以安全模式啓動。要在安全模式下與Thrift源通訊,Thrift sink也應該在安全模式下運行。客戶機-主體和客戶機-keytab是節儉接收器用於對kerberos KDC進行身份驗證的屬性。服務器主體表示此sink配置爲以安全模式鏈接的Thrift源的主體。必須屬性以粗體顯示。

Property Name Default Description
channel  
type The component type name, needs to be thrift.
hostname The hostname or IP address to bind to.
port The port # to listen on.
batch-size 100 number of event to batch together for send.
connect-timeout 20000 Amount of time (ms) to allow for the first (handshake) request.
request-timeout 20000 Amount of time (ms) to allow for requests after the first.
connection-reset-interval none

Amount of time (s) before the connection to the next hop is reset. This will force the Thrift Sink to reconnect to the next hop.

This will allow the sink to connect to hosts behind a hardware load-balancer when news hosts are added without having to restart the agent.

ssl false Set to true to enable SSL for this ThriftSink. When configuring SSL, you can optionally set a 「truststore」, 「truststore-password」 and 「truststore-type」
truststore

The path to a custom Java truststore file. Flume uses the certificate authority information in this file to determine whether the remote

Thrift Source’s SSL authentication credentials should be trusted. If not specified, the default Java JSSE certificate authority files (typically

「jssecacerts」 or 「cacerts」 in the Oracle JRE) will be used.

truststore-password The password for the specified truststore.
truststore-type JKS The type of the Java truststore. This can be 「JKS」 or other supported Java truststore type.
exclude-protocols SSLv3 Space-separated list of SSL/TLS protocols to exclude
kerberos false

Set to true to enable kerberos authentication. In kerberos mode, client-principal, client-keytab and server-principal are required for

successful authentication and communication to a kerberos enabled Thrift Source.

client-principal —- The kerberos principal used by the Thrift Sink to authenticate to the kerberos KDC.
client-keytab —- The keytab location used by the Thrift Sink in combination with the client-principal to authenticate to the kerberos KDC.
server-principal The kerberos principal of the Thrift Source to which the Thrift Sink is configured to connect to.

 agent a1示例:

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = thrift
a1.sinks.k1.channel = c1
a1.sinks.k1.hostname = 10.10.10.10
a1.sinks.k1.port = 4545

6. IRC Sink

IRC sink接收來自附加channel的消息,並將這些消息轉發到配置的IRC目的地。必須屬性以粗體顯示。

Property Name Default Description
channel  
type The component type name, needs to be irc
hostname The hostname or IP address to connect to
port 6667 The port number of remote host to connect
nick Nick name
user User name
password User password
chan channel
name    
splitlines (boolean)
splitchars n

line separator (if you were to enter the default value into the config file,

then you would need to escape the backslash, like this: 「\n」)

 agent a1 示例:

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = irc
a1.sinks.k1.channel = c1
a1.sinks.k1.hostname = irc.yourdomain.com
a1.sinks.k1.nick = flume
a1.sinks.k1.chan = #flume

7. File Roll Sink

在本地文件系統上存儲事件。必須屬性以粗體顯示。

 

Property Name Default Description
channel  
type The component type name, needs to be file_roll.
sink.directory The directory where files will be stored
sink.pathManager DEFAULT The PathManager implementation to use.
sink.pathManager.extension The file extension if the default PathManager is used.
sink.pathManager.prefix A character string to add to the beginning of the file name if the default PathManager is used
sink.rollInterval 30 Roll the file every 30 seconds. Specifying 0 will disable rolling and cause all events to be written to a single file.
sink.serializer TEXT Other possible options include avro_event or the FQCN of an implementation of EventSerializer.Builder interface.
batchSize 100  

 agent a1 示例:

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = file_roll
a1.sinks.k1.channel = c1
a1.sinks.k1.sink.directory = /var/log/flume

8. Null Sink

丟棄從channel接收的全部事件。必須屬性以粗體顯示。

Property Name Default Description
channel  
type The component type name, needs to be null.
batchSize 100  

 agent a1 示例:

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = null
a1.sinks.k1.channel = c1

9. Hbase Sink

9.1 Hbase Sink

這個sink將數據寫入HBase。Hbase配置是從第一個在類路徑中的hbase-site.xml 獲取的。由配置指定的實現HbaseEventSerializer的類用於將事件轉換爲HBase put 和/或 增量。而後將這些put和increments寫入HBase。這個sink提供了與HBase相同的一致性保證,HBase目前是行原子性的。若是Hbase沒法寫入某些事件,sink將重播該事務中的全部事件。
HBaseSink支持編寫數據來保護HBase。要寫入安全模式的HBase,agent運行的用戶必須具備對配置爲寫入的sink的表的寫入權限。能夠在配置中指定用於根據KDC進行身份驗證的主體和keytab。Flume代理類路徑中的hbase-site.xml 必須將身份驗證設置爲kerberos(有關如何實現這一點的詳細信息,請參閱HBase文檔)。
爲了方便,兩個序列化器配有Flume。SimpleHbaseEventSerializer (org.apache.flume.sink.hbase.SimpleHbaseEventSerializer))按原樣將事件體寫入HBase,並可選地增長HBase中的一列。這主要是一個示例實現。RegexHbaseEventSerializer (org.apache.flume.sink.hbase.RegexHbaseEventSerializer)基於給定的regex分解事件體,並將每一個部分寫入不一樣的列中。
類型是FQCN: org.apache.flume.sink.hbase.HBaseSink。
必須屬性以粗體顯示。

 

Property Name Default Description
channel  
type The component type name, needs to be hbase
table The name of the table in Hbase to write to.
columnFamily The column family in Hbase to write to.
zookeeperQuorum The quorum spec. This is the value for the property hbase.zookeeper.quorum in hbase-site.xml
znodeParent /hbase The base path for the znode for the -ROOT- region. Value of zookeeper.znode.parent in hbase-site.xml
batchSize 100 Number of events to be written per txn.
coalesceIncrements false

Should the sink coalesce multiple increments to a cell per batch.

This might give better performance if there are multiple increments to a limited number of cells.

serializer org.apache.flume.sink.hbase.SimpleHbaseEventSerializer Default increment column = 「iCol」, payload column = 「pCol」.
serializer.* Properties to be passed to the serializer.
kerberosPrincipal Kerberos user principal for accessing secure HBase
kerberosKeytab Kerberos keytab for accessing secure HBase

 agent a1 示例:

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = hbase
a1.sinks.k1.table = foo_table
a1.sinks.k1.columnFamily = bar_cf
a1.sinks.k1.serializer = org.apache.flume.sink.hbase.RegexHbaseEventSerializer
a1.sinks.k1.channel = c1

9.2  AsyncHBaseSink

這個sink使用異步模型將數據寫入HBase。配置中指定的實現AsyncHbaseEventSerializer的類用於將事件轉換爲HBase put和/或增量。而後將這些put和increments寫入HBase。這個sink使用Asynchbase API寫入HBase。這個接收器提供了與HBase相同的一致性保證,HBase目前是行原子性的。若是Hbase沒法寫入某些事件,sink將重播該事務中的全部事件。類型是FQCN: org.apache.flume.sink.hbase.AsyncHBaseSink。必須屬性以粗體顯示。

 

Property Name Default Description
channel  
type The component type name, needs to be asynchbase
table The name of the table in Hbase to write to.
zookeeperQuorum The quorum spec. This is the value for the property hbase.zookeeper.quorum in hbase-site.xml
znodeParent /hbase The base path for the znode for the -ROOT- region. Value of zookeeper.znode.parent in hbase-site.xml
columnFamily The column family in Hbase to write to.
batchSize 100 Number of events to be written per txn.
coalesceIncrements false

Should the sink coalesce multiple increments to a cell per batch. This might give better performance

if there are multiple increments to a limited number of cells.

timeout 60000 The length of time (in milliseconds) the sink waits for acks from hbase for all events in a transaction.
serializer

org.apache.flume.sink.hbase.

SimpleAsyncHbaseEventSerializer

 
serializer.* Properties to be passed to the serializer.

 

請注意,此接收器接受配置中的Zookeeper Quorum和父znode信息。Zookeeper Quorum和父節點配置能夠在flume配置文件中指定。或者,這些配置值取自類路徑中的第一個hbase-site.xml文件。
若是配置中沒有提供這些信息,則sink將從類路徑中的第一個hbase-site.xml讀取此信息.
agent a1 示例:

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = asynchbase
a1.sinks.k1.table = foo_table
a1.sinks.k1.columnFamily = bar_cf
a1.sinks.k1.serializer = org.apache.flume.sink.hbase.SimpleAsyncHbaseEventSerializer
a1.sinks.k1.channel = c1

10 MorphlineSolrSink

這個sink從Flume事件中提取數據,對其進行轉換,並將其近乎實時地加載到Apache Solr服務器中,而後由Apache Solr服務器向最終用戶或搜索應用程序提供查詢。
這個sink很是適合將原始數據流到HDFS(經過HdfsSink)並同時提取、轉換和加載相同數據到Solr(經過MorphlineSolrSink)的用例。特別是,這個sink能夠處理來自不一樣數據源的任意異構原始數據,並將其轉換爲對搜索應用程序有用的數據模型。
ETL功能可使用一個形態線配置文件進行定製,該文件定義了一系列轉換命令,將事件記錄從一個命令傳輸到另外一個命令。
形態線能夠看做是Unix管道的演化,其中數據模型被通常化以處理通用記錄流,包括任意二進制有效負載。形態線命令有點像flume攔截器。形態線能夠嵌入到Hadoop組件中,好比Flume。
提供了開箱即用的命令來解析和轉換一組標準數據格式,如日誌文件、Avro、CSV、文本、HTML、XML、PDF、Word、Excel等,還能夠做爲形態線插件添加其餘數據格式的定製命令和解析器。任何類型的數據格式均可以創建索引,任何類型Solr模式的任何Solr文檔均可以生成,任何定製的ETL邏輯均可以註冊和執行。
形態線操做連續的記錄流。數據模型能夠這樣描述:記錄是一組命名字段,其中每一個字段都有一個或多個值的有序列表/值能夠是任何Java對象。也就是說,記錄本質上是一個哈希表,其中每一個哈希表條目都包含一個字符串鍵和一個做爲值的Java對象列表。(實現使用了番石榴的ArrayListMultimap,這是一個ListMultimap)。注意,一個字段能夠有多個值,任何兩個記錄都不須要使用公共字段名。
這個sink將Flume事件的主體填充到morphline記錄的_attachment_body字段中,並將Flume事件的頭部複製到同名的記錄字段中。而後命令能夠對這些數據進行操做。
支持路由到SolrCloud集羣,以提升可伸縮性。索引負載能夠分散在大量的morphlinesolrsink上,以提升可伸縮性。索引負載能夠跨多個morphlinesolrsink複製以得到高可用性,例如使用Flume特性(如負載平衡接收器處理器)。MorphlineInterceptor還能夠幫助實現到多個Solr集合的動態路由(例如,對於多租戶)。
您的環境所需的形態線和solr jar必須放在Apache Flume安裝的lib目錄中。
類型是FQCN: org.apache.flume.sink.solr.morphline.MorphlineSolrSink
必須屬性以粗體顯示。

Property Name Default Description
channel  
type The component type name, needs to be org.apache.flume.sink.solr.morphline.MorphlineSolrSink
morphlineFile

The relative or absolute path on the local file system to the morphline configuration file.

Example: /etc/flume-ng/conf/morphline.conf

morphlineId null Optional name used to identify a morphline if there are multiple morphlines in a morphline config file
batchSize 1000 The maximum number of events to take per flume transaction.
batchDurationMillis 1000

The maximum duration per flume transaction (ms). The transaction commits after this

duration or when batchSize is exceeded, whichever comes first.

handlerClass

org.apache.flume.sink.solr.

morphline.MorphlineHandlerImpl

The FQCN of a class implementing org.apache.flume.sink.solr.morphline.MorphlineHandler
isProductionMode false

This flag should be enabled for mission critical, large-scale online production systems that

need to make progress without downtime when unrecoverable exceptions occur.

Corrupt or malformed parser input data, parser bugs, and errors related to unknown

Solr schema fields produce unrecoverable exceptions.

recoverableExceptionClasses org.apache.solr.client.solrj.SolrServerException

Comma separated list of recoverable exceptions that tend to be transient,

in which case the corresponding task can be retried. Examples include network connection errors,

timeouts, etc. When the production mode flag is set to true,

the recoverable exceptions configured using this parameter will not be ignored and hence will lead to retries.

isIgnoringRecoverableExceptions false

This flag should be enabled, if an unrecoverable exception is accidentally misclassified as recoverable.

This enables the sink to make progress and avoid retrying an event forever.

 agent a1 示例:

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = org.apache.flume.sink.solr.morphline.MorphlineSolrSink
a1.sinks.k1.channel = c1
a1.sinks.k1.morphlineFile = /etc/flume-ng/conf/morphline.conf
# a1.sinks.k1.morphlineId = morphline1
# a1.sinks.k1.batchSize = 1000
# a1.sinks.k1.batchDurationMillis = 1000

11. ElasticSearchSink

這個sink將數據寫入一個elasticsearch集羣。默認狀況下,事件將被寫入,以便Kibana圖形界面可以顯示它們——就像logstash編寫它們同樣。
環境所需的elasticsearch和lucene-core jar必須放在Apache Flume安裝的lib目錄中。Elasticsearch要求客戶機JAR的主版本與服務器的主版本匹配,而且二者運行相同的JVM小版本。若是不正確,將出現serializationexception。要選擇所需的版本,首先要肯定elasticsearch的版本和目標集羣正在運行的JVM版本。而後選擇一個與主版本匹配的elasticsearch客戶端庫 0.19.x客戶端能夠與一個 0.19.x的集羣通訊;0.20.x能夠和0.20通訊。0.90 x和能夠和0.90.x對話。一旦肯定了elasticsearch版本,而後讀取pom。肯定要使用的正確lucene-core JAR版本的xml文件。運行ElasticSearchSink的Flume代理還應該匹配目標集羣運行到次要版本的JVM。
天天事件將被寫入一個新的索引。名稱將是-yyyy-MM-dd,其中是indexName參數。sink將在UTC午夜開始寫入一個新索引。
默認狀況下,ElasticSearchLogStashEventSerializer將事件序列化爲elasticsearch。可使用序列化器參數覆蓋此行爲。這個參數接受org.apache.flume.sink.elasticsearch的實現。ElasticSearchEventSerializer或org.apache.flume.sink.elasticsearch.ElasticSearchIndexRequestBuilderFactory。不同意實現ElasticSearchEventSerializer,支持更強大的ElasticSearchIndexRequestBuilderFactory。
類型是FQCN: org.apache.flume.sink.elasticsearch.ElasticSearchSink
必須屬性以粗體顯示。

Property Name Default Description
channel  
type The component type name, needs to be org.apache.flume.sink.elasticsearch.ElasticSearchSink
hostNames Comma separated list of hostname:port, if the port is not present the default port ‘9300’ will be used
indexName flume

The name of the index which the date will be appended to. Example ‘flume’ -> ‘flume-yyyy-MM-dd’

Arbitrary header substitution is supported, eg. %{header} replaces with value of named event header

indexType logs

The type to index the document to, defaults to ‘log’ Arbitrary header substitution is supported,

eg. %{header} replaces with value of named event header

clusterName elasticsearch Name of the ElasticSearch cluster to connect to
batchSize 100 Number of events to be written per txn.
ttl

TTL in days, when set will cause the expired documents to be deleted automatically,

if not set documents will never be automatically deleted. TTL is accepted both in the earlier form

of integer only e.g. a1.sinks.k1.ttl = 5 and also with a qualifier ms (millisecond), s (second), m (minute),

h (hour), d (day) and w (week). Example a1.sinks.k1.ttl = 5d will set TTL to 5 days.

Followhttp://www.elasticsearch.org/guide/reference/mapping/ttl-field/ for more information.

serializer

org.apache.flume.sink.elasticsearch.

ElasticSearchLogStashEventSerializer

The ElasticSearchIndexRequestBuilderFactory or ElasticSearchEventSerializer to use.

Implementations of either class are accepted but ElasticSearchIndexRequestBuilderFactory is preferred.

serializer.* Properties to be passed to the serializer.

 

注意,使用事件頭的值來動態決定存儲事件時要使用的索引名和索引類型很是方便。使用此功能時要當心,由於事件提交器如今已經控制了indexName和indexType。此外,若是使用elasticsearch REST客戶端,則事件提交器能夠控制所使用的URL路徑。

 agent a1 示例:

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = elasticsearch
a1.sinks.k1.hostNames = 127.0.0.1:9200,127.0.0.2:9300
a1.sinks.k1.indexName = foo_index
a1.sinks.k1.indexType = bar_type
a1.sinks.k1.clusterName = foobar_cluster
a1.sinks.k1.batchSize = 500
a1.sinks.k1.ttl = 5d
a1.sinks.k1.serializer = org.apache.flume.sink.elasticsearch.ElasticSearchDynamicSerializer
a1.sinks.k1.channel = c1

12  Kite Dataset Sink

將事件寫入Kite數據集的實驗性接收器。這個接收器將反序列化每一個傳入事件的主體,並將結果記錄存儲在Kite數據集中。它經過按URI加載數據集來肯定目標數據集。
惟一受支持的序列化是avro,記錄模式必須在事件頭中傳遞,使用flume.avro.schema中的任何一個。使用JSON模式表示的文本或flume.avro.schema。一個能夠找到模式的url (hdfs:/…支持uri)。這與Log4jAppender flume客戶機和使用反序列化器的假脫機目錄源的Avro反序列化器兼容。schemaType =LITERAL
注1:flume.avro.schema。不支持哈希頭。注2:在某些狀況下,文件滾動可能會在超過滾動間隔後輕微發生。可是,這個延遲不會超過5秒。在大多數狀況下,延遲是沒法辨認的。

Property Name Default Description
channel  
type Must be org.apache.flume.sink.kite.DatasetSink
kite.dataset.uri URI of the dataset to open
kite.repo.uri URI of the repository to open (deprecated; use kite.dataset.uri instead)
kite.dataset.namespace Namespace of the Dataset where records will be written (deprecated; use kite.dataset.uri instead)
kite.dataset.name Name of the Dataset where records will be written (deprecated; use kite.dataset.uri instead)
kite.batchSize 100 Number of records to process in each batch
kite.rollInterval 30 Maximum wait time (seconds) before data files are released
kite.flushable.commitOnBatch true

If true, the Flume transaction will be commited and the writer will be

flushed on each batch of kite.batchSize records. This setting only

applies to flushable datasets. When true, it’s possible for temp files with

commited data to be left in the dataset directory. These files need to be

recovered by hand for the data to be visible to DatasetReaders.

kite.syncable.syncOnBatch true

Controls whether the sink will also sync data when committing the transaction.

This setting only applies to syncable datasets. Syncing gaurentees that data will

be written on stable storage on the remote system while flushing only gaurentees

that data has left Flume’s client buffers. When the kite.flushable.commitOnBatch 

property is set to false, this property must also be set to false.

kite.entityParser avro

Parser that turns Flume Events into Kite entities. Valid values are avro 

and the fully-qualified class name of an implementation of the EntityParser.Builder interface.

kite.failurePolicy retry

Policy that handles non-recoverable errors such as a missing Schema in the Event header.

The default value, retry, will fail the current batch and try again which matches the old behavior.

Other valid values are save, which will write the raw Event to the kite.error.dataset.uri dataset,

and the fully-qualified class name of an implementation of the FailurePolicy.Builder interface.

kite.error.dataset.uri

URI of the dataset where failed events are saved when kite.failurePolicy is set to save.

 Required when the kite.failurePolicy is set to save.

auth.kerberosPrincipal Kerberos user principal for secure authentication to HDFS
auth.kerberosKeytab Kerberos keytab location (local FS) for the principal
auth.proxyUser The effective user for HDFS actions, if different from the kerberos principal

 13.  Kafka Sink

這是一個Flume Sink實現,能夠將數據發佈到Kafka主題。目標之一是將Flume與Kafka集成,這樣基於pull的處理系統就能夠處理來自各類Flume源的數據。目前支持Kafka 0.9.x系列發行版。
這個版本的Flume再也不支持Kafka的舊版本(0.8.x)。
必需的屬性用粗體標記。

Property Name Default Description
type Must be set to org.apache.flume.sink.kafka.KafkaSink
kafka.bootstrap.servers

List of brokers Kafka-Sink will connect to, to get the list of topic partitions

This can be a partial list of brokers, but we recommend at least two for HA.

The format is comma separated list of hostname:port

kafka.topic default-flume-topic

The topic in Kafka to which the messages will be published.

If this parameter is configured, messages will be published to this topic.

If the event header contains a 「topic」 field, the event will be published to that

topic overriding the topic configured here. Arbitrary header substitution is supported,

eg. %{header} is replaced with value of event header named 「header」. (If using the substitution,

it is recommended to set 「auto.create.topics.enable」 property of Kafka broker to true.)

flumeBatchSize 100 How many messages to process in one batch. Larger batches improve throughput while adding latency.
kafka.producer.acks 1

How many replicas must acknowledge a message before its considered successfully written.

Accepted values are 0 (Never wait for acknowledgement), 1 (wait for leader only), -1 (wait for all replicas)

Set this to -1 to avoid data loss in some cases of leader failure.

useFlumeEventFormat false

By default events are put as bytes onto the Kafka topic directly from the event body.

Set to true to store events as the Flume Avro binary format. Used in conjunction with

the same property on the KafkaSource or with the parseAsFlumeEvent property on the

Kafka Channel this will preserve any Flume headers for the producing side.

defaultPartitionId

Specifies a Kafka partition ID (integer) for all events in this channel to be sent to,

unless overriden by partitionIdHeader. By default, if this property is not set,

events will be distributed by the Kafka Producer’s partitioner - including by key 

if specified (or by a partitioner specified by kafka.partitioner.class).

partitionIdHeader

When set, the sink will take the value of the field named using the value of this property from

the event header and send the message to the specified partition of the topic.

If the value represents an invalid partition, an EventDeliveryException will be thrown.

If the header value is present then this setting overrides defaultPartitionId.

allowTopicOverride true

When set, the sink will allow a message to be produced into a topic specified by the 

topicHeaderproperty (if provided).

topicHeader topic

When set in conjunction with allowTopicOverride will produce a message

into the value of the header named using the value of this property. Care should be taken

when using in conjunction with the Kafka Source topicHeader property to avoid creating a loopback.

kafka.producer.security.protocol PLAINTEXT

Set to SASL_PLAINTEXT, SASL_SSL or SSL if writing to Kafka using some level of security.

See below for additional info on secure setup.

more producer security props  

If using SASL_PLAINTEXT, SASL_SSL or SSL refer to Kafka security for additional properties

that need to be set on producer.

Other Kafka Producer Properties

These properties are used to configure the Kafka Producer. Any producer property

supported by Kafka can be used. The only requirement is to prepend the property name with the

prefixkafka.producer. For example: kafka.producer.linger.ms

注意,Kafka Sink使用來自FlumeEvent頭部的主題和鍵屬性將事件發送到Kafka。若是標題中存在主題,則事件將被髮送到該特定主題,覆蓋爲Sink配置的主題。若是鍵存在於標題中,Kafka將使用該鍵在主題分區之間對數據進行分區。具備相同鍵的事件將被髮送到相同的分區。若是鍵爲null,則事件將發送到隨機分區。
Kafka接收器還爲key.serializer(org.apache. Kafka .common. serialize . stringserializer)和value.serializer(org.apache. Kafka .common. serialize . bytearrayserializer)提供默認值。不建議修改這些參數。
棄用屬性:

Property Name Default Description
brokerList Use kafka.bootstrap.servers
topic default-flume-topic Use kafka.topic
batchSize 100 Use kafka.flumeBatchSize
requiredAcks 1 Use kafka.producer.acks

下面給出了Kafka sink的一個配置示例。屬性之前綴kafka開頭.kafka producer.在建立Kafka生成器時傳遞的屬性並不只限於本例中給出的屬性。還能夠在這裏包含定製屬性,並經過做爲方法參數傳入的Flume上下文對象在預處理器中訪問它們。

a1.sinks.k1.channel = c1
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = mytopic
a1.sinks.k1.kafka.bootstrap.servers = localhost:9092
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
a1.sinks.k1.kafka.producer.compression.type = snappy

Security and Kafka Sink:

 

Flume和Kafka之間的通訊通道支持安全認證和數據加密。對於安全身份驗證,可使用Kafka版本0.9.0中的SASL/GSSAPI (Kerberos V5)或SSL(儘管參數名爲SSL,但實際的協議是TLS實現)。

到目前爲止,數據加密僅由SSL/TLS提供。

設置kafka.producer.security.protocol 符合下列任何一項值意味着:

  • SASL_PLAINTEXT - 沒有數據加密的Kerberos或明文身份驗證
  • SASL_SSL - 帶有數據加密的Kerberos或純文自己份驗證
  • SSL - 基於TLS加密,具備可選的身份驗證.

警告:啓用SSL時會致使性能降低,其程度取決於CPU類型和JVM實現。參考文獻:Kafka安全概述和用於跟蹤這個問題的jira: Kafka -2561

TLS and Kafka Sink:

 

請閱讀配置Kafka客戶端SSL中描述的步驟,以瞭解用於微調的其餘配置設置,例如如下任何一種:安全提供程序、密碼套件、啓用的協議、信任存儲或密鑰存儲類型。

使用服務器端身份驗證和數據加密的示例配置。

a1.sinks.sink1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.sink1.kafka.bootstrap.servers = kafka-1:9093,kafka-2:9093,kafka-3:9093
a1.sinks.sink1.kafka.topic = mytopic
a1.sinks.sink1.kafka.producer.security.protocol = SSL
a1.sinks.sink1.kafka.producer.ssl.truststore.location = /path/to/truststore.jks
a1.sinks.sink1.kafka.producer.ssl.truststore.password = <password to access the truststore>

注意:默認狀況下屬性ssl.endpoint.identification.algorithm沒有定義,所以沒有執行主機名驗證。爲了啓用主機名驗證,請設置如下屬性:

a1.sinks.sink1.kafka.producer.ssl.endpoint.identification.algorithm = HTTPS

一旦啓用,客戶端將針對如下兩個字段之一驗證服務器的徹底限定域名(FQDN):

  1. Common Name (CN) https://tools.ietf.org/html/rfc6125#section-2.3
  2. Subject Alternative Name (SAN) https://tools.ietf.org/html/rfc5280#section-4.2.1.6

若是還須要客戶端身份驗證,那麼應該向Flume agent配置添加如下內容。每一個Flume agent必須擁有本身的客戶端證書,這些證書必須由Kafka agent單獨或經過其簽名鏈進行信任。常見的示例是經過一個根CA對每一個客戶端證書進行簽名,而這個根CA又受到Kafka代理的信任。

a1.sinks.sink1.kafka.producer.ssl.keystore.location = /path/to/client.keystore.jks
a1.sinks.sink1.kafka.producer.ssl.keystore.password = <password to access the keystore>

若是密鑰存儲和密鑰使用不一樣的密碼保護,則使用ssl.key.password 屬性將爲生產者密鑰庫提供所需的額外祕密:

a1.sinks.sink1.kafka.producer.ssl.key.password = <password to access the key>

Kerberos and Kafka Sink: 

要將Kafka sink與Kerberos保護的Kafka集羣一塊兒使用,請設置producer.security.protocol上面爲生產者指出的屬性。與Kafka代理一塊兒使用的Kerberos keytab和主體在JAAS文件的「KafkaClient」部分中指定。「客戶端」部分描述了須要時的Zookeeper鏈接。有關JAAS文件內容的信息,請參見Kafka文檔。能夠經過flume-env.sh中的JAVA_OPTS指定這個JAAS文件的位置,也能夠選擇指定系統範圍內的kerberos配置: 

JAVA_OPTS="$JAVA_OPTS -Djava.security.krb5.conf=/path/to/krb5.conf"
JAVA_OPTS="$JAVA_OPTS -Djava.security.auth.login.config=/path/to/flume_jaas.conf"

使用SASL_PLAINTEXT的安全配置示例:

a1.sinks.sink1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.sink1.kafka.bootstrap.servers = kafka-1:9093,kafka-2:9093,kafka-3:9093
a1.sinks.sink1.kafka.topic = mytopic
a1.sinks.sink1.kafka.producer.security.protocol = SASL_PLAINTEXT
a1.sinks.sink1.kafka.producer.sasl.mechanism = GSSAPI
a1.sinks.sink1.kafka.producer.sasl.kerberos.service.name = kafka

使用SASL_SSL的安全配置示例: 

a1.sinks.sink1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.sink1.kafka.bootstrap.servers = kafka-1:9093,kafka-2:9093,kafka-3:9093
a1.sinks.sink1.kafka.topic = mytopic
a1.sinks.sink1.kafka.producer.security.protocol = SASL_SSL
a1.sinks.sink1.kafka.producer.sasl.mechanism = GSSAPI
a1.sinks.sink1.kafka.producer.sasl.kerberos.service.name = kafka
a1.sinks.sink1.kafka.producer.ssl.truststore.location = /path/to/truststore.jks
a1.sinks.sink1.kafka.producer.ssl.truststore.password = <password to access the truststore>

JAAS文件示例。有關其內容的參考,請參閱SASL配置的Kafka文檔中所需身份驗證機制(GSSAPI/PLAIN)的客戶端配置部分。與Kafka源或Kafka通道不一樣,「客戶端」部分不是必需的,除非其餘鏈接組件須要它。另外,請確保Flume進程的操做系統用戶具備jaas和keytab文件上的讀權限。

KafkaClient {
  com.sun.security.auth.module.Krb5LoginModule required
  useKeyTab=true
  storeKey=true
  keyTab="/path/to/keytabs/flume.keytab"
  principal="flume/flumehost1.example.com@YOURKERBEROSREALM";
};

14  HTTP Sink

此接收器的行爲是,它將從通道獲取事件,並使用HTTP POST請求將這些事件發送到遠程服務。事件內容做爲POST主體發送。

此接收器的錯誤處理行爲取決於目標服務器返回的HTTP響應。sink backoff/ready狀態是可配置的,事務提交/回滾結果也是可配置的,該事件是否有助於成功的事件排放計數也是可配置的。

狀態代碼不可讀的服務器返回的任何格式錯誤的HTTP響應都將致使回退信號,而且事件不會從channel中消費。

必須屬性以粗體顯示。

Property Name Default Description
channel  
type The component type name, needs to be http.
endpoint The fully qualified URL endpoint to POST to
connectTimeout 5000 The socket connection timeout in milliseconds
requestTimeout 5000 The maximum request processing time in milliseconds
contentTypeHeader text/plain The HTTP Content-Type header
acceptHeader text/plain The HTTP Accept header value
defaultBackoff true Whether to backoff by default on receiving all HTTP status codes
defaultRollback true Whether to rollback by default on receiving all HTTP status codes
defaultIncrementMetrics false Whether to increment metrics by default on receiving all HTTP status codes
backoff.CODE Configures a specific backoff for an individual (i.e. 200) code or a group (i.e. 2XX) code
rollback.CODE Configures a specific rollback for an individual (i.e. 200) code or a group (i.e. 2XX) code
incrementMetrics.CODE Configures a specific metrics increment for an individual (i.e. 200) code or a group (i.e. 2XX) code

 

請注意,最特定的HTTP狀態代碼匹配用於backoff、rollback和incrementMetrics配置選項。若是2XX和200狀態碼都有配置值,那麼200個HTTP代碼將使用200值,而201-299範圍內的全部其餘HTTP代碼將使用2XX值。

在不向HTTP端點發出任何請求的狀況下,將使用任何空事件或空事件。

agent a1 示例:

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = http
a1.sinks.k1.channel = c1
a1.sinks.k1.endpoint = http://localhost:8080/someuri
a1.sinks.k1.connectTimeout = 2000
a1.sinks.k1.requestTimeout = 2000
a1.sinks.k1.acceptHeader = application/json
a1.sinks.k1.contentTypeHeader = application/json
a1.sinks.k1.defaultBackoff = true
a1.sinks.k1.defaultRollback = true
a1.sinks.k1.defaultIncrementMetrics = false
a1.sinks.k1.backoff.4XX = false
a1.sinks.k1.rollback.4XX = false
a1.sinks.k1.incrementMetrics.4XX = true
a1.sinks.k1.backoff.200 = false
a1.sinks.k1.rollback.200 = false
a1.sinks.k1.incrementMetrics.200 = true

15 Custom Sink

自定義接收器是接收器接口的本身實現。啓動Flume agent時,自定義sink的類及其依賴項必須包含在代理的類路徑中。自定義接收器的類型是它的FQCN。必須屬性以粗體顯示。

Property Name Default Description
channel  
type The component type name, needs to be your FQCN

agent a1 示例:

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = org.example.MySink
a1.sinks.k1.channel = c1

 

翻譯自官網flume1.8用戶指南,原文地址:Flume 1.8.0 User Guide

篇幅限制,分爲如下5篇:

【翻譯】Flume 1.8.0 User Guide(用戶指南)

【翻譯】Flume 1.8.0 User Guide(用戶指南) source

【翻譯】Flume 1.8.0 User Guide(用戶指南) Sink

【翻譯】Flume 1.8.0 User Guide(用戶指南) Channel

【翻譯】Flume 1.8.0 User Guide(用戶指南) Processors

相關文章
相關標籤/搜索