【翻譯】Flume 1.8.0 User Guide(用戶指南) source

翻譯自官網flume1.8用戶指南,原文地址:Flume 1.8.0 User Guidehtml

篇幅限制,分爲如下5篇:java

【翻譯】Flume 1.8.0 User Guide(用戶指南)node

【翻譯】Flume 1.8.0 User Guide(用戶指南) sourceshell

【翻譯】Flume 1.8.0 User Guide(用戶指南) Sinkexpress

【翻譯】Flume 1.8.0 User Guide(用戶指南) Channelapache

【翻譯】Flume 1.8.0 User Guide(用戶指南) Processorsjson

flume 的sources

1 Avro source

監聽Avro端口並接收來自外部Avro客戶端流的事件。當與另外一個(上一跳)Flume agent上的內置Avro接收器配對時,它能夠建立分層的集合拓撲。必須屬性以粗體顯示。bootstrap

 

Property Name Default Description
channels  
type The component type name, needs to be avro
bind hostname or IP address to listen on
port Port # to bind to
threads Maximum number of worker threads to spawn
selector.type    
selector.*    
interceptors Space-separated list of interceptors
interceptors.*    
compression-type none This can be 「none」 or 「deflate」. The compression-type must match the compression-type of matching AvroSource
ssl false Set this to true to enable SSL encryption. You must also specify a 「keystore」 and a 「keystore-password」.
keystore This is the path to a Java keystore file. Required for SSL.
keystore-password The password for the Java keystore. Required for SSL.
keystore-type JKS The type of the Java keystore. This can be 「JKS」 or 「PKCS12」.
exclude-protocols SSLv3 Space-separated list of SSL/TLS protocols to exclude. SSLv3 will always be excluded in addition to the protocols specified.
ipFilter false Set this to true to enable ipFiltering for netty
ipFilterRules Define N netty ipFilter pattern rules with this config.

agent a1 示例:數組

a1.sources = r1
a1.channels = c1 a1.sources.r1.type = avro a1.sources.r1.channels = c1 a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 4141

ipFilterRules的例子安全

ipFilterRules定義了N個由逗號分隔的netty ipFilters,模式規則必須採用這種格式。

< '容許'或'拒絕>:< ' ip '或'名稱'爲計算機名>:<模式>或容許/拒絕:ip/名稱:模式

例如:ipFilterRules =容許:ip: 127。*,容許:名稱:localhost,否定:ip: *

注意,要匹配的第一個規則將應用於本地主機上的客戶機,以下面的示例所示

這將容許本地主機上的客戶端拒絕來自其餘ip的客戶端" Allow:name:localhost,deny:ip: ",這將拒絕本地主機上的客戶端容許來自任何其餘ip的客戶端" deny:name:localhost, Allow:ip: "

2 Thrift Source

監聽Thrift端口並接收來自外部Thrift客戶機流的事件。當與另外一個(上一跳)Flume agent上的內置ThriftSink一塊兒使用時,它能夠建立分層的集合拓撲。經過啓用kerberos身份驗證,能夠將Thrift source配置爲以安全模式啓動。agent-principal和agent-keytab是Thrift source用於對kerberos KDC進行身份驗證的屬性。必須屬性以粗體顯示。

Property Name Default Description
channels  
type The component type name, needs to be thrift
bind hostname or IP address to listen on
port Port # to bind to
threads Maximum number of worker threads to spawn
selector.type    
selector.*    
interceptors Space separated list of interceptors
interceptors.*    
ssl false Set this to true to enable SSL encryption. You must also specify a 「keystore」 and a 「keystore-password」.
keystore This is the path to a Java keystore file. Required for SSL.
keystore-password The password for the Java keystore. Required for SSL.
keystore-type JKS The type of the Java keystore. This can be 「JKS」 or 「PKCS12」.
exclude-protocols SSLv3 Space-separated list of SSL/TLS protocols to exclude. SSLv3 will always be excluded in addition to the protocols specified.
kerberos false

Set to true to enable kerberos authentication. In kerberos mode, agent-principal and agent-keytab are required for successful authentication.

The Thrift source in secure mode, will accept connections only from Thrift clients that have kerberos enabled and are successfully authenticated to the kerberos KDC.

agent-principal The kerberos principal used by the Thrift Source to authenticate to the kerberos KDC.
agent-keytab —- The keytab location used by the Thrift Source in combination with the agent-principal to authenticate to the kerberos KDC.

agent a1 的示例:

a1.sources = r1
a1.channels = c1 a1.sources.r1.type = thrift a1.sources.r1.channels = c1 a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 4141

3 Exec Source

Exec source在啓動時運行給定的Unix命令,並指望該進程在標準輸出上持續生成數據(stderr將被丟棄,除非屬性logStdErr設置爲true)。若是進程因任何緣由退出,源也將退出,而且不會生成更多數據。這意味着cat [named pipe]或tail - f [file]等配置將產生所需的結果,而as date可能不會——前兩個命令生成數據流,然後者生成單個事件並退出。必須屬性以粗體顯示

Property Name Default Description
channels  
type The component type name, needs to be exec
command The command to execute
shell A shell invocation used to run the command. e.g. /bin/sh -c. Required only for commands relying on shell features like wildcards, back ticks, pipes etc.
restartThrottle 10000 Amount of time (in millis) to wait before attempting a restart
restart false Whether the executed cmd should be restarted if it dies
logStdErr false Whether the command’s stderr should be logged
batchSize 20 The max number of lines to read and send to the channel at a time
batchTimeout 3000 Amount of time (in milliseconds) to wait, if the buffer size was not reached, before data is pushed downstream
selector.type replicating replicating or multiplexing
selector.*   Depends on the selector.type value
interceptors Space-separated list of interceptors
interceptors.*    
警告ExecSource和其餘異步源的問題是,若是沒法將事件放入客戶端知道的Channel中,源不能保證這一點。在這種狀況下,數據將丟失。
例如,最多見的請求特性之一是相似於tail -F [file]的用例,其中應用程序將寫入磁盤上的日誌文件,並跟蹤文件,將每一行做爲事件
發送。雖然這是可能的,但有一個明顯的問題;若是Channel已滿,Flume沒法發送事件,會發生什麼狀況?Flume沒法向編寫日誌文件的應
用程序指示它須要保留日誌或出於某種緣由沒有發送事件。若是這沒有意義,您只須要知道:在使用單向異步接口(如ExecSource)時,應用
程序永遠不能保證接收到數據!做爲此警告的擴展—而且要徹底清楚—在使用此源時絕對沒有事件交付的保證。要得到更強的可靠性保證,能夠考
慮使用假脫機目錄源、Taildir源或經過SDK直接與Flume集成。

agent a1的示例:

a1.sources = r1
a1.channels = c1 a1.sources.r1.type = exec a1.sources.r1.command = tail -F /var/log/secure a1.sources.r1.channels = c1

「shell」配置用於經過命令shell(如Bash或Powershell)調用「command」。「command」做爲參數傳遞給「shell」以供執行。這容許「command」使用shell的特性,如通配符、反勾號、管道、循環、條件等。在沒有「shell」配置的狀況下,將直接調用「命令」。「shell」的經常使用值:「/bin/sh -c」、「/bin/ksh -c」、「cmd /c」、「powershell -Command」等。

a1.sources.tailsource-1.type = exec
a1.sources.tailsource-1.shell = /bin/bash -c a1.sources.tailsource-1.command = for i in /path/*.txt; do cat $i; done

4 JMS Source

JMS源從JMS目的地(如隊列或主題)讀取消息。做爲JMS應用程序,它應該能夠與任何JMS提供程序一塊兒工做,但只經過ActiveMQ進行了測試。JMS源提供可配置的批大小、消息選擇器、用戶/密碼和消息到flume事件轉換器。請注意,供應商提供的JMS jar應該包含在Flume類路徑中,plugins.d目錄(首選),命令行上的-classpath,或者經過flume-env.sh中的FLUME_CLASSPATH變量。必須屬性以粗體顯示。

Property Name Default Description
channels  
type The component type name, needs to be jms
initialContextFactory Inital Context Factory, e.g: org.apache.activemq.jndi.ActiveMQInitialContextFactory
connectionFactory The JNDI name the connection factory should appear as
providerURL The JMS provider URL
destinationName Destination name
destinationType Destination type (queue or topic)
messageSelector Message selector to use when creating the consumer
userName Username for the destination/provider
passwordFile File containing the password for the destination/provider
batchSize 100 Number of messages to consume in one batch
converter.type DEFAULT Class to use to convert messages to flume events. See below.
converter.* Converter properties.
converter.charset UTF-8 Default converter only. Charset to use when converting JMS TextMessages to byte arrays.
createDurableSubscription false

Whether to create durable subscription. Durable subscription can only be used with destinationType topic.

If true, 「clientId」 and 「durableSubscriptionName」 have to be specified.

clientId JMS client identifier set on Connection right after it is created. Required for durable subscriptions.
durableSubscriptionName Name used to identify the durable subscription. Required for durable subscriptions.

4.1 轉換器

JMS源容許可插入轉換器,儘管默認轉換器可能在大多數狀況下均可以工做。默認轉換器可以將字節、文本和對象消息轉換爲FlumeEvents。在全部狀況下,消息中的屬性都做爲header添加到FlumeEvent中。
BytesMessage:
消息字節被複制到FlumeEvent的主體中。每條消息不能轉換超過2GB的數據。
TextMessage:
將消息文本轉換爲字節數組並複製到FlumeEvent的主體中。默認轉換器默認使用UTF-8,但這是可配置的。
ObjectMessage:
對象被寫入到包裝在ObjectOutputStream中的ByteArrayOutputStream中,並將結果數組複製到FlumeEvent的主體中。
agent a1的示例:

a1.sources = r1
a1.channels = c1 a1.sources.r1.type = jms a1.sources.r1.channels = c1 a1.sources.r1.initialContextFactory = org.apache.activemq.jndi.ActiveMQInitialContextFactory a1.sources.r1.connectionFactory = GenericConnectionFactory a1.sources.r1.providerURL = tcp://mqserver:61616 a1.sources.r1.destinationName = BUSINESS_DATA a1.sources.r1.destinationType = QUEUE

 

5 sqooping 目錄souce

這個源容許您經過將文件放入磁盤上的「假脫機」目錄來獲取數據。這個源將在指定的目錄中查找新文件,並在新文件出現時解析新文件中的事件。事件解析邏輯是可插入的。將給定的文件徹底讀入通道後,將對其進行重命名,以指示完成(或可選地刪除)。
與Exec源不一樣,此源是可靠的,即便從新啓動或關閉Flume,也不會丟失數據。做爲這種可靠性的交換,必須將不可變的、唯一命名的文件放入假脫機目錄中。Flume試圖檢測這些問題狀況,若是違反這些狀況將會失敗:
若是文件被放入假脫機目錄後寫入,Flume將向其日誌文件打印錯誤並中止處理。
若是之後重用文件名,Flume將向其日誌文件打印錯誤並中止處理。
爲了不上述問題,在將文件名移動到假脫機目錄時,添加唯一標識符(如時間戳)來記錄文件名多是有用的。
儘管此源具備可靠性保證,但在某些狀況下,若是發生某些下游故障,仍然可能重複發生事件。這與其餘flume 組件提供的保證是一致的。

Property Name Default Description
channels  
type The component type name, needs to be spooldir.
spoolDir The directory from which to read files from.
fileSuffix .COMPLETED Suffix to append to completely ingested files
deletePolicy never When to delete completed files: never or immediate
fileHeader false Whether to add a header storing the absolute path filename.
fileHeaderKey file Header key to use when appending absolute path filename to event header.
basenameHeader false Whether to add a header storing the basename of the file.
basenameHeaderKey basename Header Key to use when appending basename of file to event header.
includePattern ^.*$

Regular expression specifying which files to include. It can used together with ignorePattern. If a file matches both ignorePattern 

and includePattern regex, the file is ignored.

ignorePattern ^$

Regular expression specifying which files to ignore (skip). It can used together with includePattern. If a file matches both ignorePattern

 and includePattern regex, the file is ignored.

trackerDir .flumespool Directory to store metadata related to processing of files. If this path is not an absolute path, then it is interpreted as relative to the spoolDir.
consumeOrder oldest

In which order files in the spooling directory will be consumed oldestyoungest and random. In case of oldest and youngest,

the last modified time of the files will be used to compare the files. In case of a tie, the file with smallest lexicographical order will be consumed first.

In case of random any file will be picked randomly. When using oldest and youngest the whole directory will be scanned to

pick the oldest/youngest file,

which might be slow if there are a large number of files, while using random may cause old files to be consumed very late

if new files keep coming in the spooling directory.

pollDelay 500 Delay (in milliseconds) used when polling for new files.
recursiveDirectorySearch false Whether to monitor sub directories for new files to read.
maxBackoff 4000

The maximum time (in millis) to wait between consecutive attempts to write to the channel(s) if the channel is full.

The source will start at a low backoff

and increase it exponentially each time the channel throws a ChannelException, upto the value specified by this parameter.

batchSize 100 Granularity at which to batch transfer to the channel
inputCharset UTF-8 Character set used by deserializers that treat the input file as text.
decodeErrorPolicy FAIL

What to do when we see a non-decodable character in the input file. FAIL: Throw an exception and fail to parse the file. 

REPLACE: Replace the unparseable

character with the 「replacement character」 char, typically Unicode U+FFFD. IGNORE: Drop the unparseable character sequence.

deserializer LINE

Specify the deserializer used to parse the file into events. Defaults to parsing each line as an event. The class specified must implement

 EventDeserializer.Builder.

deserializer.*   Varies per event deserializer.
bufferMaxLines (Obselete) This option is now ignored.
bufferMaxLineLength 5000 (Deprecated) Maximum length of a line in the commit buffer. Use deserializer.maxLineLength instead.
selector.type replicating replicating or multiplexing
selector.*   Depends on the selector.type value
interceptors Space-separated list of interceptors
interceptors.*    

 agent agent-1示例:

a1.channels = ch-1
a1.sources = src-1 a1.sources.src-1.type = spooldir a1.sources.src-1.channels = ch-1 a1.sources.src-1.spoolDir = /var/log/apache/flumeSpool a1.sources.src-1.fileHeader = true

5.1 事件反序列化器

下面的事件反序列化器隨Flume一塊兒發佈。

5.1.1 行

這個反序列化器爲每行文本輸入生成一個事件。

Property Name Default Description
deserializer.maxLineLength 2048

Maximum number of characters to include in a single event. If a line exceeds this length,

it is truncated, and the remaining characters on the line will appear in a subsequent event.

deserializer.outputCharset UTF-8 Charset to use for encoding events put into the channel.

5.1.2 Avor

這個反序列化器可以讀取Avro容器文件,並在文件中爲每一個Avro記錄生成一個事件。每一個事件都用一個表示所使用模式的頭進行註釋。事件的主體是二進制Avro記錄數據,不包括模式或容器文件元素的其他部分。

注意,若是spool目錄源必須重試將這些事件中的一個放到channel上(例如,由於channel已滿),那麼它將從最近的Avro容器文件同步點重置並重試。要減小這種失敗場景中的潛在事件重複,請在Avro輸入文件中更頻繁地編寫同步標記。

Property Name Default Description
deserializer.schemaType HASH

How the schema is represented. By default, or when the value HASH is specified, the Avro schema is hashed and the hash is stored

in every event in the event header 「flume.avro.schema.hash」. If LITERAL is specified, the JSON-encoded schema itself is stored

in every event in the event header 「flume.avro.schema.literal」. Using LITERAL mode is relatively inefficient compared to HASH mode.

5.1.3 BlobDeserializer

這個反序列化器爲每一個事件讀取一個二進制大對象(BLOB),一般爲每一個文件讀取一個BLOB。例如PDF或JPG文件。注意,這種方法不適用於很是大的對象,由於整個BLOB都在RAM中緩衝。

 

Property Name Default Description
deserializer The FQCN of this class: org.apache.flume.sink.solr.morphline.BlobDeserializer$Builder
deserializer.maxBlobLength 100000000 The maximum number of bytes to read and buffer for a given request

6 tail 目錄源

注意,此源代碼是做爲預覽功能提供的。它不能在Windows上運行。 

監視指定的文件,並在檢測到附加到每一個文件的新行以後幾乎實時跟蹤它們。若是正在寫入新行,該源代碼將重試讀取它們,等待寫入完成。

這個源是可靠的,不會錯過數據,即便當尾文件滾動。它按期以JSON格式將每一個文件的最後一個讀位置寫到給定的位置文件上。若是flume因爲某種緣由中止或停機,它能夠將指定位置讀取的數據寫入現有位置文件中(注:從指定的斷開位置讀positionFile參數,寫入到上次的文件中)。

在其餘用例中,這個源文件還能夠開始跟蹤使用給定位置文件的每一個文件的任意位置。當指定路徑上沒有位置文件時,默認狀況下它將從每一個文件的第一行開始尾隨。

文件將按照修改時間的順序被使用。修改時間最長的文件將首先被使用。

此源不重命名、刪除或對被跟蹤的文件作任何修改。目前,該源代碼不支持跟蹤二進制文件。它逐行讀取文本文件。

 

Property Name Default Description
channels  
type The component type name, needs to be TAILDIR.
filegroups Space-separated list of file groups. Each file group indicates a set of files to be tailed.
filegroups.<filegroupName> Absolute path of the file group. Regular expression (and not file system patterns) can be used for filename only.
positionFile ~/.flume/taildir_position.json File in JSON format to record the inode, the absolute path and the last position of each tailing file.
headers.<filegroupName>.<headerKey> Header value which is the set with header key. Multiple headers can be specified for one file group.
byteOffsetHeader false Whether to add the byte offset of a tailed line to a header called ‘byteoffset’.
skipToEnd false Whether to skip the position to EOF in the case of files not written on the position file.
idleTimeout 120000 Time (ms) to close inactive files. If the closed file is appended new lines to, this source will automatically re-open it.
writePosInterval 3000 Interval time (ms) to write the last position of each file on the position file.
batchSize 100 Max number of lines to read and send to the channel at a time. Using the default is usually fine.
backoffSleepIncrement 1000 The increment for time delay before reattempting to poll for new data, when the last attempt did not find any new data.
maxBackoffSleep 5000 The max time delay between each reattempt to poll for new data, when the last attempt did not find any new data.
cachePatternMatching true

Listing directories and applying the filename regex pattern may be time consuming for directories containing thousands of files.

Caching the list of matching files can improve performance. The order in which files are consumed will also be cached.

Requires that the file system keeps track of modification times with at least a 1-second granularity.

fileHeader false Whether to add a header storing the absolute path filename.
fileHeaderKey file Header key to use when appending absolute path filename to event header.

agent a1 示例:

a1.sources = r1
a1.channels = c1 a1.sources.r1.type = TAILDIR a1.sources.r1.channels = c1 a1.sources.r1.positionFile = /var/log/flume/taildir_position.json a1.sources.r1.filegroups = f1 f2 a1.sources.r1.filegroups.f1 = /var/log/test1/example.log a1.sources.r1.headers.f1.headerKey1 = value1 a1.sources.r1.filegroups.f2 = /var/log/test2/.*log.* a1.sources.r1.headers.f2.headerKey1 = value2 a1.sources.r1.headers.f2.headerKey2 = value2-2 a1.sources.r1.fileHeader = true

7 Twitter 1% firehose Source(實驗)

警告:這個源是高度實驗性的,可能會在不一樣版本的Flume之間發生變化。使用風險自負。

實驗源,經過流API鏈接到1%的示例twitter firehose,持續下載tweet,將其轉換爲Avro格式,並將Avro事件發送到下游Flume sink。須要使用者和訪問令牌以及Twitter開發人員賬戶的祕密。必須屬性以粗體顯示。

Property Name Default Description
channels  
type The component type name, needs to be org.apache.flume.source.twitter.TwitterSource
consumerKey OAuth consumer key
consumerSecret OAuth consumer secret
accessToken OAuth access token
accessTokenSecret OAuth token secret
maxBatchSize 1000 Maximum number of twitter messages to put in a single batch
maxBatchDurationMillis 1000 Maximum number of milliseconds to wait before closing a batch

 agent a1 示例:

a1.sources = r1
a1.channels = c1 a1.sources.r1.type = org.apache.flume.source.twitter.TwitterSource a1.sources.r1.channels = c1 a1.sources.r1.consumerKey = YOUR_TWITTER_CONSUMER_KEY a1.sources.r1.consumerSecret = YOUR_TWITTER_CONSUMER_SECRET a1.sources.r1.accessToken = YOUR_TWITTER_ACCESS_TOKEN a1.sources.r1.accessTokenSecret = YOUR_TWITTER_ACCESS_TOKEN_SECRET a1.sources.r1.maxBatchSize = 10 a1.sources.r1.maxBatchDurationMillis = 200

8 Kafka Source

Kafka源是一個Apache Kafka消費者,它從Kafka主題讀取消息。若是您有多個Kafka源在運行,您能夠將它們配置爲同一個消費組,這樣每一個消費組都將爲主題讀取一組唯一的分區。

Property Name Default Description
channels  
type The component type name, needs to be org.apache.flume.source.kafka.KafkaSource
kafka.bootstrap.servers List of brokers in the Kafka cluster used by the source
kafka.consumer.group.id flume

Unique identified of consumer group. Setting the same id in multiple sources or agents indicates that they are

part of the same consumer group

kafka.topics Comma-separated list of topics the kafka consumer will read messages from.
kafka.topics.regex

Regex that defines set of topics the source is subscribed on. This property has higher priority than kafka.topics 

and overrides kafka.topics if exists.

batchSize 1000 Maximum number of messages written to Channel in one batch
batchDurationMillis 1000

Maximum time (in ms) before a batch will be written to Channel The batch will be written whenever the first of

size and time will be reached.

backoffSleepIncrement 1000

Initial and incremental wait time that is triggered when a Kafka Topic appears to be empty.

Wait period will reduce aggressive pinging of an empty Kafka Topic.

One second is ideal for ingestion use cases but a lower value may be required for low latency operations with interceptors.

maxBackoffSleep 5000

Maximum wait time that is triggered when a Kafka Topic appears to be empty.

Five seconds is ideal for ingestion use cases but a lower value may be required for low latency operations with interceptors.

useFlumeEventFormat false

By default events are taken as bytes from the Kafka topic directly into the event body.

Set to true to read events as the Flume Avro binary format. Used in conjunction with

the same property on the KafkaSink or with the parseAsFlumeEvent property on the Kafka

Channel this will preserve any Flume headers sent on the producing side.

setTopicHeader true When set to true, stores the topic of the retrieved message into a header, defined by the topicHeaderproperty.
topicHeader topic

Defines the name of the header in which to store the name of the topic the message was received from,

if the setTopicHeader property is set to true. Care should be taken if combining with the Kafka Sink

 topicHeader property so as to avoid sending the message back to the same topic in a loop.

migrateZookeeperOffsets true

When no Kafka stored offset is found, look up the offsets in Zookeeper and commit them to Kafka.

This should be true to support seamless Kafka client migration from older versions of Flume.

Once migrated this can be set to false, though that should generally not be required.

If no Zookeeper offset is found, the Kafka configuration kafka.consumer.auto.offset.reset

defines how offsets are handled. Check Kafka documentation for details

kafka.consumer.security.protocol PLAINTEXT

Set to SASL_PLAINTEXT, SASL_SSL or SSL if writing to Kafka using some level of security.

See below for additional info on secure setup.

more consumer security props  

If using SASL_PLAINTEXT, SASL_SSL or SSL refer to Kafka security for additional properties

that need to be set on consumer.

Other Kafka Consumer Properties

These properties are used to configure the Kafka Consumer. Any consumer property supported by Kafka can be used.

The only requirement is to prepend the property name with the prefixkafka.consumer.

For example: kafka.consumer.auto.offset.reset

注意:Kafka源覆蓋了兩個Kafka使用者參數:auto.commit.enable被源設置爲「false」,而且每批都提交了。Kafka源保證至少一次消息檢索策略。當源啓動時,能夠顯示副本。Kafka源代碼還爲key.deserializer(org.apache. Kafka .common. serialize . stringserializer)和value.deserializer(org.apache. Kafka .common. serialize . bytearrayserializer)提供了默認值。不建議修改這些參數。

棄用屬性:

Property Name Default Description
topic Use kafka.topics
groupId flume Use kafka.consumer.group.id
zookeeperConnect

Is no longer supported by kafka consumer client since 0.9.x.

Use kafka.bootstrap.servers to establish connection with kafka cluster

 

以逗號分隔的topic 列表訂閱topic的示例:

tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
tier1.sources.source1.channels = channel1 tier1.sources.source1.batchSize = 5000 tier1.sources.source1.batchDurationMillis = 2000 tier1.sources.source1.kafka.bootstrap.servers = localhost:9092 tier1.sources.source1.kafka.topics = test1, test2 tier1.sources.source1.kafka.consumer.group.id = custom.g.id

regex訂閱topic的示例:

tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
tier1.sources.source1.channels = channel1 tier1.sources.source1.kafka.bootstrap.servers = localhost:9092 tier1.sources.source1.kafka.topics.regex = ^topic[0-9]$ # the default kafka.consumer.group.id=flume is used

安全和kafka source:

Flume和Kafka之間的通訊通道支持安全認證和數據加密。對於安全身份驗證,可使用Kafka版本0.9.0中的SASL/GSSAPI (Kerberos V5)或SSL(儘管參數名爲SSL,但實際的協議是TLS實現)。

到目前爲止,數據加密僅由SSL/TLS提供。

設置kafka.consumer.security.protocol 符合下列任何一項價值意味着:

  SASL_PLAINTEXT - 沒有數據加密的Kerberos或明文身份驗證
  SASL_SSL - 帶有數據加密的Kerberos或純文自己份驗證
  SSL - TLS加密,具備可選的身份驗證。

警告:啓用SSL時會致使性能降低,其程度取決於CPU類型和JVM實現。參考文獻:Kafka安全概述和用於跟蹤這個問題的jira: Kafka -2561 

TLS and Kafka Source:

請閱讀配置Kafka客戶端SSL中描述的步驟,以瞭解用於微調的其餘配置設置,例如如下任何一種:安全提供程序、密碼套件、啓用的協議、信任存儲或密鑰存儲類型。

使用服務器端身份驗證和數據加密的示例配置:

a1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.source1.kafka.bootstrap.servers = kafka-1:9093,kafka-2:9093,kafka-3:9093 a1.sources.source1.kafka.topics = mytopic a1.sources.source1.kafka.consumer.group.id = flume-consumer a1.sources.source1.kafka.consumer.security.protocol = SSL a1.sources.source1.kafka.consumer.ssl.truststore.location=/path/to/truststore.jks a1.sources.source1.kafka.consumer.ssl.truststore.password=<password to access the truststore>

注意:默認狀況下屬性是ssl.endpoint.identification.algorithm 沒有定義,所以沒有執行主機名驗證。爲了啓用主機名驗證,請設置如下屬性:

a1.sources.source1.kafka.consumer.ssl.endpoint.identification.algorithm=HTTPS

一旦啓用,客戶端將針對如下兩個字段之一驗證服務器的徹底限定域名(FQDN):

  1. Common Name (CN) https://tools.ietf.org/html/rfc6125#section-2.3
  2. Subject Alternative Name (SAN) https://tools.ietf.org/html/rfc5280#section-4.2.1.6

若是還須要客戶端身份驗證,那麼應該向Flume agent配置添加如下內容。每一個Flume代理必須擁有本身的客戶端證書,這些證書必須由Kafka brokers單獨或經過其簽名鏈進行信任。常見的示例是經過一個根CA對每一個客戶端證書進行簽名,而這個根CA證書又受到Kafka代理的信任。

a1.sources.source1.kafka.consumer.ssl.keystore.location=/path/to/client.keystore.jks
a1.sources.source1.kafka.consumer.ssl.keystore.password=<password to access the keystore>

若是密鑰存儲和密鑰使用不一樣的密碼保護,則使用ssl.key.password 屬性將爲兩個使用者密鑰存儲庫提供所需的額外機密:

a1.sources.source1.kafka.consumer.ssl.key.password=<password to access the key>

Kerberos and Kafka Source:

要將Kafka源與Kerberos保護的Kafka集羣一塊兒使用,請設置consumer.security.protocol爲上面使用者指出的屬性。與Kafka brokers一塊兒使用的Kerberos keytab和主體在JAAS文件的「KafkaClient」部分中指定。「客戶端」部分描述了須要時的Zookeeper鏈接。有關JAAS文件內容的信息,請參見Kafka文檔。能夠經過flume-env.sh中的JAVA_OPTS指定這個JAAS文件的位置,也能夠選擇指定系統範圍內的kerberos配置:

JAVA_OPTS="$JAVA_OPTS -Djava.security.krb5.conf=/path/to/krb5.conf"
JAVA_OPTS="$JAVA_OPTS -Djava.security.auth.login.config=/path/to/flume_jaas.conf"

使用SASL_PLAINTEXT的安全配置示例:

a1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.source1.kafka.bootstrap.servers = kafka-1:9093,kafka-2:9093,kafka-3:9093 a1.sources.source1.kafka.topics = mytopic a1.sources.source1.kafka.consumer.group.id = flume-consumer a1.sources.source1.kafka.consumer.security.protocol = SASL_PLAINTEXT a1.sources.source1.kafka.consumer.sasl.mechanism = GSSAPI a1.sources.source1.kafka.consumer.sasl.kerberos.service.name = kafka

使用SASL_SSL的安全配置示例:

a1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.source1.kafka.bootstrap.servers = kafka-1:9093,kafka-2:9093,kafka-3:9093 a1.sources.source1.kafka.topics = mytopic a1.sources.source1.kafka.consumer.group.id = flume-consumer a1.sources.source1.kafka.consumer.security.protocol = SASL_SSL a1.sources.source1.kafka.consumer.sasl.mechanism = GSSAPI a1.sources.source1.kafka.consumer.sasl.kerberos.service.name = kafka a1.sources.source1.kafka.consumer.ssl.truststore.location=/path/to/truststore.jks a1.sources.source1.kafka.consumer.ssl.truststore.password=<password to access the truststore>

JAAS文件示例。有關其內容的參考,請參閱SASL配置的Kafka文檔中所需身份驗證機制(GSSAPI/PLAIN)的客戶端配置部分。因爲Kafka源也能夠鏈接到Zookeeper進行偏移遷移,所以「Client」部分也被添加到這個示例中。除非您須要偏移遷移,或者對於其餘安全組件須要此部分,不然不須要這樣作。另外,請確保Flume進程的操做系統用戶具備jaas和keytab文件上的讀權限。

Client {
  com.sun.security.auth.module.Krb5LoginModule required
  useKeyTab=true storeKey=true keyTab="/path/to/keytabs/flume.keytab" principal="flume/flumehost1.example.com@YOURKERBEROSREALM"; }; KafkaClient { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab="/path/to/keytabs/flume.keytab" principal="flume/flumehost1.example.com@YOURKERBEROSREALM"; };

 

9 NetCat TCP Source

一個相似於netcat的source,它監聽給定端口並將每一行文本轉換爲一個事件。相似於nc -k -l[ip][port]。換句話說,它打開指定的端口並偵聽數據。預期提供的數據是換行分隔的文本。每一行文本都被轉換爲Flume事件,並經過鏈接的channel發送。

必須屬性以粗體顯示。

 

Property Name Default Description
channels  
type The component type name, needs to be netcat
bind Host name or IP address to bind to
port Port # to bind to
max-line-length 512 Max line length per event body (in bytes)
ack-every-event true Respond with an 「OK」 for every event received
selector.type replicating replicating or multiplexing
selector.*   Depends on the selector.type value
interceptors Space-separated list of interceptors
interceptors.*  

agent a1 示例:

a1.sources = r1
a1.channels = c1 a1.sources.r1.type = netcat a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 6666 a1.sources.r1.channels = c1

 

10  NetCat UDP Source

根據原始Netcat (TCP)源,該源在給定端口上偵聽,並將每一行文本轉換爲一個事件,並經過鏈接的通道發送。相似於nc -u -k -l[host][端port]。

必須屬性以粗體顯示。

 

Property Name Default Description
channels  
type The component type name, needs to be netcatudp
bind Host name or IP address to bind to
port Port # to bind to
remoteAddressHeader  
selector.type replicating replicating or multiplexing
selector.*   Depends on the selector.type value
interceptors Space-separated list of interceptors
interceptors.*    

 agent a1 示例:

a1.sources = r1
a1.channels = c1 a1.sources.r1.type = netcatudp a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 6666 a1.sources.r1.channels = c1

 

11 Sequence Generator Source

一個簡單的序列生成器,它使用計數器連續生成事件,計數器從0開始,遞增1,並在totalEvents中止。沒法將事件發送到channel時重試。主要用於測試。在重試期間,它保持重試消息的主體與之前相同,這樣,在目的地重複數據刪除以後,惟一事件的數量預期將等於指定的totalEvents。必須屬性以粗體顯示。

Property Name Default Description
channels  
type The component type name, needs to be seq
selector.type   replicating or multiplexing
selector.* replicating Depends on the selector.type value
interceptors Space-separated list of interceptors
interceptors.*    
batchSize 1 Number of events to attempt to process per request loop.
totalEvents Long.MAX_VALUE Number of unique events sent by the source.

agent a1 示例:

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = seq
a1.sources.r1.channels = c1

12  Syslog Sources

讀取syslog數據並生成Flume事件。UDP源將整個消息視爲單個事件。TCP源爲用換行符(' n ')分隔的每一個字符串建立一個新事件。

必須屬性以粗體顯示。

12.1  Syslog TCP Source

原始的、可靠的syslog TCP源.

Property Name Default Description
channels  
type The component type name, needs to be syslogtcp
host Host name or IP address to bind to
port Port # to bind to
eventSize 2500 Maximum size of a single event line, in bytes
keepFields none

Setting this to ‘all’ will preserve the Priority, Timestamp and Hostname in the body of the event.

A spaced separated list of fields to include is allowed as well. Currently,

the following fields can be included: priority, version, timestamp, hostname.

The values ‘true’ and ‘false’ have been deprecated in favor of ‘all’ and ‘none’.

selector.type   replicating or multiplexing
selector.* replicating Depends on the selector.type value
interceptors Space-separated list of interceptors
interceptors.*    

例如,agent a1 syslog TCP source

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = syslogtcp
a1.sources.r1.port = 5140
a1.sources.r1.host = localhost
a1.sources.r1.channels = c1

12.2 Multiport Syslog TCP Source

這是一個更新、更快、支持多端口的Syslog TCP源版本。請注意,端口配置設置已經替換了端口。多端口功能意味着它能夠以一種有效的方式同時監聽多個端口。這個源代碼使用Apache Mina庫來實現這一點。提供對RFC-3164和許多常見的RFC-5424格式消息的支持。還提供按端口配置所使用的字符集的功能。

Property Name Default Description
channels  
type The component type name, needs to be multiport_syslogtcp
host Host name or IP address to bind to.
ports Space-separated list (one or more) of ports to bind to.
eventSize 2500 Maximum size of a single event line, in bytes.
keepFields none

Setting this to ‘all’ will preserve the Priority, Timestamp and Hostname in the body of the event.

A spaced separated list of fields to include is allowed as well. Currently,

the following fields can be included: priority, version, timestamp, hostname.

The values ‘true’ and ‘false’ have been deprecated in favor of ‘all’ and ‘none’.

portHeader

If specified, the port number will be stored in the header of each event using the header name specified here.

This allows for interceptors and channel selectors to customize routing logic based on the incoming port.

charset.default UTF-8 Default character set used while parsing syslog events into strings.
charset.port.<port> Character set is configurable on a per-port basis.
batchSize 100 Maximum number of events to attempt to process per request loop. Using the default is usually fine.
readBufferSize 1024 Size of the internal Mina read buffer. Provided for performance tuning. Using the default is usually fine.
numProcessors (auto-detected)

Number of processors available on the system for use while processing messages.

Default is to auto-detect # of CPUs using the Java Runtime API.

Mina will spawn 2 request-processing threads per detected CPU, which is often reasonable.

selector.type replicating replicating, multiplexing, or custom
selector.* Depends on the selector.type value
interceptors Space-separated list of interceptors.
interceptors.*    

For example, a multiport syslog TCP source for agent named a1:

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = multiport_syslogtcp
a1.sources.r1.channels = c1
a1.sources.r1.host = 0.0.0.0
a1.sources.r1.ports = 10001 10002 10003
a1.sources.r1.portHeader = port

12.3 Syslog UDP Source

 

Property Name Default Description
channels  
type The component type name, needs to be syslogudp
host Host name or IP address to bind to
port Port # to bind to
keepFields false Setting this to true will preserve the Priority, Timestamp and Hostname in the body of the event.
selector.type   replicating or multiplexing
selector.* replicating Depends on the selector.type value
interceptors Space-separated list of interceptors
interceptors.*    

For example, a syslog UDP source for agent named a1: 

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = syslogudp
a1.sources.r1.port = 5140
a1.sources.r1.host = localhost
a1.sources.r1.channels = c1

13 HTTP source

經過HTTP POST和GET接受Flume事件的源。GET應該只用於實驗。HTTP請求經過必須實現HTTPSourceHandler接口的可插入「處理程序」轉換爲flume事件。這個處理程序接受HttpServletRequest並返回一個flume事件列表。從一個Http請求處理的全部事件都提交給一個事務中的通道,從而提升了文件通道等通道的效率。若是處理程序拋出異常,該源將返回400的HTTP狀態。若是channel已滿,或者源沒法向channel追加事件,源將返回一個HTTP 503—暫時不可用狀態。

在一個post請求中發送的全部事件都被視爲一個批處理,並在一個事務中插入到channel中。

Property Name Default Description
type   The component type name, needs to be http
port The port the source should bind to.
bind 0.0.0.0 The hostname or IP address to listen on
handler org.apache.flume.source.http.JSONHandler The FQCN of the handler class.
handler.* Config parameters for the handler
selector.type replicating replicating or multiplexing
selector.*   Depends on the selector.type value
interceptors Space-separated list of interceptors
interceptors.*    
enableSSL false Set the property true, to enable SSL. HTTP Source does not support SSLv3.
excludeProtocols SSLv3 Space-separated list of SSL/TLS protocols to exclude. SSLv3 is always excluded.
keystore   Location of the keystore includng keystore file name
keystorePassword Keystore password

例如, a http source for agent named a1:

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = http
a1.sources.r1.port = 5140
a1.sources.r1.channels = c1
a1.sources.r1.handler = org.example.rest.RestHandler
a1.sources.r1.handler.nickname = random props

13.1 JSONHandler

提供了一個開箱即用的處理程序,它能夠處理JSON格式表示的事件,並支持UTF-八、UTF-16和UTF-32字符集。處理程序接受事件數組(即便只有一個事件,也必須以數組的形式發送事件),並根據請求中指定的編碼將其轉換爲Flume事件。若是沒有指定編碼,則假定爲UTF-8。JSON處理程序支持UTF-八、UTF-16和UTF-32。事件表示以下:

[{
  "headers" : {
             "timestamp" : "434324343",
             "host" : "random_host.example.com"
             },
  "body" : "random_body"
  },
  {
  "headers" : {
             "namenode" : "namenode.example.com",
             "datanode" : "random_datanode.example.com"
             },
  "body" : "really_random_body"
  }]

要設置字符集,請求必須具備指定爲application/json的內容類型;charset=UTF-8(根據須要將UTF-8替換爲UTF-16或UTF-32)。

按照此處理程序所指望的格式建立事件的一種方法是使用Flume SDK中提供的JSONEvent,並使用谷歌Gson使用Gson#fromJson(對象、類型)方法建立JSON字符串。傳遞給事件列表的方法的第二個參數的類型令牌能夠經過如下方式建立:

Type type = new TypeToken<List<JSONEvent>>() {}.getType();

13.2  BlobHandler

默認狀況下,HTTPSource將JSON輸入拆分爲Flume事件。另外一種選擇是,BlobHandler是HTTPSource的處理程序,它返回一個事件,該事件包含請求參數以及隨此請求上載的二進制大對象(BLOB)。例如PDF或JPG文件。注意,這種方法不適用於很是大的對象,由於它在RAM中緩衝整個BLOB。

Property Name Default Description
handler The FQCN of this class: org.apache.flume.sink.solr.morphline.BlobHandler
handler.maxBlobLength 100000000 The maximum number of bytes to read and buffer for a given request

14 Stress Source 

StressSource是一個內部的負載產生源實現,它對壓力測試很是有用。它容許用戶使用空頭配置事件有效負載的大小。用戶能夠配置要發送的事件總數以及要交付的成功事件的最大數量。

必須屬性以粗體顯示。

Property Name Default Description
type The component type name, needs to be org.apache.flume.source.StressSource
size 500 Payload size of each Event. Unit:byte
maxTotalEvents -1 Maximum number of Events to be sent
maxSuccessfulEvents -1 Maximum number of Events successfully sent
batchSize 1 Number of Events to be sent in one batch

 agent a1 示例:

a1.sources = stresssource-1
a1.channels = memoryChannel-1
a1.sources.stresssource-1.type = org.apache.flume.source.StressSource
a1.sources.stresssource-1.size = 10240
a1.sources.stresssource-1.maxTotalEvents = 1000000
a1.sources.stresssource-1.channels = memoryChannel-1

15 Legacy Sources

legacy sources容許使用Flume1.x agent 接收代理0.9.4 agent 發送的事件。它接受Flume 0.9.4格式的事件,將它們轉換爲Flume 1.0格式,並將它們存儲在鏈接的channel中。時間戳、pri、主機、nanos等0.9.4事件屬性被轉換爲1。x事件頭屬性。legacy sources同時支持Avro和Thrift RPC鏈接。要在兩個Flume版本之間使用此橋接,您須要啓動一個Flume 1.x 具備avroLegacy或thriftLegacy源的agent , 0.9.4 agent應該讓agent sink指向1.x agent的主機/端口。

注意:Flume的可靠性語義1.x與水槽0.9.x不一樣。0.9.x Flume agent的E2E或DFO模式不支持 legacy source, 僅支持0.9.x 模式是最好,雖然能夠設置爲1.x,當事件被保存到flume1.x的channel中時,legacy source流將適用於它們。(注:翻譯不清楚,無力)

必須屬性以粗體顯示。

 15.1  Avro Legacy Source

Property Name Default Description
channels  
type The component type name, needs to be org.apache.flume.source.avroLegacy.AvroLegacySource
host The hostname or IP address to bind to
port The port # to listen on
selector.type   replicating or multiplexing
selector.* replicating Depends on the selector.type value
interceptors Space-separated list of interceptors
interceptors.*    

 agent a1 示例:

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = org.apache.flume.source.avroLegacy.AvroLegacySource
a1.sources.r1.host = 0.0.0.0
a1.sources.r1.bind = 6666
a1.sources.r1.channels = c1

15.2  Thrift Legacy Source

Property Name Default Description
channels  
type The component type name, needs to be org.apache.flume.source.thriftLegacy.ThriftLegacySource
host The hostname or IP address to bind to
port The port # to listen on
selector.type   replicating or multiplexing
selector.* replicating Depends on the selector.type value
interceptors Space-separated list of interceptors
interceptors.*    

 agent a1 示例:

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = org.apache.flume.source.thriftLegacy.ThriftLegacySource
a1.sources.r1.host = 0.0.0.0
a1.sources.r1.bind = 6666
a1.sources.r1.channels = c1

16 自定義source

自定義源是您本身對源接口的實現。啓動Flume代理時,必須將自定義源的類及其依賴項包含在代理的類路徑中。定製源的類型是它的FQCN。

 

Property Name Default Description
channels  
type The component type name, needs to be your FQCN
selector.type   replicating or multiplexing
selector.* replicating Depends on the selector.type value
interceptors Space-separated list of interceptors
interceptors.*    

 agent a1示例:

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = org.example.MySource
a1.sources.r1.channels = c1

17 Scribe Source

Scribe是另外一種獲取系統。Flume採用現有的Scribe 獲取系統,應在Thrift的基礎上使用ScribeSource,並採用兼容的傳輸協議。關於Scribe的部署,請遵循Facebook的指南。必須屬性以粗體顯示。

 

Property Name Default Description
type The component type name, needs to be org.apache.flume.source.scribe.ScribeSource
port 1499 Port that Scribe should be connected
maxReadBufferBytes 16384000 Thrift Default FrameBuffer Size
workerThreads 5 Handing threads number in Thrift
selector.type    
selector.*

 agent a1 示例:

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = org.apache.flume.source.scribe.ScribeSource
a1.sources.r1.port = 1463
a1.sources.r1.workerThreads = 5
a1.sources.r1.channels = c1

 

 

翻譯自官網flume1.8用戶指南,原文地址:Flume 1.8.0 User Guide

篇幅限制,分爲如下5篇:

【翻譯】Flume 1.8.0 User Guide(用戶指南)

【翻譯】Flume 1.8.0 User Guide(用戶指南) source

【翻譯】Flume 1.8.0 User Guide(用戶指南) Sink

【翻譯】Flume 1.8.0 User Guide(用戶指南) Channel

【翻譯】Flume 1.8.0 User Guide(用戶指南) Processors

相關文章
相關標籤/搜索