flume Interceptor和selector

攔截器做用:攔截器是簡單的插件式組件,設置在source和channel之間。source接收到的事件,在寫入channel以前,攔截器均可以進行轉換或者刪除這些事件。每一個攔截器只處理同一個source接收到的事件。能夠自定義攔截器。java

經常使用的攔截器:web

  1. 時間戳攔截器express

    flume中一個最常用的攔截器 ,該攔截器的做用是將時間戳插入到flume的事件報頭中。若是不使用任何攔截器,flume接受到的只有message。時間戳攔截器的配置。apache

參數 默認值 描述
type 類型名稱timestamp,也可使用類名的全路徑
preserveExisting false 若是設置爲true,若事件中報頭已經存在,不會替換時間戳報頭的值

source鏈接到時間戳攔截器的配置:服務器

?app

1
2
3
a1.sources.r1.interceptors = timestamp
a1.sources.r1.interceptors.timestamp.type=timestamp
a1.sources.r1.interceptors.timestamp.preserveExisting= false

 2.  主機攔截器
less

        主機攔截器插入服務器的ip地址或者主機名,agent將這些內容插入到事件的報頭中。時間報頭中的key使用hostHeader配置,默認是host。主機攔截器的配置
ide

參數 默認值 描述
type 類型名稱host
hostHeader host 事件投的key
useIP true 若是設置爲false,host鍵插入主機名
preserveExisting false 若是設置爲true,若事件中報頭已經存在,不會替換host報頭的值

source鏈接到主機攔截器的配置:函數

?oop

1
2
3
4
a1.sources.r1.interceptors = host
a1.sources.r1.interceptors.host.type=host
a1.sources.r1.interceptors.host.useIP= false
a1.sources.r1.interceptors.timestamp.preserveExisting= true

3.  靜態攔截器

    靜態攔截器的做用是將k/v插入到事件的報頭中。配置以下

參數 默認值 描述
type
類型名稱static
key key 事件頭的key
value value key對應的value值
preserveExisting true 若是設置爲true,若事件中報頭已經存在該key,不會替換value的值

source鏈接到靜態攔截器的配置:

?

1
2
3
4
5
a1.sources.r1.interceptors =  static
a1.sources.r1.interceptors. static .type= static
a1.sources.r1.interceptors. static .key=logs
a1.sources.r1.interceptors. static .value=logFlume
a1.sources.r1.interceptors. static .preserveExisting= false

4.    正則過濾攔截器

在日誌採集的時候,可能有一些數據是咱們不須要的,這樣添加過濾攔截器,能夠過濾掉不須要的日誌,也能夠根據須要收集知足正則條件的日誌。

參數 默認值 描述
type
類型名稱REGEX_FILTER
regex .* 匹配除「\n」以外的任何個字符
excludeEvents false
默認收集匹配到的事件。若是爲true,則會刪除匹配到的event,收集未匹配到的。

source鏈接到正則過濾攔截器的配置:

?

1
2
3
4
a1.sources.r1.interceptors = regex
a1.sources.r1.interceptors.regex.type=REGEX_FILTER
a1.sources.r1.interceptors.regex.regex=(rm)|(kill)
a1.sources.r1.interceptors.regex.excludeEvents= false

這樣配置的攔截器就只會接收日誌消息中帶有rm 或者kill的日誌。

selector做用:同一個數據源分發到不一樣的目的

官網中selector共有兩種類型:

Replicating Channel Selector (default)

Multiplexing Channel Selector

這兩種selector的區別是:Replicating 會將source過來的events發往全部channel,而Multiplexing 能夠選擇該發往哪些channel。對於上面的例子來講,若是採用Replicating ,那麼demo和demo2的日誌會同時發往channel1和channel2,這顯然是和需求不符的,需求只是讓demo的日誌發往channel1,而demo2的日誌發往channel2。

綜上所述,咱們選擇Multiplexing Channel Selector。這裏咱們有遇到一個棘手的問題,Multiplexing 須要判斷header裏指定key的值來決定分發到某個具體的channel,咱們如今demo和demo2同時運行在同一個服務器上,若是在不一樣的服務器上運行,咱們能夠在 source1上加上一個 host 攔截器(上面有介紹),這樣能夠經過header中的host來判斷event該分發給哪一個channel,而這裏是在同一個服務器上,由host是區分不出來日誌的來源的,咱們必須想辦法在header中添加一個key來區分日誌的來源。

設想一下,若是header中有一個key:flume.client.log4j.logger.source,咱們經過設置這個key的值,demo設爲app1,demo2設爲app2,這樣咱們就能經過設置:

tier1.sources.source1.channels=channel1 channel2
tier1.sources.source1.selector.type=multiplexing
tier1.sources.source1.selector.header=flume.client.log4j.logger.source
tier1.sources.source1.selector.mapping.app1=channel1
tier1.sources.source1.selector.mapping.app2=channel2

來將不一樣項目的的日誌輸出到不一樣的channel了。

可是這個header變量從哪裏來呢?

解決方法:

一、修改用到的那個source的源碼,應用到client端,不一樣的數據類型添加不一樣的header值

Event類設計  

在Flume中Event是個接口類  

public interface Event {  

  public Map<String, String> getHeaders();  

  public void setHeaders(Map<String, String> headers);  

  public byte[] getBody();  

  public void setBody(byte[] body);  

}

在org.apache.flume.event下, 有兩個Event的具體實現類: SimpleEvent, JSonEvent.
EventBuilder類顧名思義, 採用Builder的方式來組裝對象的成員, 併產生最終的對象.

public class EventBuilder {     

  public static Event withBody(byte[] body, Map<String, String> headers) {  

    Event event = new SimpleEvent();  

    if(body == null) {  

      body = new byte[0];  

    }  

    event.setBody(body);  

    if (headers != null) {  

      event.setHeaders(new HashMap<String, String>(headers));  

    }  

    return event;  

  }  

   

  public static Event withBody(byte[] body) {  

    return withBody(body,null);  

  }  

   

  public static Event withBody(String body, Charset charset,  

      Map<String, String> headers) {  

    return withBody(body.getBytes(charset), headers);  

  }  

   

  public static Event withBody(String body, Charset charset) {  

    return withBody(body, charset, null);  

  }  

   

}  

  

二、在source端配置interceptor,經過interceptor在header上設置變量header值

好比:

使用regex_extractor,對傳過來的數據進行處理,提取出type值(若是能夠的話,能夠在client端的數據格式添加type值,方便使用regex_extractor提取出來)。

三、在source端自定義interceptor,在interceptor裏對處理變量header

Interceptor用於過濾Event,即傳入一個Event而後進行過濾加工,而後返回一個新的Event,接口以下:

 public interface Interceptor {

    public void initialize();

    public Event intercept(Event event);

    public List<Event> intercept(List<Event> events); 

    public void close();

}

 

一、public void initialize()運行前的初始化,通常不須要實現(上面的幾個都沒實現這個方法);

二、public Event intercept(Event event)處理單個event;

三、public List<Event> intercept(List<Event> events)批量處理event,實際上市循環調用上面的2;

四、public void close()能夠作一些清理工做,上面幾個也都沒有實現這個方法;

五、 public interface Builder extends Configurable 構建Interceptor對象,外部使用這個Builder來獲取Interceptor對象。

若是要本身定製,必需要完成上面的2,3,5。

 

下面,咱們來看看org.apache.flume.interceptor.HostInterceptor,其所有代碼以下:

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License. */package org.apache.flume.interceptor;import java.net.InetAddress;import java.net.UnknownHostException;import java.util.List;import java.util.Map;import org.apache.flume.Context;import org.apache.flume.Event;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import static org.apache.flume.interceptor.HostInterceptor.Constants.*;/**
 * Simple Interceptor class that sets the host name or IP on all events
 * that are intercepted.<p>
 * The host header is named <code>host</code> and its format is either the FQDN
 * or IP of the host on which this interceptor is run.
 *
 *
 * Properties:<p>
 *
 *   preserveExisting: Whether to preserve an existing value for 'host'
 *                     (default is false)<p>
 *
 *   useIP: Whether to use IP address or fully-qualified hostname for 'host'
 *          header value (default is true)<p>
 *
 *  hostHeader: Specify the key to be used in the event header map for the
 *          host name. (default is "host") <p>
 *
 * Sample config:<p>
 *
 * <code>
 *   agent.sources.r1.channels = c1<p>
 *   agent.sources.r1.type = SEQ<p>
 *   agent.sources.r1.interceptors = i1<p>
 *   agent.sources.r1.interceptors.i1.type = host<p>
 *   agent.sources.r1.interceptors.i1.preserveExisting = true<p>
 *   agent.sources.r1.interceptors.i1.useIP = false<p>
 *   agent.sources.r1.interceptors.i1.hostHeader = hostname<p>
 * </code>
 * */public class HostInterceptor implements Interceptor {

  private static final Logger logger = LoggerFactory
          .getLogger(HostInterceptor.class);  private final boolean preserveExisting;  private final String header;  private String host = null;  /**
   * Only {@link HostInterceptor.Builder} can build me   */
  private HostInterceptor(boolean preserveExisting,      boolean useIP, String header) {    this.preserveExisting = preserveExisting;    this.header = header;
    InetAddress addr;    try {
      addr = InetAddress.getLocalHost();      if (useIP) {
        host = addr.getHostAddress();
      } else {
        host = addr.getCanonicalHostName();
      }
    } catch (UnknownHostException e) {
      logger.warn("Could not get local host address. Exception follows.", e);
    }


  }  @Override
  public void initialize() {    // no-op  }  /**
   * Modifies events in-place.   */
  @Override
  public Event intercept(Event event) {
    Map<String, String> headers = event.getHeaders();    if (preserveExisting && headers.containsKey(header)) {      return event;
    }    if(host != null) {
      headers.put(header, host);
    }    return event;
  }  /**
   * Delegates to {@link #intercept(Event)} in a loop.
   * @param events
   * @return
   */
  @Override
  public List<Event> intercept(List<Event> events) {    for (Event event : events) {
      intercept(event);
    }    return events;
  }  @Override
  public void close() {    // no-op  }  /**
   * Builder which builds new instances of the HostInterceptor.   */
  public static class Builder implements Interceptor.Builder {

    private boolean preserveExisting = PRESERVE_DFLT;    private boolean useIP = USE_IP_DFLT;    private String header = HOST;    @Override
    public Interceptor build() {      return new HostInterceptor(preserveExisting, useIP, header);
    }    @Override
    public void configure(Context context) {
      preserveExisting = context.getBoolean(PRESERVE, PRESERVE_DFLT);
      useIP = context.getBoolean(USE_IP, USE_IP_DFLT);
      header = context.getString(HOST_HEADER, HOST);
    }

  }  public static class Constants {
    public static String HOST = "host";    public static String PRESERVE = "preserveExisting";    public static boolean PRESERVE_DFLT = false;    public static String USE_IP = "useIP";    public static boolean USE_IP_DFLT = true;    public static String HOST_HEADER = "hostHeader";
  }

}

Constants類是參數類及默認的一些參數:

Builder類是構造HostInterceptor對象的,它會首先經過configure(Context context)方法獲取配置文件中interceptor的參數,而後方法build()用來返回一個HostInterceptor對象:

一、preserveExisting表示若是event的header中包含有本interceptor指定的header,是否要保留這個header,true則保留;

二、useIP表示是否使用本機IP地址做爲header的value,true則使用IP,默認是true;

三、header是event的headers的key,默認是host。

HostInterceptor:

一、構造函數除了賦值外,還有就是根據useIP獲取IP或者hostname;

二、intercept(Event event)方法是設置event的header的地方,首先是獲取headers對象,而後若是同時知足preserveExisting==true而且headers.containsKey(header)就直接返回event,不然設置headers:headers.put(header, host)。

三、intercept(List<Event> events)方法是循環調用上述2的方法。

顯然其餘幾個Interceptor也就相似這樣。在配置文件中配置source的interceptor時,若是是本身定製的interceptor,則須要對type參數賦值:完整類名+¥Builder,好比com.MyInterceptor$Builder便可。

這樣設置好headers後,就能夠在後續的流轉中經過selector實現細分存儲。

相關文章
相關標籤/搜索