現狀:原先公司服務器的日誌是用來過後調試。日誌是根據時間按照鍵值對存儲的本地文件。業務繁忙或日子一長,便會產生大量無效的文件。除此以外,別無他用。javascript
目標:從日誌中提取有用的信息。從軟件上能夠獲取必定時間內的IO性能;客戶端請求的統計。從業務上則能夠獲取更多。css
這是大體的工做流程。html
技能預備:java
python 3.4 | 文本分析和數據處理 | 快速開發,快速實現業務 |
mongodb | 鍵值對的存儲 | 數據因爲業務不一樣,相關的鍵值對也不一致,因此採用nosql |
mysql | 最終記錄的管理 | 具體業務的最終結果是相對固定的,同時也爲了後續客戶端的快速訪問 |
workbench | 數據庫table的維護 | 數據庫維護工具 |
c# && wcf | 服務端,給瀏覽器提供數據支持 | 構建一個簡單的服務端,支持瀏覽器的post方式獲取數據 |
Javascript && HTML | 客戶端界面和操做 | html,javascript分離界面和業務,手工綁定數據 |
Fiddler | 測試wcf請求 | 測試客戶端的請求和返回值,調試封裝後的API |
如何安裝這些軟件,就不在這裏詳述了。python
測試環境:mysql
windows 2008 r2 | c#,workbench |
ubuntu14.04 | mongodb,mysql |
實現過程:jquery
1. 原始日誌文件的輸入git
日誌基本說明:github
文件按照目錄存放,日誌內容是有時序。正則表達式
每條日誌佔用一行。
它的格式以下:2015-11-09 07:37:09 Request sn=0 topic=\RealTime\SEHKFeed\6881 fields=<F>P1<D>6881<F>P2<D>CGS<F>P3<D>中國銀河<F>P4<D>中國銀河<F>P6<D>7.730<F>P15<D>500
時間 | 2015-11-09 07:37:09 |
關鍵字 | \RealTime\SEHKFeed\6881 |
數據 | <F>P1<D>6881<F>P2<D>CGS<F>P3<D>中國銀河<F>P4<D>中國銀河<F>P6<D>7.730<F>P15<D>500 |
實現文件列表:
pyCollectMore.py | 啓動腳本,設置參數,捕獲錯誤 |
pyScanner.py | 枚舉目錄的文件,併發pyResolve(因爲開發時間有限,暫時作成單線程的方法調用了) |
pyStoreDefine.py | 常量定義 |
mongoAccesslib.py | mongodb api封裝 |
pyResolve.py | 利用正則表達式分析日誌行,寫入mongo數據庫 |
開發細節:
1. 因爲日誌之間有時間順序,因此同一個目錄的文件須要依次遍歷,但不一樣目錄能夠並行執行。
2. 主要是日誌topic對象的信息分解。
原始數據以下:
2015-11-09 07:37:09 Request sn=0 topic=\RealTime\SEHKFeed\6881 fields=<F>P1<D>6881<F>P2<D>CGS<F>P3<D>中國銀河<F>P4<D>中國銀河<F>P6<D>7.730<F>P15<D>500
分解1:獲取時間,topic名稱,原始鍵值對。
正則表達式:(\d\d\d\d-\d\d-\d\d\s\d\d:\d\d:\d\d)\s(?:Request|Update)\ssn=0\stopic=\\([\S]+)\\([\S]+)\sfields=(.+)
分解2:鍵值對。
正則表達式:<F>([^<]+)<D>([^<]*)
整理和合並日志記錄:
對於一個topic而言,包含2部分:命名空間和topic對象名稱。
在以前的示例中「\RealTime\SEHKFeed\6881」---命名空間:「\RealTime\SEHKFeed」;對象是:「6881」。
因此處理完成所有的日誌行後,須要合併相同命名空間下的對象名稱。
2. 中間結果的存儲
在以前任務中,咱們能夠獲取2部分的信息:topic的明細以及topic的清單。
這時候須要在mongodb中建立2個表。
tb_topic_list | 記錄已收集的topic對象名稱 |
tb_命名空間 | 根據日誌收集後的命名空間,建立對應的表,存放每一個topic的所有明細 |
這時候使用nosql的好處了,事先不用定義表以及相關字段。
tb_topic_list字段定義:
_id | 記錄id |
tps | 新建表名稱,後續調度程序使用 |
tpn | 對象名稱 |
tpu | 是否已經處理過 |
tb_命名空間(根據上下文應該是tb_RealTime_SEHKFeed)字段定義:
_id | 記錄id |
nss | 記錄序列號(python維護) |
nstp | topic名稱 |
nsd | 時間 |
nsn | 命名空間 |
業務的字段P1,P2 | .. |
3. 任務調度和數據分析
實現文件列表:
pyTaskMng.py | 啓動腳本,定時查詢待處理的topic,併發執行pyReduce(topic數量不少,因此這裏是併發進程) |
pyReduce.py | 從mongodb獲取已經分類的topic,根據時間順序,計算總數,最大值,最小值以及記錄明細 |
pyStoreDefine.py | 常量定義 |
mongoAccesslib.py | mongodb api封裝 |
mysql.connector | mysql官方引用,在reduce以後寫入計算的結果 |
開發細節:
1. pyTaskMng運行在不一樣的機器上,定時檢查tb_topic_list。
若是有記錄上, 併發交給進程池,由pyReduce負責獲取topic對應的所有記錄,數量大的時候,批量獲取。
2. pyReduce分批獲取topic的明細。
根據業務規則:
P1 | code |
P2 | 名稱 |
P3 | big5名稱 |
P4 | gb名稱 |
P5 | 價格 |
P6 | 數量 |
因爲這個項目屬於demo,因此咱們假定如下的規則:
開盤價 | 每一個股票第一筆價格 |
收盤價 | 每一個股票最後一筆價格 |
最高價 | 初值等於開盤價,比較當前價後判斷是否更改 |
最低價 | 初值等於開盤價,比較當前價後判斷是否更改 |
變化次數 | 累計記錄數 |
成交量 | 每一個記錄報價數 |
pyReduce併發執行,執行完成後更新tb_topic_list。
計算的結果存入mysql表。
4. 最終結果的存儲
鑑於nosql在統計查詢上有些限制,因此使用mysql做爲統計信息的存儲設備。
secDaily表記錄統計信息
字段 | 類型 | 說明 |
tID | int | 主鍵 |
code | varchar(45) | 股票code |
secName | varchar(32) | 股票名稱 |
ns | varchar(64) | 命名空間 |
secTime | datetime | 日期 |
dayNum | int | 邏輯天 |
openPrice | double | 開盤價 |
closePrice | double | 收盤價 |
highPrice | double | 最高價 |
lowPrice | double | 最低價 |
changeTotal | int | 變化次數 |
exQty | double | 成交量 |
secDetail表記錄時間戳
字段 | 類型 | 說明 |
sdID | int | 主鍵 |
secDailyID | int | 主表id |
seq | int | 序列號 |
detailTime | datetime | 時間戳 |
price | double | 價格 |
qty | double | 數量 |
5. 提供數據的服務端
爲了快速顯示,此次使用wcf做爲服務端。
提供5個接口:
獲取指定sec列表總數 | |
select count(tID) from secDaily where secCode like '%{0}%' or secName like '%{0}%'; | |
http://localhost:8000/DEMOService/secSum | |
R: {"filter" :"1"} |
|
獲取批量sec列表 | |
select secCode,secName,ns,DATE_FORMAT(secTime,'%Y-%m-%%d %H:%i:%S'),openPrice,closePrice ,highPrice,lowPrice,changeTotal,exQty from secDaily where secCode like '%{0}%' or secName like '%{0}%' order by secCode limit {1},{2}; |
|
http://localhost:8000/DEMOService/secbatch | |
R: {"filter" :"1", "offset": 0,"count":10} |
|
獲取指定sec明細總數 | |
select count(sdID) from secDetail inner join secDaily on secDetail.secDailyID = secDaily.tID where secDaily.secCode = '{0}'; |
|
http://localhost:8000/DEMOService/detailSum | |
R:{"code" :"1"} |
|
獲取指定sec批量列表(關鍵是item的時間) | |
select seq,DATE_FORMAT(detailTime,'%Y-%m-%%d %H:%i:%S') from secDetail inner join secDaily on secDetail.secDailyID = secDaily.tID where secDaily.secCode = '{0}' order by seq limit {1},{2}; |
|
獲取指定sec批量列表 | |
R: {"code" :"1", "offset": 0,"count":2} |
|
獲取指定時間內統計 | |
create procedure getRange(codeRange varchar(45), beginTime varchar(20), endTime varchar(20),rangval int) 須要構建臨時表,填充空記錄 |
|
http://localhost:8000/DEMOService/rangeTotal |
客戶端經過瀏覽器使用POSt方式查詢數據.因此服務器請求須要特別處理。
[ServiceContract(Name = "RESTDemoServices")]
public interface IRESTDemoServices
{
[OperationContract]
[WebInvoke(UriTemplate = "secSum", Method = "*", BodyStyle = WebMessageBodyStyle.Bare, RequestFormat = WebMessageFormat.Json, ResponseFormat = WebMessageFormat.Json)]
SecSumResponse GetSecSum(SecSumRequest req);
}
[ServiceBehavior(InstanceContextMode = InstanceContextMode.Single, ConcurrencyMode = ConcurrencyMode.Single, IncludeExceptionDetailInFaults = true)]
[AspNetCompatibilityRequirements(RequirementsMode = AspNetCompatibilityRequirementsMode.Allowed)]
[JavascriptCallbackBehavior(UrlParameterName = "callback")]
public class RestDemoServices : BaseEntry.IRESTDemoServices
{
public SecSumResponse GetSecSum(SecSumRequest req)
{
if (WebOperationContext.Current.IncomingRequest.Method == "OPTIONS")
{
WebOperationContext.Current.OutgoingResponse.Headers.Add("Access-Control-Allow-Origin", "*");
WebOperationContext.Current.OutgoingResponse.Headers.Add("Access-Control-Allow-Methods", "POST");
WebOperationContext.Current.OutgoingResponse.Headers.Add("Access-Control-Allow-Headers", "Content-Type, Accept");
return null;
}
WebOperationContext.Current.OutgoingResponse.Headers.Add("Access-Control-Allow-Origin", "*");
SecSumResponse resp = new SecSumResponse();
}
}
6. 客戶端實現 Javascript, chart.JS
實現文件列表:
main.htm | 界面佈局 |
bigDataStyle.css | css |
jquery-1.11.2.js | jquery |
mainImpl.js | 界面實現腳本 |
SecDailyApi.js | api封裝腳本 |
table.js | 表格腳本 |
Chart.JS | Chart.JS引用 |
效果圖
測試文件:ch1.fxl
7. 接下來能夠作的任務
這個項目屬於demo性質。下一步預備使用java改寫python的實現,scan和reduce部分的存儲可能考慮hadoop實現存儲和分發。