Flume-接入Hive數倉搭建流程

實時流接入數倉,基本在大公司都會有,在Flume1.8之後支持taildir source, 其有如下幾個特色,而被普遍使用:linux

1.使用正則表達式匹配目錄中的文件名
2.監控的文件中,一旦有數據寫入, Flume就會將信息寫入到指定的Sink
3.高可靠,不會丟失數據
4.不會對跟蹤文件有任何處理,不會重命名也不會刪除
5.不支持 Windows,不能讀二進制文件。支持按行讀取文本文件

本文以開源Flume流爲例,介紹流接入HDFS ,後面在其上面創建ods層外表。正則表達式

1.1 taildir source配置

a1.sources.r1.type = TAILDIR
a1.sources.r1.positionFile = /opt/hoult/servers/conf/startlog_position.json
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 =/opt/hoult/servers/logs/start/.*log

1.2 hdfs sink 配置

a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /user/data/logs/start/logs/start/%Y-%m-%d/
a1.sinks.k1.hdfs.filePrefix = startlog.
# 配置文件滾動方式(文件大小32M)
a1.sinks.k1.hdfs.rollSize = 33554432
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.rollInterval = 0
a1.sinks.k1.hdfs.idleTimeout = 0
a1.sinks.k1.hdfs.minBlockReplicas = 1
# 向hdfs上刷新的event的個數
a1.sinks.k1.hdfs.batchSize = 100
# 使用本地時間
a1.sinks.k1.hdfs.useLocalTimeStamp = true

1.3 Agent的配置

a1.sources = r1
a1.sinks = k1
a1.channels = c1
# taildir source
a1.sources.r1.type = TAILDIR
a1.sources.r1.positionFile = /opt/hoult/servers/conf/startlog_position.json
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /user/data/logs/start/.*log
# memorychannel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 100000
a1.channels.c1.transactionCapacity = 2000
# hdfs sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /opt/hoult/servers/logs/start/%Y-%m-%d/
a1.sinks.k1.hdfs.filePrefix = startlog.
# 配置文件滾動方式(文件大小32M)
a1.sinks.k1.hdfs.rollSize = 33554432
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.rollInterval = 0
a1.sinks.k1.hdfs.idleTimeout = 0
a1.sinks.k1.hdfs.minBlockReplicas = 1
# 向hdfs上刷新的event的個數
a1.sinks.k1.hdfs.batchSize = 1000
# 使用本地時間
a1.sinks.k1.hdfs.useLocalTimeStamp = true
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

/opt/hoult/servers/conf/flume-log2hdfs.confjson

1.4 啓動

flume-ng agent --conf-file /opt/hoult/servers/conf/flume-log2hdfs.conf -name a1 -Dflume.roog.logger=INFO,console

export JAVA_OPTS="-Xms4000m -Xmx4000m -Dcom.sun.management.jmxremote"
# 要想使配置文件生效,還要在命令行中指定配置文件目錄
flume-ng agent --conf /opt/hoult/servers/flume-1.9.0/conf --conf-file /opt/hoult/servers/conf/flume-log2hdfs.conf -name a1 -Dflume.roog.logger=INFO,console

$FLUME_HOME/conf/flume-env.sh加下面的參數,不然會報錯誤以下:ide

file

1.5 使用自定義攔截器解決Flume Agent替換本地時間爲日誌裏面的時間戳

使用netcat source → logger sink來測試

# a1是agent的名稱。source、channel、sink的名稱分別爲:r1 c1 k1
a1.sources = r1
a1.channels = c1
a1.sinks = k1
# source
a1.sources.r1.type = netcat
a1.sources.r1.bind = linux121
a1.sources.r1.port = 9999
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.hoult.flume.CustomerInterceptor$Builder
# channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 100
# sink
a1.sinks.k1.type = logger
# source、channel、sink之間的關係
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

攔截器主要代碼以下:測試

public class CustomerInterceptor implements Interceptor {
    private static DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyyMMdd");

    @Override
    public void initialize() {

    }

    @Override
    public Event intercept(Event event) {
        // 得到body的內容
        String eventBody = new String(event.getBody(), Charsets.UTF_8);
        // 獲取header的內容
        Map<String, String> headerMap = event.getHeaders();
        final String[] bodyArr = eventBody.split("\\s+");
        try {
            String jsonStr = bodyArr[6];
            if (Strings.isNullOrEmpty(jsonStr)) {
                return null;
            }
            // 將 string 轉成 json 對象
            JSONObject jsonObject = JSON.parseObject(jsonStr);
            String timestampStr = jsonObject.getString("time");
            //將timestamp 轉爲時間日期類型(格式 :yyyyMMdd)
            long timeStamp = Long.valueOf(timestampStr);
            String date = formatter.format(LocalDateTime.ofInstant(Instant.ofEpochMilli(timeStamp), ZoneId.systemDefault()));
            headerMap.put("logtime", date);
            event.setHeaders(headerMap);
        } catch (Exception e) {
            headerMap.put("logtime", "unknown");
            event.setHeaders(headerMap);
        }
        return event;

    }

    @Override
    public List<Event> intercept(List<Event> events) {
        List<Event> out = new ArrayList<>();
        for (Event event : events) {
            Event outEvent = intercept(event);
            if (outEvent != null) {
                out.add(outEvent);
            }
        }
        return out;
    }

    @Override
    public void close() {

    }

    public static class Builder implements Interceptor.Builder {
        @Override
        public Interceptor build() {
            return new CustomerInterceptor();
        }

        @Override
        public void configure(Context context) {
        }
    }

啓動

flume-ng agent --conf /opt/hoult/servers/flume-1.9.0/conf --conf-file /opt/hoult/servers/conf/flume-test.conf -name a1 -Dflume.roog.logger=INFO,console
## 測試
telnet linux121 9999

吳邪,小三爺,混跡於後臺,大數據,人工智能領域的小菜鳥。
更多請關注
file大數據

相關文章
相關標籤/搜索