【翻譯】Flume 1.8.0 User Guide(用戶指南) Channel

翻譯自官網flume1.8用戶指南,原文地址:Flume 1.8.0 User Guidehtml

篇幅限制,分爲如下5篇:java

【翻譯】Flume 1.8.0 User Guide(用戶指南)算法

【翻譯】Flume 1.8.0 User Guide(用戶指南) source數據庫

【翻譯】Flume 1.8.0 User Guide(用戶指南) Sinkapache

【翻譯】Flume 1.8.0 User Guide(用戶指南) Channelbootstrap

【翻譯】Flume 1.8.0 User Guide(用戶指南) Processors數組

Flume Channels

Channel 是事件在agent 上上演的存儲庫。Source添加事件,Sink刪除事件。安全

1 Memory Channel

事件存儲在內存隊列中,具備可配置的最大大小。對於須要更高吞吐量並準備在agent失敗時丟失階段數據的流來講,它是理想的。必須屬性以粗體顯示。服務器

 

Property Name Default Description
type The component type name, needs to be memory
capacity 100 The maximum number of events stored in the channel
transactionCapacity 100 The maximum number of events the channel will take from a source or give to a sink per transaction
keep-alive 3 Timeout in seconds for adding or removing an event
byteCapacityBufferPercentage 20

Defines the percent of buffer between byteCapacity and the estimated total sizesession

of all events in the channel, to account for data in headers. See below.

byteCapacity see description

Maximum total bytes of memory allowed as a sum of all events in this channel.

The implementation only counts the Event body, which is the reason for providing the

 byteCapacityBufferPercentage configuration parameter as well. Defaults to a

computed value equal to 80% of the maximum memory available to the JVM (i.e. 80% of the

-Xmx value passed on the command line). Note that if you have multiple memory channels

on a single JVM, and they happen to hold the same physical events (i.e. if you are using

a replicating channel selector from a single source) then those event sizes may be

double-counted for channel byteCapacity purposes. Setting this value to 0 will cause

this value to fall back to a hard internal limit of about 200 GB.

 Example for agent named a1:

a1.channels = c1
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 10000
a1.channels.c1.byteCapacityBufferPercentage = 20
a1.channels.c1.byteCapacity = 800000

2. JDBC Channel

事件存儲在由數據庫支持的持久存儲中。JDBC通道目前支持嵌入式Derby。這是一個持久的通道,對於可恢復性很是重要的流來講很是理想。必須屬性以粗體顯示。

Property Name Default Description
type The component type name, needs to be jdbc
db.type DERBY Database vendor, needs to be DERBY.
driver.class org.apache.derby.jdbc.EmbeddedDriver Class for vendor’s JDBC driver
driver.url (constructed from other properties) JDBC connection URL
db.username 「sa」 User id for db connection
db.password password for db connection
connection.properties.file JDBC Connection property file path
create.schema true If true, then creates db schema if not there
create.index true Create indexes to speed up lookups
create.foreignkey true  
transaction.isolation 「READ_COMMITTED」 Isolation level for db session READ_UNCOMMITTED, READ_COMMITTED, SERIALIZABLE, REPEATABLE_READ
maximum.connections 10 Max connections allowed to db
maximum.capacity 0 (unlimited) Max number of events in the channel
sysprop.*   DB Vendor specific properties
sysprop.user.home   Home path to store embedded Derby database

Example for agent named a1:

a1.channels = c1
a1.channels.c1.type = jdbc

3. Kafka Channel

事件存儲在Kafka集羣中(必須單獨安裝)。Kafka提供高可用性和複製,所以在代理或Kafka代理崩潰時,事件能夠當即提供給其餘接收器

Kafka通道可用於多種場景:

1. With Flume source and sink-它提供了一個可靠的和高可用的渠道的事件
2. With Flume source and interceptor but no sink——它容許將Flume事件寫入Kafka主題,供其餘應用程序使用
3. With Flume sink, but no source——這是一種低延遲、容錯的方式,能夠將事件從Kafka發送到Flume接收器,如HDFS、HBase或Solr
這個版本的Flume須要Kafka 0.9或更高版本,由於它依賴於該版本附帶的Kafka客戶機。與之前的flume版本相比,通道的配置發生了變化。

配置參數組織以下:

1. 與通道相關的配置值一般應用於通道配置級別,例如:a1.channel.k1.type=
2. 與Kafka或通道如何操做相關的配置值以「Kafka」做爲前綴。例如:a1.channel .k1.kafka。主題和a1.channels.k1.kafka.bootstrap.servers。這與hdfs接收器的操做方式沒有什麼不一樣
3. 特定於生產者/消費者的屬性以kafka.producer或kafka.consumer做爲前綴
4. 在可能的狀況下,使用Kafka參數名,例如:bootstrap.servers 和 acks
這個版本的flume向後兼容之前的版本,可是下表中指出了一些不同意使用的屬性,當它們出如今配置文件中時,會在啓動時記錄一條警告消息。

必須屬性以粗體顯示。

Property Name Default Description
type The component type name, needs to be org.apache.flume.channel.kafka.KafkaChannel
kafka.bootstrap.servers

List of brokers in the Kafka cluster used by the channel This can be a partial list of brokers,

but we recommend at least two for HA. The format is comma separated list of hostname:port

kafka.topic flume-channel Kafka topic which the channel will use
kafka.consumer.group.id flume

Consumer group ID the channel uses to register with Kafka. Multiple channels must use

the same topic and group to ensure that when one agent fails another can get the data

Note that having non-channel consumers with the same ID can lead to data loss.

parseAsFlumeEvent true

Expecting Avro datums with FlumeEvent schema in the channel.

This should be true if Flume source is writing to the channel and false

if other producers are writing into the topic that the channel is using.

Flume source messages to Kafka can be parsed outside of Flume by

using org.apache.flume.source.avro.AvroFlumeEvent provided by the flume-ng-sdk artifact

migrateZookeeperOffsets true

When no Kafka stored offset is found, look up the offsets in Zookeeper and commit them to Kafka.

This should be true to support seamless Kafka client migration from older versions of Flume.

Once migrated this can be set to false, though that should generally not be required.

If no Zookeeper offset is found the kafka.consumer.auto.offset.reset configuration defines how offsets are handled.

pollTimeout 500

The amount of time(in milliseconds) to wait in the 「poll()」 call of the

consumer.https://kafka.apache.org/090/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#poll(long)

defaultPartitionId

Specifies a Kafka partition ID (integer) for all events in this channel to be sent to,

unless overriden by partitionIdHeader. By default, if this property is not set,

events will be distributed by the Kafka Producer’s partitioner - including by key if

specified (or by a partitioner specified by kafka.partitioner.class).

partitionIdHeader

When set, the producer will take the value of the field named using the value of this property

from the event header and send the message to the specified partition of the topic.

If the value represents an invalid partition the event will not be accepted into the channel.

If the header value is present then this setting overrides defaultPartitionId.

kafka.consumer.auto.offset.reset latest

What to do when there is no initial offset in Kafka or if the current offset does

not exist any more on the server (e.g. because that data has been deleted):

earliest: automatically reset the offset to the earliest offset latest: automatically

reset the offset to the latest offset none: throw exception to the consumer if no

previous offset is found for the consumer’s group anything else: throw exception to the consumer.

kafka.producer.security.protocol PLAINTEXT

Set to SASL_PLAINTEXT, SASL_SSL or SSL if writing to Kafka using

some level of security. See below for additional info on secure setup.

kafka.consumer.security.protocol PLAINTEXT Same as kafka.producer.security.protocol but for reading/consuming from Kafka.
more producer/consumer security props  

If using SASL_PLAINTEXT, SASL_SSL or SSL refer to Kafka security for

additional properties that need to be set on producer/consumer.

棄用的熟悉:

Property Name Default Description
brokerList

List of brokers in the Kafka cluster used by the channel This can be a partial list of brokers,

but we recommend at least two for HA. The format is comma separated list of hostname:port

topic flume-channel Use kafka.topic
groupId flume Use kafka.consumer.group.id
readSmallestOffset false Use kafka.consumer.auto.offset.reset

 注意,因爲通道的負載平衡方式,代理首次啓動時可能會出現重複事件

Example for agent named a1:

a1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.channel1.kafka.bootstrap.servers = kafka-1:9092,kafka-2:9092,kafka-3:9092
a1.channels.channel1.kafka.topic = channel1
a1.channels.channel1.kafka.consumer.group.id = flume-consumer

Security and Kafka Channel

Flume和Kafka之間的通訊通道支持安全認證和數據加密。對於安全身份驗證,可使用Kafka版本0.9.0中的SASL/GSSAPI (Kerberos V5)或SSL(儘管參數名爲SSL,但實際的協議是TLS實現)。

到目前爲止,數據加密僅由SSL/TLS提供。

設置kafka.producer | consumer.security。符合下列任何一項價值的協議意味着:

  • SASL_PLAINTEXT - Kerberos或無數據加密的純文自己份驗證
  • SASL_SSL - Kerberos或具備數據加密的純文自己份驗證
  • SSL - 使用可選身份驗證的基於TLS的加密。

警告:啓用SSL時會致使性能降低,其程度取決於CPU類型和JVM實現。參考文獻:Kafka安全概述和用於跟蹤這個問題的jira: Kafka -2561

 TLS and Kafka Channel:

請閱讀配置Kafka客戶機SSL中描述的步驟,以瞭解用於微調的其餘配置設置,例如如下任何一種:安全提供程序、密碼套件、啓用的協議、信任存儲或密鑰存儲類型。

使用服務器端身份驗證和數據加密的示例配置。

a1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.channel1.kafka.bootstrap.servers = kafka-1:9093,kafka-2:9093,kafka-3:9093
a1.channels.channel1.kafka.topic = channel1
a1.channels.channel1.kafka.consumer.group.id = flume-consumer
a1.channels.channel1.kafka.producer.security.protocol = SSL
a1.channels.channel1.kafka.producer.ssl.truststore.location = /path/to/truststore.jks
a1.channels.channel1.kafka.producer.ssl.truststore.password = <password to access the truststore>
a1.channels.channel1.kafka.consumer.security.protocol = SSL
a1.channels.channel1.kafka.consumer.ssl.truststore.location = /path/to/truststore.jks
a1.channels.channel1.kafka.consumer.ssl.truststore.password = <password to access the truststore>

注意:默認狀況下屬性是ssl.end . identify。沒有定義算法,所以沒有執行主機名驗證。爲了啓用主機名驗證,請設置如下屬性

a1.channels.channel1.kafka.producer.ssl.endpoint.identification.algorithm = HTTPS
a1.channels.channel1.kafka.consumer.ssl.endpoint.identification.algorithm = HTTPS

一旦啓用,客戶端將針對如下兩個字段之一驗證服務器的徹底限定域名(FQDN):

  1. Common Name (CN) https://tools.ietf.org/html/rfc6125#section-2.3
  2. Subject Alternative Name (SAN) https://tools.ietf.org/html/rfc5280#section-4.2.1.6

若是還須要客戶端身份驗證,那麼應該向Flume代理配置添加如下內容。每一個Flume代理必須擁有本身的客戶端證書,這些證書必須由Kafka代理單獨或經過其簽名鏈進行信任。常見的示例是經過一個根CA對每一個客戶端證書進行簽名,而這個根CA又受到Kafka代理的信任。

a1.channels.channel1.kafka.producer.ssl.keystore.location = /path/to/client.keystore.jks
a1.channels.channel1.kafka.producer.ssl.keystore.password = <password to access the keystore>
a1.channels.channel1.kafka.consumer.ssl.keystore.location = /path/to/client.keystore.jks
a1.channels.channel1.kafka.consumer.ssl.keystore.password = <password to access the keystore>

若是密鑰存儲和密鑰使用不一樣的密碼保護,則使用ssl.key.password 屬性將爲使用者和生產者密鑰存儲庫提供所需的額外機密:

a1.channels.channel1.kafka.producer.ssl.key.password = <password to access the key>
a1.channels.channel1.kafka.consumer.ssl.key.password = <password to access the key>

Kerberos and Kafka Channel: 

要將Kafka通道與Kerberos保護的Kafka集羣一塊兒使用,請設置producer/consumer.security.protocol上面提到的生產者和/或消費者的屬性。與Kafka代理一塊兒使用的Kerberos keytab和主體在JAAS文件的「KafkaClient」部分中指定。「客戶端」部分描述了須要時的Zookeeper鏈接。有關JAAS文件內容的信息,請參見Kafka文檔。能夠經過flume-env.sh中的JAVA_OPTS指定這個JAAS文件的位置,也能夠選擇指定系統範圍內的kerberos配置:

JAVA_OPTS="$JAVA_OPTS -Djava.security.krb5.conf=/path/to/krb5.conf"
JAVA_OPTS="$JAVA_OPTS -Djava.security.auth.login.config=/path/to/flume_jaas.conf"

Example secure configuration using SASL_PLAINTEXT:

a1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.channel1.kafka.bootstrap.servers = kafka-1:9093,kafka-2:9093,kafka-3:9093
a1.channels.channel1.kafka.topic = channel1
a1.channels.channel1.kafka.consumer.group.id = flume-consumer
a1.channels.channel1.kafka.producer.security.protocol = SASL_PLAINTEXT
a1.channels.channel1.kafka.producer.sasl.mechanism = GSSAPI
a1.channels.channel1.kafka.producer.sasl.kerberos.service.name = kafka
a1.channels.channel1.kafka.consumer.security.protocol = SASL_PLAINTEXT
a1.channels.channel1.kafka.consumer.sasl.mechanism = GSSAPI
a1.channels.channel1.kafka.consumer.sasl.kerberos.service.name = kafka

Example secure configuration using SASL_SSL:

a1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.channel1.kafka.bootstrap.servers = kafka-1:9093,kafka-2:9093,kafka-3:9093
a1.channels.channel1.kafka.topic = channel1
a1.channels.channel1.kafka.consumer.group.id = flume-consumer
a1.channels.channel1.kafka.producer.security.protocol = SASL_SSL
a1.channels.channel1.kafka.producer.sasl.mechanism = GSSAPI
a1.channels.channel1.kafka.producer.sasl.kerberos.service.name = kafka
a1.channels.channel1.kafka.producer.ssl.truststore.location = /path/to/truststore.jks
a1.channels.channel1.kafka.producer.ssl.truststore.password = <password to access the truststore>
a1.channels.channel1.kafka.consumer.security.protocol = SASL_SSL
a1.channels.channel1.kafka.consumer.sasl.mechanism = GSSAPI
a1.channels.channel1.kafka.consumer.sasl.kerberos.service.name = kafka
a1.channels.channel1.kafka.consumer.ssl.truststore.location = /path/to/truststore.jks
a1.channels.channel1.kafka.consumer.ssl.truststore.password = <password to access the truststore>

JAAS文件示例。有關其內容的參考,請參閱SASL配置的Kafka文檔中所需身份驗證機制(GSSAPI/PLAIN)的客戶端配置部分。因爲Kafka源也能夠鏈接到Zookeeper進行偏移遷移,所以「Client」部分也被添加到這個示例中。除非您須要偏移遷移,或者對於其餘安全組件須要此部分,不然不須要這樣作。另外,請確保Flume進程的操做系統用戶具備jaas和keytab文件上的讀權限。

Client {
  com.sun.security.auth.module.Krb5LoginModule required
  useKeyTab=true
  storeKey=true
  keyTab="/path/to/keytabs/flume.keytab"
  principal="flume/flumehost1.example.com@YOURKERBEROSREALM";
};

KafkaClient {
  com.sun.security.auth.module.Krb5LoginModule required
  useKeyTab=true
  storeKey=true
  keyTab="/path/to/keytabs/flume.keytab"
  principal="flume/flumehost1.example.com@YOURKERBEROSREALM";
};

4. File Channel

必須屬性以粗體顯示。

Property Name Default Description  
type The component type name, needs to be file.
checkpointDir ~/.flume/file-channel/checkpoint The directory where checkpoint file will be stored
useDualCheckpoints false Backup the checkpoint. If this is set to truebackupCheckpointDir must be set
backupCheckpointDir

The directory where the checkpoint is backed up to. This directory must not be the same

as the data directories or the checkpoint directory

dataDirs ~/.flume/file-channel/data

Comma separated list of directories for storing log files. Using multiple directories on

separate disks can improve file channel peformance

transactionCapacity 10000 The maximum size of transaction supported by the channel
checkpointInterval 30000 Amount of time (in millis) between checkpoints
maxFileSize 2146435071 Max size (in bytes) of a single log file
minimumRequiredSpace 524288000

Minimum Required free space (in bytes). To avoid data corruption, File Channel stops

accepting take/put requests when free space drops below this value

capacity 1000000 Maximum capacity of the channel
keep-alive 3 Amount of time (in sec) to wait for a put operation
use-log-replay-v1 false Expert: Use old replay logic
use-fast-replay false Expert: Replay without using queue
checkpointOnClose true

Controls if a checkpoint is created when the channel is closed. Creating a checkpoint on close

speeds up subsequent startup of the file channel by avoiding replay.

encryption.activeKey Key name used to encrypt new data
encryption.cipherProvider Cipher provider type, supported types: AESCTRNOPADDING
encryption.keyProvider Key provider type, supported types: JCEKSFILE
encryption.keyProvider.keyStoreFile Path to the keystore file
encrpytion.keyProvider.keyStorePasswordFile Path to the keystore password file
encryption.keyProvider.keys List of all keys (e.g. history of the activeKey setting)
encyption.keyProvider.keys.*.passwordFile Path to the optional key password file

 注意,默認狀況下,文件通道使用上面指定的檢查點路徑和用戶home中的數據目錄。所以,若是代理中有多個文件通道實例處於活動狀態,那麼只有一個實例可以鎖定目錄並致使其餘通道初始化失敗。所以,有必要爲全部配置的通道提供顯式路徑,最好是在不一樣的磁盤上。此外,因爲文件通道將在每次提交後同步到磁盤,所以須要將其與一個將事件批處理在一塊兒的接收器/源耦合,以便在檢查點和數據目錄沒法使用多個磁盤的狀況下提供良好的性能。

 Example for agent named a1:

a1.channels = c1
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /mnt/flume/checkpoint
a1.channels.c1.dataDirs = /mnt/flume/data

Encryption

下面是一些示例配置:

生成與密鑰存儲密碼分離的密鑰:

keytool -genseckey -alias key-0 -keypass keyPassword -keyalg AES \
  -keysize 128 -validity 9000 -keystore test.keystore \
  -storetype jceks -storepass keyStorePassword

生成與密鑰存儲密碼相同的密鑰:

keytool -genseckey -alias key-1 -keyalg AES -keysize 128 -validity 9000 \
  -keystore src/test/resources/test.keystore -storetype jceks \
  -storepass keyStorePassword
a1.channels.c1.encryption.activeKey = key-0
a1.channels.c1.encryption.cipherProvider = AESCTRNOPADDING
a1.channels.c1.encryption.keyProvider = key-provider-0
a1.channels.c1.encryption.keyProvider = JCEKSFILE
a1.channels.c1.encryption.keyProvider.keyStoreFile = /path/to/my.keystore
a1.channels.c1.encryption.keyProvider.keyStorePasswordFile = /path/to/my.keystore.password
a1.channels.c1.encryption.keyProvider.keys = key-0

假設您的key-0已通過期,新文件應該使用key-1加密:

a1.channels.c1.encryption.activeKey = key-1
a1.channels.c1.encryption.cipherProvider = AESCTRNOPADDING
a1.channels.c1.encryption.keyProvider = JCEKSFILE
a1.channels.c1.encryption.keyProvider.keyStoreFile = /path/to/my.keystore
a1.channels.c1.encryption.keyProvider.keyStorePasswordFile = /path/to/my.keystore.password
a1.channels.c1.encryption.keyProvider.keys = key-0 key-1

與上述場景相同,但key-0有本身的密碼:

a1.channels.c1.encryption.activeKey = key-1
a1.channels.c1.encryption.cipherProvider = AESCTRNOPADDING
a1.channels.c1.encryption.keyProvider = JCEKSFILE
a1.channels.c1.encryption.keyProvider.keyStoreFile = /path/to/my.keystore
a1.channels.c1.encryption.keyProvider.keyStorePasswordFile = /path/to/my.keystore.password
a1.channels.c1.encryption.keyProvider.keys = key-0 key-1
a1.channels.c1.encryption.keyProvider.keys.key-0.passwordFile = /path/to/key-0.password

5. Spillable Memory Channel

事件存儲在內存隊列和磁盤中。內存中的隊列做爲主要存儲,磁盤做爲溢出。磁盤存儲使用嵌入式文件通道進行管理。當內存中的隊列滿時,其餘傳入事件存儲在文件通道中。對於在正常操做期間須要內存通道的高吞吐量的流,此通道是理想的,但同時須要文件通道的更大容量,以便更好地容忍間歇性sink端中斷或漏速降低。在這種異常狀況下,吞吐量將下降到文件通道速度。在代理崩潰或從新啓動的狀況下,只有當代理聯機時才恢復存儲在磁盤上的事件。該通道目前處於試驗階段,不建議用於生產。

必須屬性以粗體顯示。請參閱文件通道獲取所需的其餘屬性。

Property Name Default Description
type The component type name, needs to be SPILLABLEMEMORY
memoryCapacity 10000

Maximum number of events stored in memory queue. To disable use of

in-memory queue, set this to zero.

overflowCapacity 100000000

Maximum number of events stored in overflow disk (i.e File channel). To

disable use of overflow, set this to zero.

overflowTimeout 3 The number of seconds to wait before enabling disk overflow when memory fills up.
byteCapacityBufferPercentage 20

Defines the percent of buffer between byteCapacity and the estimated total size

of all events in the channel, to account for data in headers. See below.

byteCapacity see description

Maximum bytes of memory allowed as a sum of all events in the memory queue.

The implementation only counts the Event body, which is the reason for providing

the byteCapacityBufferPercentageconfiguration parameter as well. Defaults to

a computed value equal to 80% of the maximum memory available to the JVM

(i.e. 80% of the -Xmx value passed on the command line). Note that if you have multiple

memory channels on a single JVM, and they happen to hold the same physical events

(i.e. if you are using a replicating channel selector from a single source) then those

event sizes may be double-counted for channel byteCapacity purposes. Setting this

value to 0 will cause this value to fall back to a hard internal limit of about 200 GB.

avgEventSize 500 Estimated average size of events, in bytes, going into the channel
<file channel properties> see file channel

Any file channel property with the exception of ‘keep-alive’ and ‘capacity’ can be used.

The keep-alive of file channel is managed by Spillable Memory Channel. Use ‘overflowCapacity’ to set the File channel’s capacity.

若是達到memoryCapacity或byteCapacity限制,則認爲內存中的隊列已滿。

Example for agent named a1: 

a1.channels = c1
a1.channels.c1.type = SPILLABLEMEMORY
a1.channels.c1.memoryCapacity = 10000
a1.channels.c1.overflowCapacity = 1000000
a1.channels.c1.byteCapacity = 800000
a1.channels.c1.checkpointDir = /mnt/flume/checkpoint
a1.channels.c1.dataDirs = /mnt/flume/data

要禁用內存中的隊列和功能,如文件通道:

a1.channels = c1
a1.channels.c1.type = SPILLABLEMEMORY
a1.channels.c1.memoryCapacity = 0
a1.channels.c1.overflowCapacity = 1000000
a1.channels.c1.checkpointDir = /mnt/flume/checkpoint
a1.channels.c1.dataDirs = /mnt/flume/data

要禁用溢出磁盤的使用和功能純粹做爲內存通道:

a1.channels = c1
a1.channels.c1.type = SPILLABLEMEMORY
a1.channels.c1.memoryCapacity = 100000
a1.channels.c1.overflowCapacity = 0

6. Pseudo Transaction Channel

警告:僞事務通道僅用於單元測試,不用於生產環境。

必須屬性以粗體顯示。

Property Name Default Description
type The component type name, needs to be org.apache.flume.channel.PseudoTxnMemoryChannel
capacity 50 The max number of events stored in the channel
keep-alive 3 Timeout in seconds for adding or removing an event

 

7. Custom Channel

自定義通道是您本身的通道接口實現。啓動Flume代理時,自定義通道的類及其依賴項必須包含在代理的類路徑中。自定義通道的類型是它的FQCN。必須屬性以粗體顯示。

 

Property Name Default Description
type The component type name, needs to be a FQCN

Example for agent named a1:

a1.channels = c1
a1.channels.c1.type = org.example.MyChannel

Flume Channel Selectors

若是沒有指定類型,則默認爲「replication」。

Replicating Channel Selector (default)

必須屬性以粗體顯示。

 

Property Name Default Description
selector.type replicating The component type name, needs to be replicating
selector.optional Set of channels to be marked as optional

Example for agent named a1 and it’s source called r1:

a1.sources = r1
a1.channels = c1 c2 c3
a1.sources.r1.selector.type = replicating
a1.sources.r1.channels = c1 c2 c3
a1.sources.r1.selector.optional = c3

在上面的配置中,c3是一個可選通道。寫入c3失敗將被簡單地忽略。因爲c1和c2不是可選的,所以寫入這些通道失敗將致使事務失敗。

Multiplexing Channel Selector

必須屬性以粗體顯示。

 

Property Name Default Description
selector.type replicating The component type name, needs to be multiplexing
selector.header flume.selector.header  
selector.default  
selector.mapping.*  

Example for agent named a1 and it’s source called r1:

a1.sources = r1
a1.channels = c1 c2 c3 c4
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = state
a1.sources.r1.selector.mapping.CZ = c1
a1.sources.r1.selector.mapping.US = c2 c3
a1.sources.r1.selector.default = c4

Custom Channel Selector

自定義通道選擇器是您本身的ChannelSelector接口實現。啓動Flume代理時,必須將自定義通道選擇器的類及其依賴項包含在代理的類路徑中。自定義通道選擇器的類型是其FQCN。

 

Property Name Default Description
selector.type The component type name, needs to be your FQCN

Example for agent named a1 and its source called r1: 

a1.sources = r1
a1.channels = c1
a1.sources.r1.selector.type = org.example.MyChannelSelector

 

翻譯自官網flume1.8用戶指南,原文地址:Flume 1.8.0 User Guide

篇幅限制,分爲如下5篇:

【翻譯】Flume 1.8.0 User Guide(用戶指南)

【翻譯】Flume 1.8.0 User Guide(用戶指南) source

【翻譯】Flume 1.8.0 User Guide(用戶指南) Sink

【翻譯】Flume 1.8.0 User Guide(用戶指南) Channel

【翻譯】Flume 1.8.0 User Guide(用戶指南) Processors

相關文章
相關標籤/搜索