翻譯自官網flume1.8用戶指南,原文地址:Flume 1.8.0 User Guidehtml
篇幅限制,分爲如下5篇:java
【翻譯】Flume 1.8.0 User Guide(用戶指南)node
【翻譯】Flume 1.8.0 User Guide(用戶指南) sourcegit
【翻譯】Flume 1.8.0 User Guide(用戶指南) Sinkgithub
【翻譯】Flume 1.8.0 User Guide(用戶指南) Channel正則表達式
【翻譯】Flume 1.8.0 User Guide(用戶指南) Processors數據庫
接收器組容許用戶將多個接收器分組到一個實體中。接收器處理器可用於在組內的全部接收器上提供負載平衡功能,或在出現暫時故障時實現從一個接收器到另外一個接收器的故障轉移。express
必須屬性以粗體顯示。apache
Property Name | Default | Description |
---|---|---|
sinks | – | Space-separated list of sinks that are participating in the group |
processor.type | default | The component type name, needs to be default, failover or load_balance |
Example for agent named a1:json
a1.sinkgroups = g1 a1.sinkgroups.g1.sinks = k1 k2 a1.sinkgroups.g1.processor.type = load_balance
默認接收器處理器只接受單個接收器。用戶沒必要爲單個接收器建立處理器(接收器組)。相反,用戶能夠遵循本用戶指南中前面解釋的源-通道-接收器模式。
故障轉移接收器處理器維護一個優先級較高的接收器列表,確保只要有一個可用的接收器,就會處理(交付)事件。
故障轉移機制的工做方式是將失敗的接收器降級到池中,在池中爲它們分配一個冷卻期,在重試以前隨着順序故障的增長而增長。一旦接收器成功發送事件,它將被恢復到活動池。接收器有一個與之相關的優先級,越大,優先級越高。若是一個接收器在發送事件時失敗,那麼下一個具備最高優先級的接收器將在下一次發送事件時嘗試。例如,優先級爲100的接收器在優先級爲80的接收器以前被激活。若是沒有指定優先級,則根據配置中指定接收器的順序肯定thr優先級。
若要配置,請設置接收器組處理器進行故障轉移,併爲全部單個接收器設置優先級。全部指定的優先級必須是惟一的。此外,可使用maxpenalty屬性設置故障轉移時間的上限(以毫秒爲單位)。
必須屬性以粗體顯示。
Property Name | Default | Description |
---|---|---|
sinks | – | Space-separated list of sinks that are participating in the group |
processor.type | default | The component type name, needs to be failover |
processor.priority.<sinkName> | – | Priority value. <sinkName> must be one of the sink instances associated with the current sink group A higher priority value Sink gets activated earlier. A larger absolute value indicates higher priority |
processor.maxpenalty | 30000 | The maximum backoff period for the failed Sink (in millis) |
Example for agent named a1:
a1.sinkgroups = g1 a1.sinkgroups.g1.sinks = k1 k2 a1.sinkgroups.g1.processor.type = failover a1.sinkgroups.g1.processor.priority.k1 = 5 a1.sinkgroups.g1.processor.priority.k2 = 10 a1.sinkgroups.g1.processor.maxpenalty = 10000
負載平衡接收器處理器提供了跨多個接收器的負載平衡流的能力。它維護一個活動接收器的索引列表,其中必須分佈負載。實現支持經過round_robin或隨機選擇機制分配負載。選擇機制的選擇默認爲round_robin類型,可是能夠經過配置覆蓋。經過繼承AbstractSinkSelector的自定義類支持自定義選擇機制。
調用時,此選擇器使用其配置的選擇機制選擇下一個接收器並調用它。對於round_robin和random,若是所選的接收器沒法交付事件,處理器將經過其配置的選擇機制選擇下一個可用的接收器。此實現不會將失敗的接收器列入黑名單,而是繼續樂觀地嘗試全部可用的接收器。若是全部接收器調用都致使失敗,則選擇器將失敗傳播到接收器運行器。
若是啓用了backoff,接收器處理器將把失敗的接收器列入黑名單,在給定的超時中刪除它們。當超時結束時,若是接收仍然是無響應的,則以指數方式增長超時,以免在無響應接收上陷入長時間等待。禁用此功能後,在循環中,全部失敗的接收器負載將被傳遞到行中的下一個接收器,所以不會均衡
必須屬性以粗體顯示。
Property Name | Default | Description |
---|---|---|
processor.sinks | – | Space-separated list of sinks that are participating in the group |
processor.type | default | The component type name, needs to be load_balance |
processor.backoff | false | Should failed sinks be backed off exponentially. |
processor.selector | round_robin | Selection mechanism. Must be either round_robin, random or FQCN of custom class that inherits from AbstractSinkSelector |
processor.selector.maxTimeOut | 30000 | Used by backoff selectors to limit exponential backoff (in milliseconds) |
Example for agent named a1:
a1.sinkgroups = g1 a1.sinkgroups.g1.sinks = k1 k2 a1.sinkgroups.g1.processor.type = load_balance a1.sinkgroups.g1.processor.backoff = true a1.sinkgroups.g1.processor.selector = random
目前不支持自定義接收器處理器。
file_roll接收器和hdfs接收器都支持EventSerializer接口。下面提供了帶有Flume的eventserializer的詳細信息。
別名:text。此攔截器將事件體寫入輸出流,而不進行任何轉換或修改。忽略事件標題。配置選項以下:
Property Name | Default | Description |
---|---|---|
appendNewline | true | Whether a newline will be appended to each event at write time. The default of true assumes that events do not contain newlines, for legacy reasons. |
Example for agent named a1:
a1.sinks = k1 a1.sinks.k1.type = file_roll a1.sinks.k1.channel = c1 a1.sinks.k1.sink.directory = /var/log/flume a1.sinks.k1.sink.serializer = text a1.sinks.k1.sink.serializer.appendNewline = false
別名:avro_event。
這個攔截器將Flume事件序列化到Avro容器文件中。使用的模式與Avro RPC機制中Flume事件使用的模式相同。
這個序列化器繼承了AbstractAvroEventSerializer類。
配置選項以下:
Property Name | Default | Description |
---|---|---|
syncIntervalBytes | 2048000 | Avro sync interval, in approximate bytes. |
compressionCodec | null | Avro compression codec. For supported codecs, see Avro’s CodecFactory docs. |
Example for agent named a1:
a1.sinks.k1.type = hdfs a1.sinks.k1.channel = c1 a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/%S a1.sinks.k1.serializer = avro_event a1.sinks.k1.serializer.compressionCodec = snappy
別名:此序列化器沒有別名,必須使用全限定類名類名指定。
這將Flume事件序列化到Avro容器文件中,如「Flume事件」Avro事件序列化器,可是記錄模式是可配置的。記錄模式能夠指定爲Flume配置屬性,也能夠在事件頭中傳遞。
要將記錄模式做爲Flume配置的一部分傳遞,請使用下面列出的屬性schemaURL。
要在事件標頭中傳遞記錄模式,請指定事件標頭flume.avro.schema。包含模式或flume.av .schema的json格式表示的文本。一個能夠找到模式的url (hdfs:/…支持uri)。
這個序列化器繼承了AbstractAvroEventSerializer類。
配置選項以下:
Property Name | Default | Description |
---|---|---|
syncIntervalBytes | 2048000 | Avro sync interval, in approximate bytes. |
compressionCodec | null | Avro compression codec. For supported codecs, see Avro’s CodecFactory docs. |
schemaURL | null | Avro schema URL. Schemas specified in the header ovverride this option. |
Example for agent named a1:
a1.sinks.k1.type = hdfs a1.sinks.k1.channel = c1 a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/%S a1.sinks.k1.serializer = org.apache.flume.sink.hdfs.AvroEventSerializer$Builder a1.sinks.k1.serializer.compressionCodec = snappy a1.sinks.k1.serializer.schemaURL = hdfs://namenode/path/to/schema.avsc
Flume可以修改/刪除飛行中的事件。這是在攔截器的幫助下完成的。攔截器是實現org.apache.flume.interceptor.Interceptor接口的類。攔截器能夠根據開發人員選擇的任何標準修改甚至刪除事件。Flume支持攔截器的連接。這能夠經過在配置中指定攔截器構建器類名的列表來實現。攔截器在源配置中指定爲空格分隔的列表。指定攔截器的順序就是調用它們的順序。一個攔截器返回的事件列表傳遞給鏈中的下一個攔截器。攔截器能夠修改或刪除事件。若是攔截器須要刪除事件,它只是在返回的列表中不返回該事件。若是要刪除全部事件,那麼它只返回一個空列表。攔截器是命名組件,下面是一個經過配置建立攔截器的例子:
a1.sources = r1 a1.sinks = k1 a1.channels = c1 a1.sources.r1.interceptors = i1 i2 a1.sources.r1.interceptors.i1.type = org.apache.flume.interceptor.HostInterceptor$Builder a1.sources.r1.interceptors.i1.preserveExisting = false a1.sources.r1.interceptors.i1.hostHeader = hostname a1.sources.r1.interceptors.i2.type = org.apache.flume.interceptor.TimestampInterceptor$Builder a1.sinks.k1.filePrefix = FlumeData.%{CollectorHost}.%Y-%m-%d a1.sinks.k1.channel = c1
注意,攔截器構建器被傳遞給type config參數。攔截器自己是可配置的,能夠像傳遞給任何其餘可配置組件同樣傳遞配置值。在上面的示例中,首先將事件傳遞給HostInterceptor,而後將HostInterceptor返回的事件傳遞給TimestampInterceptor。能夠指定徹底限定類名(FQCN)或別名時間戳。若是有多個收集器寫入相同的HDFS路徑,那麼還可使用HostInterceptor。
此攔截器將插入事件標頭,即它處理事件的millis時間。這個攔截器插入一個帶有鍵時間戳(或由header屬性指定)的消息頭,其值是相關的時間戳。若是配置中已有時間戳,則此攔截器能夠保留該時間戳。
Property Name | Default | Description |
---|---|---|
type | – | The component type name, has to be timestamp or the FQCN |
header | timestamp | The name of the header in which to place the generated timestamp. |
preserveExisting | false | If the timestamp already exists, should it be preserved - true or false |
Example for agent named a1:
a1.sources = r1 a1.channels = c1 a1.sources.r1.channels = c1 a1.sources.r1.type = seq a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type = timestamp
這個攔截器插入代理運行的主機的主機名或IP地址。它插入一個帶有密鑰主機的頭或一個已配置密鑰,該密鑰的值是基於配置的主機名或主機的IP地址。
Property Name | Default | Description |
---|---|---|
type | – | The component type name, has to be host |
preserveExisting | false | If the host header already exists, should it be preserved - true or false |
useIP | true | Use the IP Address if true, else use hostname. |
hostHeader | host | The header key to be used. |
Example for agent named a1:
a1.sources = r1 a1.channels = c1 a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type = host
靜態攔截器容許用戶向全部事件附加一個具備靜態值的靜態標題。
當前實現不容許同時指定多個標題。相反,用戶能夠連接多個靜態攔截器,每一個攔截器定義一個靜態頭。
Property Name | Default | Description |
---|---|---|
type | – | The component type name, has to be static |
preserveExisting | true | If configured header already exists, should it be preserved - true or false |
key | key | Name of header that should be created |
value | value | Static value that should be created |
Example for agent named a1:
a1.sources = r1 a1.channels = c1 a1.sources.r1.channels = c1 a1.sources.r1.type = seq a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type = static a1.sources.r1.interceptors.i1.key = datacenter a1.sources.r1.interceptors.i1.value = NEW_YORK
這個攔截器經過刪除一個或多個header來操縱Flume事件header。它能夠刪除靜態定義的頭、基於正則表達式的頭或列表中的頭。若是這些都沒有定義,或者沒有標題與標準匹配,就不會修改Flume事件。
注意,若是隻須要刪除一個頭,那麼經過名稱指定它會比其餘兩個方法提供更好的性能。
Property Name | Default | Description |
---|---|---|
type | – | The component type name has to be remove_header |
withName | – | Name of the header to remove |
fromList | – | List of headers to remove, separated with the separator specified by fromListSeparator |
fromListSeparator | \s*,\s* | Regular expression used to separate multiple header names in the list specified by fromList. Default is a comma surrounded by any number of whitespace characters |
matching | – | All the headers which names match this regular expression are removed |
這個攔截器在全部被攔截的事件上設置一個統一的惟一標識符。一個示例UUID是b5755073-77a9-43c1-8fa -b7a586fc1b97,它表示128位值。
若是沒有事件的應用程序級唯一鍵可用,能夠考慮使用UUIDInterceptor自動爲事件分配UUID。當事件進入Flume網絡時,爲它們分配uuid是很是重要的;也就是說,在第一個Flume source 的流中。這使得在爲高可用性和高性能而設計的Flume網絡中,面對複製和從新交付時,能夠對事件進行後續重複數據刪除。若是應用程序級密鑰可用,這比自動生成的UUID更可取,由於它使用已知的應用程序級密鑰支持數據存儲中事件的後續更新和刪除。
Property Name | Default | Description |
---|---|---|
type | – | The component type name has to be org.apache.flume.sink.solr.morphline.UUIDInterceptor$Builder |
headerName | id | The name of the Flume header to modify |
preserveExisting | true | If the UUID header already exists, should it be preserved - true or false |
prefix | 「」 | The prefix string constant to prepend to each generated UUID |
這個攔截器經過一個形態線配置文件過濾事件,該文件定義了一個轉換命令鏈,將記錄從一個命令傳輸到另外一個命令。例如,morphline能夠忽略某些事件,或者經過基於正則表達式的模式匹配更改或插入某些事件頭部,或者能夠經過Apache Tika自動檢測並在被截獲的事件上設置MIME類型。例如,這種包嗅探能夠用於Flume拓撲中基於內容的動態路由。MorphlineInterceptor還能夠幫助實現到多個Apache Solr集合的動態路由(例如,對於多租戶)。
目前,有一個限制,攔截器的形態線不能爲每一個輸入事件生成多個輸出記錄。這個攔截器不是爲繁重的ETL處理而設計的——若是你須要的話,能夠考慮將ETL處理從Flume源轉移到Flume Sink,例如到MorphlineSolrSink。
必須屬性以粗體顯示。
Property Name | Default | Description |
---|---|---|
type | – | The component type name has to be org.apache.flume.sink.solr. morphline.MorphlineInterceptor$Builder |
morphlineFile | – | The relative or absolute path on the local file system to the morphline configuration file. Example: /etc/flume-ng/conf/morphline.conf |
morphlineId | null | Optional name used to identify a morphline if there are multiple morphlines in a morphline config file |
Sample flume.conf file:
a1.sources.avroSrc.interceptors = morphlineinterceptor a1.sources.avroSrc.interceptors.morphlineinterceptor.type = org.apache.flume.sink.solr.morphline.MorphlineInterceptor$Builder a1.sources.avroSrc.interceptors.morphlineinterceptor.morphlineFile = /etc/flume-ng/conf/morphline.conf a1.sources.avroSrc.interceptors.morphlineinterceptor.morphlineId = morphline1
這個攔截器基於Java正則表達式提供簡單的基於字符串的搜索和替換功能。也可使用回溯/組捕獲。這個攔截器使用與Java Matcher.replaceAll()方法中相同的規則。
Property Name | Default | Description |
---|---|---|
type | – | The component type name has to be search_replace |
searchPattern | – | The pattern to search for and replace. |
replaceString | – | The replacement string. |
charset | UTF-8 | The charset of the event body. Assumed by default to be UTF-8. |
Example configuration:
a1.sources.avroSrc.interceptors = search-replace a1.sources.avroSrc.interceptors.search-replace.type = search_replace # Remove leading alphanumeric characters in an event body. a1.sources.avroSrc.interceptors.search-replace.searchPattern = ^[A-Za-z0-9_]+ a1.sources.avroSrc.interceptors.search-replace.replaceString =
Another example:
a1.sources.avroSrc.interceptors = search-replace a1.sources.avroSrc.interceptors.search-replace.type = search_replace # Use grouping operators to reorder and munge words on a line. a1.sources.avroSrc.interceptors.search-replace.searchPattern = The quick brown ([a-z]+) jumped over the lazy ([a-z]+) a1.sources.avroSrc.interceptors.search-replace.replaceString = The hungry $2 ate the careless $1
這個攔截器經過將事件體解釋爲文本並根據配置的正則表達式匹配文原本選擇性地過濾事件。提供的正則表達式可用於包含事件或排除事件。
Property Name | Default | Description |
---|---|---|
type | – | The component type name has to be regex_filter |
regex | 」.*」 | Regular expression for matching against events |
excludeEvents | false | If true, regex determines events to exclude, otherwise regex determines events to include. |
這個攔截器使用指定的正則表達式提取regex匹配組,並將匹配組追加爲事件的頭部。它還支持可插入的序列化器,用於在將匹配組添加爲事件頭以前對其進行格式化。
Property Name | Default | Description |
---|---|---|
type | – | The component type name has to be regex_extractor |
regex | – | Regular expression for matching against events |
serializers | – | Space-separated list of serializers for mapping matches to header names and serializing their values. (See example below) Flume provides built-in support for the following serializers: org.apache.flume.interceptor.RegexExtractorInterceptorPassThroughSerializerorg. apache.flume.interceptor.RegexExtractorInterceptorMillisSerializer |
serializers.<s1>.type | default | Must be default (org.apache.flume.interceptor.RegexExtractorInterceptorPassThroughSerializer), org.apache.flume.interceptor.RegexExtractorInterceptorMillisSerializer, or the FQCN of a custom class that implements org.apache.flume.interceptor.RegexExtractorInterceptorSerializer |
serializers.<s1>.name | – | |
serializers.* | – | Serializer-specific properties |
序列化器用於將匹配映射到標題名稱和格式化的標題值;默認狀況下,您只須要指定標題名稱,並使用默認的org.apache.flume.interceptor. regexextractorinterceptorpassthrough序列化器。這個序列化器只是將匹配映射到指定的頭名稱,並在regex提取值時傳遞該值。您可使用徹底限定類名(FQCN)將自定義序列化器實現插入提取器中,以按照您喜歡的方式格式化匹配。
若是水槽事件體包含1:2:3.4foobar5,則使用如下配置
a1.sources.r1.interceptors.i1.regex = (\\d):(\\d):(\\d) a1.sources.r1.interceptors.i1.serializers = s1 s2 s3 a1.sources.r1.interceptors.i1.serializers.s1.name = one a1.sources.r1.interceptors.i1.serializers.s2.name = two a1.sources.r1.interceptors.i1.serializers.s3.name = three
提取的事件將包含相同的主體,可是添加了如下頭部:1 =>1,2 =>2,3 =>3
若是水槽事件體包含2012-10-18 18:47:57,614,則使用一些日誌行,並使用如下配置
a1.sources.r1.interceptors.i1.regex = ^(?:\\n)?(\\d\\d\\d\\d-\\d\\d-\\d\\d\\s\\d\\d:\\d\\d) a1.sources.r1.interceptors.i1.serializers = s1 a1.sources.r1.interceptors.i1.serializers.s1.type = org.apache.flume.interceptor.RegexExtractorInterceptorMillisSerializer a1.sources.r1.interceptors.i1.serializers.s1.name = timestamp a1.sources.r1.interceptors.i1.serializers.s1.pattern = yyyy-MM-dd HH:mm
提取的事件將包含相同的主體,可是添加了如下標頭的時間戳=>1350611220000
Property Name | Default | Description |
---|---|---|
flume.called.from.service | – | If this property is specified then the Flume agent will continue polling for the config file even if the config file is not found at the expected location. Otherwise, the Flume agent will terminate if the config doesn’t exist at the expected location. No property value is needed when setting this property (eg, just specifying -Dflume.called.from.service is enough) |
Flume每30秒按期輪詢指定配置文件的更改。若是第一次輪詢現有文件,或者自上次輪詢以來已有文件的修改日期發生了更改,則Flume代理將從配置文件加載新配置。重命名或移動文件不會改變其修改時間。當Flume代理輪詢不存在的文件時,會發生兩種狀況之一:1。當代理第一次輪詢不存在的配置文件時,代理將根據flume.call .from.service屬性進行操做。若是設置了屬性,那麼代理將繼續輪詢(始終在同一時間段—每30秒)。若是屬性未設置,則代理將當即終止。2. 當代理輪詢一個不存在的配置文件,而且這不是該文件第一次輪詢時,那麼代理在此輪詢期間不進行任何配置更改。代理繼續輪詢而不是終止。
將Log4j事件追加到flume代理的avro源。使用這個appender的客戶機必須在類路徑中包含flume-ng-sdk(例如,flume-ng-sdk-1.8.0.jar)。必須屬性以粗體顯示。
Property Name | Default | Description |
---|---|---|
Hostname | – | The hostname on which a remote Flume agent is running with an avro source. |
Port | – | The port at which the remote Flume agent’s avro source is listening. |
UnsafeMode | false | If true, the appender will not throw exceptions on failure to send the events. |
AvroReflectionEnabled | false | Use Avro Reflection to serialize Log4j events. (Do not use when users log strings) |
AvroSchemaUrl | – | A URL from which the Avro schema can be retrieved. |
Sample log4j.properties file:
#... log4j.appender.flume = org.apache.flume.clients.log4jappender.Log4jAppender log4j.appender.flume.Hostname = example.com log4j.appender.flume.Port = 41414 log4j.appender.flume.UnsafeMode = true # configure a class's logger to output to the flume appender log4j.logger.org.example.MyClass = DEBUG,flume #...
默認狀況下,經過調用toString()或使用Log4j佈局(若是指定的話)將每一個事件轉換爲一個字符串。
若是事件是org.apache.avro.generic.GenericRecord的實例。 org.apache.avro.specific.SpecificRecord, 若是屬性AvroReflectionEnabled設置爲true,則使用Avro序列化對事件進行序列化。
使用其Avro模式序列化每一個事件的效率很低,所以最好提供一個模式URL,下游接收器(一般是HDFS接收器)能夠從中檢索模式。若是沒有指定AvroSchemaUrl,則模式將做爲Flume標頭包含。
示例log4j。配置爲使用Avro序列化的屬性文件:
#... log4j.appender.flume = org.apache.flume.clients.log4jappender.Log4jAppender log4j.appender.flume.Hostname = example.com log4j.appender.flume.Port = 41414 log4j.appender.flume.AvroReflectionEnabled = true log4j.appender.flume.AvroSchemaUrl = hdfs://namenode/path/to/schema.avsc # configure a class's logger to output to the flume appender log4j.logger.org.example.MyClass = DEBUG,flume #...
將Log4j事件追加到flume代理的avro源列表中。使用這個appender的客戶機必須在類路徑中包含flume-ng-sdk(例如,flume-ng-sdk-1.8.0.jar)。這個附加程序支持執行負載平衡的循環和隨機方案。它還支持可配置的backoff超時,以便臨時從所需的主機屬性集中刪除down代理。
Property Name | Default | Description |
---|---|---|
Hosts | – | A space-separated list of host:port at which Flume (through an AvroSource) is listening for events |
Selector | ROUND_ROBIN | Selection mechanism. Must be either ROUND_ROBIN, RANDOM or custom FQDN to class that inherits from LoadBalancingSelector. |
MaxBackoff | – | A long value representing the maximum amount of time in milliseconds the Load balancing client will backoff from a node that has failed to consume an event. Defaults to no backoff |
UnsafeMode | false | If true, the appender will not throw exceptions on failure to send the events. |
AvroReflectionEnabled | false | Use Avro Reflection to serialize Log4j events. |
AvroSchemaUrl | – | A URL from which the Avro schema can be retrieved. |
Sample log4j.properties file configured using defaults:
#... log4j.appender.out2 = org.apache.flume.clients.log4jappender.LoadBalancingLog4jAppender log4j.appender.out2.Hosts = localhost:25430 localhost:25431 # configure a class's logger to output to the flume appender log4j.logger.org.example.MyClass = DEBUG,flume #...
Sample log4j.properties file configured using RANDOM load balancing:
#... log4j.appender.out2 = org.apache.flume.clients.log4jappender.LoadBalancingLog4jAppender log4j.appender.out2.Hosts = localhost:25430 localhost:25431 log4j.appender.out2.Selector = RANDOM # configure a class's logger to output to the flume appender log4j.logger.org.example.MyClass = DEBUG,flume #...
Sample log4j.properties file configured using backoff:
#... log4j.appender.out2 = org.apache.flume.clients.log4jappender.LoadBalancingLog4jAppender log4j.appender.out2.Hosts = localhost:25430 localhost:25431 localhost:25432 log4j.appender.out2.Selector = ROUND_ROBIN log4j.appender.out2.MaxBackoff = 30000 # configure a class's logger to output to the flume appender log4j.logger.org.example.MyClass = DEBUG,flume #...
HDFS接收器、HBase接收器、Thrift source、Thrift接收器和Kite數據集接收器都支持Kerberos身份驗證。有關配置與kerberos相關的選項,請參閱相應的部分。
Flume代理將做爲一個主體對kerberos KDC進行身份驗證,須要kerberos身份驗證的不一樣組件將使用這個主體。爲Thrift source、Thrift sink、HDFS sink、HBase sink和DataSet sink配置的principal和keytab應該相同,不然組件將沒法啓動。
Flume的監測工做仍在進行中, 變化常常發生。幾個Flume組件向JMX平臺MBean服務器報告指標。可使用Jconsole查詢這些指標。
能夠經過使用flume-env在JAVA_OPTS環境變量中指定JMX參數來啓用JMX報告。就像
export JAVA_OPTS=」-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=5445
-Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false」
注意:上面的示例禁用安全性。要啓用安全性,請參閱http://docs.oracle.com/javase/6/docs/technotes/guides/management/agent.html
Flume還能夠向Ganglia 3或Ganglia 3.1 metanode報告這些指標。要向Ganglia報告指標,必須使用這種支持啓動flume代理。啓動Flume代理時,必須將如下參數做爲系統屬性傳遞給Flume .monitoring.,可在flume-env.sh中指定:
Property Name | Default | Description |
---|---|---|
type | – | The component type name, has to be ganglia |
hosts | – | Comma-separated list of hostname:port of Ganglia servers |
pollFrequency | 60 | Time, in seconds, between consecutive reporting to Ganglia server |
isGanglia3 | false | Ganglia server version is 3. By default, Flume sends in Ganglia 3.1 format |
咱們可使用Ganglia支持啓動Flume,以下所示:
$ bin/flume-ng agent --conf-file example.conf --name a1 -Dflume.monitoring.type=ganglia -Dflume.monitoring.hosts=com.example:1234,com.example2:5455
Flume還能夠以JSON格式報告指標。要啓用JSON格式的報表,Flume在一個可配置端口上駐留一個Web服務器。Flume以如下JSON格式報告指標:
{ "typeName1.componentName1" : {"metric1" : "metricValue1", "metric2" : "metricValue2"}, "typeName2.componentName2" : {"metric3" : "metricValue3", "metric4" : "metricValue4"} }
下面是一個例子:
{ "CHANNEL.fileChannel":{"EventPutSuccessCount":"468085", "Type":"CHANNEL", "StopTime":"0", "EventPutAttemptCount":"468086", "ChannelSize":"233428", "StartTime":"1344882233070", "EventTakeSuccessCount":"458200", "ChannelCapacity":"600000", "EventTakeAttemptCount":"458288"}, "CHANNEL.memChannel":{"EventPutSuccessCount":"22948908", "Type":"CHANNEL", "StopTime":"0", "EventPutAttemptCount":"22948908", "ChannelSize":"5", "StartTime":"1344882209413", "EventTakeSuccessCount":"22948900", "ChannelCapacity":"100", "EventTakeAttemptCount":"22948908"} }
Property Name | Default | Description |
---|---|---|
type | – | The component type name, has to be http |
port | 41414 | The port to start the server on. |
咱們可使用JSON報表支持啓動Flume,以下所示:
$ bin/flume-ng agent --conf-file example.conf --name a1 -Dflume.monitoring.type=http -Dflume.monitoring.port=34545
而後,Metrics將在http://<hostname>:<port>/metrics網頁上提供。定製組件能夠報告上面Ganglia部分中提到的指標。
經過編寫執行報告的服務器,能夠向其餘系統報告指標。任何報告類都必須實現接口org.apache.flume.instrument . monitorservice。此類類的使用方式與GangliaServer用於報告的方式相同。他們能夠輪詢平臺mbean服務器以輪詢mbean以得到度量。例如,若是一個名爲httpre的HTTP監控服務可使用以下方式:
$ bin/flume-ng agent --conf-file example.conf --name a1 -Dflume.monitoring.type=com.example.reporting.HTTPReporting -Dflume.monitoring.node=com.example:332
Property Name | Default | Description |
---|---|---|
type | – | The component type name, has to be FQCN |
任何自定義flume組件都應該繼承自org.apache.flume.instrumentation.MonitoredCounterGroup類。而後類應該爲它公開的每一個度量提供getter方法。參見下面的代碼。MonitoredCounterGroup須要一個屬性列表,該類公開這些屬性的指標。到目前爲止,該類只支持將度量做爲長值公開。
public class SinkCounter extends MonitoredCounterGroup implements SinkCounterMBean { private static final String COUNTER_CONNECTION_CREATED = "sink.connection.creation.count"; private static final String COUNTER_CONNECTION_CLOSED = "sink.connection.closed.count"; private static final String COUNTER_CONNECTION_FAILED = "sink.connection.failed.count"; private static final String COUNTER_BATCH_EMPTY = "sink.batch.empty"; private static final String COUNTER_BATCH_UNDERFLOW = "sink.batch.underflow"; private static final String COUNTER_BATCH_COMPLETE = "sink.batch.complete"; private static final String COUNTER_EVENT_DRAIN_ATTEMPT = "sink.event.drain.attempt"; private static final String COUNTER_EVENT_DRAIN_SUCCESS = "sink.event.drain.sucess"; private static final String[] ATTRIBUTES = { COUNTER_CONNECTION_CREATED, COUNTER_CONNECTION_CLOSED, COUNTER_CONNECTION_FAILED, COUNTER_BATCH_EMPTY, COUNTER_BATCH_UNDERFLOW, COUNTER_BATCH_COMPLETE, COUNTER_EVENT_DRAIN_ATTEMPT, COUNTER_EVENT_DRAIN_SUCCESS }; public SinkCounter(String name) { super(MonitoredCounterGroup.Type.SINK, name, ATTRIBUTES); } @Override public long getConnectionCreatedCount() { return get(COUNTER_CONNECTION_CREATED); } public long incrementConnectionCreatedCount() { return increment(COUNTER_CONNECTION_CREATED); } }
文件通道完整性工具驗證文件通道中單個事件的完整性,並刪除損壞的事件。
工具能夠運行以下:
$bin/flume-ng tool --conf ./conf FCINTEGRITYTOOL -l ./datadir
其中datadir是要驗證的數據目錄的逗號分隔列表。
如下是可用的選項
Option Name | Description |
---|---|
h/help | Displays help |
l/dataDirs | Comma-separated list of data directories which the tool must verify |
事件驗證器工具可用於以特定於應用程序的方式驗證文件通道事件。該工具對每一個事件應用用戶提供程序驗證登陸,並刪除不符合邏輯的事件。
工具能夠運行以下:
$bin/flume-ng tool --conf ./conf FCINTEGRITYTOOL -l ./datadir -e org.apache.flume.MyEventValidator -DmaxSize 2000
其中datadir是要驗證的數據目錄的逗號分隔列表。
如下是可用的選項
Option Name | Description |
---|---|
h/help | Displays help |
l/dataDirs | Comma-separated list of data directories which the tool must verify |
e/eventValidator | Fully Qualified Name of Event Validator Implementation. The jar must be on Flume classpath |
事件驗證器實現必須實現EventValidator接口。建議不要從實現中拋出任何異常,由於它們被視爲無效事件。其餘參數能夠經過-D選項傳遞給EventValitor實現。
讓咱們看一個簡單的基於大小的事件驗證器示例,它將拒絕大於指定的最大大小的事件。
public static class MyEventValidator implements EventValidator { private int value = 0; private MyEventValidator(int val) { value = val; } @Override public boolean validateEvent(Event event) { return event.getBody() <= value; } public static class Builder implements EventValidator.Builder { private int sizeValidator = 0; @Override public EventValidator build() { return new DummyEventVerifier(sizeValidator); } @Override public void configure(Context context) { binaryValidator = context.getInteger("maxSize"); } } }
Flume很是靈活,容許大量可能的部署場景。若是您計劃在大型生產部署中使用Flume,那麼明智的作法是花一些時間考慮如何用Flume拓撲來表示問題。本節將介紹一些注意事項。
若是您須要將文本日誌數據導入Hadoop/HDFS中,那麼Flume正好適合您的問題,徹底中止。對於其餘用例,這裏有一些指導方針:
Flume的設計目的是在相對穩定、潛在複雜的拓撲結構上傳輸和攝取常規生成的事件數據。「事件數據」的概念定義很是普遍。對Flume來講,事件只是一個普通的字節blob。對於事件的大小有一些限制—例如,它不能大於您能夠存儲在內存或單個機器上的磁盤上的內容—可是在實踐中,flume事件能夠是從文本日誌條目到圖像文件的全部內容。事件的關鍵屬性是以連續的流方式生成的。若是您的數據不是按期生成的(例如,您正在嘗試將單個批量數據加載到Hadoop集羣中),那麼Flume仍然能夠工做,可是對於您的狀況來講,這可能有些過頭了。Flume喜歡相對穩定的拓撲結構。您的拓撲不須要是不可變的,由於Flume能夠在不丟失數據的狀況下處理拓撲中的更改,還能夠容忍因爲故障轉移或供應而按期進行從新配置。若是您天天都嘗試更改拓撲,那麼它可能不會很好地工做,由於從新配置須要一些思考和開銷。
Flume流量的可靠性取決於幾個因素。經過調整這些因素,您能夠經過Flume實現普遍的可靠性選項。
你使用什麼類型的頻道?Flume既有持久通道(將數據持久化到磁盤上的通道),也有非持久通道(若是機器發生故障將丟失數據的通道)。持久通道使用基於磁盤的存儲,存儲在此類通道中的數據將在機器重啓或與磁盤無關的故障之間持續存在。
是否爲工做負載提供了足夠的通道。Flume中的通道在不一樣的躍點上充當緩衝器。這些緩衝器的容量是固定的,一旦容量滿了,就會對流中較早的點產生反壓力。若是這種壓力傳播到流的源頭,水槽將不可用,可能會丟失數據。
是否使用冗餘拓撲。Flume讓您能夠跨冗餘拓撲復制流。這能夠提供一個很是容易的容錯源,克服磁盤或機器故障。
考慮Flume拓撲中的可靠性的最佳方法是考慮各類故障場景及其結果。若是磁盤出現故障怎麼辦?若是機器故障了怎麼辦?若是你的終端接收器(如HDFS)降低一段時間,你有背壓,會發生什麼?可能的設計空間很大,可是您須要問的基本問題卻不多。
設計Flume拓撲的第一步是枚舉數據的全部源和目標(終端接收器)。這些將定義拓撲的邊緣點。接下來要考慮的是是否引入中間聚合層或事件路由。若是您正在從大量源中收集數據,爲了簡化在終端接收器上的攝取,聚合這些數據是頗有幫助的。聚合層還能夠充當緩衝區,消除源的突發性或匯聚處的不可用性。若是您在不一樣位置之間路由數據,您可能還但願在不一樣的點上分割流:這將建立子拓撲,這些拓撲自己可能包含聚合點。
一旦您瞭解了拓撲的外觀,下一個問題就是須要多少硬件和網絡容量。首先要量化生成的數據量。這並不老是一項簡單的任務!大多數數據流都是突發性的(例如,因爲晝夜模式),而且可能沒法預測。一個好的起點是考慮拓撲的每一層的最大吞吐量,包括每秒的事件數和每秒的字節數。一旦您知道了給定層所需的吞吐量,就能夠計算出該層須要多少節點的下限。爲了肯定可達到的吞吐量,最好在硬件上使用合成的或取樣的事件數據對Flume進行試驗。通常來講,基於磁盤的通道應該獲得10 MB/s,基於內存的通道應該獲得100 MB/s或更多。可是,根據硬件和操做環境的不一樣,性能差異很大。
調整聚合吞吐量的大小能夠爲每一層所需的節點數量提供一個下限。增長節點的緣由有不少,好比增長冗餘和更好地吸取負載中的突發事件。
若是Flume代理宕機,則該代理上承載的全部流都將停止。一旦代理從新啓動,則流將恢復。使用文件流通道或其餘穩定的通道將恢復處理事件離開。若是代理不能從新啓動在相同的硬件上,而後有一個選項將數據庫遷移到另外一個硬件和設置一個新的水槽代理,能夠保存在數據庫恢復處理事件。能夠利用數據庫HA futures將Flume代理移動到另外一個主機。
目前Flume支持HDFS 0.20.2和0.23
TBD
TBD
TBD
TBD
Component Interface | Type Alias | Implementation Class |
---|---|---|
org.apache.flume.Channel | memory | org.apache.flume.channel.MemoryChannel |
org.apache.flume.Channel | jdbc | org.apache.flume.channel.jdbc.JdbcChannel |
org.apache.flume.Channel | file | org.apache.flume.channel.file.FileChannel |
org.apache.flume.Channel | – | org.apache.flume.channel.PseudoTxnMemoryChannel |
org.apache.flume.Channel | – | org.example.MyChannel |
org.apache.flume.Source | avro | org.apache.flume.source.AvroSource |
org.apache.flume.Source | netcat | org.apache.flume.source.NetcatSource |
org.apache.flume.Source | seq | org.apache.flume.source.SequenceGeneratorSource |
org.apache.flume.Source | exec | org.apache.flume.source.ExecSource |
org.apache.flume.Source | syslogtcp | org.apache.flume.source.SyslogTcpSource |
org.apache.flume.Source | multiport_syslogtcp | org.apache.flume.source.MultiportSyslogTCPSource |
org.apache.flume.Source | syslogudp | org.apache.flume.source.SyslogUDPSource |
org.apache.flume.Source | spooldir | org.apache.flume.source.SpoolDirectorySource |
org.apache.flume.Source | http | org.apache.flume.source.http.HTTPSource |
org.apache.flume.Source | thrift | org.apache.flume.source.ThriftSource |
org.apache.flume.Source | jms | org.apache.flume.source.jms.JMSSource |
org.apache.flume.Source | – | org.apache.flume.source.avroLegacy.AvroLegacySource |
org.apache.flume.Source | – | org.apache.flume.source.thriftLegacy.ThriftLegacySource |
org.apache.flume.Source | – | org.example.MySource |
org.apache.flume.Sink | null | org.apache.flume.sink.NullSink |
org.apache.flume.Sink | logger | org.apache.flume.sink.LoggerSink |
org.apache.flume.Sink | avro | org.apache.flume.sink.AvroSink |
org.apache.flume.Sink | hdfs | org.apache.flume.sink.hdfs.HDFSEventSink |
org.apache.flume.Sink | hbase | org.apache.flume.sink.hbase.HBaseSink |
org.apache.flume.Sink | asynchbase | org.apache.flume.sink.hbase.AsyncHBaseSink |
org.apache.flume.Sink | elasticsearch | org.apache.flume.sink.elasticsearch.ElasticSearchSink |
org.apache.flume.Sink | file_roll | org.apache.flume.sink.RollingFileSink |
org.apache.flume.Sink | irc | org.apache.flume.sink.irc.IRCSink |
org.apache.flume.Sink | thrift | org.apache.flume.sink.ThriftSink |
org.apache.flume.Sink | – | org.example.MySink |
org.apache.flume.ChannelSelector | replicating | org.apache.flume.channel.ReplicatingChannelSelector |
org.apache.flume.ChannelSelector | multiplexing | org.apache.flume.channel.MultiplexingChannelSelector |
org.apache.flume.ChannelSelector | – | org.example.MyChannelSelector |
org.apache.flume.SinkProcessor | default | org.apache.flume.sink.DefaultSinkProcessor |
org.apache.flume.SinkProcessor | failover | org.apache.flume.sink.FailoverSinkProcessor |
org.apache.flume.SinkProcessor | load_balance | org.apache.flume.sink.LoadBalancingSinkProcessor |
org.apache.flume.SinkProcessor | – | |
org.apache.flume.interceptor.Interceptor | timestamp | org.apache.flume.interceptor.TimestampInterceptor$Builder |
org.apache.flume.interceptor.Interceptor | host | org.apache.flume.interceptor.HostInterceptor$Builder |
org.apache.flume.interceptor.Interceptor | static | org.apache.flume.interceptor.StaticInterceptor$Builder |
org.apache.flume.interceptor.Interceptor | regex_filter | org.apache.flume.interceptor.RegexFilteringInterceptor$Builder |
org.apache.flume.interceptor.Interceptor | regex_extractor | org.apache.flume.interceptor.RegexFilteringInterceptor$Builder |
org.apache.flume.channel.file.encryption.KeyProvider$Builder | jceksfile | org.apache.flume.channel.file.encryption.JCEFileKeyProvider |
org.apache.flume.channel.file.encryption.KeyProvider$Builder | – | org.example.MyKeyProvider |
org.apache.flume.channel.file.encryption.CipherProvider | aesctrnopadding | org.apache.flume.channel.file.encryption.AESCTRNoPaddingProvider |
org.apache.flume.channel.file.encryption.CipherProvider | – | org.example.MyCipherProvider |
org.apache.flume.serialization.EventSerializer$Builder | text | org.apache.flume.serialization.BodyTextEventSerializer$Builder |
org.apache.flume.serialization.EventSerializer$Builder | avro_event | org.apache.flume.serialization.FlumeEventAvroEventSerializer$Builder |
org.apache.flume.serialization.EventSerializer$Builder | – | org.example.MyEventSerializer$Builder |
在上面特定於組件的示例中使用了這些別名約定,以保持全部示例中的名稱簡短且一致。
Alias Name | Alias Type |
---|---|
a | agent |
c | channel |
r | source |
k | sink |
g | sink group |
i | interceptor |
y | key |
h | host |
s | serializer |
翻譯自官網flume1.8用戶指南,原文地址:Flume 1.8.0 User Guide
篇幅限制,分爲如下5篇:
【翻譯】Flume 1.8.0 User Guide(用戶指南)
【翻譯】Flume 1.8.0 User Guide(用戶指南) source
【翻譯】Flume 1.8.0 User Guide(用戶指南) Sink
【翻譯】Flume 1.8.0 User Guide(用戶指南) Channel
【翻譯】Flume 1.8.0 User Guide(用戶指南) Processors
完結
感謝有道翻譯,主要是他的功能,我就是個搬磚的