實時流接入數倉,基本在大公司都會有,在Flume1.8
之後支持taildir source
, 其有如下幾個特色,而被普遍使用:linux
1.使用正則表達式匹配目錄中的文件名
2.監控的文件中,一旦有數據寫入,Flume
就會將信息寫入到指定的Sink
3.高可靠,不會丟失數據
4.不會對跟蹤文件有任何處理,不會重命名也不會刪除
5.不支持Windows
,不能讀二進制文件。支持按行讀取文本文件
本文以開源Flume
流爲例,介紹流接入HDFS
,後面在其上面創建ods
層外表。正則表達式
a1.sources.r1.type = TAILDIR a1.sources.r1.positionFile = /opt/hoult/servers/conf/startlog_position.json a1.sources.r1.filegroups = f1 a1.sources.r1.filegroups.f1 =/opt/hoult/servers/logs/start/.*log
a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path = /user/data/logs/start/logs/start/%Y-%m-%d/ a1.sinks.k1.hdfs.filePrefix = startlog. # 配置文件滾動方式(文件大小32M) a1.sinks.k1.hdfs.rollSize = 33554432 a1.sinks.k1.hdfs.rollCount = 0 a1.sinks.k1.hdfs.rollInterval = 0 a1.sinks.k1.hdfs.idleTimeout = 0 a1.sinks.k1.hdfs.minBlockReplicas = 1 # 向hdfs上刷新的event的個數 a1.sinks.k1.hdfs.batchSize = 100 # 使用本地時間 a1.sinks.k1.hdfs.useLocalTimeStamp = true
a1.sources = r1 a1.sinks = k1 a1.channels = c1 # taildir source a1.sources.r1.type = TAILDIR a1.sources.r1.positionFile = /opt/hoult/servers/conf/startlog_position.json a1.sources.r1.filegroups = f1 a1.sources.r1.filegroups.f1 = /user/data/logs/start/.*log # memorychannel a1.channels.c1.type = memory a1.channels.c1.capacity = 100000 a1.channels.c1.transactionCapacity = 2000 # hdfs sink a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path = /opt/hoult/servers/logs/start/%Y-%m-%d/ a1.sinks.k1.hdfs.filePrefix = startlog. # 配置文件滾動方式(文件大小32M) a1.sinks.k1.hdfs.rollSize = 33554432 a1.sinks.k1.hdfs.rollCount = 0 a1.sinks.k1.hdfs.rollInterval = 0 a1.sinks.k1.hdfs.idleTimeout = 0 a1.sinks.k1.hdfs.minBlockReplicas = 1 # 向hdfs上刷新的event的個數 a1.sinks.k1.hdfs.batchSize = 1000 # 使用本地時間 a1.sinks.k1.hdfs.useLocalTimeStamp = true # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
/opt/hoult/servers/conf/flume-log2hdfs.conf
json
flume-ng agent --conf-file /opt/hoult/servers/conf/flume-log2hdfs.conf -name a1 -Dflume.roog.logger=INFO,console export JAVA_OPTS="-Xms4000m -Xmx4000m -Dcom.sun.management.jmxremote" # 要想使配置文件生效,還要在命令行中指定配置文件目錄 flume-ng agent --conf /opt/hoult/servers/flume-1.9.0/conf --conf-file /opt/hoult/servers/conf/flume-log2hdfs.conf -name a1 -Dflume.roog.logger=INFO,console
要$FLUME_HOME/conf/flume-env.sh
加下面的參數,不然會報錯誤以下:ide
# a1是agent的名稱。source、channel、sink的名稱分別爲:r1 c1 k1 a1.sources = r1 a1.channels = c1 a1.sinks = k1 # source a1.sources.r1.type = netcat a1.sources.r1.bind = linux121 a1.sources.r1.port = 9999 a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type = com.hoult.flume.CustomerInterceptor$Builder # channel a1.channels.c1.type = memory a1.channels.c1.capacity = 10000 a1.channels.c1.transactionCapacity = 100 # sink a1.sinks.k1.type = logger # source、channel、sink之間的關係 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
攔截器主要代碼以下:測試
public class CustomerInterceptor implements Interceptor { private static DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyyMMdd"); @Override public void initialize() { } @Override public Event intercept(Event event) { // 得到body的內容 String eventBody = new String(event.getBody(), Charsets.UTF_8); // 獲取header的內容 Map<String, String> headerMap = event.getHeaders(); final String[] bodyArr = eventBody.split("\\s+"); try { String jsonStr = bodyArr[6]; if (Strings.isNullOrEmpty(jsonStr)) { return null; } // 將 string 轉成 json 對象 JSONObject jsonObject = JSON.parseObject(jsonStr); String timestampStr = jsonObject.getString("time"); //將timestamp 轉爲時間日期類型(格式 :yyyyMMdd) long timeStamp = Long.valueOf(timestampStr); String date = formatter.format(LocalDateTime.ofInstant(Instant.ofEpochMilli(timeStamp), ZoneId.systemDefault())); headerMap.put("logtime", date); event.setHeaders(headerMap); } catch (Exception e) { headerMap.put("logtime", "unknown"); event.setHeaders(headerMap); } return event; } @Override public List<Event> intercept(List<Event> events) { List<Event> out = new ArrayList<>(); for (Event event : events) { Event outEvent = intercept(event); if (outEvent != null) { out.add(outEvent); } } return out; } @Override public void close() { } public static class Builder implements Interceptor.Builder { @Override public Interceptor build() { return new CustomerInterceptor(); } @Override public void configure(Context context) { } }
flume-ng agent --conf /opt/hoult/servers/flume-1.9.0/conf --conf-file /opt/hoult/servers/conf/flume-test.conf -name a1 -Dflume.roog.logger=INFO,console ## 測試 telnet linux121 9999
吳邪,小三爺,混跡於後臺,大數據,人工智能領域的小菜鳥。
更多請關注大數據