翻譯自官網flume1.8用戶指南,原文地址:Flume 1.8.0 User Guidehtml
篇幅限制,分爲如下5篇:java
【翻譯】Flume 1.8.0 User Guide(用戶指南)node
【翻譯】Flume 1.8.0 User Guide(用戶指南) sourceshell
【翻譯】Flume 1.8.0 User Guide(用戶指南) Sinkexpress
【翻譯】Flume 1.8.0 User Guide(用戶指南) Channelapache
【翻譯】Flume 1.8.0 User Guide(用戶指南) Processorsjson
監聽Avro端口並接收來自外部Avro客戶端流的事件。當與另外一個(上一跳)Flume agent上的內置Avro接收器配對時,它能夠建立分層的集合拓撲。必須屬性以粗體顯示。bootstrap
Property Name | Default | Description |
---|---|---|
channels | – | |
type | – | The component type name, needs to be avro |
bind | – | hostname or IP address to listen on |
port | – | Port # to bind to |
threads | – | Maximum number of worker threads to spawn |
selector.type | ||
selector.* | ||
interceptors | – | Space-separated list of interceptors |
interceptors.* | ||
compression-type | none | This can be 「none」 or 「deflate」. The compression-type must match the compression-type of matching AvroSource |
ssl | false | Set this to true to enable SSL encryption. You must also specify a 「keystore」 and a 「keystore-password」. |
keystore | – | This is the path to a Java keystore file. Required for SSL. |
keystore-password | – | The password for the Java keystore. Required for SSL. |
keystore-type | JKS | The type of the Java keystore. This can be 「JKS」 or 「PKCS12」. |
exclude-protocols | SSLv3 | Space-separated list of SSL/TLS protocols to exclude. SSLv3 will always be excluded in addition to the protocols specified. |
ipFilter | false | Set this to true to enable ipFiltering for netty |
ipFilterRules | – | Define N netty ipFilter pattern rules with this config. |
agent a1 示例:數組
a1.sources = r1
a1.channels = c1 a1.sources.r1.type = avro a1.sources.r1.channels = c1 a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 4141
ipFilterRules的例子安全
ipFilterRules定義了N個由逗號分隔的netty ipFilters,模式規則必須採用這種格式。
< '容許'或'拒絕>:< ' ip '或'名稱'爲計算機名>:<模式>或容許/拒絕:ip/名稱:模式
例如:ipFilterRules =容許:ip: 127。*,容許:名稱:localhost,否定:ip: *
注意,要匹配的第一個規則將應用於本地主機上的客戶機,以下面的示例所示
這將容許本地主機上的客戶端拒絕來自其餘ip的客戶端" Allow:name:localhost,deny:ip: ",這將拒絕本地主機上的客戶端容許來自任何其餘ip的客戶端" deny:name:localhost, Allow:ip: "
監聽Thrift端口並接收來自外部Thrift客戶機流的事件。當與另外一個(上一跳)Flume agent上的內置ThriftSink一塊兒使用時,它能夠建立分層的集合拓撲。經過啓用kerberos身份驗證,能夠將Thrift source配置爲以安全模式啓動。agent-principal和agent-keytab是Thrift source用於對kerberos KDC進行身份驗證的屬性。必須屬性以粗體顯示。
Property Name | Default | Description |
---|---|---|
channels | – | |
type | – | The component type name, needs to be thrift |
bind | – | hostname or IP address to listen on |
port | – | Port # to bind to |
threads | – | Maximum number of worker threads to spawn |
selector.type | ||
selector.* | ||
interceptors | – | Space separated list of interceptors |
interceptors.* | ||
ssl | false | Set this to true to enable SSL encryption. You must also specify a 「keystore」 and a 「keystore-password」. |
keystore | – | This is the path to a Java keystore file. Required for SSL. |
keystore-password | – | The password for the Java keystore. Required for SSL. |
keystore-type | JKS | The type of the Java keystore. This can be 「JKS」 or 「PKCS12」. |
exclude-protocols | SSLv3 | Space-separated list of SSL/TLS protocols to exclude. SSLv3 will always be excluded in addition to the protocols specified. |
kerberos | false | Set to true to enable kerberos authentication. In kerberos mode, agent-principal and agent-keytab are required for successful authentication. The Thrift source in secure mode, will accept connections only from Thrift clients that have kerberos enabled and are successfully authenticated to the kerberos KDC. |
agent-principal | – | The kerberos principal used by the Thrift Source to authenticate to the kerberos KDC. |
agent-keytab | —- | The keytab location used by the Thrift Source in combination with the agent-principal to authenticate to the kerberos KDC. |
agent a1 的示例:
a1.sources = r1
a1.channels = c1 a1.sources.r1.type = thrift a1.sources.r1.channels = c1 a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 4141
Exec source在啓動時運行給定的Unix命令,並指望該進程在標準輸出上持續生成數據(stderr將被丟棄,除非屬性logStdErr設置爲true)。若是進程因任何緣由退出,源也將退出,而且不會生成更多數據。這意味着cat [named pipe]或tail - f [file]等配置將產生所需的結果,而as date可能不會——前兩個命令生成數據流,然後者生成單個事件並退出。必須屬性以粗體顯示
Property Name | Default | Description |
---|---|---|
channels | – | |
type | – | The component type name, needs to be exec |
command | – | The command to execute |
shell | – | A shell invocation used to run the command. e.g. /bin/sh -c. Required only for commands relying on shell features like wildcards, back ticks, pipes etc. |
restartThrottle | 10000 | Amount of time (in millis) to wait before attempting a restart |
restart | false | Whether the executed cmd should be restarted if it dies |
logStdErr | false | Whether the command’s stderr should be logged |
batchSize | 20 | The max number of lines to read and send to the channel at a time |
batchTimeout | 3000 | Amount of time (in milliseconds) to wait, if the buffer size was not reached, before data is pushed downstream |
selector.type | replicating | replicating or multiplexing |
selector.* | Depends on the selector.type value | |
interceptors | – | Space-separated list of interceptors |
interceptors.* |
警告ExecSource和其餘異步源的問題是,若是沒法將事件放入客戶端知道的Channel中,源不能保證這一點。在這種狀況下,數據將丟失。
例如,最多見的請求特性之一是相似於tail -F [file]的用例,其中應用程序將寫入磁盤上的日誌文件,並跟蹤文件,將每一行做爲事件
發送。雖然這是可能的,但有一個明顯的問題;若是Channel已滿,Flume沒法發送事件,會發生什麼狀況?Flume沒法向編寫日誌文件的應
用程序指示它須要保留日誌或出於某種緣由沒有發送事件。若是這沒有意義,您只須要知道:在使用單向異步接口(如ExecSource)時,應用
程序永遠不能保證接收到數據!做爲此警告的擴展—而且要徹底清楚—在使用此源時絕對沒有事件交付的保證。要得到更強的可靠性保證,能夠考
慮使用假脫機目錄源、Taildir源或經過SDK直接與Flume集成。
agent a1的示例:
a1.sources = r1
a1.channels = c1 a1.sources.r1.type = exec a1.sources.r1.command = tail -F /var/log/secure a1.sources.r1.channels = c1
「shell」配置用於經過命令shell(如Bash或Powershell)調用「command」。「command」做爲參數傳遞給「shell」以供執行。這容許「command」使用shell的特性,如通配符、反勾號、管道、循環、條件等。在沒有「shell」配置的狀況下,將直接調用「命令」。「shell」的經常使用值:「/bin/sh -c」、「/bin/ksh -c」、「cmd /c」、「powershell -Command」等。
a1.sources.tailsource-1.type = exec
a1.sources.tailsource-1.shell = /bin/bash -c a1.sources.tailsource-1.command = for i in /path/*.txt; do cat $i; done
JMS源從JMS目的地(如隊列或主題)讀取消息。做爲JMS應用程序,它應該能夠與任何JMS提供程序一塊兒工做,但只經過ActiveMQ進行了測試。JMS源提供可配置的批大小、消息選擇器、用戶/密碼和消息到flume事件轉換器。請注意,供應商提供的JMS jar應該包含在Flume類路徑中,plugins.d目錄(首選),命令行上的-classpath,或者經過flume-env.sh中的FLUME_CLASSPATH變量。必須屬性以粗體顯示。
Property Name | Default | Description |
---|---|---|
channels | – | |
type | – | The component type name, needs to be jms |
initialContextFactory | – | Inital Context Factory, e.g: org.apache.activemq.jndi.ActiveMQInitialContextFactory |
connectionFactory | – | The JNDI name the connection factory should appear as |
providerURL | – | The JMS provider URL |
destinationName | – | Destination name |
destinationType | – | Destination type (queue or topic) |
messageSelector | – | Message selector to use when creating the consumer |
userName | – | Username for the destination/provider |
passwordFile | – | File containing the password for the destination/provider |
batchSize | 100 | Number of messages to consume in one batch |
converter.type | DEFAULT | Class to use to convert messages to flume events. See below. |
converter.* | – | Converter properties. |
converter.charset | UTF-8 | Default converter only. Charset to use when converting JMS TextMessages to byte arrays. |
createDurableSubscription | false | Whether to create durable subscription. Durable subscription can only be used with destinationType topic. If true, 「clientId」 and 「durableSubscriptionName」 have to be specified. |
clientId | – | JMS client identifier set on Connection right after it is created. Required for durable subscriptions. |
durableSubscriptionName | – | Name used to identify the durable subscription. Required for durable subscriptions. |
JMS源容許可插入轉換器,儘管默認轉換器可能在大多數狀況下均可以工做。默認轉換器可以將字節、文本和對象消息轉換爲FlumeEvents。在全部狀況下,消息中的屬性都做爲header添加到FlumeEvent中。
BytesMessage:
消息字節被複制到FlumeEvent的主體中。每條消息不能轉換超過2GB的數據。
TextMessage:
將消息文本轉換爲字節數組並複製到FlumeEvent的主體中。默認轉換器默認使用UTF-8,但這是可配置的。
ObjectMessage:
對象被寫入到包裝在ObjectOutputStream中的ByteArrayOutputStream中,並將結果數組複製到FlumeEvent的主體中。
agent a1的示例:
a1.sources = r1
a1.channels = c1 a1.sources.r1.type = jms a1.sources.r1.channels = c1 a1.sources.r1.initialContextFactory = org.apache.activemq.jndi.ActiveMQInitialContextFactory a1.sources.r1.connectionFactory = GenericConnectionFactory a1.sources.r1.providerURL = tcp://mqserver:61616 a1.sources.r1.destinationName = BUSINESS_DATA a1.sources.r1.destinationType = QUEUE
這個源容許您經過將文件放入磁盤上的「假脫機」目錄來獲取數據。這個源將在指定的目錄中查找新文件,並在新文件出現時解析新文件中的事件。事件解析邏輯是可插入的。將給定的文件徹底讀入通道後,將對其進行重命名,以指示完成(或可選地刪除)。
與Exec源不一樣,此源是可靠的,即便從新啓動或關閉Flume,也不會丟失數據。做爲這種可靠性的交換,必須將不可變的、唯一命名的文件放入假脫機目錄中。Flume試圖檢測這些問題狀況,若是違反這些狀況將會失敗:
若是文件被放入假脫機目錄後寫入,Flume將向其日誌文件打印錯誤並中止處理。
若是之後重用文件名,Flume將向其日誌文件打印錯誤並中止處理。
爲了不上述問題,在將文件名移動到假脫機目錄時,添加唯一標識符(如時間戳)來記錄文件名多是有用的。
儘管此源具備可靠性保證,但在某些狀況下,若是發生某些下游故障,仍然可能重複發生事件。這與其餘flume 組件提供的保證是一致的。
Property Name | Default | Description |
---|---|---|
channels | – | |
type | – | The component type name, needs to be spooldir. |
spoolDir | – | The directory from which to read files from. |
fileSuffix | .COMPLETED | Suffix to append to completely ingested files |
deletePolicy | never | When to delete completed files: never or immediate |
fileHeader | false | Whether to add a header storing the absolute path filename. |
fileHeaderKey | file | Header key to use when appending absolute path filename to event header. |
basenameHeader | false | Whether to add a header storing the basename of the file. |
basenameHeaderKey | basename | Header Key to use when appending basename of file to event header. |
includePattern | ^.*$ | Regular expression specifying which files to include. It can used together with ignorePattern. If a file matches both ignorePattern and includePattern regex, the file is ignored. |
ignorePattern | ^$ | Regular expression specifying which files to ignore (skip). It can used together with includePattern. If a file matches both ignorePattern and includePattern regex, the file is ignored. |
trackerDir | .flumespool | Directory to store metadata related to processing of files. If this path is not an absolute path, then it is interpreted as relative to the spoolDir. |
consumeOrder | oldest | In which order files in the spooling directory will be consumed oldest, youngest and random. In case of oldest and youngest, the last modified time of the files will be used to compare the files. In case of a tie, the file with smallest lexicographical order will be consumed first. In case of random any file will be picked randomly. When using oldest and youngest the whole directory will be scanned to pick the oldest/youngest file, which might be slow if there are a large number of files, while using random may cause old files to be consumed very late if new files keep coming in the spooling directory. |
pollDelay | 500 | Delay (in milliseconds) used when polling for new files. |
recursiveDirectorySearch | false | Whether to monitor sub directories for new files to read. |
maxBackoff | 4000 | The maximum time (in millis) to wait between consecutive attempts to write to the channel(s) if the channel is full. The source will start at a low backoff and increase it exponentially each time the channel throws a ChannelException, upto the value specified by this parameter. |
batchSize | 100 | Granularity at which to batch transfer to the channel |
inputCharset | UTF-8 | Character set used by deserializers that treat the input file as text. |
decodeErrorPolicy | FAIL | What to do when we see a non-decodable character in the input file. FAIL: Throw an exception and fail to parse the file. REPLACE: Replace the unparseable character with the 「replacement character」 char, typically Unicode U+FFFD. IGNORE: Drop the unparseable character sequence. |
deserializer | LINE | Specify the deserializer used to parse the file into events. Defaults to parsing each line as an event. The class specified must implement EventDeserializer.Builder. |
deserializer.* | Varies per event deserializer. | |
bufferMaxLines | – | (Obselete) This option is now ignored. |
bufferMaxLineLength | 5000 | (Deprecated) Maximum length of a line in the commit buffer. Use deserializer.maxLineLength instead. |
selector.type | replicating | replicating or multiplexing |
selector.* | Depends on the selector.type value | |
interceptors | – | Space-separated list of interceptors |
interceptors.* |
agent agent-1示例:
a1.channels = ch-1
a1.sources = src-1 a1.sources.src-1.type = spooldir a1.sources.src-1.channels = ch-1 a1.sources.src-1.spoolDir = /var/log/apache/flumeSpool a1.sources.src-1.fileHeader = true
下面的事件反序列化器隨Flume一塊兒發佈。
這個反序列化器爲每行文本輸入生成一個事件。
Property Name | Default | Description |
---|---|---|
deserializer.maxLineLength | 2048 | Maximum number of characters to include in a single event. If a line exceeds this length, it is truncated, and the remaining characters on the line will appear in a subsequent event. |
deserializer.outputCharset | UTF-8 | Charset to use for encoding events put into the channel. |
這個反序列化器可以讀取Avro容器文件,並在文件中爲每一個Avro記錄生成一個事件。每一個事件都用一個表示所使用模式的頭進行註釋。事件的主體是二進制Avro記錄數據,不包括模式或容器文件元素的其他部分。
注意,若是spool目錄源必須重試將這些事件中的一個放到channel上(例如,由於channel已滿),那麼它將從最近的Avro容器文件同步點重置並重試。要減小這種失敗場景中的潛在事件重複,請在Avro輸入文件中更頻繁地編寫同步標記。
Property Name | Default | Description |
---|---|---|
deserializer.schemaType | HASH | How the schema is represented. By default, or when the value HASH is specified, the Avro schema is hashed and the hash is stored in every event in the event header 「flume.avro.schema.hash」. If LITERAL is specified, the JSON-encoded schema itself is stored in every event in the event header 「flume.avro.schema.literal」. Using LITERAL mode is relatively inefficient compared to HASH mode. |
這個反序列化器爲每一個事件讀取一個二進制大對象(BLOB),一般爲每一個文件讀取一個BLOB。例如PDF或JPG文件。注意,這種方法不適用於很是大的對象,由於整個BLOB都在RAM中緩衝。
Property Name | Default | Description |
---|---|---|
deserializer | – | The FQCN of this class: org.apache.flume.sink.solr.morphline.BlobDeserializer$Builder |
deserializer.maxBlobLength | 100000000 | The maximum number of bytes to read and buffer for a given request |
注意,此源代碼是做爲預覽功能提供的。它不能在Windows上運行。
監視指定的文件,並在檢測到附加到每一個文件的新行以後幾乎實時跟蹤它們。若是正在寫入新行,該源代碼將重試讀取它們,等待寫入完成。
這個源是可靠的,不會錯過數據,即便當尾文件滾動。它按期以JSON格式將每一個文件的最後一個讀位置寫到給定的位置文件上。若是flume因爲某種緣由中止或停機,它能夠將指定位置讀取的數據寫入現有位置文件中(注:從指定的斷開位置讀positionFile參數,寫入到上次的文件中)。
在其餘用例中,這個源文件還能夠開始跟蹤使用給定位置文件的每一個文件的任意位置。當指定路徑上沒有位置文件時,默認狀況下它將從每一個文件的第一行開始尾隨。
文件將按照修改時間的順序被使用。修改時間最長的文件將首先被使用。
此源不重命名、刪除或對被跟蹤的文件作任何修改。目前,該源代碼不支持跟蹤二進制文件。它逐行讀取文本文件。
Property Name | Default | Description |
---|---|---|
channels | – | |
type | – | The component type name, needs to be TAILDIR. |
filegroups | – | Space-separated list of file groups. Each file group indicates a set of files to be tailed. |
filegroups.<filegroupName> | – | Absolute path of the file group. Regular expression (and not file system patterns) can be used for filename only. |
positionFile | ~/.flume/taildir_position.json | File in JSON format to record the inode, the absolute path and the last position of each tailing file. |
headers.<filegroupName>.<headerKey> | – | Header value which is the set with header key. Multiple headers can be specified for one file group. |
byteOffsetHeader | false | Whether to add the byte offset of a tailed line to a header called ‘byteoffset’. |
skipToEnd | false | Whether to skip the position to EOF in the case of files not written on the position file. |
idleTimeout | 120000 | Time (ms) to close inactive files. If the closed file is appended new lines to, this source will automatically re-open it. |
writePosInterval | 3000 | Interval time (ms) to write the last position of each file on the position file. |
batchSize | 100 | Max number of lines to read and send to the channel at a time. Using the default is usually fine. |
backoffSleepIncrement | 1000 | The increment for time delay before reattempting to poll for new data, when the last attempt did not find any new data. |
maxBackoffSleep | 5000 | The max time delay between each reattempt to poll for new data, when the last attempt did not find any new data. |
cachePatternMatching | true | Listing directories and applying the filename regex pattern may be time consuming for directories containing thousands of files. Caching the list of matching files can improve performance. The order in which files are consumed will also be cached. Requires that the file system keeps track of modification times with at least a 1-second granularity. |
fileHeader | false | Whether to add a header storing the absolute path filename. |
fileHeaderKey | file | Header key to use when appending absolute path filename to event header. |
agent a1 示例:
a1.sources = r1
a1.channels = c1 a1.sources.r1.type = TAILDIR a1.sources.r1.channels = c1 a1.sources.r1.positionFile = /var/log/flume/taildir_position.json a1.sources.r1.filegroups = f1 f2 a1.sources.r1.filegroups.f1 = /var/log/test1/example.log a1.sources.r1.headers.f1.headerKey1 = value1 a1.sources.r1.filegroups.f2 = /var/log/test2/.*log.* a1.sources.r1.headers.f2.headerKey1 = value2 a1.sources.r1.headers.f2.headerKey2 = value2-2 a1.sources.r1.fileHeader = true
警告:這個源是高度實驗性的,可能會在不一樣版本的Flume之間發生變化。使用風險自負。
實驗源,經過流API鏈接到1%的示例twitter firehose,持續下載tweet,將其轉換爲Avro格式,並將Avro事件發送到下游Flume sink。須要使用者和訪問令牌以及Twitter開發人員賬戶的祕密。必須屬性以粗體顯示。
Property Name | Default | Description |
---|---|---|
channels | – | |
type | – | The component type name, needs to be org.apache.flume.source.twitter.TwitterSource |
consumerKey | – | OAuth consumer key |
consumerSecret | – | OAuth consumer secret |
accessToken | – | OAuth access token |
accessTokenSecret | – | OAuth token secret |
maxBatchSize | 1000 | Maximum number of twitter messages to put in a single batch |
maxBatchDurationMillis | 1000 | Maximum number of milliseconds to wait before closing a batch |
agent a1 示例:
a1.sources = r1
a1.channels = c1 a1.sources.r1.type = org.apache.flume.source.twitter.TwitterSource a1.sources.r1.channels = c1 a1.sources.r1.consumerKey = YOUR_TWITTER_CONSUMER_KEY a1.sources.r1.consumerSecret = YOUR_TWITTER_CONSUMER_SECRET a1.sources.r1.accessToken = YOUR_TWITTER_ACCESS_TOKEN a1.sources.r1.accessTokenSecret = YOUR_TWITTER_ACCESS_TOKEN_SECRET a1.sources.r1.maxBatchSize = 10 a1.sources.r1.maxBatchDurationMillis = 200
Kafka源是一個Apache Kafka消費者,它從Kafka主題讀取消息。若是您有多個Kafka源在運行,您能夠將它們配置爲同一個消費組,這樣每一個消費組都將爲主題讀取一組唯一的分區。
Property Name | Default | Description |
---|---|---|
channels | – | |
type | – | The component type name, needs to be org.apache.flume.source.kafka.KafkaSource |
kafka.bootstrap.servers | – | List of brokers in the Kafka cluster used by the source |
kafka.consumer.group.id | flume | Unique identified of consumer group. Setting the same id in multiple sources or agents indicates that they are part of the same consumer group |
kafka.topics | – | Comma-separated list of topics the kafka consumer will read messages from. |
kafka.topics.regex | – | Regex that defines set of topics the source is subscribed on. This property has higher priority than kafka.topics and overrides kafka.topics if exists. |
batchSize | 1000 | Maximum number of messages written to Channel in one batch |
batchDurationMillis | 1000 | Maximum time (in ms) before a batch will be written to Channel The batch will be written whenever the first of size and time will be reached. |
backoffSleepIncrement | 1000 | Initial and incremental wait time that is triggered when a Kafka Topic appears to be empty. Wait period will reduce aggressive pinging of an empty Kafka Topic. One second is ideal for ingestion use cases but a lower value may be required for low latency operations with interceptors. |
maxBackoffSleep | 5000 | Maximum wait time that is triggered when a Kafka Topic appears to be empty. Five seconds is ideal for ingestion use cases but a lower value may be required for low latency operations with interceptors. |
useFlumeEventFormat | false | By default events are taken as bytes from the Kafka topic directly into the event body. Set to true to read events as the Flume Avro binary format. Used in conjunction with the same property on the KafkaSink or with the parseAsFlumeEvent property on the Kafka Channel this will preserve any Flume headers sent on the producing side. |
setTopicHeader | true | When set to true, stores the topic of the retrieved message into a header, defined by the topicHeaderproperty. |
topicHeader | topic | Defines the name of the header in which to store the name of the topic the message was received from, if the setTopicHeader property is set to true. Care should be taken if combining with the Kafka Sink topicHeader property so as to avoid sending the message back to the same topic in a loop. |
migrateZookeeperOffsets | true | When no Kafka stored offset is found, look up the offsets in Zookeeper and commit them to Kafka. This should be true to support seamless Kafka client migration from older versions of Flume. Once migrated this can be set to false, though that should generally not be required. If no Zookeeper offset is found, the Kafka configuration kafka.consumer.auto.offset.reset defines how offsets are handled. Check Kafka documentation for details |
kafka.consumer.security.protocol | PLAINTEXT | Set to SASL_PLAINTEXT, SASL_SSL or SSL if writing to Kafka using some level of security. See below for additional info on secure setup. |
more consumer security props | If using SASL_PLAINTEXT, SASL_SSL or SSL refer to Kafka security for additional properties that need to be set on consumer. |
|
Other Kafka Consumer Properties | – | These properties are used to configure the Kafka Consumer. Any consumer property supported by Kafka can be used. The only requirement is to prepend the property name with the prefixkafka.consumer. For example: kafka.consumer.auto.offset.reset |
注意:Kafka源覆蓋了兩個Kafka使用者參數:auto.commit.enable被源設置爲「false」,而且每批都提交了。Kafka源保證至少一次消息檢索策略。當源啓動時,能夠顯示副本。Kafka源代碼還爲key.deserializer(org.apache. Kafka .common. serialize . stringserializer)和value.deserializer(org.apache. Kafka .common. serialize . bytearrayserializer)提供了默認值。不建議修改這些參數。
棄用屬性:
Property Name | Default | Description |
---|---|---|
topic | – | Use kafka.topics |
groupId | flume | Use kafka.consumer.group.id |
zookeeperConnect | – | Is no longer supported by kafka consumer client since 0.9.x. Use kafka.bootstrap.servers to establish connection with kafka cluster |
以逗號分隔的topic 列表訂閱topic的示例:
tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
tier1.sources.source1.channels = channel1 tier1.sources.source1.batchSize = 5000 tier1.sources.source1.batchDurationMillis = 2000 tier1.sources.source1.kafka.bootstrap.servers = localhost:9092 tier1.sources.source1.kafka.topics = test1, test2 tier1.sources.source1.kafka.consumer.group.id = custom.g.id
regex訂閱topic的示例:
tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
tier1.sources.source1.channels = channel1 tier1.sources.source1.kafka.bootstrap.servers = localhost:9092 tier1.sources.source1.kafka.topics.regex = ^topic[0-9]$ # the default kafka.consumer.group.id=flume is used
安全和kafka source:
Flume和Kafka之間的通訊通道支持安全認證和數據加密。對於安全身份驗證,可使用Kafka版本0.9.0中的SASL/GSSAPI (Kerberos V5)或SSL(儘管參數名爲SSL,但實際的協議是TLS實現)。
到目前爲止,數據加密僅由SSL/TLS提供。
設置kafka.consumer.security.protocol 符合下列任何一項價值意味着:
SASL_PLAINTEXT - 沒有數據加密的Kerberos或明文身份驗證
SASL_SSL - 帶有數據加密的Kerberos或純文自己份驗證
SSL - TLS加密,具備可選的身份驗證。
警告:啓用SSL時會致使性能降低,其程度取決於CPU類型和JVM實現。參考文獻:Kafka安全概述和用於跟蹤這個問題的jira: Kafka -2561
TLS and Kafka Source:
請閱讀配置Kafka客戶端SSL中描述的步驟,以瞭解用於微調的其餘配置設置,例如如下任何一種:安全提供程序、密碼套件、啓用的協議、信任存儲或密鑰存儲類型。
使用服務器端身份驗證和數據加密的示例配置:
a1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.source1.kafka.bootstrap.servers = kafka-1:9093,kafka-2:9093,kafka-3:9093 a1.sources.source1.kafka.topics = mytopic a1.sources.source1.kafka.consumer.group.id = flume-consumer a1.sources.source1.kafka.consumer.security.protocol = SSL a1.sources.source1.kafka.consumer.ssl.truststore.location=/path/to/truststore.jks a1.sources.source1.kafka.consumer.ssl.truststore.password=<password to access the truststore>
注意:默認狀況下屬性是ssl.endpoint.identification.algorithm 沒有定義,所以沒有執行主機名驗證。爲了啓用主機名驗證,請設置如下屬性:
a1.sources.source1.kafka.consumer.ssl.endpoint.identification.algorithm=HTTPS
一旦啓用,客戶端將針對如下兩個字段之一驗證服務器的徹底限定域名(FQDN):
若是還須要客戶端身份驗證,那麼應該向Flume agent配置添加如下內容。每一個Flume代理必須擁有本身的客戶端證書,這些證書必須由Kafka brokers單獨或經過其簽名鏈進行信任。常見的示例是經過一個根CA對每一個客戶端證書進行簽名,而這個根CA證書又受到Kafka代理的信任。
a1.sources.source1.kafka.consumer.ssl.keystore.location=/path/to/client.keystore.jks
a1.sources.source1.kafka.consumer.ssl.keystore.password=<password to access the keystore>
若是密鑰存儲和密鑰使用不一樣的密碼保護,則使用ssl.key.password 屬性將爲兩個使用者密鑰存儲庫提供所需的額外機密:
a1.sources.source1.kafka.consumer.ssl.key.password=<password to access the key>
Kerberos and Kafka Source:
要將Kafka源與Kerberos保護的Kafka集羣一塊兒使用,請設置consumer.security.protocol爲上面使用者指出的屬性。與Kafka brokers一塊兒使用的Kerberos keytab和主體在JAAS文件的「KafkaClient」部分中指定。「客戶端」部分描述了須要時的Zookeeper鏈接。有關JAAS文件內容的信息,請參見Kafka文檔。能夠經過flume-env.sh中的JAVA_OPTS指定這個JAAS文件的位置,也能夠選擇指定系統範圍內的kerberos配置:
JAVA_OPTS="$JAVA_OPTS -Djava.security.krb5.conf=/path/to/krb5.conf"
JAVA_OPTS="$JAVA_OPTS -Djava.security.auth.login.config=/path/to/flume_jaas.conf"
使用SASL_PLAINTEXT的安全配置示例:
a1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.source1.kafka.bootstrap.servers = kafka-1:9093,kafka-2:9093,kafka-3:9093 a1.sources.source1.kafka.topics = mytopic a1.sources.source1.kafka.consumer.group.id = flume-consumer a1.sources.source1.kafka.consumer.security.protocol = SASL_PLAINTEXT a1.sources.source1.kafka.consumer.sasl.mechanism = GSSAPI a1.sources.source1.kafka.consumer.sasl.kerberos.service.name = kafka
使用SASL_SSL的安全配置示例:
a1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.source1.kafka.bootstrap.servers = kafka-1:9093,kafka-2:9093,kafka-3:9093 a1.sources.source1.kafka.topics = mytopic a1.sources.source1.kafka.consumer.group.id = flume-consumer a1.sources.source1.kafka.consumer.security.protocol = SASL_SSL a1.sources.source1.kafka.consumer.sasl.mechanism = GSSAPI a1.sources.source1.kafka.consumer.sasl.kerberos.service.name = kafka a1.sources.source1.kafka.consumer.ssl.truststore.location=/path/to/truststore.jks a1.sources.source1.kafka.consumer.ssl.truststore.password=<password to access the truststore>
JAAS文件示例。有關其內容的參考,請參閱SASL配置的Kafka文檔中所需身份驗證機制(GSSAPI/PLAIN)的客戶端配置部分。因爲Kafka源也能夠鏈接到Zookeeper進行偏移遷移,所以「Client」部分也被添加到這個示例中。除非您須要偏移遷移,或者對於其餘安全組件須要此部分,不然不須要這樣作。另外,請確保Flume進程的操做系統用戶具備jaas和keytab文件上的讀權限。
Client {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true storeKey=true keyTab="/path/to/keytabs/flume.keytab" principal="flume/flumehost1.example.com@YOURKERBEROSREALM"; }; KafkaClient { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab="/path/to/keytabs/flume.keytab" principal="flume/flumehost1.example.com@YOURKERBEROSREALM"; };
一個相似於netcat的source,它監聽給定端口並將每一行文本轉換爲一個事件。相似於nc -k -l[ip][port]。換句話說,它打開指定的端口並偵聽數據。預期提供的數據是換行分隔的文本。每一行文本都被轉換爲Flume事件,並經過鏈接的channel發送。
必須屬性以粗體顯示。
Property Name | Default | Description |
---|---|---|
channels | – | |
type | – | The component type name, needs to be netcat |
bind | – | Host name or IP address to bind to |
port | – | Port # to bind to |
max-line-length | 512 | Max line length per event body (in bytes) |
ack-every-event | true | Respond with an 「OK」 for every event received |
selector.type | replicating | replicating or multiplexing |
selector.* | Depends on the selector.type value | |
interceptors | – | Space-separated list of interceptors |
interceptors.* | |
agent a1 示例:
a1.sources = r1
a1.channels = c1 a1.sources.r1.type = netcat a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 6666 a1.sources.r1.channels = c1
根據原始Netcat (TCP)源,該源在給定端口上偵聽,並將每一行文本轉換爲一個事件,並經過鏈接的通道發送。相似於nc -u -k -l[host][端port]。
必須屬性以粗體顯示。
Property Name | Default | Description |
---|---|---|
channels | – | |
type | – | The component type name, needs to be netcatudp |
bind | – | Host name or IP address to bind to |
port | – | Port # to bind to |
remoteAddressHeader | – | |
selector.type | replicating | replicating or multiplexing |
selector.* | Depends on the selector.type value | |
interceptors | – | Space-separated list of interceptors |
interceptors.* | |
agent a1 示例:
a1.sources = r1
a1.channels = c1 a1.sources.r1.type = netcatudp a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 6666 a1.sources.r1.channels = c1
一個簡單的序列生成器,它使用計數器連續生成事件,計數器從0開始,遞增1,並在totalEvents中止。沒法將事件發送到channel時重試。主要用於測試。在重試期間,它保持重試消息的主體與之前相同,這樣,在目的地重複數據刪除以後,惟一事件的數量預期將等於指定的totalEvents。必須屬性以粗體顯示。
Property Name | Default | Description |
---|---|---|
channels | – | |
type | – | The component type name, needs to be seq |
selector.type | replicating or multiplexing | |
selector.* | replicating | Depends on the selector.type value |
interceptors | – | Space-separated list of interceptors |
interceptors.* | ||
batchSize | 1 | Number of events to attempt to process per request loop. |
totalEvents | Long.MAX_VALUE | Number of unique events sent by the source. |
agent a1 示例:
a1.sources = r1 a1.channels = c1 a1.sources.r1.type = seq a1.sources.r1.channels = c1
讀取syslog數據並生成Flume事件。UDP源將整個消息視爲單個事件。TCP源爲用換行符(' n ')分隔的每一個字符串建立一個新事件。
必須屬性以粗體顯示。
原始的、可靠的syslog TCP源.
Property Name | Default | Description |
---|---|---|
channels | – | |
type | – | The component type name, needs to be syslogtcp |
host | – | Host name or IP address to bind to |
port | – | Port # to bind to |
eventSize | 2500 | Maximum size of a single event line, in bytes |
keepFields | none | Setting this to ‘all’ will preserve the Priority, Timestamp and Hostname in the body of the event. A spaced separated list of fields to include is allowed as well. Currently, the following fields can be included: priority, version, timestamp, hostname. The values ‘true’ and ‘false’ have been deprecated in favor of ‘all’ and ‘none’. |
selector.type | replicating or multiplexing | |
selector.* | replicating | Depends on the selector.type value |
interceptors | – | Space-separated list of interceptors |
interceptors.* |
例如,agent a1 syslog TCP source
a1.sources = r1 a1.channels = c1 a1.sources.r1.type = syslogtcp a1.sources.r1.port = 5140 a1.sources.r1.host = localhost a1.sources.r1.channels = c1
這是一個更新、更快、支持多端口的Syslog TCP源版本。請注意,端口配置設置已經替換了端口。多端口功能意味着它能夠以一種有效的方式同時監聽多個端口。這個源代碼使用Apache Mina庫來實現這一點。提供對RFC-3164和許多常見的RFC-5424格式消息的支持。還提供按端口配置所使用的字符集的功能。
Property Name | Default | Description |
---|---|---|
channels | – | |
type | – | The component type name, needs to be multiport_syslogtcp |
host | – | Host name or IP address to bind to. |
ports | – | Space-separated list (one or more) of ports to bind to. |
eventSize | 2500 | Maximum size of a single event line, in bytes. |
keepFields | none | Setting this to ‘all’ will preserve the Priority, Timestamp and Hostname in the body of the event. A spaced separated list of fields to include is allowed as well. Currently, the following fields can be included: priority, version, timestamp, hostname. The values ‘true’ and ‘false’ have been deprecated in favor of ‘all’ and ‘none’. |
portHeader | – | If specified, the port number will be stored in the header of each event using the header name specified here. This allows for interceptors and channel selectors to customize routing logic based on the incoming port. |
charset.default | UTF-8 | Default character set used while parsing syslog events into strings. |
charset.port.<port> | – | Character set is configurable on a per-port basis. |
batchSize | 100 | Maximum number of events to attempt to process per request loop. Using the default is usually fine. |
readBufferSize | 1024 | Size of the internal Mina read buffer. Provided for performance tuning. Using the default is usually fine. |
numProcessors | (auto-detected) | Number of processors available on the system for use while processing messages. Default is to auto-detect # of CPUs using the Java Runtime API. Mina will spawn 2 request-processing threads per detected CPU, which is often reasonable. |
selector.type | replicating | replicating, multiplexing, or custom |
selector.* | – | Depends on the selector.type value |
interceptors | – | Space-separated list of interceptors. |
interceptors.* |
For example, a multiport syslog TCP source for agent named a1:
a1.sources = r1 a1.channels = c1 a1.sources.r1.type = multiport_syslogtcp a1.sources.r1.channels = c1 a1.sources.r1.host = 0.0.0.0 a1.sources.r1.ports = 10001 10002 10003 a1.sources.r1.portHeader = port
Property Name | Default | Description |
---|---|---|
channels | – | |
type | – | The component type name, needs to be syslogudp |
host | – | Host name or IP address to bind to |
port | – | Port # to bind to |
keepFields | false | Setting this to true will preserve the Priority, Timestamp and Hostname in the body of the event. |
selector.type | replicating or multiplexing | |
selector.* | replicating | Depends on the selector.type value |
interceptors | – | Space-separated list of interceptors |
interceptors.* |
For example, a syslog UDP source for agent named a1:
a1.sources = r1 a1.channels = c1 a1.sources.r1.type = syslogudp a1.sources.r1.port = 5140 a1.sources.r1.host = localhost a1.sources.r1.channels = c1
經過HTTP POST和GET接受Flume事件的源。GET應該只用於實驗。HTTP請求經過必須實現HTTPSourceHandler接口的可插入「處理程序」轉換爲flume事件。這個處理程序接受HttpServletRequest並返回一個flume事件列表。從一個Http請求處理的全部事件都提交給一個事務中的通道,從而提升了文件通道等通道的效率。若是處理程序拋出異常,該源將返回400的HTTP狀態。若是channel已滿,或者源沒法向channel追加事件,源將返回一個HTTP 503—暫時不可用狀態。
在一個post請求中發送的全部事件都被視爲一個批處理,並在一個事務中插入到channel中。
Property Name | Default | Description |
---|---|---|
type | The component type name, needs to be http | |
port | – | The port the source should bind to. |
bind | 0.0.0.0 | The hostname or IP address to listen on |
handler | org.apache.flume.source.http.JSONHandler | The FQCN of the handler class. |
handler.* | – | Config parameters for the handler |
selector.type | replicating | replicating or multiplexing |
selector.* | Depends on the selector.type value | |
interceptors | – | Space-separated list of interceptors |
interceptors.* | ||
enableSSL | false | Set the property true, to enable SSL. HTTP Source does not support SSLv3. |
excludeProtocols | SSLv3 | Space-separated list of SSL/TLS protocols to exclude. SSLv3 is always excluded. |
keystore | Location of the keystore includng keystore file name | |
keystorePassword Keystore password |
例如, a http source for agent named a1:
a1.sources = r1 a1.channels = c1 a1.sources.r1.type = http a1.sources.r1.port = 5140 a1.sources.r1.channels = c1 a1.sources.r1.handler = org.example.rest.RestHandler a1.sources.r1.handler.nickname = random props
提供了一個開箱即用的處理程序,它能夠處理JSON格式表示的事件,並支持UTF-八、UTF-16和UTF-32字符集。處理程序接受事件數組(即便只有一個事件,也必須以數組的形式發送事件),並根據請求中指定的編碼將其轉換爲Flume事件。若是沒有指定編碼,則假定爲UTF-8。JSON處理程序支持UTF-八、UTF-16和UTF-32。事件表示以下:
[{ "headers" : { "timestamp" : "434324343", "host" : "random_host.example.com" }, "body" : "random_body" }, { "headers" : { "namenode" : "namenode.example.com", "datanode" : "random_datanode.example.com" }, "body" : "really_random_body" }]
要設置字符集,請求必須具備指定爲application/json的內容類型;charset=UTF-8(根據須要將UTF-8替換爲UTF-16或UTF-32)。
按照此處理程序所指望的格式建立事件的一種方法是使用Flume SDK中提供的JSONEvent,並使用谷歌Gson使用Gson#fromJson(對象、類型)方法建立JSON字符串。傳遞給事件列表的方法的第二個參數的類型令牌能夠經過如下方式建立:
Type type = new TypeToken<List<JSONEvent>>() {}.getType();
默認狀況下,HTTPSource將JSON輸入拆分爲Flume事件。另外一種選擇是,BlobHandler是HTTPSource的處理程序,它返回一個事件,該事件包含請求參數以及隨此請求上載的二進制大對象(BLOB)。例如PDF或JPG文件。注意,這種方法不適用於很是大的對象,由於它在RAM中緩衝整個BLOB。
Property Name | Default | Description |
---|---|---|
handler | – | The FQCN of this class: org.apache.flume.sink.solr.morphline.BlobHandler |
handler.maxBlobLength | 100000000 | The maximum number of bytes to read and buffer for a given request |
StressSource是一個內部的負載產生源實現,它對壓力測試很是有用。它容許用戶使用空頭配置事件有效負載的大小。用戶能夠配置要發送的事件總數以及要交付的成功事件的最大數量。
必須屬性以粗體顯示。
Property Name | Default | Description |
---|---|---|
type | – | The component type name, needs to be org.apache.flume.source.StressSource |
size | 500 | Payload size of each Event. Unit:byte |
maxTotalEvents | -1 | Maximum number of Events to be sent |
maxSuccessfulEvents | -1 | Maximum number of Events successfully sent |
batchSize | 1 | Number of Events to be sent in one batch |
agent a1 示例:
a1.sources = stresssource-1 a1.channels = memoryChannel-1 a1.sources.stresssource-1.type = org.apache.flume.source.StressSource a1.sources.stresssource-1.size = 10240 a1.sources.stresssource-1.maxTotalEvents = 1000000 a1.sources.stresssource-1.channels = memoryChannel-1
legacy sources容許使用Flume1.x agent 接收代理0.9.4 agent 發送的事件。它接受Flume 0.9.4格式的事件,將它們轉換爲Flume 1.0格式,並將它們存儲在鏈接的channel中。時間戳、pri、主機、nanos等0.9.4事件屬性被轉換爲1。x事件頭屬性。legacy sources同時支持Avro和Thrift RPC鏈接。要在兩個Flume版本之間使用此橋接,您須要啓動一個Flume 1.x 具備avroLegacy或thriftLegacy源的agent , 0.9.4 agent應該讓agent sink指向1.x agent的主機/端口。
注意:Flume的可靠性語義1.x與水槽0.9.x不一樣。0.9.x Flume agent的E2E或DFO模式不支持 legacy source, 僅支持0.9.x 模式是最好,雖然能夠設置爲1.x,當事件被保存到flume1.x的channel中時,legacy source流將適用於它們。(注:翻譯不清楚,無力)
必須屬性以粗體顯示。
Property Name | Default | Description |
---|---|---|
channels | – | |
type | – | The component type name, needs to be org.apache.flume.source.avroLegacy.AvroLegacySource |
host | – | The hostname or IP address to bind to |
port | – | The port # to listen on |
selector.type | replicating or multiplexing | |
selector.* | replicating | Depends on the selector.type value |
interceptors | – | Space-separated list of interceptors |
interceptors.* |
agent a1 示例:
a1.sources = r1 a1.channels = c1 a1.sources.r1.type = org.apache.flume.source.avroLegacy.AvroLegacySource a1.sources.r1.host = 0.0.0.0 a1.sources.r1.bind = 6666 a1.sources.r1.channels = c1
Property Name | Default | Description |
---|---|---|
channels | – | |
type | – | The component type name, needs to be org.apache.flume.source.thriftLegacy.ThriftLegacySource |
host | – | The hostname or IP address to bind to |
port | – | The port # to listen on |
selector.type | replicating or multiplexing | |
selector.* | replicating | Depends on the selector.type value |
interceptors | – | Space-separated list of interceptors |
interceptors.* |
agent a1 示例:
a1.sources = r1 a1.channels = c1 a1.sources.r1.type = org.apache.flume.source.thriftLegacy.ThriftLegacySource a1.sources.r1.host = 0.0.0.0 a1.sources.r1.bind = 6666 a1.sources.r1.channels = c1
自定義源是您本身對源接口的實現。啓動Flume代理時,必須將自定義源的類及其依賴項包含在代理的類路徑中。定製源的類型是它的FQCN。
Property Name | Default | Description |
---|---|---|
channels | – | |
type | – | The component type name, needs to be your FQCN |
selector.type | replicating or multiplexing | |
selector.* | replicating | Depends on the selector.type value |
interceptors | – | Space-separated list of interceptors |
interceptors.* |
agent a1示例:
a1.sources = r1 a1.channels = c1 a1.sources.r1.type = org.example.MySource a1.sources.r1.channels = c1
Scribe是另外一種獲取系統。Flume採用現有的Scribe 獲取系統,應在Thrift的基礎上使用ScribeSource,並採用兼容的傳輸協議。關於Scribe的部署,請遵循Facebook的指南。必須屬性以粗體顯示。
Property Name | Default | Description |
---|---|---|
type | – | The component type name, needs to be org.apache.flume.source.scribe.ScribeSource |
port | 1499 | Port that Scribe should be connected |
maxReadBufferBytes | 16384000 | Thrift Default FrameBuffer Size |
workerThreads | 5 | Handing threads number in Thrift |
selector.type | ||
selector.* |
agent a1 示例:
a1.sources = r1 a1.channels = c1 a1.sources.r1.type = org.apache.flume.source.scribe.ScribeSource a1.sources.r1.port = 1463 a1.sources.r1.workerThreads = 5 a1.sources.r1.channels = c1
翻譯自官網flume1.8用戶指南,原文地址:Flume 1.8.0 User Guide
篇幅限制,分爲如下5篇:
【翻譯】Flume 1.8.0 User Guide(用戶指南)
【翻譯】Flume 1.8.0 User Guide(用戶指南) source
【翻譯】Flume 1.8.0 User Guide(用戶指南) Sink