【翻譯】Flume 1.8.0 User Guide(用戶指南) Processors

翻譯自官網flume1.8用戶指南,原文地址:Flume 1.8.0 User Guidehtml

篇幅限制,分爲如下5篇:java

【翻譯】Flume 1.8.0 User Guide(用戶指南)node

【翻譯】Flume 1.8.0 User Guide(用戶指南) sourcegit

【翻譯】Flume 1.8.0 User Guide(用戶指南) Sinkgithub

【翻譯】Flume 1.8.0 User Guide(用戶指南) Channel正則表達式

【翻譯】Flume 1.8.0 User Guide(用戶指南) Processors數據庫

Flume Sink Processors

接收器組容許用戶將多個接收器分組到一個實體中。接收器處理器可用於在組內的全部接收器上提供負載平衡功能,或在出現暫時故障時實現從一個接收器到另外一個接收器的故障轉移。express

必須屬性以粗體顯示。apache

Property Name Default Description
sinks Space-separated list of sinks that are participating in the group
processor.type default The component type name, needs to be defaultfailover or load_balance

Example for agent named a1:json

a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = load_balance

Default Sink Processor

默認接收器處理器只接受單個接收器。用戶沒必要爲單個接收器建立處理器(接收器組)。相反,用戶能夠遵循本用戶指南中前面解釋的源-通道-接收器模式。

Failover Sink Processor

故障轉移接收器處理器維護一個優先級較高的接收器列表,確保只要有一個可用的接收器,就會處理(交付)事件。

故障轉移機制的工做方式是將失敗的接收器降級到池中,在池中爲它們分配一個冷卻期,在重試以前隨着順序故障的增長而增長。一旦接收器成功發送事件,它將被恢復到活動池。接收器有一個與之相關的優先級,越大,優先級越高。若是一個接收器在發送事件時失敗,那麼下一個具備最高優先級的接收器將在下一次發送事件時嘗試。例如,優先級爲100的接收器在優先級爲80的接收器以前被激活。若是沒有指定優先級,則根據配置中指定接收器的順序肯定thr優先級。

若要配置,請設置接收器組處理器進行故障轉移,併爲全部單個接收器設置優先級。全部指定的優先級必須是惟一的。此外,可使用maxpenalty屬性設置故障轉移時間的上限(以毫秒爲單位)。

必須屬性以粗體顯示。

Property Name Default Description
sinks Space-separated list of sinks that are participating in the group
processor.type default The component type name, needs to be failover
processor.priority.<sinkName>

Priority value. <sinkName> must be one of the sink instances associated with the

current sink group A higher priority value Sink gets activated earlier. A larger absolute value indicates higher priority

processor.maxpenalty 30000 The maximum backoff period for the failed Sink (in millis)

Example for agent named a1:

a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.k1 = 5
a1.sinkgroups.g1.processor.priority.k2 = 10
a1.sinkgroups.g1.processor.maxpenalty = 10000

Load balancing Sink Processor

負載平衡接收器處理器提供了跨多個接收器的負載平衡流的能力。它維護一個活動接收器的索引列表,其中必須分佈負載。實現支持經過round_robin或隨機選擇機制分配負載。選擇機制的選擇默認爲round_robin類型,可是能夠經過配置覆蓋。經過繼承AbstractSinkSelector的自定義類支持自定義選擇機制。

調用時,此選擇器使用其配置的選擇機制選擇下一個接收器並調用它。對於round_robin和random,若是所選的接收器沒法交付事件,處理器將經過其配置的選擇機制選擇下一個可用的接收器。此實現不會將失敗的接收器列入黑名單,而是繼續樂觀地嘗試全部可用的接收器。若是全部接收器調用都致使失敗,則選擇器將失敗傳播到接收器運行器。

若是啓用了backoff,接收器處理器將把失敗的接收器列入黑名單,在給定的超時中刪除它們。當超時結束時,若是接收仍然是無響應的,則以指數方式增長超時,以免在無響應接收上陷入長時間等待。禁用此功能後,在循環中,全部失敗的接收器負載將被傳遞到行中的下一個接收器,所以不會均衡

必須屬性以粗體顯示。

Property Name Default Description
processor.sinks Space-separated list of sinks that are participating in the group
processor.type default The component type name, needs to be load_balance
processor.backoff false Should failed sinks be backed off exponentially.
processor.selector round_robin Selection mechanism. Must be either round_robinrandom or FQCN of custom class that inherits from AbstractSinkSelector
processor.selector.maxTimeOut 30000 Used by backoff selectors to limit exponential backoff (in milliseconds)

Example for agent named a1:

a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = load_balance
a1.sinkgroups.g1.processor.backoff = true
a1.sinkgroups.g1.processor.selector = random

Custom Sink Processor

目前不支持自定義接收器處理器。

 

Event Serializers

file_roll接收器和hdfs接收器都支持EventSerializer接口。下面提供了帶有Flume的eventserializer的詳細信息。

Body Text Serializer

別名:text。此攔截器將事件體寫入輸出流,而不進行任何轉換或修改。忽略事件標題。配置選項以下:

Property Name Default Description
appendNewline true

Whether a newline will be appended to each event at write time. The default of true assumes that

events do not contain newlines, for legacy reasons.

Example for agent named a1:

a1.sinks = k1
a1.sinks.k1.type = file_roll
a1.sinks.k1.channel = c1
a1.sinks.k1.sink.directory = /var/log/flume
a1.sinks.k1.sink.serializer = text
a1.sinks.k1.sink.serializer.appendNewline = false

「Flume Event」 Avro Event Serializer

別名:avro_event。

這個攔截器將Flume事件序列化到Avro容器文件中。使用的模式與Avro RPC機制中Flume事件使用的模式相同。

這個序列化器繼承了AbstractAvroEventSerializer類。

配置選項以下:

Property Name Default Description
syncIntervalBytes 2048000 Avro sync interval, in approximate bytes.
compressionCodec null Avro compression codec. For supported codecs, see Avro’s CodecFactory docs.

 Example for agent named a1:

a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/%S
a1.sinks.k1.serializer = avro_event
a1.sinks.k1.serializer.compressionCodec = snappy

Avro Event Serializer

別名:此序列化器沒有別名,必須使用全限定類名類名指定。

這將Flume事件序列化到Avro容器文件中,如「Flume事件」Avro事件序列化器,可是記錄模式是可配置的。記錄模式能夠指定爲Flume配置屬性,也能夠在事件頭中傳遞。

要將記錄模式做爲Flume配置的一部分傳遞,請使用下面列出的屬性schemaURL。

要在事件標頭中傳遞記錄模式,請指定事件標頭flume.avro.schema。包含模式或flume.av .schema的json格式表示的文本。一個能夠找到模式的url (hdfs:/…支持uri)。

這個序列化器繼承了AbstractAvroEventSerializer類。

配置選項以下:

Property Name Default Description
syncIntervalBytes 2048000 Avro sync interval, in approximate bytes.
compressionCodec null Avro compression codec. For supported codecs, see Avro’s CodecFactory docs.
schemaURL null Avro schema URL. Schemas specified in the header ovverride this option.

Example for agent named a1:

a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/%S
a1.sinks.k1.serializer = org.apache.flume.sink.hdfs.AvroEventSerializer$Builder
a1.sinks.k1.serializer.compressionCodec = snappy
a1.sinks.k1.serializer.schemaURL = hdfs://namenode/path/to/schema.avsc

Flume Interceptors

Flume可以修改/刪除飛行中的事件。這是在攔截器的幫助下完成的。攔截器是實現org.apache.flume.interceptor.Interceptor接口的類。攔截器能夠根據開發人員選擇的任何標準修改甚至刪除事件。Flume支持攔截器的連接。這能夠經過在配置中指定攔截器構建器類名的列表來實現。攔截器在源配置中指定爲空格分隔的列表。指定攔截器的順序就是調用它們的順序。一個攔截器返回的事件列表傳遞給鏈中的下一個攔截器。攔截器能夠修改或刪除事件。若是攔截器須要刪除事件,它只是在返回的列表中不返回該事件。若是要刪除全部事件,那麼它只返回一個空列表。攔截器是命名組件,下面是一個經過配置建立攔截器的例子:

a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.interceptors = i1 i2
a1.sources.r1.interceptors.i1.type = org.apache.flume.interceptor.HostInterceptor$Builder
a1.sources.r1.interceptors.i1.preserveExisting = false
a1.sources.r1.interceptors.i1.hostHeader = hostname
a1.sources.r1.interceptors.i2.type = org.apache.flume.interceptor.TimestampInterceptor$Builder
a1.sinks.k1.filePrefix = FlumeData.%{CollectorHost}.%Y-%m-%d
a1.sinks.k1.channel = c1

注意,攔截器構建器被傳遞給type config參數。攔截器自己是可配置的,能夠像傳遞給任何其餘可配置組件同樣傳遞配置值。在上面的示例中,首先將事件傳遞給HostInterceptor,而後將HostInterceptor返回的事件傳遞給TimestampInterceptor。能夠指定徹底限定類名(FQCN)或別名時間戳。若是有多個收集器寫入相同的HDFS路徑,那麼還可使用HostInterceptor。

Timestamp Interceptor

此攔截器將插入事件標頭,即它處理事件的millis時間。這個攔截器插入一個帶有鍵時間戳(或由header屬性指定)的消息頭,其值是相關的時間戳。若是配置中已有時間戳,則此攔截器能夠保留該時間戳。

Property Name Default Description
type The component type name, has to be timestamp or the FQCN
header timestamp The name of the header in which to place the generated timestamp.
preserveExisting false If the timestamp already exists, should it be preserved - true or false

Example for agent named a1:

a1.sources = r1
a1.channels = c1
a1.sources.r1.channels =  c1
a1.sources.r1.type = seq
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = timestamp

Host Interceptor

這個攔截器插入代理運行的主機的主機名或IP地址。它插入一個帶有密鑰主機的頭或一個已配置密鑰,該密鑰的值是基於配置的主機名或主機的IP地址。

Property Name Default Description
type The component type name, has to be host
preserveExisting false If the host header already exists, should it be preserved - true or false
useIP true Use the IP Address if true, else use hostname.
hostHeader host The header key to be used.

Example for agent named a1:

a1.sources = r1
a1.channels = c1
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = host

Static Interceptor 

 

靜態攔截器容許用戶向全部事件附加一個具備靜態值的靜態標題。

當前實現不容許同時指定多個標題。相反,用戶能夠連接多個靜態攔截器,每一個攔截器定義一個靜態頭。

Property Name Default Description
type The component type name, has to be static
preserveExisting true If configured header already exists, should it be preserved - true or false
key key Name of header that should be created
value value Static value that should be created

Example for agent named a1:

a1.sources = r1
a1.channels = c1
a1.sources.r1.channels =  c1
a1.sources.r1.type = seq
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = static
a1.sources.r1.interceptors.i1.key = datacenter
a1.sources.r1.interceptors.i1.value = NEW_YORK

Remove Header Interceptor

這個攔截器經過刪除一個或多個header來操縱Flume事件header。它能夠刪除靜態定義的頭、基於正則表達式的頭或列表中的頭。若是這些都沒有定義,或者沒有標題與標準匹配,就不會修改Flume事件。

注意,若是隻須要刪除一個頭,那麼經過名稱指定它會比其餘兩個方法提供更好的性能。

Property Name Default Description
type The component type name has to be remove_header
withName Name of the header to remove
fromList List of headers to remove, separated with the separator specified by fromListSeparator
fromListSeparator \s*,\s*

Regular expression used to separate multiple header names in the list specified by 

fromList. Default is a comma surrounded by any number of whitespace characters

matching All the headers which names match this regular expression are removed

 UUID Interceptor

這個攔截器在全部被攔截的事件上設置一個統一的惟一標識符。一個示例UUID是b5755073-77a9-43c1-8fa -b7a586fc1b97,它表示128位值。

若是沒有事件的應用程序級唯一鍵可用,能夠考慮使用UUIDInterceptor自動爲事件分配UUID。當事件進入Flume網絡時,爲它們分配uuid是很是重要的;也就是說,在第一個Flume source 的流中。這使得在爲高可用性和高性能而設計的Flume網絡中,面對複製和從新交付時,能夠對事件進行後續重複數據刪除。若是應用程序級密鑰可用,這比自動生成的UUID更可取,由於它使用已知的應用程序級密鑰支持數據存儲中事件的後續更新和刪除。

Property Name Default Description
type The component type name has to be org.apache.flume.sink.solr.morphline.UUIDInterceptor$Builder
headerName id The name of the Flume header to modify
preserveExisting true If the UUID header already exists, should it be preserved - true or false
prefix 「」 The prefix string constant to prepend to each generated UUID

 

Morphline Interceptor

這個攔截器經過一個形態線配置文件過濾事件,該文件定義了一個轉換命令鏈,將記錄從一個命令傳輸到另外一個命令。例如,morphline能夠忽略某些事件,或者經過基於正則表達式的模式匹配更改或插入某些事件頭部,或者能夠經過Apache Tika自動檢測並在被截獲的事件上設置MIME類型。例如,這種包嗅探能夠用於Flume拓撲中基於內容的動態路由。MorphlineInterceptor還能夠幫助實現到多個Apache Solr集合的動態路由(例如,對於多租戶)。

目前,有一個限制,攔截器的形態線不能爲每一個輸入事件生成多個輸出記錄。這個攔截器不是爲繁重的ETL處理而設計的——若是你須要的話,能夠考慮將ETL處理從Flume源轉移到Flume Sink,例如到MorphlineSolrSink。

必須屬性以粗體顯示。

 

Property Name Default Description
type

The component type name has to be org.apache.flume.sink.solr.

morphline.MorphlineInterceptor$Builder

morphlineFile

The relative or absolute path on the local file system to the morphline configuration file.

Example: /etc/flume-ng/conf/morphline.conf

morphlineId null

Optional name used to identify a morphline if there are multiple morphlines in a morphline

config file

Sample flume.conf file:

a1.sources.avroSrc.interceptors = morphlineinterceptor
a1.sources.avroSrc.interceptors.morphlineinterceptor.type = org.apache.flume.sink.solr.morphline.MorphlineInterceptor$Builder
a1.sources.avroSrc.interceptors.morphlineinterceptor.morphlineFile = /etc/flume-ng/conf/morphline.conf
a1.sources.avroSrc.interceptors.morphlineinterceptor.morphlineId = morphline1

Search and Replace Interceptor

這個攔截器基於Java正則表達式提供簡單的基於字符串的搜索和替換功能。也可使用回溯/組捕獲。這個攔截器使用與Java Matcher.replaceAll()方法中相同的規則。

Property Name Default Description
type The component type name has to be search_replace
searchPattern The pattern to search for and replace.
replaceString The replacement string.
charset UTF-8 The charset of the event body. Assumed by default to be UTF-8.

Example configuration:

a1.sources.avroSrc.interceptors = search-replace
a1.sources.avroSrc.interceptors.search-replace.type = search_replace

# Remove leading alphanumeric characters in an event body.
a1.sources.avroSrc.interceptors.search-replace.searchPattern = ^[A-Za-z0-9_]+
a1.sources.avroSrc.interceptors.search-replace.replaceString =

Another example:

a1.sources.avroSrc.interceptors = search-replace
a1.sources.avroSrc.interceptors.search-replace.type = search_replace

# Use grouping operators to reorder and munge words on a line.
a1.sources.avroSrc.interceptors.search-replace.searchPattern = The quick brown ([a-z]+) jumped over the lazy ([a-z]+)
a1.sources.avroSrc.interceptors.search-replace.replaceString = The hungry $2 ate the careless $1

Regex Filtering Interceptor

這個攔截器經過將事件體解釋爲文本並根據配置的正則表達式匹配文原本選擇性地過濾事件。提供的正則表達式可用於包含事件或排除事件。

Property Name Default Description
type The component type name has to be regex_filter
regex 」.*」 Regular expression for matching against events
excludeEvents false If true, regex determines events to exclude, otherwise regex determines events to include.
 

Regex Extractor Interceptor

這個攔截器使用指定的正則表達式提取regex匹配組,並將匹配組追加爲事件的頭部。它還支持可插入的序列化器,用於在將匹配組添加爲事件頭以前對其進行格式化。

Property Name Default Description
type The component type name has to be regex_extractor
regex Regular expression for matching against events
serializers

Space-separated list of serializers for mapping matches to header names and serializing their values.

(See example below) Flume provides built-in support for the following serializers:

org.apache.flume.interceptor.RegexExtractorInterceptorPassThroughSerializerorg.

apache.flume.interceptor.RegexExtractorInterceptorMillisSerializer

serializers.<s1>.type default

Must be default (org.apache.flume.interceptor.RegexExtractorInterceptorPassThroughSerializer),

org.apache.flume.interceptor.RegexExtractorInterceptorMillisSerializer, or the FQCN of a custom class that implements org.apache.flume.interceptor.RegexExtractorInterceptorSerializer

serializers.<s1>.name  
serializers.* Serializer-specific properties

 

序列化器用於將匹配映射到標題名稱和格式化的標題值;默認狀況下,您只須要指定標題名稱,並使用默認的org.apache.flume.interceptor. regexextractorinterceptorpassthrough序列化器。這個序列化器只是將匹配映射到指定的頭名稱,並在regex提取值時傳遞該值。您可使用徹底限定類名(FQCN)將自定義序列化器實現插入提取器中,以按照您喜歡的方式格式化匹配。

Example 1:

若是水槽事件體包含1:2:3.4foobar5,則使用如下配置

a1.sources.r1.interceptors.i1.regex = (\\d):(\\d):(\\d)
a1.sources.r1.interceptors.i1.serializers = s1 s2 s3
a1.sources.r1.interceptors.i1.serializers.s1.name = one
a1.sources.r1.interceptors.i1.serializers.s2.name = two
a1.sources.r1.interceptors.i1.serializers.s3.name = three

提取的事件將包含相同的主體,可是添加了如下頭部:1 =>1,2 =>2,3 =>3

Example 2:

若是水槽事件體包含2012-10-18 18:47:57,614,則使用一些日誌行,並使用如下配置

a1.sources.r1.interceptors.i1.regex = ^(?:\\n)?(\\d\\d\\d\\d-\\d\\d-\\d\\d\\s\\d\\d:\\d\\d)
a1.sources.r1.interceptors.i1.serializers = s1
a1.sources.r1.interceptors.i1.serializers.s1.type = org.apache.flume.interceptor.RegexExtractorInterceptorMillisSerializer
a1.sources.r1.interceptors.i1.serializers.s1.name = timestamp
a1.sources.r1.interceptors.i1.serializers.s1.pattern = yyyy-MM-dd HH:mm

提取的事件將包含相同的主體,可是添加了如下標頭的時間戳=>1350611220000

Flume Properties

Property Name Default Description
flume.called.from.service

If this property is specified then the Flume agent will continue polling for the config file even

if the config file is not found at the expected location. Otherwise, the Flume agent will

terminate if the config doesn’t exist at the expected location. No property

value is needed when setting this property (eg, just specifying -Dflume.called.from.service is enough)

 Property: flume.called.from.service

Flume每30秒按期輪詢指定配置文件的更改。若是第一次輪詢現有文件,或者自上次輪詢以來已有文件的修改日期發生了更改,則Flume代理將從配置文件加載新配置。重命名或移動文件不會改變其修改時間。當Flume代理輪詢不存在的文件時,會發生兩種狀況之一:1。當代理第一次輪詢不存在的配置文件時,代理將根據flume.call .from.service屬性進行操做。若是設置了屬性,那麼代理將繼續輪詢(始終在同一時間段—每30秒)。若是屬性未設置,則代理將當即終止。2. 當代理輪詢一個不存在的配置文件,而且這不是該文件第一次輪詢時,那麼代理在此輪詢期間不進行任何配置更改。代理繼續輪詢而不是終止。

Log4J Appender

將Log4j事件追加到flume代理的avro源。使用這個appender的客戶機必須在類路徑中包含flume-ng-sdk(例如,flume-ng-sdk-1.8.0.jar)。必須屬性以粗體顯示。

Property Name Default Description
Hostname The hostname on which a remote Flume agent is running with an avro source.
Port The port at which the remote Flume agent’s avro source is listening.
UnsafeMode false If true, the appender will not throw exceptions on failure to send the events.
AvroReflectionEnabled false Use Avro Reflection to serialize Log4j events. (Do not use when users log strings)
AvroSchemaUrl A URL from which the Avro schema can be retrieved.

Sample log4j.properties file:

#...
log4j.appender.flume = org.apache.flume.clients.log4jappender.Log4jAppender
log4j.appender.flume.Hostname = example.com
log4j.appender.flume.Port = 41414
log4j.appender.flume.UnsafeMode = true

# configure a class's logger to output to the flume appender
log4j.logger.org.example.MyClass = DEBUG,flume
#...

默認狀況下,經過調用toString()或使用Log4j佈局(若是指定的話)將每一個事件轉換爲一個字符串。

若是事件是org.apache.avro.generic.GenericRecord的實例。 org.apache.avro.specific.SpecificRecord, 若是屬性AvroReflectionEnabled設置爲true,則使用Avro序列化對事件進行序列化。

使用其Avro模式序列化每一個事件的效率很低,所以最好提供一個模式URL,下游接收器(一般是HDFS接收器)能夠從中檢索模式。若是沒有指定AvroSchemaUrl,則模式將做爲Flume標頭包含。

示例log4j。配置爲使用Avro序列化的屬性文件:

#...
log4j.appender.flume = org.apache.flume.clients.log4jappender.Log4jAppender
log4j.appender.flume.Hostname = example.com
log4j.appender.flume.Port = 41414
log4j.appender.flume.AvroReflectionEnabled = true
log4j.appender.flume.AvroSchemaUrl = hdfs://namenode/path/to/schema.avsc

# configure a class's logger to output to the flume appender
log4j.logger.org.example.MyClass = DEBUG,flume
#...

Load Balancing Log4J Appender

將Log4j事件追加到flume代理的avro源列表中。使用這個appender的客戶機必須在類路徑中包含flume-ng-sdk(例如,flume-ng-sdk-1.8.0.jar)。這個附加程序支持執行負載平衡的循環和隨機方案。它還支持可配置的backoff超時,以便臨時從所需的主機屬性集中刪除down代理。

Property Name Default Description
Hosts A space-separated list of host:port at which Flume (through an AvroSource) is listening for events
Selector ROUND_ROBIN

Selection mechanism. Must be either ROUND_ROBIN, RANDOM

or custom FQDN to class that inherits from LoadBalancingSelector.

MaxBackoff

A long value representing the maximum amount of time in milliseconds the Load balancing

client will backoff from a node that has failed to consume an event. Defaults to no backoff

UnsafeMode false If true, the appender will not throw exceptions on failure to send the events.
AvroReflectionEnabled false Use Avro Reflection to serialize Log4j events.
AvroSchemaUrl A URL from which the Avro schema can be retrieved.

 

Sample log4j.properties file configured using defaults:

#...
log4j.appender.out2 = org.apache.flume.clients.log4jappender.LoadBalancingLog4jAppender
log4j.appender.out2.Hosts = localhost:25430 localhost:25431

# configure a class's logger to output to the flume appender
log4j.logger.org.example.MyClass = DEBUG,flume
#...

Sample log4j.properties file configured using RANDOM load balancing:

#...
log4j.appender.out2 = org.apache.flume.clients.log4jappender.LoadBalancingLog4jAppender
log4j.appender.out2.Hosts = localhost:25430 localhost:25431
log4j.appender.out2.Selector = RANDOM

# configure a class's logger to output to the flume appender
log4j.logger.org.example.MyClass = DEBUG,flume
#...

Sample log4j.properties file configured using backoff:

#...
log4j.appender.out2 = org.apache.flume.clients.log4jappender.LoadBalancingLog4jAppender
log4j.appender.out2.Hosts = localhost:25430 localhost:25431 localhost:25432
log4j.appender.out2.Selector = ROUND_ROBIN
log4j.appender.out2.MaxBackoff = 30000

# configure a class's logger to output to the flume appender
log4j.logger.org.example.MyClass = DEBUG,flume
#...

Security

HDFS接收器、HBase接收器、Thrift source、Thrift接收器和Kite數據集接收器都支持Kerberos身份驗證。有關配置與kerberos相關的選項,請參閱相應的部分。

Flume代理將做爲一個主體對kerberos KDC進行身份驗證,須要kerberos身份驗證的不一樣組件將使用這個主體。爲Thrift source、Thrift sink、HDFS sink、HBase sink和DataSet sink配置的principal和keytab應該相同,不然組件將沒法啓動。

Monitoring

Flume的監測工做仍在進行中, 變化常常發生。幾個Flume組件向JMX平臺MBean服務器報告指標。可使用Jconsole查詢這些指標。

JMX Reporting

能夠經過使用flume-env在JAVA_OPTS環境變量中指定JMX參數來啓用JMX報告。就像

export JAVA_OPTS=」-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=5445 
-Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false

注意:上面的示例禁用安全性。要啓用安全性,請參閱http://docs.oracle.com/javase/6/docs/technotes/guides/management/agent.html

Ganglia Reporting

Flume還能夠向Ganglia 3或Ganglia 3.1 metanode報告這些指標。要向Ganglia報告指標,必須使用這種支持啓動flume代理。啓動Flume代理時,必須將如下參數做爲系統屬性傳遞給Flume .monitoring.,可在flume-env.sh中指定:

Property Name Default Description
type The component type name, has to be ganglia
hosts Comma-separated list of hostname:port of Ganglia servers
pollFrequency 60 Time, in seconds, between consecutive reporting to Ganglia server
isGanglia3 false Ganglia server version is 3. By default, Flume sends in Ganglia 3.1 format

 

咱們可使用Ganglia支持啓動Flume,以下所示:

$ bin/flume-ng agent --conf-file example.conf --name a1 -Dflume.monitoring.type=ganglia -Dflume.monitoring.hosts=com.example:1234,com.example2:5455

JSON Reporting

Flume還能夠以JSON格式報告指標。要啓用JSON格式的報表,Flume在一個可配置端口上駐留一個Web服務器。Flume以如下JSON格式報告指標:

{
"typeName1.componentName1" : {"metric1" : "metricValue1", "metric2" : "metricValue2"},
"typeName2.componentName2" : {"metric3" : "metricValue3", "metric4" : "metricValue4"}
}

下面是一個例子:

{
"CHANNEL.fileChannel":{"EventPutSuccessCount":"468085",
                      "Type":"CHANNEL",
                      "StopTime":"0",
                      "EventPutAttemptCount":"468086",
                      "ChannelSize":"233428",
                      "StartTime":"1344882233070",
                      "EventTakeSuccessCount":"458200",
                      "ChannelCapacity":"600000",
                      "EventTakeAttemptCount":"458288"},
"CHANNEL.memChannel":{"EventPutSuccessCount":"22948908",
                   "Type":"CHANNEL",
                   "StopTime":"0",
                   "EventPutAttemptCount":"22948908",
                   "ChannelSize":"5",
                   "StartTime":"1344882209413",
                   "EventTakeSuccessCount":"22948900",
                   "ChannelCapacity":"100",
                   "EventTakeAttemptCount":"22948908"}
}
Property Name Default Description
type The component type name, has to be http
port 41414 The port to start the server on.

 

咱們可使用JSON報表支持啓動Flume,以下所示:

$ bin/flume-ng agent --conf-file example.conf --name a1 -Dflume.monitoring.type=http -Dflume.monitoring.port=34545

而後,Metrics將在http://<hostname>:<port>/metrics網頁上提供。定製組件能夠報告上面Ganglia部分中提到的指標。

Custom Reporting

經過編寫執行報告的服務器,能夠向其餘系統報告指標。任何報告類都必須實現接口org.apache.flume.instrument . monitorservice。此類類的使用方式與GangliaServer用於報告的方式相同。他們能夠輪詢平臺mbean服務器以輪詢mbean以得到度量。例如,若是一個名爲httpre的HTTP監控服務可使用以下方式:

$ bin/flume-ng agent --conf-file example.conf --name a1 -Dflume.monitoring.type=com.example.reporting.HTTPReporting -Dflume.monitoring.node=com.example:332
Property Name Default Description
type The component type name, has to be FQCN

 

Reporting metrics from custom components

任何自定義flume組件都應該繼承自org.apache.flume.instrumentation.MonitoredCounterGroup類。而後類應該爲它公開的每一個度量提供getter方法。參見下面的代碼。MonitoredCounterGroup須要一個屬性列表,該類公開這些屬性的指標。到目前爲止,該類只支持將度量做爲長值公開。

public class SinkCounter extends MonitoredCounterGroup implements
    SinkCounterMBean {

  private static final String COUNTER_CONNECTION_CREATED =
    "sink.connection.creation.count";

  private static final String COUNTER_CONNECTION_CLOSED =
    "sink.connection.closed.count";

  private static final String COUNTER_CONNECTION_FAILED =
    "sink.connection.failed.count";

  private static final String COUNTER_BATCH_EMPTY =
    "sink.batch.empty";

  private static final String COUNTER_BATCH_UNDERFLOW =
      "sink.batch.underflow";

  private static final String COUNTER_BATCH_COMPLETE =
    "sink.batch.complete";

  private static final String COUNTER_EVENT_DRAIN_ATTEMPT =
    "sink.event.drain.attempt";

  private static final String COUNTER_EVENT_DRAIN_SUCCESS =
    "sink.event.drain.sucess";

  private static final String[] ATTRIBUTES = {
    COUNTER_CONNECTION_CREATED, COUNTER_CONNECTION_CLOSED,
    COUNTER_CONNECTION_FAILED, COUNTER_BATCH_EMPTY,
    COUNTER_BATCH_UNDERFLOW, COUNTER_BATCH_COMPLETE,
    COUNTER_EVENT_DRAIN_ATTEMPT, COUNTER_EVENT_DRAIN_SUCCESS
  };


  public SinkCounter(String name) {
    super(MonitoredCounterGroup.Type.SINK, name, ATTRIBUTES);
  }

  @Override
  public long getConnectionCreatedCount() {
    return get(COUNTER_CONNECTION_CREATED);
  }

  public long incrementConnectionCreatedCount() {
    return increment(COUNTER_CONNECTION_CREATED);
  }

}

Tools

File Channel Integrity Tool

文件通道完整性工具驗證文件通道中單個事件的完整性,並刪除損壞的事件。

工具能夠運行以下:

$bin/flume-ng tool --conf ./conf FCINTEGRITYTOOL -l ./datadir

其中datadir是要驗證的數據目錄的逗號分隔列表。

如下是可用的選項

Option Name Description
h/help Displays help
l/dataDirs Comma-separated list of data directories which the tool must verify
 

Event Validator Tool

事件驗證器工具可用於以特定於應用程序的方式驗證文件通道事件。該工具對每一個事件應用用戶提供程序驗證登陸,並刪除不符合邏輯的事件。

工具能夠運行以下:

$bin/flume-ng tool --conf ./conf FCINTEGRITYTOOL -l ./datadir -e org.apache.flume.MyEventValidator -DmaxSize 2000

其中datadir是要驗證的數據目錄的逗號分隔列表。

如下是可用的選項

Option Name Description
h/help Displays help
l/dataDirs Comma-separated list of data directories which the tool must verify
e/eventValidator Fully Qualified Name of Event Validator Implementation. The jar must be on Flume classpath

 

事件驗證器實現必須實現EventValidator接口。建議不要從實現中拋出任何異常,由於它們被視爲無效事件。其餘參數能夠經過-D選項傳遞給EventValitor實現。

讓咱們看一個簡單的基於大小的事件驗證器示例,它將拒絕大於指定的最大大小的事件。

public static class MyEventValidator implements EventValidator {

  private int value = 0;

  private MyEventValidator(int val) {
    value = val;
  }

  @Override
  public boolean validateEvent(Event event) {
    return event.getBody() <= value;
  }

  public static class Builder implements EventValidator.Builder {

    private int sizeValidator = 0;

    @Override
    public EventValidator build() {
      return new DummyEventVerifier(sizeValidator);
    }

    @Override
    public void configure(Context context) {
      binaryValidator = context.getInteger("maxSize");
    }
  }
}

Topology Design Considerations

Flume很是靈活,容許大量可能的部署場景。若是您計劃在大型生產部署中使用Flume,那麼明智的作法是花一些時間考慮如何用Flume拓撲來表示問題。本節將介紹一些注意事項。

Is Flume a good fit for your problem?

若是您須要將文本日誌數據導入Hadoop/HDFS中,那麼Flume正好適合您的問題,徹底中止。對於其餘用例,這裏有一些指導方針:

Flume的設計目的是在相對穩定、潛在複雜的拓撲結構上傳輸和攝取常規生成的事件數據。「事件數據」的概念定義很是普遍。對Flume來講,事件只是一個普通的字節blob。對於事件的大小有一些限制—例如,它不能大於您能夠存儲在內存或單個機器上的磁盤上的內容—可是在實踐中,flume事件能夠是從文本日誌條目到圖像文件的全部內容。事件的關鍵屬性是以連續的流方式生成的。若是您的數據不是按期生成的(例如,您正在嘗試將單個批量數據加載到Hadoop集羣中),那麼Flume仍然能夠工做,可是對於您的狀況來講,這可能有些過頭了。Flume喜歡相對穩定的拓撲結構。您的拓撲不須要是不可變的,由於Flume能夠在不丟失數據的狀況下處理拓撲中的更改,還能夠容忍因爲故障轉移或供應而按期進行從新配置。若是您天天都嘗試更改拓撲,那麼它可能不會很好地工做,由於從新配置須要一些思考和開銷。

Flow reliability in Flume

Flume流量的可靠性取決於幾個因素。經過調整這些因素,您能夠經過Flume實現普遍的可靠性選項。

你使用什麼類型的頻道?Flume既有持久通道(將數據持久化到磁盤上的通道),也有非持久通道(若是機器發生故障將丟失數據的通道)。持久通道使用基於磁盤的存儲,存儲在此類通道中的數據將在機器重啓或與磁盤無關的故障之間持續存在。

是否爲工做負載提供了足夠的通道。Flume中的通道在不一樣的躍點上充當緩衝器。這些緩衝器的容量是固定的,一旦容量滿了,就會對流中較早的點產生反壓力。若是這種壓力傳播到流的源頭,水槽將不可用,可能會丟失數據。

是否使用冗餘拓撲。Flume讓您能夠跨冗餘拓撲復制流。這能夠提供一個很是容易的容錯源,克服磁盤或機器故障。

考慮Flume拓撲中的可靠性的最佳方法是考慮各類故障場景及其結果。若是磁盤出現故障怎麼辦?若是機器故障了怎麼辦?若是你的終端接收器(如HDFS)降低一段時間,你有背壓,會發生什麼?可能的設計空間很大,可是您須要問的基本問題卻不多。

 

Flume topology design

設計Flume拓撲的第一步是枚舉數據的全部源和目標(終端接收器)。這些將定義拓撲的邊緣點。接下來要考慮的是是否引入中間聚合層或事件路由。若是您正在從大量源中收集數據,爲了簡化在終端接收器上的攝取,聚合這些數據是頗有幫助的。聚合層還能夠充當緩衝區,消除源的突發性或匯聚處的不可用性。若是您在不一樣位置之間路由數據,您可能還但願在不一樣的點上分割流:這將建立子拓撲,這些拓撲自己可能包含聚合點。

Sizing a Flume deployment

一旦您瞭解了拓撲的外觀,下一個問題就是須要多少硬件和網絡容量。首先要量化生成的數據量。這並不老是一項簡單的任務!大多數數據流都是突發性的(例如,因爲晝夜模式),而且可能沒法預測。一個好的起點是考慮拓撲的每一層的最大吞吐量,包括每秒的事件數和每秒的字節數。一旦您知道了給定層所需的吞吐量,就能夠計算出該層須要多少節點的下限。爲了肯定可達到的吞吐量,最好在硬件上使用合成的或取樣的事件數據對Flume進行試驗。通常來講,基於磁盤的通道應該獲得10 MB/s,基於內存的通道應該獲得100 MB/s或更多。可是,根據硬件和操做環境的不一樣,性能差異很大。

調整聚合吞吐量的大小能夠爲每一層所需的節點數量提供一個下限。增長節點的緣由有不少,好比增長冗餘和更好地吸取負載中的突發事件。

Troubleshooting

Handling agent failures

若是Flume代理宕機,則該代理上承載的全部流都將停止。一旦代理從新啓動,則流將恢復。使用文件流通道或其餘穩定的通道將恢復處理事件離開。若是代理不能從新啓動在相同的硬件上,而後有一個選項將數據庫遷移到另外一個硬件和設置一個新的水槽代理,能夠保存在數據庫恢復處理事件。能夠利用數據庫HA futures將Flume代理移動到另外一個主機。

Compatibility

HDFS

目前Flume支持HDFS 0.20.2和0.23

AVRO

TBD

Additional version requirements

TBD

Tracing

TBD

More Sample Configs

TBD

 Component Summary

 

Component Interface Type Alias Implementation Class
org.apache.flume.Channel memory org.apache.flume.channel.MemoryChannel
org.apache.flume.Channel jdbc org.apache.flume.channel.jdbc.JdbcChannel
org.apache.flume.Channel file org.apache.flume.channel.file.FileChannel
org.apache.flume.Channel org.apache.flume.channel.PseudoTxnMemoryChannel
org.apache.flume.Channel org.example.MyChannel
org.apache.flume.Source avro org.apache.flume.source.AvroSource
org.apache.flume.Source netcat org.apache.flume.source.NetcatSource
org.apache.flume.Source seq org.apache.flume.source.SequenceGeneratorSource
org.apache.flume.Source exec org.apache.flume.source.ExecSource
org.apache.flume.Source syslogtcp org.apache.flume.source.SyslogTcpSource
org.apache.flume.Source multiport_syslogtcp org.apache.flume.source.MultiportSyslogTCPSource
org.apache.flume.Source syslogudp org.apache.flume.source.SyslogUDPSource
org.apache.flume.Source spooldir org.apache.flume.source.SpoolDirectorySource
org.apache.flume.Source http org.apache.flume.source.http.HTTPSource
org.apache.flume.Source thrift org.apache.flume.source.ThriftSource
org.apache.flume.Source jms org.apache.flume.source.jms.JMSSource
org.apache.flume.Source org.apache.flume.source.avroLegacy.AvroLegacySource
org.apache.flume.Source org.apache.flume.source.thriftLegacy.ThriftLegacySource
org.apache.flume.Source org.example.MySource
org.apache.flume.Sink null org.apache.flume.sink.NullSink
org.apache.flume.Sink logger org.apache.flume.sink.LoggerSink
org.apache.flume.Sink avro org.apache.flume.sink.AvroSink
org.apache.flume.Sink hdfs org.apache.flume.sink.hdfs.HDFSEventSink
org.apache.flume.Sink hbase org.apache.flume.sink.hbase.HBaseSink
org.apache.flume.Sink asynchbase org.apache.flume.sink.hbase.AsyncHBaseSink
org.apache.flume.Sink elasticsearch org.apache.flume.sink.elasticsearch.ElasticSearchSink
org.apache.flume.Sink file_roll org.apache.flume.sink.RollingFileSink
org.apache.flume.Sink irc org.apache.flume.sink.irc.IRCSink
org.apache.flume.Sink thrift org.apache.flume.sink.ThriftSink
org.apache.flume.Sink org.example.MySink
org.apache.flume.ChannelSelector replicating org.apache.flume.channel.ReplicatingChannelSelector
org.apache.flume.ChannelSelector multiplexing org.apache.flume.channel.MultiplexingChannelSelector
org.apache.flume.ChannelSelector org.example.MyChannelSelector
org.apache.flume.SinkProcessor default org.apache.flume.sink.DefaultSinkProcessor
org.apache.flume.SinkProcessor failover org.apache.flume.sink.FailoverSinkProcessor
org.apache.flume.SinkProcessor load_balance org.apache.flume.sink.LoadBalancingSinkProcessor
org.apache.flume.SinkProcessor  
org.apache.flume.interceptor.Interceptor timestamp org.apache.flume.interceptor.TimestampInterceptor$Builder
org.apache.flume.interceptor.Interceptor host org.apache.flume.interceptor.HostInterceptor$Builder
org.apache.flume.interceptor.Interceptor static org.apache.flume.interceptor.StaticInterceptor$Builder
org.apache.flume.interceptor.Interceptor regex_filter org.apache.flume.interceptor.RegexFilteringInterceptor$Builder
org.apache.flume.interceptor.Interceptor regex_extractor org.apache.flume.interceptor.RegexFilteringInterceptor$Builder
org.apache.flume.channel.file.encryption.KeyProvider$Builder jceksfile org.apache.flume.channel.file.encryption.JCEFileKeyProvider
org.apache.flume.channel.file.encryption.KeyProvider$Builder org.example.MyKeyProvider
org.apache.flume.channel.file.encryption.CipherProvider aesctrnopadding org.apache.flume.channel.file.encryption.AESCTRNoPaddingProvider
org.apache.flume.channel.file.encryption.CipherProvider org.example.MyCipherProvider
org.apache.flume.serialization.EventSerializer$Builder text org.apache.flume.serialization.BodyTextEventSerializer$Builder
org.apache.flume.serialization.EventSerializer$Builder avro_event org.apache.flume.serialization.FlumeEventAvroEventSerializer$Builder
org.apache.flume.serialization.EventSerializer$Builder org.example.MyEventSerializer$Builder
 

Alias Conventions

在上面特定於組件的示例中使用了這些別名約定,以保持全部示例中的名稱簡短且一致。

Alias Name Alias Type
a agent
c channel
r source
k sink
g sink group
i interceptor
y key
h host
s serializer

 

翻譯自官網flume1.8用戶指南,原文地址:Flume 1.8.0 User Guide

篇幅限制,分爲如下5篇:

【翻譯】Flume 1.8.0 User Guide(用戶指南)

【翻譯】Flume 1.8.0 User Guide(用戶指南) source

【翻譯】Flume 1.8.0 User Guide(用戶指南) Sink

【翻譯】Flume 1.8.0 User Guide(用戶指南) Channel

【翻譯】Flume 1.8.0 User Guide(用戶指南) Processors

 

 完結

感謝有道翻譯,主要是他的功能,我就是個搬磚的

相關文章
相關標籤/搜索