【翻譯】Flume 1.8.0 User Guide(用戶指南) source

翻譯自官網flume1.8用戶指南,原文地址:Flume 1.8.0 User Guidehtml


【翻譯】Flume 1.8.0 User Guide(用戶指南)node

【翻譯】Flume 1.8.0 User Guide(用戶指南) sourceshell

【翻譯】Flume 1.8.0 User Guide(用戶指南) Sinkexpress

【翻譯】Flume 1.8.0 User Guide(用戶指南) Channelapache

【翻譯】Flume 1.8.0 User Guide(用戶指南) Processorsjson

flume 的sources

1 Avro source

監聽Avro端口並接收來自外部Avro客戶端流的事件。當與另外一個(上一跳)Flume agent上的內置Avro接收器配對時,它能夠建立分層的集合拓撲。必須屬性以粗體顯示。bootstrap


Property Name Default Description
type The component type name, needs to be avro
bind hostname or IP address to listen on
port Port # to bind to
threads Maximum number of worker threads to spawn
interceptors Space-separated list of interceptors
compression-type none This can be 「none」 or 「deflate」. The compression-type must match the compression-type of matching AvroSource
ssl false Set this to true to enable SSL encryption. You must also specify a 「keystore」 and a 「keystore-password」.
keystore This is the path to a Java keystore file. Required for SSL.
keystore-password The password for the Java keystore. Required for SSL.
keystore-type JKS The type of the Java keystore. This can be 「JKS」 or 「PKCS12」.
exclude-protocols SSLv3 Space-separated list of SSL/TLS protocols to exclude. SSLv3 will always be excluded in addition to the protocols specified.
ipFilter false Set this to true to enable ipFiltering for netty
ipFilterRules Define N netty ipFilter pattern rules with this config.

agent a1 示例:數組

a1.sources = r1
a1.channels = c1 a1.sources.r1.type = avro a1.sources.r1.channels = c1 a1.sources.r1.bind = a1.sources.r1.port = 4141


ipFilterRules定義了N個由逗號分隔的netty ipFilters,模式規則必須採用這種格式。

< '容許'或'拒絕>:< ' ip '或'名稱'爲計算機名>:<模式>或容許/拒絕:ip/名稱:模式

例如:ipFilterRules =容許:ip: 127。*,容許:名稱:localhost,否定:ip: *


這將容許本地主機上的客戶端拒絕來自其餘ip的客戶端" Allow:name:localhost,deny:ip: ",這將拒絕本地主機上的客戶端容許來自任何其餘ip的客戶端" deny:name:localhost, Allow:ip: "

2 Thrift Source

監聽Thrift端口並接收來自外部Thrift客戶機流的事件。當與另外一個(上一跳)Flume agent上的內置ThriftSink一塊兒使用時,它能夠建立分層的集合拓撲。經過啓用kerberos身份驗證,能夠將Thrift source配置爲以安全模式啓動。agent-principal和agent-keytab是Thrift source用於對kerberos KDC進行身份驗證的屬性。必須屬性以粗體顯示。

Property Name Default Description
type The component type name, needs to be thrift
bind hostname or IP address to listen on
port Port # to bind to
threads Maximum number of worker threads to spawn
interceptors Space separated list of interceptors
ssl false Set this to true to enable SSL encryption. You must also specify a 「keystore」 and a 「keystore-password」.
keystore This is the path to a Java keystore file. Required for SSL.
keystore-password The password for the Java keystore. Required for SSL.
keystore-type JKS The type of the Java keystore. This can be 「JKS」 or 「PKCS12」.
exclude-protocols SSLv3 Space-separated list of SSL/TLS protocols to exclude. SSLv3 will always be excluded in addition to the protocols specified.
kerberos false

Set to true to enable kerberos authentication. In kerberos mode, agent-principal and agent-keytab are required for successful authentication.

The Thrift source in secure mode, will accept connections only from Thrift clients that have kerberos enabled and are successfully authenticated to the kerberos KDC.

agent-principal The kerberos principal used by the Thrift Source to authenticate to the kerberos KDC.
agent-keytab —- The keytab location used by the Thrift Source in combination with the agent-principal to authenticate to the kerberos KDC.

agent a1 的示例:

a1.sources = r1
a1.channels = c1 a1.sources.r1.type = thrift a1.sources.r1.channels = c1 a1.sources.r1.bind = a1.sources.r1.port = 4141

3 Exec Source

Exec source在啓動時運行給定的Unix命令,並指望該進程在標準輸出上持續生成數據(stderr將被丟棄,除非屬性logStdErr設置爲true)。若是進程因任何緣由退出,源也將退出,而且不會生成更多數據。這意味着cat [named pipe]或tail - f [file]等配置將產生所需的結果,而as date可能不會——前兩個命令生成數據流,然後者生成單個事件並退出。必須屬性以粗體顯示

Property Name Default Description
type The component type name, needs to be exec
command The command to execute
shell A shell invocation used to run the command. e.g. /bin/sh -c. Required only for commands relying on shell features like wildcards, back ticks, pipes etc.
restartThrottle 10000 Amount of time (in millis) to wait before attempting a restart
restart false Whether the executed cmd should be restarted if it dies
logStdErr false Whether the command’s stderr should be logged
batchSize 20 The max number of lines to read and send to the channel at a time
batchTimeout 3000 Amount of time (in milliseconds) to wait, if the buffer size was not reached, before data is pushed downstream
selector.type replicating replicating or multiplexing
selector.*   Depends on the selector.type value
interceptors Space-separated list of interceptors
例如,最多見的請求特性之一是相似於tail -F [file]的用例,其中應用程序將寫入磁盤上的日誌文件,並跟蹤文件,將每一行做爲事件

agent a1的示例:

a1.sources = r1
a1.channels = c1 a1.sources.r1.type = exec a1.sources.r1.command = tail -F /var/log/secure a1.sources.r1.channels = c1

「shell」配置用於經過命令shell(如Bash或Powershell)調用「command」。「command」做爲參數傳遞給「shell」以供執行。這容許「command」使用shell的特性,如通配符、反勾號、管道、循環、條件等。在沒有「shell」配置的狀況下,將直接調用「命令」。「shell」的經常使用值:「/bin/sh -c」、「/bin/ksh -c」、「cmd /c」、「powershell -Command」等。

a1.sources.tailsource-1.type = exec
a1.sources.tailsource-1.shell = /bin/bash -c a1.sources.tailsource-1.command = for i in /path/*.txt; do cat $i; done

4 JMS Source

JMS源從JMS目的地(如隊列或主題)讀取消息。做爲JMS應用程序,它應該能夠與任何JMS提供程序一塊兒工做,但只經過ActiveMQ進行了測試。JMS源提供可配置的批大小、消息選擇器、用戶/密碼和消息到flume事件轉換器。請注意,供應商提供的JMS jar應該包含在Flume類路徑中,plugins.d目錄(首選),命令行上的-classpath,或者經過flume-env.sh中的FLUME_CLASSPATH變量。必須屬性以粗體顯示。

Property Name Default Description
type The component type name, needs to be jms
initialContextFactory Inital Context Factory, e.g: org.apache.activemq.jndi.ActiveMQInitialContextFactory
connectionFactory The JNDI name the connection factory should appear as
providerURL The JMS provider URL
destinationName Destination name
destinationType Destination type (queue or topic)
messageSelector Message selector to use when creating the consumer
userName Username for the destination/provider
passwordFile File containing the password for the destination/provider
batchSize 100 Number of messages to consume in one batch
converter.type DEFAULT Class to use to convert messages to flume events. See below.
converter.* Converter properties.
converter.charset UTF-8 Default converter only. Charset to use when converting JMS TextMessages to byte arrays.
createDurableSubscription false

Whether to create durable subscription. Durable subscription can only be used with destinationType topic.

If true, 「clientId」 and 「durableSubscriptionName」 have to be specified.

clientId JMS client identifier set on Connection right after it is created. Required for durable subscriptions.
durableSubscriptionName Name used to identify the durable subscription. Required for durable subscriptions.

4.1 轉換器

agent a1的示例:

a1.sources = r1
a1.channels = c1 a1.sources.r1.type = jms a1.sources.r1.channels = c1 a1.sources.r1.initialContextFactory = org.apache.activemq.jndi.ActiveMQInitialContextFactory a1.sources.r1.connectionFactory = GenericConnectionFactory a1.sources.r1.providerURL = tcp://mqserver:61616 a1.sources.r1.destinationName = BUSINESS_DATA a1.sources.r1.destinationType = QUEUE


5 sqooping 目錄souce

儘管此源具備可靠性保證,但在某些狀況下,若是發生某些下游故障,仍然可能重複發生事件。這與其餘flume 組件提供的保證是一致的。

Property Name Default Description
type The component type name, needs to be spooldir.
spoolDir The directory from which to read files from.
fileSuffix .COMPLETED Suffix to append to completely ingested files
deletePolicy never When to delete completed files: never or immediate
fileHeader false Whether to add a header storing the absolute path filename.
fileHeaderKey file Header key to use when appending absolute path filename to event header.
basenameHeader false Whether to add a header storing the basename of the file.
basenameHeaderKey basename Header Key to use when appending basename of file to event header.
includePattern ^.*$

Regular expression specifying which files to include. It can used together with ignorePattern. If a file matches both ignorePattern 

and includePattern regex, the file is ignored.

ignorePattern ^$

Regular expression specifying which files to ignore (skip). It can used together with includePattern. If a file matches both ignorePattern

 and includePattern regex, the file is ignored.

trackerDir .flumespool Directory to store metadata related to processing of files. If this path is not an absolute path, then it is interpreted as relative to the spoolDir.
consumeOrder oldest

In which order files in the spooling directory will be consumed oldestyoungest and random. In case of oldest and youngest,

the last modified time of the files will be used to compare the files. In case of a tie, the file with smallest lexicographical order will be consumed first.

In case of random any file will be picked randomly. When using oldest and youngest the whole directory will be scanned to

pick the oldest/youngest file,

which might be slow if there are a large number of files, while using random may cause old files to be consumed very late

if new files keep coming in the spooling directory.

pollDelay 500 Delay (in milliseconds) used when polling for new files.
recursiveDirectorySearch false Whether to monitor sub directories for new files to read.
maxBackoff 4000

The maximum time (in millis) to wait between consecutive attempts to write to the channel(s) if the channel is full.

The source will start at a low backoff

and increase it exponentially each time the channel throws a ChannelException, upto the value specified by this parameter.

batchSize 100 Granularity at which to batch transfer to the channel
inputCharset UTF-8 Character set used by deserializers that treat the input file as text.
decodeErrorPolicy FAIL

What to do when we see a non-decodable character in the input file. FAIL: Throw an exception and fail to parse the file. 

REPLACE: Replace the unparseable

character with the 「replacement character」 char, typically Unicode U+FFFD. IGNORE: Drop the unparseable character sequence.

deserializer LINE

Specify the deserializer used to parse the file into events. Defaults to parsing each line as an event. The class specified must implement


deserializer.*   Varies per event deserializer.
bufferMaxLines (Obselete) This option is now ignored.
bufferMaxLineLength 5000 (Deprecated) Maximum length of a line in the commit buffer. Use deserializer.maxLineLength instead.
selector.type replicating replicating or multiplexing
selector.*   Depends on the selector.type value
interceptors Space-separated list of interceptors

 agent agent-1示例:

a1.channels = ch-1
a1.sources = src-1 a1.sources.src-1.type = spooldir a1.sources.src-1.channels = ch-1 a1.sources.src-1.spoolDir = /var/log/apache/flumeSpool a1.sources.src-1.fileHeader = true

5.1 事件反序列化器


5.1.1 行


Property Name Default Description
deserializer.maxLineLength 2048

Maximum number of characters to include in a single event. If a line exceeds this length,

it is truncated, and the remaining characters on the line will appear in a subsequent event.

deserializer.outputCharset UTF-8 Charset to use for encoding events put into the channel.

5.1.2 Avor



Property Name Default Description
deserializer.schemaType HASH

How the schema is represented. By default, or when the value HASH is specified, the Avro schema is hashed and the hash is stored

in every event in the event header 「flume.avro.schema.hash」. If LITERAL is specified, the JSON-encoded schema itself is stored

in every event in the event header 「flume.avro.schema.literal」. Using LITERAL mode is relatively inefficient compared to HASH mode.

5.1.3 BlobDeserializer



Property Name Default Description
deserializer The FQCN of this class: org.apache.flume.sink.solr.morphline.BlobDeserializer$Builder
deserializer.maxBlobLength 100000000 The maximum number of bytes to read and buffer for a given request

6 tail 目錄源








Property Name Default Description
type The component type name, needs to be TAILDIR.
filegroups Space-separated list of file groups. Each file group indicates a set of files to be tailed.
filegroups.<filegroupName> Absolute path of the file group. Regular expression (and not file system patterns) can be used for filename only.
positionFile ~/.flume/taildir_position.json File in JSON format to record the inode, the absolute path and the last position of each tailing file.
headers.<filegroupName>.<headerKey> Header value which is the set with header key. Multiple headers can be specified for one file group.
byteOffsetHeader false Whether to add the byte offset of a tailed line to a header called ‘byteoffset’.
skipToEnd false Whether to skip the position to EOF in the case of files not written on the position file.
idleTimeout 120000 Time (ms) to close inactive files. If the closed file is appended new lines to, this source will automatically re-open it.
writePosInterval 3000 Interval time (ms) to write the last position of each file on the position file.
batchSize 100 Max number of lines to read and send to the channel at a time. Using the default is usually fine.
backoffSleepIncrement 1000 The increment for time delay before reattempting to poll for new data, when the last attempt did not find any new data.
maxBackoffSleep 5000 The max time delay between each reattempt to poll for new data, when the last attempt did not find any new data.
cachePatternMatching true

Listing directories and applying the filename regex pattern may be time consuming for directories containing thousands of files.

Caching the list of matching files can improve performance. The order in which files are consumed will also be cached.

Requires that the file system keeps track of modification times with at least a 1-second granularity.

fileHeader false Whether to add a header storing the absolute path filename.
fileHeaderKey file Header key to use when appending absolute path filename to event header.

agent a1 示例:

a1.sources = r1
a1.channels = c1 a1.sources.r1.type = TAILDIR a1.sources.r1.channels = c1 a1.sources.r1.positionFile = /var/log/flume/taildir_position.json a1.sources.r1.filegroups = f1 f2 a1.sources.r1.filegroups.f1 = /var/log/test1/example.log a1.sources.r1.headers.f1.headerKey1 = value1 a1.sources.r1.filegroups.f2 = /var/log/test2/.*log.* a1.sources.r1.headers.f2.headerKey1 = value2 a1.sources.r1.headers.f2.headerKey2 = value2-2 a1.sources.r1.fileHeader = true

7 Twitter 1% firehose Source(實驗)


實驗源,經過流API鏈接到1%的示例twitter firehose,持續下載tweet,將其轉換爲Avro格式,並將Avro事件發送到下游Flume sink。須要使用者和訪問令牌以及Twitter開發人員賬戶的祕密。必須屬性以粗體顯示。

Property Name Default Description
type The component type name, needs to be org.apache.flume.source.twitter.TwitterSource
consumerKey OAuth consumer key
consumerSecret OAuth consumer secret
accessToken OAuth access token
accessTokenSecret OAuth token secret
maxBatchSize 1000 Maximum number of twitter messages to put in a single batch
maxBatchDurationMillis 1000 Maximum number of milliseconds to wait before closing a batch

 agent a1 示例:

a1.sources = r1
a1.channels = c1 a1.sources.r1.type = org.apache.flume.source.twitter.TwitterSource a1.sources.r1.channels = c1 a1.sources.r1.consumerKey = YOUR_TWITTER_CONSUMER_KEY a1.sources.r1.consumerSecret = YOUR_TWITTER_CONSUMER_SECRET a1.sources.r1.accessToken = YOUR_TWITTER_ACCESS_TOKEN a1.sources.r1.accessTokenSecret = YOUR_TWITTER_ACCESS_TOKEN_SECRET a1.sources.r1.maxBatchSize = 10 a1.sources.r1.maxBatchDurationMillis = 200

8 Kafka Source

Kafka源是一個Apache Kafka消費者,它從Kafka主題讀取消息。若是您有多個Kafka源在運行,您能夠將它們配置爲同一個消費組,這樣每一個消費組都將爲主題讀取一組唯一的分區。

Property Name Default Description
type The component type name, needs to be org.apache.flume.source.kafka.KafkaSource
kafka.bootstrap.servers List of brokers in the Kafka cluster used by the source
kafka.consumer.group.id flume

Unique identified of consumer group. Setting the same id in multiple sources or agents indicates that they are

part of the same consumer group

kafka.topics Comma-separated list of topics the kafka consumer will read messages from.

Regex that defines set of topics the source is subscribed on. This property has higher priority than kafka.topics 

and overrides kafka.topics if exists.

batchSize 1000 Maximum number of messages written to Channel in one batch
batchDurationMillis 1000

Maximum time (in ms) before a batch will be written to Channel The batch will be written whenever the first of

size and time will be reached.

backoffSleepIncrement 1000

Initial and incremental wait time that is triggered when a Kafka Topic appears to be empty.

Wait period will reduce aggressive pinging of an empty Kafka Topic.

One second is ideal for ingestion use cases but a lower value may be required for low latency operations with interceptors.

maxBackoffSleep 5000

Maximum wait time that is triggered when a Kafka Topic appears to be empty.

Five seconds is ideal for ingestion use cases but a lower value may be required for low latency operations with interceptors.

useFlumeEventFormat false

By default events are taken as bytes from the Kafka topic directly into the event body.

Set to true to read events as the Flume Avro binary format. Used in conjunction with

the same property on the KafkaSink or with the parseAsFlumeEvent property on the Kafka

Channel this will preserve any Flume headers sent on the producing side.

setTopicHeader true When set to true, stores the topic of the retrieved message into a header, defined by the topicHeaderproperty.
topicHeader topic

Defines the name of the header in which to store the name of the topic the message was received from,

if the setTopicHeader property is set to true. Care should be taken if combining with the Kafka Sink

 topicHeader property so as to avoid sending the message back to the same topic in a loop.

migrateZookeeperOffsets true

When no Kafka stored offset is found, look up the offsets in Zookeeper and commit them to Kafka.

This should be true to support seamless Kafka client migration from older versions of Flume.

Once migrated this can be set to false, though that should generally not be required.

If no Zookeeper offset is found, the Kafka configuration kafka.consumer.auto.offset.reset

defines how offsets are handled. Check Kafka documentation for details

kafka.consumer.security.protocol PLAINTEXT

Set to SASL_PLAINTEXT, SASL_SSL or SSL if writing to Kafka using some level of security.

See below for additional info on secure setup.

more consumer security props  

If using SASL_PLAINTEXT, SASL_SSL or SSL refer to Kafka security for additional properties

that need to be set on consumer.

Other Kafka Consumer Properties

These properties are used to configure the Kafka Consumer. Any consumer property supported by Kafka can be used.

The only requirement is to prepend the property name with the prefixkafka.consumer.

For example: kafka.consumer.auto.offset.reset

注意:Kafka源覆蓋了兩個Kafka使用者參數:auto.commit.enable被源設置爲「false」,而且每批都提交了。Kafka源保證至少一次消息檢索策略。當源啓動時,能夠顯示副本。Kafka源代碼還爲key.deserializer(org.apache. Kafka .common. serialize . stringserializer)和value.deserializer(org.apache. Kafka .common. serialize . bytearrayserializer)提供了默認值。不建議修改這些參數。


Property Name Default Description
topic Use kafka.topics
groupId flume Use kafka.consumer.group.id

Is no longer supported by kafka consumer client since 0.9.x.

Use kafka.bootstrap.servers to establish connection with kafka cluster


以逗號分隔的topic 列表訂閱topic的示例:

tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
tier1.sources.source1.channels = channel1 tier1.sources.source1.batchSize = 5000 tier1.sources.source1.batchDurationMillis = 2000 tier1.sources.source1.kafka.bootstrap.servers = localhost:9092 tier1.sources.source1.kafka.topics = test1, test2 tier1.sources.source1.kafka.consumer.group.id = custom.g.id


tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
tier1.sources.source1.channels = channel1 tier1.sources.source1.kafka.bootstrap.servers = localhost:9092 tier1.sources.source1.kafka.topics.regex = ^topic[0-9]$ # the default kafka.consumer.group.id=flume is used

安全和kafka source:

Flume和Kafka之間的通訊通道支持安全認證和數據加密。對於安全身份驗證,可使用Kafka版本0.9.0中的SASL/GSSAPI (Kerberos V5)或SSL(儘管參數名爲SSL,但實際的協議是TLS實現)。


設置kafka.consumer.security.protocol 符合下列任何一項價值意味着:

  SASL_PLAINTEXT - 沒有數據加密的Kerberos或明文身份驗證
  SASL_SSL - 帶有數據加密的Kerberos或純文自己份驗證
  SSL - TLS加密,具備可選的身份驗證。

警告:啓用SSL時會致使性能降低,其程度取決於CPU類型和JVM實現。參考文獻:Kafka安全概述和用於跟蹤這個問題的jira: Kafka -2561 

TLS and Kafka Source:



a1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.source1.kafka.bootstrap.servers = kafka-1:9093,kafka-2:9093,kafka-3:9093 a1.sources.source1.kafka.topics = mytopic a1.sources.source1.kafka.consumer.group.id = flume-consumer a1.sources.source1.kafka.consumer.security.protocol = SSL a1.sources.source1.kafka.consumer.ssl.truststore.location=/path/to/truststore.jks a1.sources.source1.kafka.consumer.ssl.truststore.password=<password to access the truststore>

注意:默認狀況下屬性是ssl.endpoint.identification.algorithm 沒有定義,所以沒有執行主機名驗證。爲了啓用主機名驗證,請設置如下屬性:



  1. Common Name (CN) https://tools.ietf.org/html/rfc6125#section-2.3
  2. Subject Alternative Name (SAN) https://tools.ietf.org/html/rfc5280#section-

若是還須要客戶端身份驗證,那麼應該向Flume agent配置添加如下內容。每一個Flume代理必須擁有本身的客戶端證書,這些證書必須由Kafka brokers單獨或經過其簽名鏈進行信任。常見的示例是經過一個根CA對每一個客戶端證書進行簽名,而這個根CA證書又受到Kafka代理的信任。

a1.sources.source1.kafka.consumer.ssl.keystore.password=<password to access the keystore>

若是密鑰存儲和密鑰使用不一樣的密碼保護,則使用ssl.key.password 屬性將爲兩個使用者密鑰存儲庫提供所需的額外機密:

a1.sources.source1.kafka.consumer.ssl.key.password=<password to access the key>

Kerberos and Kafka Source:

要將Kafka源與Kerberos保護的Kafka集羣一塊兒使用,請設置consumer.security.protocol爲上面使用者指出的屬性。與Kafka brokers一塊兒使用的Kerberos keytab和主體在JAAS文件的「KafkaClient」部分中指定。「客戶端」部分描述了須要時的Zookeeper鏈接。有關JAAS文件內容的信息,請參見Kafka文檔。能夠經過flume-env.sh中的JAVA_OPTS指定這個JAAS文件的位置,也能夠選擇指定系統範圍內的kerberos配置:

JAVA_OPTS="$JAVA_OPTS -Djava.security.krb5.conf=/path/to/krb5.conf"
JAVA_OPTS="$JAVA_OPTS -Djava.security.auth.login.config=/path/to/flume_jaas.conf"


a1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.source1.kafka.bootstrap.servers = kafka-1:9093,kafka-2:9093,kafka-3:9093 a1.sources.source1.kafka.topics = mytopic a1.sources.source1.kafka.consumer.group.id = flume-consumer a1.sources.source1.kafka.consumer.security.protocol = SASL_PLAINTEXT a1.sources.source1.kafka.consumer.sasl.mechanism = GSSAPI a1.sources.source1.kafka.consumer.sasl.kerberos.service.name = kafka


a1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.source1.kafka.bootstrap.servers = kafka-1:9093,kafka-2:9093,kafka-3:9093 a1.sources.source1.kafka.topics = mytopic a1.sources.source1.kafka.consumer.group.id = flume-consumer a1.sources.source1.kafka.consumer.security.protocol = SASL_SSL a1.sources.source1.kafka.consumer.sasl.mechanism = GSSAPI a1.sources.source1.kafka.consumer.sasl.kerberos.service.name = kafka a1.sources.source1.kafka.consumer.ssl.truststore.location=/path/to/truststore.jks a1.sources.source1.kafka.consumer.ssl.truststore.password=<password to access the truststore>


Client {
  com.sun.security.auth.module.Krb5LoginModule required
  useKeyTab=true storeKey=true keyTab="/path/to/keytabs/flume.keytab" principal="flume/flumehost1.example.com@YOURKERBEROSREALM"; }; KafkaClient { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab="/path/to/keytabs/flume.keytab" principal="flume/flumehost1.example.com@YOURKERBEROSREALM"; };


9 NetCat TCP Source

一個相似於netcat的source,它監聽給定端口並將每一行文本轉換爲一個事件。相似於nc -k -l[ip][port]。換句話說,它打開指定的端口並偵聽數據。預期提供的數據是換行分隔的文本。每一行文本都被轉換爲Flume事件,並經過鏈接的channel發送。



Property Name Default Description
type The component type name, needs to be netcat
bind Host name or IP address to bind to
port Port # to bind to
max-line-length 512 Max line length per event body (in bytes)
ack-every-event true Respond with an 「OK」 for every event received
selector.type replicating replicating or multiplexing
selector.*   Depends on the selector.type value
interceptors Space-separated list of interceptors

agent a1 示例:

a1.sources = r1
a1.channels = c1 a1.sources.r1.type = netcat a1.sources.r1.bind = a1.sources.r1.port = 6666 a1.sources.r1.channels = c1


10  NetCat UDP Source

根據原始Netcat (TCP)源,該源在給定端口上偵聽,並將每一行文本轉換爲一個事件,並經過鏈接的通道發送。相似於nc -u -k -l[host][端port]。



Property Name Default Description
type The component type name, needs to be netcatudp
bind Host name or IP address to bind to
port Port # to bind to
selector.type replicating replicating or multiplexing
selector.*   Depends on the selector.type value
interceptors Space-separated list of interceptors

 agent a1 示例:

a1.sources = r1
a1.channels = c1 a1.sources.r1.type = netcatudp a1.sources.r1.bind = a1.sources.r1.port = 6666 a1.sources.r1.channels = c1


11 Sequence Generator Source


Property Name Default Description
type The component type name, needs to be seq
selector.type   replicating or multiplexing
selector.* replicating Depends on the selector.type value
interceptors Space-separated list of interceptors
batchSize 1 Number of events to attempt to process per request loop.
totalEvents Long.MAX_VALUE Number of unique events sent by the source.

agent a1 示例:

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = seq
a1.sources.r1.channels = c1

12  Syslog Sources

讀取syslog數據並生成Flume事件。UDP源將整個消息視爲單個事件。TCP源爲用換行符(' n ')分隔的每一個字符串建立一個新事件。


12.1  Syslog TCP Source

原始的、可靠的syslog TCP源.

Property Name Default Description
type The component type name, needs to be syslogtcp
host Host name or IP address to bind to
port Port # to bind to
eventSize 2500 Maximum size of a single event line, in bytes
keepFields none

Setting this to ‘all’ will preserve the Priority, Timestamp and Hostname in the body of the event.

A spaced separated list of fields to include is allowed as well. Currently,

the following fields can be included: priority, version, timestamp, hostname.

The values ‘true’ and ‘false’ have been deprecated in favor of ‘all’ and ‘none’.

selector.type   replicating or multiplexing
selector.* replicating Depends on the selector.type value
interceptors Space-separated list of interceptors

例如,agent a1 syslog TCP source

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = syslogtcp
a1.sources.r1.port = 5140
a1.sources.r1.host = localhost
a1.sources.r1.channels = c1

12.2 Multiport Syslog TCP Source

這是一個更新、更快、支持多端口的Syslog TCP源版本。請注意,端口配置設置已經替換了端口。多端口功能意味着它能夠以一種有效的方式同時監聽多個端口。這個源代碼使用Apache Mina庫來實現這一點。提供對RFC-3164和許多常見的RFC-5424格式消息的支持。還提供按端口配置所使用的字符集的功能。

Property Name Default Description
type The component type name, needs to be multiport_syslogtcp
host Host name or IP address to bind to.
ports Space-separated list (one or more) of ports to bind to.
eventSize 2500 Maximum size of a single event line, in bytes.
keepFields none

Setting this to ‘all’ will preserve the Priority, Timestamp and Hostname in the body of the event.

A spaced separated list of fields to include is allowed as well. Currently,

the following fields can be included: priority, version, timestamp, hostname.

The values ‘true’ and ‘false’ have been deprecated in favor of ‘all’ and ‘none’.


If specified, the port number will be stored in the header of each event using the header name specified here.

This allows for interceptors and channel selectors to customize routing logic based on the incoming port.

charset.default UTF-8 Default character set used while parsing syslog events into strings.
charset.port.<port> Character set is configurable on a per-port basis.
batchSize 100 Maximum number of events to attempt to process per request loop. Using the default is usually fine.
readBufferSize 1024 Size of the internal Mina read buffer. Provided for performance tuning. Using the default is usually fine.
numProcessors (auto-detected)

Number of processors available on the system for use while processing messages.

Default is to auto-detect # of CPUs using the Java Runtime API.

Mina will spawn 2 request-processing threads per detected CPU, which is often reasonable.

selector.type replicating replicating, multiplexing, or custom
selector.* Depends on the selector.type value
interceptors Space-separated list of interceptors.

For example, a multiport syslog TCP source for agent named a1:

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = multiport_syslogtcp
a1.sources.r1.channels = c1
a1.sources.r1.host =
a1.sources.r1.ports = 10001 10002 10003
a1.sources.r1.portHeader = port

12.3 Syslog UDP Source


Property Name Default Description
type The component type name, needs to be syslogudp
host Host name or IP address to bind to
port Port # to bind to
keepFields false Setting this to true will preserve the Priority, Timestamp and Hostname in the body of the event.
selector.type   replicating or multiplexing
selector.* replicating Depends on the selector.type value
interceptors Space-separated list of interceptors

For example, a syslog UDP source for agent named a1: 

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = syslogudp
a1.sources.r1.port = 5140
a1.sources.r1.host = localhost
a1.sources.r1.channels = c1

13 HTTP source

經過HTTP POST和GET接受Flume事件的源。GET應該只用於實驗。HTTP請求經過必須實現HTTPSourceHandler接口的可插入「處理程序」轉換爲flume事件。這個處理程序接受HttpServletRequest並返回一個flume事件列表。從一個Http請求處理的全部事件都提交給一個事務中的通道,從而提升了文件通道等通道的效率。若是處理程序拋出異常,該源將返回400的HTTP狀態。若是channel已滿,或者源沒法向channel追加事件,源將返回一個HTTP 503—暫時不可用狀態。


Property Name Default Description
type   The component type name, needs to be http
port The port the source should bind to.
bind The hostname or IP address to listen on
handler org.apache.flume.source.http.JSONHandler The FQCN of the handler class.
handler.* Config parameters for the handler
selector.type replicating replicating or multiplexing
selector.*   Depends on the selector.type value
interceptors Space-separated list of interceptors
enableSSL false Set the property true, to enable SSL. HTTP Source does not support SSLv3.
excludeProtocols SSLv3 Space-separated list of SSL/TLS protocols to exclude. SSLv3 is always excluded.
keystore   Location of the keystore includng keystore file name
keystorePassword Keystore password

例如, a http source for agent named a1:

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = http
a1.sources.r1.port = 5140
a1.sources.r1.channels = c1
a1.sources.r1.handler = org.example.rest.RestHandler
a1.sources.r1.handler.nickname = random props

13.1 JSONHandler


  "headers" : {
             "timestamp" : "434324343",
             "host" : "random_host.example.com"
  "body" : "random_body"
  "headers" : {
             "namenode" : "namenode.example.com",
             "datanode" : "random_datanode.example.com"
  "body" : "really_random_body"


按照此處理程序所指望的格式建立事件的一種方法是使用Flume SDK中提供的JSONEvent,並使用谷歌Gson使用Gson#fromJson(對象、類型)方法建立JSON字符串。傳遞給事件列表的方法的第二個參數的類型令牌能夠經過如下方式建立:

Type type = new TypeToken<List<JSONEvent>>() {}.getType();

13.2  BlobHandler


Property Name Default Description
handler The FQCN of this class: org.apache.flume.sink.solr.morphline.BlobHandler
handler.maxBlobLength 100000000 The maximum number of bytes to read and buffer for a given request

14 Stress Source 



Property Name Default Description
type The component type name, needs to be org.apache.flume.source.StressSource
size 500 Payload size of each Event. Unit:byte
maxTotalEvents -1 Maximum number of Events to be sent
maxSuccessfulEvents -1 Maximum number of Events successfully sent
batchSize 1 Number of Events to be sent in one batch

 agent a1 示例:

a1.sources = stresssource-1
a1.channels = memoryChannel-1
a1.sources.stresssource-1.type = org.apache.flume.source.StressSource
a1.sources.stresssource-1.size = 10240
a1.sources.stresssource-1.maxTotalEvents = 1000000
a1.sources.stresssource-1.channels = memoryChannel-1

15 Legacy Sources

legacy sources容許使用Flume1.x agent 接收代理0.9.4 agent 發送的事件。它接受Flume 0.9.4格式的事件,將它們轉換爲Flume 1.0格式,並將它們存儲在鏈接的channel中。時間戳、pri、主機、nanos等0.9.4事件屬性被轉換爲1。x事件頭屬性。legacy sources同時支持Avro和Thrift RPC鏈接。要在兩個Flume版本之間使用此橋接,您須要啓動一個Flume 1.x 具備avroLegacy或thriftLegacy源的agent , 0.9.4 agent應該讓agent sink指向1.x agent的主機/端口。

注意:Flume的可靠性語義1.x與水槽0.9.x不一樣。0.9.x Flume agent的E2E或DFO模式不支持 legacy source, 僅支持0.9.x 模式是最好,雖然能夠設置爲1.x,當事件被保存到flume1.x的channel中時,legacy source流將適用於它們。(注:翻譯不清楚,無力)


 15.1  Avro Legacy Source

Property Name Default Description
type The component type name, needs to be org.apache.flume.source.avroLegacy.AvroLegacySource
host The hostname or IP address to bind to
port The port # to listen on
selector.type   replicating or multiplexing
selector.* replicating Depends on the selector.type value
interceptors Space-separated list of interceptors

 agent a1 示例:

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = org.apache.flume.source.avroLegacy.AvroLegacySource
a1.sources.r1.host =
a1.sources.r1.bind = 6666
a1.sources.r1.channels = c1

15.2  Thrift Legacy Source

Property Name Default Description
type The component type name, needs to be org.apache.flume.source.thriftLegacy.ThriftLegacySource
host The hostname or IP address to bind to
port The port # to listen on
selector.type   replicating or multiplexing
selector.* replicating Depends on the selector.type value
interceptors Space-separated list of interceptors

 agent a1 示例:

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = org.apache.flume.source.thriftLegacy.ThriftLegacySource
a1.sources.r1.host =
a1.sources.r1.bind = 6666
a1.sources.r1.channels = c1

16 自定義source



Property Name Default Description
type The component type name, needs to be your FQCN
selector.type   replicating or multiplexing
selector.* replicating Depends on the selector.type value
interceptors Space-separated list of interceptors

 agent a1示例:

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = org.example.MySource
a1.sources.r1.channels = c1

17 Scribe Source

Scribe是另外一種獲取系統。Flume採用現有的Scribe 獲取系統,應在Thrift的基礎上使用ScribeSource,並採用兼容的傳輸協議。關於Scribe的部署,請遵循Facebook的指南。必須屬性以粗體顯示。


Property Name Default Description
type The component type name, needs to be org.apache.flume.source.scribe.ScribeSource
port 1499 Port that Scribe should be connected
maxReadBufferBytes 16384000 Thrift Default FrameBuffer Size
workerThreads 5 Handing threads number in Thrift

 agent a1 示例:

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = org.apache.flume.source.scribe.ScribeSource
a1.sources.r1.port = 1463
a1.sources.r1.workerThreads = 5
a1.sources.r1.channels = c1



翻譯自官網flume1.8用戶指南,原文地址:Flume 1.8.0 User Guide


【翻譯】Flume 1.8.0 User Guide(用戶指南)

【翻譯】Flume 1.8.0 User Guide(用戶指南) source

【翻譯】Flume 1.8.0 User Guide(用戶指南) Sink

【翻譯】Flume 1.8.0 User Guide(用戶指南) Channel

【翻譯】Flume 1.8.0 User Guide(用戶指南) Processors
