日誌的可視化---基本實現以及擴展

現狀:原先公司服務器的日誌是用來過後調試。日誌是根據時間按照鍵值對存儲的本地文件。業務繁忙或日子一長,便會產生大量無效的文件。除此以外,別無他用。javascript

目標:從日誌中提取有用的信息。從軟件上能夠獲取必定時間內的IO性能;客戶端請求的統計。從業務上則能夠獲取更多。css

這是大體的工做流程。html

image

技能預備:java

python 3.4 文本分析和數據處理 快速開發,快速實現業務
mongodb 鍵值對的存儲 數據因爲業務不一樣,相關的鍵值對也不一致,因此採用nosql
mysql 最終記錄的管理 具體業務的最終結果是相對固定的,同時也爲了後續客戶端的快速訪問
workbench 數據庫table的維護 數據庫維護工具
c# && wcf 服務端,給瀏覽器提供數據支持 構建一個簡單的服務端,支持瀏覽器的post方式獲取數據
Javascript && HTML 客戶端界面和操做 html,javascript分離界面和業務,手工綁定數據
Fiddler 測試wcf請求 測試客戶端的請求和返回值,調試封裝後的API

如何安裝這些軟件,就不在這裏詳述了。python

測試環境:mysql

windows 2008 r2 c#,workbench
ubuntu14.04 mongodb,mysql

實現過程:jquery

1. 原始日誌文件的輸入git

日誌基本說明:github

文件按照目錄存放,日誌內容是有時序。正則表達式

每條日誌佔用一行。

它的格式以下:2015-11-09 07:37:09 Request sn=0 topic=\RealTime\SEHKFeed\6881 fields=<F>P1<D>6881<F>P2<D>CGS<F>P3<D>中國銀河<F>P4<D>中國銀河<F>P6<D>7.730<F>P15<D>500

時間 2015-11-09 07:37:09
關鍵字 \RealTime\SEHKFeed\6881
數據 <F>P1<D>6881<F>P2<D>CGS<F>P3<D>中國銀河<F>P4<D>中國銀河<F>P6<D>7.730<F>P15<D>500

實現文件列表:

pyCollectMore.py 啓動腳本,設置參數,捕獲錯誤
pyScanner.py 枚舉目錄的文件,併發pyResolve(因爲開發時間有限,暫時作成單線程的方法調用了)
pyStoreDefine.py 常量定義
mongoAccesslib.py mongodb api封裝
pyResolve.py 利用正則表達式分析日誌行,寫入mongo數據庫

開發細節:

1. 因爲日誌之間有時間順序,因此同一個目錄的文件須要依次遍歷,但不一樣目錄能夠並行執行。

2. 主要是日誌topic對象的信息分解。

原始數據以下:

2015-11-09 07:37:09 Request sn=0 topic=\RealTime\SEHKFeed\6881 fields=<F>P1<D>6881<F>P2<D>CGS<F>P3<D>中國銀河<F>P4<D>中國銀河<F>P6<D>7.730<F>P15<D>500

分解1:獲取時間,topic名稱,原始鍵值對。

正則表達式:(\d\d\d\d-\d\d-\d\d\s\d\d:\d\d:\d\d)\s(?:Request|Update)\ssn=0\stopic=\\([\S]+)\\([\S]+)\sfields=(.+)

分解2:鍵值對。

正則表達式:<F>([^<]+)<D>([^<]*)

整理和合並日志記錄:

對於一個topic而言,包含2部分:命名空間和topic對象名稱。

在以前的示例中「\RealTime\SEHKFeed\6881」---命名空間:「\RealTime\SEHKFeed」;對象是:「6881」。

因此處理完成所有的日誌行後,須要合併相同命名空間下的對象名稱。

2. 中間結果的存儲

在以前任務中,咱們能夠獲取2部分的信息:topic的明細以及topic的清單。

這時候須要在mongodb中建立2個表。

tb_topic_list 記錄已收集的topic對象名稱
tb_命名空間 根據日誌收集後的命名空間,建立對應的表,存放每一個topic的所有明細

這時候使用nosql的好處了,事先不用定義表以及相關字段。

tb_topic_list字段定義:

_id 記錄id
tps 新建表名稱,後續調度程序使用
tpn 對象名稱
tpu 是否已經處理過

tb_命名空間(根據上下文應該是tb_RealTime_SEHKFeed)字段定義:

_id 記錄id
nss 記錄序列號(python維護)
nstp topic名稱
nsd 時間
nsn 命名空間
業務的字段P1,P2 ..

 

3. 任務調度和數據分析

實現文件列表:

pyTaskMng.py 啓動腳本,定時查詢待處理的topic,併發執行pyReduce(topic數量不少,因此這裏是併發進程)
pyReduce.py 從mongodb獲取已經分類的topic,根據時間順序,計算總數,最大值,最小值以及記錄明細
pyStoreDefine.py 常量定義
mongoAccesslib.py mongodb api封裝
mysql.connector mysql官方引用,在reduce以後寫入計算的結果

開發細節:

1. pyTaskMng運行在不一樣的機器上,定時檢查tb_topic_list。

若是有記錄上, 併發交給進程池,由pyReduce負責獲取topic對應的所有記錄,數量大的時候,批量獲取。

2. pyReduce分批獲取topic的明細。

根據業務規則:

P1 code
P2 名稱
P3 big5名稱
P4 gb名稱
P5 價格
P6 數量

因爲這個項目屬於demo,因此咱們假定如下的規則:

開盤價 每一個股票第一筆價格
收盤價 每一個股票最後一筆價格
最高價 初值等於開盤價,比較當前價後判斷是否更改
最低價 初值等於開盤價,比較當前價後判斷是否更改
變化次數 累計記錄數
成交量 每一個記錄報價數

pyReduce併發執行,執行完成後更新tb_topic_list。

計算的結果存入mysql表。

4. 最終結果的存儲

鑑於nosql在統計查詢上有些限制,因此使用mysql做爲統計信息的存儲設備。

secDaily表記錄統計信息

字段 類型 說明
tID int 主鍵
code varchar(45) 股票code
secName varchar(32) 股票名稱
ns varchar(64) 命名空間
secTime datetime 日期
dayNum int 邏輯天
openPrice double 開盤價
closePrice double 收盤價
highPrice double 最高價
lowPrice double 最低價
changeTotal int 變化次數
exQty double 成交量

secDetail表記錄時間戳

字段 類型 說明
sdID int 主鍵
secDailyID int 主表id
seq int 序列號
detailTime datetime 時間戳
price double 價格
qty double 數量

5. 提供數據的服務端

爲了快速顯示,此次使用wcf做爲服務端。

提供5個接口:

獲取指定sec列表總數  
  select count(tID) from secDaily where secCode like '%{0}%' or secName like '%{0}%';
  http://localhost:8000/DEMOService/secSum
 

R:  {"filter" :"1"}   
S: {"cookie":0,"total":279}

獲取批量sec列表  
 

select secCode,secName,ns,DATE_FORMAT(secTime,'%Y-%m-%%d %H:%i:%S'),openPrice,closePrice

,highPrice,lowPrice,changeTotal,exQty 

from secDaily

where secCode like '%{0}%' or secName like '%{0}%' 

order by  secCode limit {1},{2};

  http://localhost:8000/DEMOService/secbatch
 

R: {"filter" :"1", "offset": 0,"count":10}                  
S:{"cookie": 0,
         "items": [
             {
                 "changeTotal": 21728,
                 "closePrice": 104.1,
                 "exQty": 10864000,
                 "highPrice": 104.1,
                 "lowPrice": 104.1,
                 "ns": "RealTime-SEHKFeed",
                 "openPrice": 104.1,
                 "secCode": "1",
                 "secName": "CKH HOLDINGS",
                 "secTime": "2015-11-09 09:51:11"
             }
         ]
     }

獲取指定sec明細總數  
 

select count(sdID) from secDetail 

inner join secDaily on secDetail.secDailyID = secDaily.tID

where secDaily.secCode = '{0}';

  http://localhost:8000/DEMOService/detailSum
 

R:{"code" :"1"}
S:{"total":21728}

獲取指定sec批量列表(關鍵是item的時間)    
  select seq,DATE_FORMAT(detailTime,'%Y-%m-%%d %H:%i:%S')
from secDetail 
inner join secDaily on secDetail.secDailyID = secDaily.tID
where secDaily.secCode = '{0}'  order by seq  limit {1},{2};  
  獲取指定sec批量列表
 

R:  {"code" :"1", "offset": 0,"count":2}
S:{ "cookie": 0,
        "items": [
            {
                "detailTime": "2015-11-09 09:51:11",
                "seq": 45
            },
            {
                "detailTime": "2015-11-09 09:51:11",
                "seq": 46
            }
        ]
    }

獲取指定時間內統計  
  create procedure getRange(codeRange varchar(45), beginTime varchar(20), endTime varchar(20),rangval int)
須要構建臨時表,填充空記錄
  http://localhost:8000/DEMOService/rangeTotal

客戶端經過瀏覽器使用POSt方式查詢數據.因此服務器請求須要特別處理。

[ServiceContract(Name = "RESTDemoServices")]
public interface IRESTDemoServices
{
    [OperationContract]
    [WebInvoke(UriTemplate = "secSum", Method = "*", BodyStyle = WebMessageBodyStyle.Bare, RequestFormat = WebMessageFormat.Json, ResponseFormat = WebMessageFormat.Json)]
     SecSumResponse GetSecSum(SecSumRequest req);
}

[ServiceBehavior(InstanceContextMode = InstanceContextMode.Single, ConcurrencyMode = ConcurrencyMode.Single, IncludeExceptionDetailInFaults = true)]
[AspNetCompatibilityRequirements(RequirementsMode = AspNetCompatibilityRequirementsMode.Allowed)]
[JavascriptCallbackBehavior(UrlParameterName = "callback")]
public class RestDemoServices : BaseEntry.IRESTDemoServices
{
    public SecSumResponse GetSecSum(SecSumRequest req)
    {
           
            if (WebOperationContext.Current.IncomingRequest.Method == "OPTIONS")
            {
                WebOperationContext.Current.OutgoingResponse.Headers.Add("Access-Control-Allow-Origin", "*");
                WebOperationContext.Current.OutgoingResponse.Headers.Add("Access-Control-Allow-Methods", "POST");
                WebOperationContext.Current.OutgoingResponse.Headers.Add("Access-Control-Allow-Headers", "Content-Type, Accept");

                return null;
            }
            WebOperationContext.Current.OutgoingResponse.Headers.Add("Access-Control-Allow-Origin", "*");
            SecSumResponse resp = new SecSumResponse();
    }
}

6. 客戶端實現 Javascript, chart.JS

實現文件列表:

main.htm 界面佈局
bigDataStyle.css css
jquery-1.11.2.js jquery
mainImpl.js 界面實現腳本
SecDailyApi.js api封裝腳本
table.js 表格腳本
Chart.JS Chart.JS引用

效果圖

image

測試文件:ch1.fxl

7. 接下來能夠作的任務

這個項目屬於demo性質。下一步預備使用java改寫python的實現,scan和reduce部分的存儲可能考慮hadoop實現存儲和分發。

源代碼:https://github.com/febwave/pybig

相關文章
相關標籤/搜索