用ELK分析支付寶帳單(一)

在微信公衆號看到一篇文章叫作《用ELK分析你的支付寶帳單》,看了之後躍躍欲試,而且也很想學習一下ELK技術,因此就將此次的小實驗看成ELK的入門吧。json

一. 什麼是ELK?數組

ELK指的是elasticsearch, logstash和kibana,Elasticsearch是基於Lucene的分佈式搜索引擎,它對Lucene進行了封裝,提供了RestApi。Logstash負責採集數據,能夠從多個數據源中獲取數據、轉換數據。kibana則是提供了對ES中的數據的可視化展現和搜索。瀏覽器

二.數據準備微信

首先在支付寶官網下載本身的支付寶帳單(csv格式),先備份一份,另一份刪除表頭表尾,只保留數據部分。jvm

以下圖,支付寶提供的csv有顯示每一列的列名,意味着咱們能夠經過這些列名來識別每一列分別是幹什麼的,而且經過Logstash進行解析。elasticsearch

3、ELK的安裝分佈式

此次我打算先將ELK都安裝個人電腦(Windows系統)學習

ElasticSearch的安裝:解壓zip文件,而後直接運行安裝目錄下bin裏面的ElasticSearch.bat便可,要驗證是否啓動成功,用瀏覽器訪問localhost:9200,若是返回json格式的數據,說明ES啓動成功。搜索引擎

注意:由於數據中包含中文,爲了不亂碼,應在conf目錄下的jvm.options文件中,把-Dfile.encoding=UTF-8改爲-Dfile.encoding=GBK.url

啓動ElasticSearch後訪問localhost:9200

 

Logstash的安裝:解壓zip文件,要配置好conf文件,運行bin目錄下logstash.sh,並用-f指定conf文件的位置。

Kibana的安裝,解壓zip文件後,打開conf下的kibana.yml,配置好elasticsearch.url="http://localhost:9200"(其實只是被註釋了,只須要解開註釋便可),而後再運行bin目錄下的kibana.bat

而後訪問localhost:5601,看到如下界面,說明kibana啓動成功了。

四.Logstash的配置

下面這幅圖展現了Logstash的pipeline以及在Logstash中數據的流向。

 

Logstash的配置文件包括三部分,input,filter和output,Logstash的配置其實是這三部分plugin的配置。

input部分

input{
    file{
        type => "zhifubao"
        path => ["D:\qingfei\ELK\data\jenny_alipay.csv"]
        start_position=>"beginning"
        codec=> plain {
            charset=> "GBK"
        }
    }
}

 

這裏的file就是input的其中一個插件,file plugin的做用是從文件中獲取數據,通常是經過tail -0f的形式來讀取文件,但我用的是本身的支付寶帳單,並不會實時的更新,因此我指定start_position爲beginning。type能夠用來區分數據的類型,在後面的filter和output中都須要用到。codec用的是GBK.

filter部分:

filter {
    if[type] == "zhifubao" {
        csv {
            separator=>","
            columns => ["TranId","OrderId","CreateTime","PayTime","LastModified","TranSource","Type","CounterParty","ProductName","Amount","inOut","status","serviceCost","IssuccessRefund","Remark","Status"]
            convert => {
                "Amount" => "float"
            }

        }
        date {
            match => ["CreateTime","M/dd/yyyy HH:mm:ss","M/dd/yyyy HH:mm"]
        }
    }

}

Filter部分咱們使用的是csv plugin和date plugin。因爲咱們輸入的數據文件是csv文件,csv plugin能夠解析csv文件並把它們保存成指定的column.

separator屬性是咱們要告訴csv plugin,column與column之間的分隔符是什麼,默認是逗號。

columns屬性的數據類型是數組,這裏指定的是將數據文件中的數據解析成那些column,這裏必須按照csv中的順序來寫。

默認全部的column都會被解析成Text類型,但若是咱們要進行分析,咱們須要一些數值類型的數據,convert屬性能夠對column進行數據類型的轉換,例如這裏是將amount轉換成float類型的數據。

date plugin是用於將某些column轉換爲日期用於數據分析。match屬性的數據類型是數組,格式是[「field name」,formats....],第一個元素是要轉換的column名,後面能夠爲任意多個值,用於定義日期的格式。爲何要定義格式呢?由於date plugin並不知道輸入的數據是對應的日期是怎樣的,年月日怎麼獲取,因此咱們要告訴date plugin,如何獲取年月日小時分鐘。因此此時咱們必需要觀察咱們的數據,來肯定格式。

如下是部分的日期數據,經過觀察能夠發現,CreateTime的值是按照「月份/日期/年份 小時:分鐘」的格式來表示時間的。

經過查閱date plugin的文檔,肯定了格式。

output部分:

output {
    if[type]=="zhifubao" {
        elasticsearch {
            hosts => ["localhost:9200"]
            index => "jenny"
        }
    }
}

由於咱們要將數據導入到ES,因此就應該使用elasticsearch plugin。hosts指定es的host,index指定es中的索引名。

5、在配置Logstash中遇到的問題

啓動Logstash後,我發現個人date plugin配置有錯誤,因此修改了配置,一直重啓logstash和es,卻發現個人配置一直沒有生效。後來我在想是否是要從新建立索引,因此我把output的index的名字改了,重啓logstash和es,發現仍是不生效,甚至新的index都沒有被建立。

後來查找了stackoverflow(https://stackoverflow.com/questions/32742379/logstash-file-input-glob-not-working)以及官方文檔,發現了一個叫sincedb的配置。官網是這麼解釋sincedb的。

The plugin keeps track of the current position in each file by recording it in a separate file named sincedb. This makes it possible to stop and restart Logstash and have it pick up where it left off without missing the lines that were added to the file while Logstash was stopped.

By default, the sincedb file is placed in the home directory of the user running Logstash with a filename based on the filename patterns being watched (i.e. the path option). 

個人理解是由於logstash對於每個文件都會記錄上一次讀到/解析到的位置,保存在一個叫sincedb的文件,由於文件一直沒有更新,因此即便我不停地從新啓動logstash和從新指定一個索引,logstash都會從上次讀到的位置開始讀。因此新的索引一直沒法建立成功。後來在conf文件裏將sincedb的路徑指向null,就能夠建立新索引了。

因此新的logstash的配置文件內容以下:

input{
    file{
        type => "zhifubao"
        path => ["D:\qingfei\ELK\data\jenny_alipay.csv"]
        start_position=>"beginning"
        codec=> plain {
            charset=> "GBK"
        }
        sincedb_path => "null"

    }
}
filter {
    if[type] == "zhifubao" {
        csv {
            separator=>","
            columns => ["TranId","OrderId","CreateTime","PayTime","LastModified","TranSource","Type","CounterParty","ProductName","Amount","inOut","status","serviceCost","IssuccessRefund","Remark","Status"]
            convert => {
                "Amount" => "float"
            }

        }
        date {
            match => ["CreateTime","M/dd/yyyy HH:mm:ss","M/dd/yyyy HH:mm"]
        }
    }

}
output {
    if[type]=="zhifubao" {
        elasticsearch {
            hosts => ["localhost:9200"]
            index => "jenny"
        }
    }
}

 重啓logstash之後,新的index就被建立了。

接下來的任務就是用kibana展現和分析數據了,這部分放在下一篇文章。

相關文章
相關標籤/搜索