9. 安裝Flume集羣並採集數據

Flume簡介


Flume是Cloudera提供的一個高可用的,高可靠的,分佈式的海量日誌採集、聚合和傳輸的系統,Flume支持在日誌系統中定製各種數據發送方,用於收集數據;同時,Flume提供對數據進行簡單處理,並寫到各類數據接受方(可定製)的能力。

集羣規劃


hadoop151 hadoop152 hadoop153
Flume(採集數據)
Flume(消費數據)

安裝Flume


  1. 解壓到指定位置並重命名java

    [hadoop@hadoop151 software]$ tar -zxvf apache-flume-1.7.0-bin.tar.gz -C /opt/module/
    [hadoop@hadoop151 module]$ mv apache-flume-1.7.0-bin/ flume
  2. 進入「flume/conf」目錄,將「flume-env.sh.template」重命名後更改JAVA_HOME。git

    [hadoop@hadoop151 conf]$ mv flume-env.sh.template flume-env.sh
    [hadoop@hadoop151 conf]$ vim flume-env.sh
    export JAVA_HOME=/opt/module/jdk
  3. 按照集羣規劃,在其餘節點上進行上述操做。(也可以使用腳本文件,筆記中有xsync集羣分發腳本)

配置Flume採集數據


  1. 在「flume/conf」目錄下建立file-flume-kafka.conf文件。apache

    a1.sources=r1
    a1.channels=c1 c2
    
    # configure source
    a1.sources.r1.type = TAILDIR
    a1.sources.r1.positionFile=/opt/module/flume/test/log_position.json
    a1.sources.r1.filegroups = f1
    a1.sources.r1.filegroups.f1 = /tmp/logs/app.+
    a1.sources.r1.fileHeader = true
    a1.sources.r1.channels = c1 c2
    
    #interceptor
    a1.sources.r1.interceptors =  i1 i2
    a1.sources.r1.interceptors.i1.type = com.bbxy.flume.interceptor.LogETLInterceptor$Builder
    a1.sources.r1.interceptors.i2.type = com.bbxy.flume.interceptor.LogTypeInterceptor$Builder
    
    a1.sources.r1.selector.type = multiplexing
    a1.sources.r1.selector.header = topic
    a1.sources.r1.selector.mapping.topic_start = c1
    a1.sources.r1.selector.mapping.topic_event = c2
    
    # configure channel
    a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
    a1.channels.c1.kafka.bootstrap.servers = hadoop151:9092,hadoop152:9092,hadoop153:9092
    a1.channels.c1.kafka.topic = topic_start
    a1.channels.c1.parseAsFlumeEvent = false
    a1.channels.c1.kafka.consumer.group.id = flume-consumer
    
    a1.channels.c2.type = org.apache.flume.channel.kafka.KafkaChannel
    a1.channels.c2.kafka.bootstrap.servers = hadoop151:9092,hadoop152:9092,hadoop153:9092
    a1.channels.c2.kafka.topic = topic_event
    a1.channels.c2.parseAsFlumeEvent = false
    a1.channels.c2.kafka.consumer.group.id = flume-consumer

    將該文件分發至hadoop152上。

    json

  2. 自定義ETL攔截器和分類型攔截器bootstrap

    • ETL攔截器主要用於過濾時間戳不合法和Json數據不完整的日誌;
    • 日誌類型區分攔截器主要用於將啓動日誌和事件日誌區分開來,方便發往Kafka的不一樣Topic。
    1. 建立maven工程flume-interceptor,在pom.xml文件中寫入依賴。vim

      <dependencies>
        <dependency>
            <groupId>org.apache.flume</groupId>
            <artifactId>flume-ng-core</artifactId>
            <version>1.7.0</version>
        </dependency>
      </dependencies>
      
      <build>
        <plugins>
            <plugin>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>2.3.2</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
      </build>
    2. 建立LogETLInterceptor類(ETL攔截器)。服務器

      package com.bbxy.flume.interceptor;
      
      import org.apache.flume.Context;
      import org.apache.flume.Event;
      import org.apache.flume.interceptor.Interceptor;
      
      import java.nio.charset.Charset;
      import java.util.ArrayList;
      import java.util.List;
      
      public class LogETLInterceptor implements Interceptor {
      
        @Override
        public void initialize() {
      
        }
      
        @Override
        public Event intercept(Event event) {
      
            // 1 獲取數據
            byte[] body = event.getBody();
            String log = new String(body, Charset.forName("UTF-8"));
      
            // 2 判斷數據類型並向Header中賦值
            if (log.contains("start")) {
                if (LogUtils.validateStart(log)){
                    return event;
                }
            }else {
                if (LogUtils.validateEvent(log)){
                    return event;
                }
            }
      
            // 3 返回校驗結果
            return null;
        }
      
        @Override
        public List<Event> intercept(List<Event> events) {
      
            ArrayList<Event> interceptors = new ArrayList<>();
      
            for (Event event : events) {
                Event intercept1 = intercept(event);
      
                if (intercept1 != null){
                    interceptors.add(intercept1);
                }
            }
      
            return interceptors;
        }
      
        @Override
        public void close() {
      
        }
      
        public static class Builder implements Interceptor.Builder{
      
            @Override
            public Interceptor build() {
                return new LogETLInterceptor();
            }
      
            @Override
            public void configure(Context context) {
      
            }
        }
      }
    3. 建立LogTypeInterceptor類。(日誌類型區分攔截器)app

      package com.bbxy.flume.interceptor;
      
      import org.apache.flume.Context;
      import org.apache.flume.Event;
      import org.apache.flume.interceptor.Interceptor;
      
      import java.nio.charset.Charset;
      import java.util.ArrayList;
      import java.util.List;
      import java.util.Map;
      
      public class LogTypeInterceptor implements Interceptor {
        @Override
        public void initialize() {
      
        }
      
        @Override
        public Event intercept(Event event) {
      
            // 區分日誌類型:   body  header
            // 1 獲取body數據
            byte[] body = event.getBody();
            String log = new String(body, Charset.forName("UTF-8"));
      
            // 2 獲取header
            Map<String, String> headers = event.getHeaders();
      
            // 3 判斷數據類型並向Header中賦值
            if (log.contains("start")) {
                headers.put("topic","topic_start");
            }else {
                headers.put("topic","topic_event");
            }
      
            return event;
        }
      
        @Override
        public List<Event> intercept(List<Event> events) {
      
            ArrayList<Event> interceptors = new ArrayList<>();
      
            for (Event event : events) {
                Event intercept1 = intercept(event);
      
                interceptors.add(intercept1);
            }
      
            return interceptors;
        }
      
        @Override
        public void close() {
      
        }
      
        public static class Builder implements  Interceptor.Builder{
      
            @Override
            public Interceptor build() {
                return new LogTypeInterceptor();
            }
      
            @Override
            public void configure(Context context) {
      
            }
        }
      }
    4. 日誌過濾工具類maven

      package com.bbxu.flume.interceptor;
      import org.apache.commons.lang.math.NumberUtils;
      
      public class LogUtils {
      
        public static boolean validateEvent(String log) {
            // 服務器時間 | json
            // 1549696569054 | {"cm":{"ln":"-89.2","sv":"V2.0.4","os":"8.2.0","g":"M67B4QYU@gmail.com","nw":"4G","l":"en","vc":"18","hw":"1080*1920","ar":"MX","uid":"u8678","t":"1549679122062","la":"-27.4","md":"sumsung-12","vn":"1.1.3","ba":"Sumsung","sr":"Y"},"ap":"weather","et":[]}
      
            // 1 切割
            String[] logContents = log.split("\\|");
      
            // 2 校驗
            if(logContents.length != 2){
                return false;
            }
      
            //3 校驗服務器時間
            if (logContents[0].length()!=13 || !NumberUtils.isDigits(logContents[0])){
                return false;
            }
      
            // 4 校驗json
            if (!logContents[1].trim().startsWith("{") || !logContents[1].trim().endsWith("}")){
                return false;
            }
      
            return true;
        }
      
        public static boolean validateStart(String log) {
      
            if (log == null){
                return false;
            }
      
            // 校驗json
            if (!log.trim().startsWith("{") || !log.trim().endsWith("}")){
                return false;
            }
      
            return true;
        }
      }
  3. 打包。選取不帶依賴的jar包放入「flume/lib」目錄下。

  4. 將打包的文件發送到hadoop152上。

  5. 啓動flume消費埋點數據。分佈式

    [hadoop@hadoop151 flume]$ bin/flume-ng agent --name a1 --conf-file conf/file-flume-kafka.conf &
    [hadoop@hadoop152 flume]$ bin/flume-ng agent --name a1 --conf-file conf/file-flume-kafka.conf &

    或使用腳本文件消費。

相關文章
相關標籤/搜索