最近一段時間,負責公司的產品日誌埋點與收集工做,搭建了基於Flume+HDFS+Hive日誌蒐集系統。html
1、日誌蒐集系統架構:java
簡單畫了一下日誌蒐集系統的架構圖,能夠看出,flume承擔了agent與collector角色,HDFS承擔了數據持久化存儲的角色。node
做者搭建的服務器是個demo版,只用到了一個flume_collector,數據只存儲在HDFS。固然高可用的日誌蒐集處理系統架構是須要多臺flume collector作負載均衡與容錯處理的。linux
2、日誌產生:shell
一、log4j配置,每隔1分鐘roll一個文件,若是1分鐘以內文件大於5M,則再生成一個文件。apache
<!-- 產品數據分析日誌 按分鐘分 --> <RollingRandomAccessFile name="RollingFile_product_minute" fileName="${STAT_LOG_HOME}/${SERVER_NAME}_product.log" filePattern="${STAT_LOG_HOME}/${SERVER_NAME}_product.log.%d{yyyy-MM-dd-HH-mm}-%i"> <PatternLayout charset="UTF-8" pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} %level - %msg%xEx%n" /> <Policies> <TimeBasedTriggeringPolicy interval="1" modulate="true" /> <SizeBasedTriggeringPolicy size="${EVERY_FILE_SIZE}" /> </Policies> <Filters> <ThresholdFilter level="INFO" onMatch="ACCEPT" onMismatch="NEUTRAL" /> </Filters> </RollingRandomAccessFile>
roll後的文件格式以下json
二、日誌內容vim
json格式文件,最外層json按順序爲:tableName,logRequest,timestamp,statBody,logResponse,resultCode,resultMsgcentos
2016-11-30 09:18:21.916 INFO - { "tableName": "ReportView", "logRequest": { *** }, "timestamp": 1480468701432, "statBody": { *** }, "logResponse": { *** }, "resultCode": 1, "resultFailMsg": "" }
3、flume配置bash
虛擬機環境,請見個人博客http://www.cnblogs.com/xckk/p/6000881.html
hadoop環境,請見個人另外一篇博客http://www.cnblogs.com/xckk/p/6124553.html
此處flume環境是
centos1:flume-agent
centos2:flume-collector
一、flume agent配置,conf文件
a1.sources = skydataSource a1.channels = skydataChannel a1.sinks = skydataSink a1.sources.skydataSource.type = spooldir a1.sources.skydataSource.channels = skydataChannel #日誌目錄 a1.sources.skydataSource.spoolDir = /opt/flumeSpool a1.sources.skydataSource.fileHeader = true #日誌內容處理完後,會生成.COMPLETED後綴的文件,同時.log文件每一分鐘roll一個,此處忽略.log文件與.COMPLETED文件 a1.sources.skydataSource.ignorePattern=([^_]+)|(.*(\.log)$)|(.*(\.COMPLETED)$) a1.sources.skydataSource.basenameHeader=true a1.sources.skydataSource.deserializer.maxLineLength=102400 #自定義攔截器,對json格式的源日誌進行字段分隔,並添加timestamp,爲後面的hdfsSink作處理,攔截器代碼見後面 a1.sources.skydataSource.interceptors=i1 a1.sources.skydataSource.interceptors.i1.type=com.skydata.flume_interceptor.HiveLogInterceptor2$Builder a1.sinks.skydataSink.type = avro a1.sinks.skydataSink.channel = skydataChannel a1.sinks.skydataSink.hostname = centos2 a1.sinks.skydataSink.port = 4545 #此處配置deflate壓縮後,hive collector那邊必定也要相應配置解壓縮 a1.sinks.skydataSink.compression-type=deflate a1.channels.skydataChannel.type=memory a1.channels.skydataChannel.capacity=10000 a1.channels.skydataChannel.transactionCapacity=1000
二、flume collector配置
a1.sources = avroSource a1.channels = memChannel a1.sinks = hdfsSink a1.sources.avroSource.type = avro a1.sources.avroSource.channels = memChannel a1.sources.avroSource.bind=centos2 a1.sources.avroSource.port=4545 #與flume agent配置對應 a1.sources.avroSource.compression-type=deflate a1.sinks.hdfsSink.type = hdfs a1.sinks.hdfsSink.channel = memChannel # skydata_hive_log爲hive表,按年-月-日分區存儲, a1.sinks.hdfsSink.hdfs.path=hdfs://centos1:9000/flume/skydata_hive_log/dt=%Y-%m-%d a1.sinks.hdfsSink.hdfs.batchSize=10000 a1.sinks.hdfsSink.hdfs.fileType=DataStream a1.sinks.hdfsSink.hdfs.writeFormat=Text a1.sinks.hdfsSink.hdfs.rollSize=10240000 a1.sinks.hdfsSink.hdfs.rollCount=0 a1.sinks.hdfsSink.hdfs.rollInterval=300 a1.channels.memChannel.type=memory a1.channels.memChannel.capacity=100000 a1.channels.memChannel.transactionCapacity=10000
4、hive表建立與分區
一、hive表建立
在hive中執行建表語句後,hdfs://centos1:9000/flume/目錄下新生成了skydata_hive_log目錄。(建表語句裏面有location關鍵字)
\u0001表示hive經過該分隔符進行字段分離,該字符在linux用vim編輯器打開是^A。
因爲日誌格式是JSON格式,由於須要將JSON格式轉換成\u0001字符分隔,並經過dt進行分區。這一步經過flume自定義攔截器來完成。
CREATE TABLE `skydata_hive_log`( `tableNmae` string, `logRequest` string, `timestamp` bigint, `statBody` string, `logResponse` string, `resultCode` int, `resultFailMsg` string ) PARTITIONED BY ( `dt` string) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\u0001' STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' LOCATION 'hdfs://centos1:9000/flume/skydata_hive_log';
二、hive表分區
for ((i=-1;i<=365;i++)) do dt=$(date -d "$(date +%F) ${i} days" +%Y-%m-%d) echo date=$dt hive -e "ALTER TABLE skydata_hive_log ADD PARTITION(dt='${dt}')" >> logs/init_skydata_hive_log.out 2>>logs/init_skydata_hive_log.err done
5、自定義flume攔截器
新建maven工程,攔截器HiveInterceptor2代碼以下。
package com.skydata.flume_interceptor; import java.util.ArrayList; import java.util.List; import java.util.Map; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.interceptor.Interceptor; import org.apache.flume.interceptor.TimestampInterceptor.Constants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.alibaba.fastjson.JSONObject; import com.google.common.base.Charsets; import com.google.common.base.Joiner; public class HiveLogInterceptor2 implements Interceptor { private static Logger logger = LoggerFactory.getLogger(HiveLogInterceptor2.class); public static final String HIVE_SEPARATOR = "\001"; public void close() { // TODO Auto-generated method stub } public void initialize() { // TODO Auto-generated method stub } public Event intercept(Event event) { String orginalLog = new String(event.getBody(), Charsets.UTF_8); try { String log = this.parseLog(orginalLog); // 設置時間,用於hdfsSink long now = System.currentTimeMillis(); Map headers = event.getHeaders(); headers.put(Constants.TIMESTAMP, Long.toString(now)); event.setBody(log.getBytes()); } catch (Throwable throwable) { logger.error(("errror when intercept,log [ " + orginalLog + " ] "), throwable); return null; } return event; } public List<Event> intercept(List<Event> list) { List<Event> events = new ArrayList<Event>(); for (Event event : list) { Event interceptedEvent = this.intercept(event); if (interceptedEvent != null) { events.add(interceptedEvent); } } return events; } private static String parseLog(String log) { List<String> logFileds = new ArrayList<String>(); String dt = log.substring(0, 10); String keyStr = "INFO - "; int index = log.indexOf(keyStr); String content = ""; if (index != -1) { content = log.substring(index + keyStr.length(), log.length()); } //針對不一樣OS,使用不一樣回車換行符號 content = content.replaceAll("\r", ""); content = content.replaceAll("\n", "\\\\" + System.getProperty("line.separator")); JSONObject jsonObj = JSONObject.parseObject(content); String tableName = jsonObj.getString("tableName"); String logRequest = jsonObj.getString("logRequest"); String timestamp = jsonObj.getString("timestamp"); String statBody = jsonObj.getString("statBody"); String logResponse = jsonObj.getString("logResponse"); String resultCode = jsonObj.getString("resultCode"); String resultFailMsg = jsonObj.getString("resultFailMsg"); //字段分離 logFileds.add(tableName); logFileds.add(logRequest); logFileds.add(timestamp); logFileds.add(statBody); logFileds.add(logResponse); logFileds.add(resultCode); logFileds.add(resultFailMsg); logFileds.add(dt); return Joiner.on(HIVE_SEPARATOR).join(logFileds); } public static class Builder implements Interceptor.Builder { public Interceptor build() { return new HiveLogInterceptor2(); } public void configure(Context arg0) { } } }
pom.xml增長以下配置,將flume攔截器工程進行maven打包,jar包與依賴包均拷到${flume-agent}/lib目錄
<build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-dependency-plugin</artifactId> <configuration> <outputDirectory> ${project.build.directory} </outputDirectory> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-dependency-plugin</artifactId> <executions> <execution> <id>copy-dependencies</id> <phase>prepare-package</phase> <goals> <goal>copy-dependencies</goal> </goals> <configuration> <outputDirectory>${project.build.directory}/lib</outputDirectory> <overWriteReleases>true</overWriteReleases> <overWriteSnapshots>true</overWriteSnapshots> <overWriteIfNewer>true</overWriteIfNewer> </configuration> </execution> </executions> </plugin> </plugins> </build>
對日誌用分隔符"\001"進行分隔,。經攔截器處理後的日誌格式以下,^A便是"\001"
ReportView^A{"request":{},"requestBody":{"detailInfos":[],"flag":"","reportId":7092,"pageSize":0,"searchs":[],"orders":[],"pageNum":1}}^A1480468701432^A{"sourceId":22745,"reportId":7092,"projectId":29355,"userId":2532}^A{"responseBody":{"statusCodeValue":200,"httpHeaders":{},"body":{"msg":"請求成功","httpCode":200,"timestamp":1480468701849},"statusCode":"OK"},"response":{}}^A1^A^A2016-11-30
至此,flume+Hdfs+Hive的配置均已完成。
後續能夠經過mapreduce或者HQL對數據進行分析。
6、啓動運行與結果
一、啓動hadoop hdfs
參考個人前一篇文章:hadoop 1.2 集羣搭建與環境配置 http://www.cnblogs.com/xckk/p/6124553.html
二、啓動flume_collector和flume_agent,因爲flume啓動命令參數太多,本身寫了一個啓動腳本
start-Flume.sh
#!/bin/bash jps -l|grep org.apache.flume.node.Application|awk '{print $1}'|xargs kill -9 2>&1 >/dev/null cd "$(dirname "$0")" cd .. nohup bin/flume-ng agent --conf conf --conf-file conf/flume-conf.properties --name a1 2>&1 > /dev/null &
三、hdfs查看數據
能夠看到蒐集的日誌已經上傳到HDFS上
[root@centos1 bin]# rm -rf FlumeData.1480587273016.tmp [root@centos1 bin]# hadoop fs -ls /flume/skydata_hive_log/dt=2016-12-01/ Found 3 items -rw-r--r-- 3 root supergroup 5517 2016-12-01 08:12 /flume/skydata_hive_log/dt=2016-12-01/FlumeData.1480608753042.tmp -rw-r--r-- 3 root supergroup 5517 2016-12-01 08:40 /flume/skydata_hive_log/dt=2016-12-01/FlumeData.1480610453116 -rw-r--r-- 3 root supergroup 5517 2016-12-01 08:44 /flume/skydata_hive_log/dt=2016-12-01/FlumeData.1480610453117 [root@centos1 bin]#
四、啓動hive,查看數據,能夠看到hive已經能夠加載hdfs數據
[root@centos1 lib]# hive Logging initialized using configuration in file:/root/apache-hive-1.2.1-bin/conf/hive-log4j.properties hive> select * from skydata_hive_log limit 2; OK ReportView {"request":{},"requestBody":{"detailInfos":[],"flag":"","reportId":7092,"pageSize":0,"searchs":[],"orders":[],"pageNum":1}} 1480468701432 {"sourceId":22745,"reportId":7092,"projectId":29355,"userId":2532} {"responseBody":{"statusCodeValue":200,"httpHeaders":{},"body":{"msg":"請求成功","httpCode":200,"timestamp":1480468701849},"statusCode":"OK"},"response":{}} 1 2016-12-01 ReportDesignResult {"request":{},"requestBody":{"sourceId":22745,"detailInfos":[{"colName":"月份","flag":"0","reportId":7092,"colCode":"col_2_22745","pageSize":20,"type":"1","pageNum":1,"rcolCode":"col_25538","colType":"string","formula":"","id":25538,"position":"row","colId":181664,"dorder":1,"pColName":"月份","pRcolCode":"col_25538"},{"colName":"綜合利率(合計)","flag":"1","reportId":7092,"colCode":"col_11_22745","pageSize":20,"type":"1","pageNum":1,"rcolCode":"sum_col_25539","colType":"number","formula":"sum","id":25539,"position":"group","colId":181673,"dorder":1,"pColName":"綜合利率","pRcolCode":"col_25539"}],"flag":"bar1","reportId":7092,"reportName":"iiiissszzzV","pageSize":100,"searchs":[],"orders":[],"pageNum":1,"projectId":29355}} 1480468703586{"reportType":"bar1","sourceId":22745,"reportId":7092,"num":5,"usedFields":"月份$$綜合利率(合計)$$","projectId":29355,"userId":2532} {"responseBody":{"statusCodeValue":200,"httpHeaders":{},"body":{"msg":"請求成功","reportId":7092,"httpCode":200,"timestamp":1480468703774},"statusCode":"OK"},"response":{}} 1 2016-12-01 Time taken: 2.212 seconds, Fetched: 2 row(s) hive>
7、常見問題與處理方法
一、FATAL: Spool Directory source skydataSource: { spoolDir: /opt/flumeSpool }: Uncaught exception in SpoolDirectorySource thread. Restart or reconfigure Flume to continue processing.
java.nio.charset.MalformedInputException: Input length = 1
可能緣由:
一、字符編碼問題,spoolDir目錄下的日誌文件必須是UTF-8
二、使用Spooling Directory Source的時候,必定要避免同時讀寫一個文件的狀況,conf文件增長以下配置
a1.sources.skydataSource.ignorePattern=([^_]+)|(.*(\.log)$)|(.*(\.COMPLETED)$)
二、日誌導入到hadoop目錄,可是hive表查詢無數據。如hdfs://centos1:9000/flume/skydata_hive_log/dt=2016-12-01/下面有數據,
hive查詢 select * from skydata_hive_log 卻無數據
可能緣由:
一、建表的時候,沒有創建分區。即便flume進行了配置(a1.sinks.hdfsSink.hdfs.path=hdfs://centos1:9000/flume/skydata_hive_log/dt=%Y-%m-%d),可是表的分區結構沒有創建,所以文件導入到HDFS上後,HIVE並不能讀取。
解決方法:先建立分區,創建shell可執行文件,將該表的分區先建好
for ((i=-10;i<=365;i++)) do dt=$(date -d "$(date +%F) ${i} days" +%Y-%m-%d) echo date=$dt hive -e "ALTER TABLE skydata_hive_log ADD PARTITION(dt='${dt}')" >> logs/init_skydata_hive_log.out 2>>logs/init_skydata_hive_log.err done
二、也多是文件在hdfs上仍是.tmp文件,仍然被hdfs在寫入。.tmp文件hive暫時沒法讀取,只能讀取非.tmp文件。
解決方法:等待hdfs配置的roll間隔時間,或者達到必定大小後tmp文件重命名爲hdfs上的日誌文件後,再查詢hive,便可查到。
秀才坤坤出品
轉載請註明
原文地址:http://www.cnblogs.com/xckk/p/6125838.html