翻譯自官網flume1.8用戶指南,原文地址:Flume 1.8.0 User Guidehtml
篇幅限制,分爲如下5篇:java
【翻譯】Flume 1.8.0 User Guide(用戶指南)算法
【翻譯】Flume 1.8.0 User Guide(用戶指南) source數據庫
【翻譯】Flume 1.8.0 User Guide(用戶指南) Sinkapache
【翻譯】Flume 1.8.0 User Guide(用戶指南) Channelbootstrap
【翻譯】Flume 1.8.0 User Guide(用戶指南) Processors數組
Channel 是事件在agent 上上演的存儲庫。Source添加事件,Sink刪除事件。安全
事件存儲在內存隊列中,具備可配置的最大大小。對於須要更高吞吐量並準備在agent失敗時丟失階段數據的流來講,它是理想的。必須屬性以粗體顯示。服務器
Property Name | Default | Description |
---|---|---|
type | – | The component type name, needs to be memory |
capacity | 100 | The maximum number of events stored in the channel |
transactionCapacity | 100 | The maximum number of events the channel will take from a source or give to a sink per transaction |
keep-alive | 3 | Timeout in seconds for adding or removing an event |
byteCapacityBufferPercentage | 20 | Defines the percent of buffer between byteCapacity and the estimated total sizesession of all events in the channel, to account for data in headers. See below. |
byteCapacity | see description | Maximum total bytes of memory allowed as a sum of all events in this channel. The implementation only counts the Event body, which is the reason for providing the byteCapacityBufferPercentage configuration parameter as well. Defaults to a computed value equal to 80% of the maximum memory available to the JVM (i.e. 80% of the -Xmx value passed on the command line). Note that if you have multiple memory channels on a single JVM, and they happen to hold the same physical events (i.e. if you are using a replicating channel selector from a single source) then those event sizes may be double-counted for channel byteCapacity purposes. Setting this value to 0 will cause this value to fall back to a hard internal limit of about 200 GB. |
Example for agent named a1:
a1.channels = c1 a1.channels.c1.type = memory a1.channels.c1.capacity = 10000 a1.channels.c1.transactionCapacity = 10000 a1.channels.c1.byteCapacityBufferPercentage = 20 a1.channels.c1.byteCapacity = 800000
事件存儲在由數據庫支持的持久存儲中。JDBC通道目前支持嵌入式Derby。這是一個持久的通道,對於可恢復性很是重要的流來講很是理想。必須屬性以粗體顯示。
Property Name | Default | Description |
---|---|---|
type | – | The component type name, needs to be jdbc |
db.type | DERBY | Database vendor, needs to be DERBY. |
driver.class | org.apache.derby.jdbc.EmbeddedDriver | Class for vendor’s JDBC driver |
driver.url | (constructed from other properties) | JDBC connection URL |
db.username | 「sa」 | User id for db connection |
db.password | – | password for db connection |
connection.properties.file | – | JDBC Connection property file path |
create.schema | true | If true, then creates db schema if not there |
create.index | true | Create indexes to speed up lookups |
create.foreignkey | true | |
transaction.isolation | 「READ_COMMITTED」 | Isolation level for db session READ_UNCOMMITTED, READ_COMMITTED, SERIALIZABLE, REPEATABLE_READ |
maximum.connections | 10 | Max connections allowed to db |
maximum.capacity | 0 (unlimited) | Max number of events in the channel |
sysprop.* | DB Vendor specific properties | |
sysprop.user.home | Home path to store embedded Derby database |
Example for agent named a1:
a1.channels = c1
a1.channels.c1.type = jdbc
事件存儲在Kafka集羣中(必須單獨安裝)。Kafka提供高可用性和複製,所以在代理或Kafka代理崩潰時,事件能夠當即提供給其餘接收器
Kafka通道可用於多種場景:
1. With Flume source and sink-它提供了一個可靠的和高可用的渠道的事件
2. With Flume source and interceptor but no sink——它容許將Flume事件寫入Kafka主題,供其餘應用程序使用
3. With Flume sink, but no source——這是一種低延遲、容錯的方式,能夠將事件從Kafka發送到Flume接收器,如HDFS、HBase或Solr
這個版本的Flume須要Kafka 0.9或更高版本,由於它依賴於該版本附帶的Kafka客戶機。與之前的flume版本相比,通道的配置發生了變化。
配置參數組織以下:
1. 與通道相關的配置值一般應用於通道配置級別,例如:a1.channel.k1.type=
2. 與Kafka或通道如何操做相關的配置值以「Kafka」做爲前綴。例如:a1.channel .k1.kafka。主題和a1.channels.k1.kafka.bootstrap.servers。這與hdfs接收器的操做方式沒有什麼不一樣
3. 特定於生產者/消費者的屬性以kafka.producer或kafka.consumer做爲前綴
4. 在可能的狀況下,使用Kafka參數名,例如:bootstrap.servers 和 acks
這個版本的flume向後兼容之前的版本,可是下表中指出了一些不同意使用的屬性,當它們出如今配置文件中時,會在啓動時記錄一條警告消息。
必須屬性以粗體顯示。
Property Name | Default | Description |
---|---|---|
type | – | The component type name, needs to be org.apache.flume.channel.kafka.KafkaChannel |
kafka.bootstrap.servers | – | List of brokers in the Kafka cluster used by the channel This can be a partial list of brokers, but we recommend at least two for HA. The format is comma separated list of hostname:port |
kafka.topic | flume-channel | Kafka topic which the channel will use |
kafka.consumer.group.id | flume | Consumer group ID the channel uses to register with Kafka. Multiple channels must use the same topic and group to ensure that when one agent fails another can get the data Note that having non-channel consumers with the same ID can lead to data loss. |
parseAsFlumeEvent | true | Expecting Avro datums with FlumeEvent schema in the channel. This should be true if Flume source is writing to the channel and false if other producers are writing into the topic that the channel is using. Flume source messages to Kafka can be parsed outside of Flume by using org.apache.flume.source.avro.AvroFlumeEvent provided by the flume-ng-sdk artifact |
migrateZookeeperOffsets | true | When no Kafka stored offset is found, look up the offsets in Zookeeper and commit them to Kafka. This should be true to support seamless Kafka client migration from older versions of Flume. Once migrated this can be set to false, though that should generally not be required. If no Zookeeper offset is found the kafka.consumer.auto.offset.reset configuration defines how offsets are handled. |
pollTimeout | 500 | The amount of time(in milliseconds) to wait in the 「poll()」 call of the consumer.https://kafka.apache.org/090/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#poll(long) |
defaultPartitionId | – | Specifies a Kafka partition ID (integer) for all events in this channel to be sent to, unless overriden by partitionIdHeader. By default, if this property is not set, events will be distributed by the Kafka Producer’s partitioner - including by key if specified (or by a partitioner specified by kafka.partitioner.class). |
partitionIdHeader | – | When set, the producer will take the value of the field named using the value of this property from the event header and send the message to the specified partition of the topic. If the value represents an invalid partition the event will not be accepted into the channel. If the header value is present then this setting overrides defaultPartitionId. |
kafka.consumer.auto.offset.reset | latest | What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server (e.g. because that data has been deleted): earliest: automatically reset the offset to the earliest offset latest: automatically reset the offset to the latest offset none: throw exception to the consumer if no previous offset is found for the consumer’s group anything else: throw exception to the consumer. |
kafka.producer.security.protocol | PLAINTEXT | Set to SASL_PLAINTEXT, SASL_SSL or SSL if writing to Kafka using some level of security. See below for additional info on secure setup. |
kafka.consumer.security.protocol | PLAINTEXT | Same as kafka.producer.security.protocol but for reading/consuming from Kafka. |
more producer/consumer security props | If using SASL_PLAINTEXT, SASL_SSL or SSL refer to Kafka security for additional properties that need to be set on producer/consumer. |
棄用的熟悉:
Property Name | Default | Description |
---|---|---|
brokerList | – | List of brokers in the Kafka cluster used by the channel This can be a partial list of brokers, but we recommend at least two for HA. The format is comma separated list of hostname:port |
topic | flume-channel | Use kafka.topic |
groupId | flume | Use kafka.consumer.group.id |
readSmallestOffset | false | Use kafka.consumer.auto.offset.reset |
注意,因爲通道的負載平衡方式,代理首次啓動時可能會出現重複事件
Example for agent named a1:
a1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel a1.channels.channel1.kafka.bootstrap.servers = kafka-1:9092,kafka-2:9092,kafka-3:9092 a1.channels.channel1.kafka.topic = channel1 a1.channels.channel1.kafka.consumer.group.id = flume-consumer
Flume和Kafka之間的通訊通道支持安全認證和數據加密。對於安全身份驗證,可使用Kafka版本0.9.0中的SASL/GSSAPI (Kerberos V5)或SSL(儘管參數名爲SSL,但實際的協議是TLS實現)。
到目前爲止,數據加密僅由SSL/TLS提供。
設置kafka.producer | consumer.security。符合下列任何一項價值的協議意味着:
警告:啓用SSL時會致使性能降低,其程度取決於CPU類型和JVM實現。參考文獻:Kafka安全概述和用於跟蹤這個問題的jira: Kafka -2561
TLS and Kafka Channel:
請閱讀配置Kafka客戶機SSL中描述的步驟,以瞭解用於微調的其餘配置設置,例如如下任何一種:安全提供程序、密碼套件、啓用的協議、信任存儲或密鑰存儲類型。
使用服務器端身份驗證和數據加密的示例配置。
a1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel a1.channels.channel1.kafka.bootstrap.servers = kafka-1:9093,kafka-2:9093,kafka-3:9093 a1.channels.channel1.kafka.topic = channel1 a1.channels.channel1.kafka.consumer.group.id = flume-consumer a1.channels.channel1.kafka.producer.security.protocol = SSL a1.channels.channel1.kafka.producer.ssl.truststore.location = /path/to/truststore.jks a1.channels.channel1.kafka.producer.ssl.truststore.password = <password to access the truststore> a1.channels.channel1.kafka.consumer.security.protocol = SSL a1.channels.channel1.kafka.consumer.ssl.truststore.location = /path/to/truststore.jks a1.channels.channel1.kafka.consumer.ssl.truststore.password = <password to access the truststore>
注意:默認狀況下屬性是ssl.end . identify。沒有定義算法,所以沒有執行主機名驗證。爲了啓用主機名驗證,請設置如下屬性
a1.channels.channel1.kafka.producer.ssl.endpoint.identification.algorithm = HTTPS
a1.channels.channel1.kafka.consumer.ssl.endpoint.identification.algorithm = HTTPS
一旦啓用,客戶端將針對如下兩個字段之一驗證服務器的徹底限定域名(FQDN):
若是還須要客戶端身份驗證,那麼應該向Flume代理配置添加如下內容。每一個Flume代理必須擁有本身的客戶端證書,這些證書必須由Kafka代理單獨或經過其簽名鏈進行信任。常見的示例是經過一個根CA對每一個客戶端證書進行簽名,而這個根CA又受到Kafka代理的信任。
a1.channels.channel1.kafka.producer.ssl.keystore.location = /path/to/client.keystore.jks a1.channels.channel1.kafka.producer.ssl.keystore.password = <password to access the keystore> a1.channels.channel1.kafka.consumer.ssl.keystore.location = /path/to/client.keystore.jks a1.channels.channel1.kafka.consumer.ssl.keystore.password = <password to access the keystore>
若是密鑰存儲和密鑰使用不一樣的密碼保護,則使用ssl.key.password 屬性將爲使用者和生產者密鑰存儲庫提供所需的額外機密:
a1.channels.channel1.kafka.producer.ssl.key.password = <password to access the key>
a1.channels.channel1.kafka.consumer.ssl.key.password = <password to access the key>
Kerberos and Kafka Channel:
要將Kafka通道與Kerberos保護的Kafka集羣一塊兒使用,請設置producer/consumer.security.protocol上面提到的生產者和/或消費者的屬性。與Kafka代理一塊兒使用的Kerberos keytab和主體在JAAS文件的「KafkaClient」部分中指定。「客戶端」部分描述了須要時的Zookeeper鏈接。有關JAAS文件內容的信息,請參見Kafka文檔。能夠經過flume-env.sh中的JAVA_OPTS指定這個JAAS文件的位置,也能夠選擇指定系統範圍內的kerberos配置:
JAVA_OPTS="$JAVA_OPTS -Djava.security.krb5.conf=/path/to/krb5.conf" JAVA_OPTS="$JAVA_OPTS -Djava.security.auth.login.config=/path/to/flume_jaas.conf"
Example secure configuration using SASL_PLAINTEXT:
a1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel a1.channels.channel1.kafka.bootstrap.servers = kafka-1:9093,kafka-2:9093,kafka-3:9093 a1.channels.channel1.kafka.topic = channel1 a1.channels.channel1.kafka.consumer.group.id = flume-consumer a1.channels.channel1.kafka.producer.security.protocol = SASL_PLAINTEXT a1.channels.channel1.kafka.producer.sasl.mechanism = GSSAPI a1.channels.channel1.kafka.producer.sasl.kerberos.service.name = kafka a1.channels.channel1.kafka.consumer.security.protocol = SASL_PLAINTEXT a1.channels.channel1.kafka.consumer.sasl.mechanism = GSSAPI a1.channels.channel1.kafka.consumer.sasl.kerberos.service.name = kafka
Example secure configuration using SASL_SSL:
a1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel a1.channels.channel1.kafka.bootstrap.servers = kafka-1:9093,kafka-2:9093,kafka-3:9093 a1.channels.channel1.kafka.topic = channel1 a1.channels.channel1.kafka.consumer.group.id = flume-consumer a1.channels.channel1.kafka.producer.security.protocol = SASL_SSL a1.channels.channel1.kafka.producer.sasl.mechanism = GSSAPI a1.channels.channel1.kafka.producer.sasl.kerberos.service.name = kafka a1.channels.channel1.kafka.producer.ssl.truststore.location = /path/to/truststore.jks a1.channels.channel1.kafka.producer.ssl.truststore.password = <password to access the truststore> a1.channels.channel1.kafka.consumer.security.protocol = SASL_SSL a1.channels.channel1.kafka.consumer.sasl.mechanism = GSSAPI a1.channels.channel1.kafka.consumer.sasl.kerberos.service.name = kafka a1.channels.channel1.kafka.consumer.ssl.truststore.location = /path/to/truststore.jks a1.channels.channel1.kafka.consumer.ssl.truststore.password = <password to access the truststore>
JAAS文件示例。有關其內容的參考,請參閱SASL配置的Kafka文檔中所需身份驗證機制(GSSAPI/PLAIN)的客戶端配置部分。因爲Kafka源也能夠鏈接到Zookeeper進行偏移遷移,所以「Client」部分也被添加到這個示例中。除非您須要偏移遷移,或者對於其餘安全組件須要此部分,不然不須要這樣作。另外,請確保Flume進程的操做系統用戶具備jaas和keytab文件上的讀權限。
Client { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab="/path/to/keytabs/flume.keytab" principal="flume/flumehost1.example.com@YOURKERBEROSREALM"; }; KafkaClient { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab="/path/to/keytabs/flume.keytab" principal="flume/flumehost1.example.com@YOURKERBEROSREALM"; };
必須屬性以粗體顯示。
Property Name Default | Description | |
---|---|---|
type | – | The component type name, needs to be file. |
checkpointDir | ~/.flume/file-channel/checkpoint | The directory where checkpoint file will be stored |
useDualCheckpoints | false | Backup the checkpoint. If this is set to true, backupCheckpointDir must be set |
backupCheckpointDir | – | The directory where the checkpoint is backed up to. This directory must not be the same as the data directories or the checkpoint directory |
dataDirs | ~/.flume/file-channel/data | Comma separated list of directories for storing log files. Using multiple directories on separate disks can improve file channel peformance |
transactionCapacity | 10000 | The maximum size of transaction supported by the channel |
checkpointInterval | 30000 | Amount of time (in millis) between checkpoints |
maxFileSize | 2146435071 | Max size (in bytes) of a single log file |
minimumRequiredSpace | 524288000 | Minimum Required free space (in bytes). To avoid data corruption, File Channel stops accepting take/put requests when free space drops below this value |
capacity | 1000000 | Maximum capacity of the channel |
keep-alive | 3 | Amount of time (in sec) to wait for a put operation |
use-log-replay-v1 | false | Expert: Use old replay logic |
use-fast-replay | false | Expert: Replay without using queue |
checkpointOnClose | true | Controls if a checkpoint is created when the channel is closed. Creating a checkpoint on close speeds up subsequent startup of the file channel by avoiding replay. |
encryption.activeKey | – | Key name used to encrypt new data |
encryption.cipherProvider | – | Cipher provider type, supported types: AESCTRNOPADDING |
encryption.keyProvider | – | Key provider type, supported types: JCEKSFILE |
encryption.keyProvider.keyStoreFile | – | Path to the keystore file |
encrpytion.keyProvider.keyStorePasswordFile | – | Path to the keystore password file |
encryption.keyProvider.keys | – | List of all keys (e.g. history of the activeKey setting) |
encyption.keyProvider.keys.*.passwordFile | – | Path to the optional key password file |
注意,默認狀況下,文件通道使用上面指定的檢查點路徑和用戶home中的數據目錄。所以,若是代理中有多個文件通道實例處於活動狀態,那麼只有一個實例可以鎖定目錄並致使其餘通道初始化失敗。所以,有必要爲全部配置的通道提供顯式路徑,最好是在不一樣的磁盤上。此外,因爲文件通道將在每次提交後同步到磁盤,所以須要將其與一個將事件批處理在一塊兒的接收器/源耦合,以便在檢查點和數據目錄沒法使用多個磁盤的狀況下提供良好的性能。
Example for agent named a1:
a1.channels = c1 a1.channels.c1.type = file a1.channels.c1.checkpointDir = /mnt/flume/checkpoint a1.channels.c1.dataDirs = /mnt/flume/data
Encryption
下面是一些示例配置:
生成與密鑰存儲密碼分離的密鑰:
keytool -genseckey -alias key-0 -keypass keyPassword -keyalg AES \ -keysize 128 -validity 9000 -keystore test.keystore \ -storetype jceks -storepass keyStorePassword
生成與密鑰存儲密碼相同的密鑰:
keytool -genseckey -alias key-1 -keyalg AES -keysize 128 -validity 9000 \ -keystore src/test/resources/test.keystore -storetype jceks \ -storepass keyStorePassword
a1.channels.c1.encryption.activeKey = key-0 a1.channels.c1.encryption.cipherProvider = AESCTRNOPADDING a1.channels.c1.encryption.keyProvider = key-provider-0 a1.channels.c1.encryption.keyProvider = JCEKSFILE a1.channels.c1.encryption.keyProvider.keyStoreFile = /path/to/my.keystore a1.channels.c1.encryption.keyProvider.keyStorePasswordFile = /path/to/my.keystore.password a1.channels.c1.encryption.keyProvider.keys = key-0
假設您的key-0已通過期,新文件應該使用key-1加密:
a1.channels.c1.encryption.activeKey = key-1 a1.channels.c1.encryption.cipherProvider = AESCTRNOPADDING a1.channels.c1.encryption.keyProvider = JCEKSFILE a1.channels.c1.encryption.keyProvider.keyStoreFile = /path/to/my.keystore a1.channels.c1.encryption.keyProvider.keyStorePasswordFile = /path/to/my.keystore.password a1.channels.c1.encryption.keyProvider.keys = key-0 key-1
與上述場景相同,但key-0有本身的密碼:
a1.channels.c1.encryption.activeKey = key-1 a1.channels.c1.encryption.cipherProvider = AESCTRNOPADDING a1.channels.c1.encryption.keyProvider = JCEKSFILE a1.channels.c1.encryption.keyProvider.keyStoreFile = /path/to/my.keystore a1.channels.c1.encryption.keyProvider.keyStorePasswordFile = /path/to/my.keystore.password a1.channels.c1.encryption.keyProvider.keys = key-0 key-1 a1.channels.c1.encryption.keyProvider.keys.key-0.passwordFile = /path/to/key-0.password
事件存儲在內存隊列和磁盤中。內存中的隊列做爲主要存儲,磁盤做爲溢出。磁盤存儲使用嵌入式文件通道進行管理。當內存中的隊列滿時,其餘傳入事件存儲在文件通道中。對於在正常操做期間須要內存通道的高吞吐量的流,此通道是理想的,但同時須要文件通道的更大容量,以便更好地容忍間歇性sink端中斷或漏速降低。在這種異常狀況下,吞吐量將下降到文件通道速度。在代理崩潰或從新啓動的狀況下,只有當代理聯機時才恢復存儲在磁盤上的事件。該通道目前處於試驗階段,不建議用於生產。
必須屬性以粗體顯示。請參閱文件通道獲取所需的其餘屬性。
Property Name | Default | Description |
---|---|---|
type | – | The component type name, needs to be SPILLABLEMEMORY |
memoryCapacity | 10000 | Maximum number of events stored in memory queue. To disable use of in-memory queue, set this to zero. |
overflowCapacity | 100000000 | Maximum number of events stored in overflow disk (i.e File channel). To disable use of overflow, set this to zero. |
overflowTimeout | 3 | The number of seconds to wait before enabling disk overflow when memory fills up. |
byteCapacityBufferPercentage | 20 | Defines the percent of buffer between byteCapacity and the estimated total size of all events in the channel, to account for data in headers. See below. |
byteCapacity | see description | Maximum bytes of memory allowed as a sum of all events in the memory queue. The implementation only counts the Event body, which is the reason for providing the byteCapacityBufferPercentageconfiguration parameter as well. Defaults to a computed value equal to 80% of the maximum memory available to the JVM (i.e. 80% of the -Xmx value passed on the command line). Note that if you have multiple memory channels on a single JVM, and they happen to hold the same physical events (i.e. if you are using a replicating channel selector from a single source) then those event sizes may be double-counted for channel byteCapacity purposes. Setting this value to 0 will cause this value to fall back to a hard internal limit of about 200 GB. |
avgEventSize | 500 | Estimated average size of events, in bytes, going into the channel |
<file channel properties> | see file channel | Any file channel property with the exception of ‘keep-alive’ and ‘capacity’ can be used. The keep-alive of file channel is managed by Spillable Memory Channel. Use ‘overflowCapacity’ to set the File channel’s capacity. |
若是達到memoryCapacity或byteCapacity限制,則認爲內存中的隊列已滿。
Example for agent named a1:
a1.channels = c1 a1.channels.c1.type = SPILLABLEMEMORY a1.channels.c1.memoryCapacity = 10000 a1.channels.c1.overflowCapacity = 1000000 a1.channels.c1.byteCapacity = 800000 a1.channels.c1.checkpointDir = /mnt/flume/checkpoint a1.channels.c1.dataDirs = /mnt/flume/data
要禁用內存中的隊列和功能,如文件通道:
a1.channels = c1 a1.channels.c1.type = SPILLABLEMEMORY a1.channels.c1.memoryCapacity = 0 a1.channels.c1.overflowCapacity = 1000000 a1.channels.c1.checkpointDir = /mnt/flume/checkpoint a1.channels.c1.dataDirs = /mnt/flume/data
要禁用溢出磁盤的使用和功能純粹做爲內存通道:
a1.channels = c1 a1.channels.c1.type = SPILLABLEMEMORY a1.channels.c1.memoryCapacity = 100000 a1.channels.c1.overflowCapacity = 0
警告:僞事務通道僅用於單元測試,不用於生產環境。
必須屬性以粗體顯示。
Property Name | Default | Description |
---|---|---|
type | – | The component type name, needs to be org.apache.flume.channel.PseudoTxnMemoryChannel |
capacity | 50 | The max number of events stored in the channel |
keep-alive | 3 | Timeout in seconds for adding or removing an event |
7. Custom Channel
自定義通道是您本身的通道接口實現。啓動Flume代理時,自定義通道的類及其依賴項必須包含在代理的類路徑中。自定義通道的類型是它的FQCN。必須屬性以粗體顯示。
Property Name | Default | Description |
---|---|---|
type | – | The component type name, needs to be a FQCN |
Example for agent named a1:
a1.channels = c1
a1.channels.c1.type = org.example.MyChannel
若是沒有指定類型,則默認爲「replication」。
必須屬性以粗體顯示。
Property Name | Default | Description |
---|---|---|
selector.type | replicating | The component type name, needs to be replicating |
selector.optional | – | Set of channels to be marked as optional |
Example for agent named a1 and it’s source called r1:
a1.sources = r1 a1.channels = c1 c2 c3 a1.sources.r1.selector.type = replicating a1.sources.r1.channels = c1 c2 c3 a1.sources.r1.selector.optional = c3
在上面的配置中,c3是一個可選通道。寫入c3失敗將被簡單地忽略。因爲c1和c2不是可選的,所以寫入這些通道失敗將致使事務失敗。
必須屬性以粗體顯示。
Property Name | Default | Description |
---|---|---|
selector.type | replicating | The component type name, needs to be multiplexing |
selector.header | flume.selector.header | |
selector.default | – | |
selector.mapping.* | – |
Example for agent named a1 and it’s source called r1:
a1.sources = r1 a1.channels = c1 c2 c3 c4 a1.sources.r1.selector.type = multiplexing a1.sources.r1.selector.header = state a1.sources.r1.selector.mapping.CZ = c1 a1.sources.r1.selector.mapping.US = c2 c3 a1.sources.r1.selector.default = c4
自定義通道選擇器是您本身的ChannelSelector接口實現。啓動Flume代理時,必須將自定義通道選擇器的類及其依賴項包含在代理的類路徑中。自定義通道選擇器的類型是其FQCN。
Property Name | Default | Description |
---|---|---|
selector.type | – | The component type name, needs to be your FQCN |
Example for agent named a1 and its source called r1:
a1.sources = r1 a1.channels = c1 a1.sources.r1.selector.type = org.example.MyChannelSelector
翻譯自官網flume1.8用戶指南,原文地址:Flume 1.8.0 User Guide
篇幅限制,分爲如下5篇:
【翻譯】Flume 1.8.0 User Guide(用戶指南)
【翻譯】Flume 1.8.0 User Guide(用戶指南) source
【翻譯】Flume 1.8.0 User Guide(用戶指南) Sink