收集、分析線上日誌數據實戰——ELK


本文來自網易雲社區html

做者:田躲躲node


用戶行爲統計(User Behavior Statistics, UBS)一直是互聯網產品中必不可少的環節,也俗稱埋點。對於產品經理,運營人員來講,埋點固然是越多,覆蓋範圍越廣越好。經過用戶行爲分析系統可洞悉用戶基本操做習慣、探析用戶心理。經過行爲數據的補充,構建出精細、完整的用戶畫像,對不一樣特徵用戶作個性化營銷,提高用戶體驗。讓產品設計人員準確評估用戶行爲路徑轉化、產品改版優良、某一新功能對產品的影響幾何,讓運營人員作精準營銷而且評估營銷結果等。git

目前所負責項目前期採用了先後端約定字段,埋點統計用戶操做行爲。數據存放在DDB中。若是用戶行爲日誌很是大的話,這種方式確定是不可行的。故採用了目前比較成熟的ELK代替以前的統計流程。本篇文章主要介紹ELK集羣搭建,基本API封裝,以及遇到的一些坑。github

Elasticsearchweb

Elasticsearch是一個基於Lucene構建的開源、分佈式、RESTful風格的搜索引擎。它被設計用於雲計算中,具備實時搜索負載、穩定、快速、安裝使用方便等優勢。(以前用過SolrCloud,ES對用戶的侵入性簡直能夠忽略)正則表達式

集羣安裝:sql

   每臺機器先配置elasticsearch.yml,主要配置信息以下:json

#
# ---------------------------------- Cluster -----------------------------------
#
# Use a descriptive name for your cluster:
#
cluster.name: es-commenta-event #其餘機器集羣名稱應該保持一致
#
# ------------------------------------ Node ------------------------------------
#
# Use a descriptive name for the node:
#
node.name: es-node-c1
#
# Add custom attributes to the node:
#
#node.attr.rack: r1
#
# ----------------------------------- Paths ------------------------------------
#
# Path to directory where to store the data (separate multiple locations by comma):
#
path.data: /opt/elk/elasticsearch-5.1.1/data
#
# Path to log files:
#
path.logs: /opt/elk/elasticsearch-5.1.1/logs
#
# ----------------------------------- Memory -----------------------------------
#
# Lock the memory on startup:
#
#bootstrap.memory_lock: true
#
# Make sure that the heap size is set to about half the memory available
# on the system and that the owner of the process is allowed to use this
# limit.
#
# Elasticsearch performs poorly when the system is swapping the memory.
#
# ---------------------------------- Network -----------------------------------
#
# Set the bind address to a specific IP (IPv4 or IPv6):
#
network.host: 192.168.140.133 #本機器host
#
# Set a custom port for HTTP:
#
#http.port: 9200
#
# For more information, see the documentation at:
# <http://www.elastic.co/guide/en/elasticsearch/reference/current/modules-network.html>
#
# --------------------------------- Discovery ----------------------------------
#
# Pass an initial list of hosts to perform discovery when new node is started:
# The default list of hosts is ["127.0.0.1", "[::1]"]
#
discovery.zen.ping.unicast.hosts: ["192.168.140.133",  "192.168.140.134", "192.168.140.135"] #集羣host列表

# Prevent the "split brain" by configuring the majority of nodes (total number of nodes / 2 + 1):
#
discovery.zen.minimum_master_nodes: 2

集羣啓動:bootstrap

Q1:can not run elasticsearch as root後端

由於是本地虛擬機root安裝的,啓動的時候會報這個錯。解決方案是:

group esgroup 
useradd esuser -g esgroup -p espassword
chown -R esuser:esgroup /etc/
chown -R esuser:esgroup /opt/

切換到esuser用戶便可執行啓動命令。

Q2:Unsupported major.minor version 52.0

目前安裝的ES版本爲5.1.1,須要Jdk1.8的版本,故安裝下Jdk1.8,配置下環境變量,便可執行啓動命令。

Q3:max virtual memory areas vm.max_map_count [65530] likely too low, increase to at least [262144]

ES啓動佔用更大的內存。修改以下:

sysctl -w vm.max_map_count=262144

每一個ES服務設置好後,就能夠真正啓動了。依次啓動機器的時候,能夠看下機器日誌是否有node加入到集羣。如:

curl '192.168.140.133:9200'{    "name": "es-node-c1", 
    "cluster_name": "es-commenta-event", 
    "cluster_uuid": "wi_1VOWoRqecjIht3Ra3mg", 
    "version": {        "number": "5.1.1", 
        "build_hash": "5395e21", 
        "build_date": "2016-12-06T12:36:15.409Z", 
        "build_snapshot": false, 
        "lucene_version": "6.3.0"
    }, 
    "tagline": "You Know, for Search"}

目前有3臺虛擬機,默認ES有5個節點,能夠經過命令建立3個節點的index,每一個主節點有一個複製節點。

curl -XPUT 'http://192.168.140.133:9200/commenta' -d '{"settings" : {"number_of_shards" : 3,"number_of_replicas" : 1}}'

集羣狀態:

curl 'http://192.168.140.133:9200/_cluster/health?pretty'{  "cluster_name" : "es-commenta-event",  "status" : "green",  "timed_out" : false,  "number_of_nodes" : 3,  "number_of_data_nodes" : 3,  "active_primary_shards" : 3,  "active_shards" : 6,  "relocating_shards" : 0,  "initializing_shards" : 0,  "unassigned_shards" : 0,  "delayed_unassigned_shards" : 0,  "number_of_pending_tasks" : 0,  "number_of_in_flight_fetch" : 0,  "task_max_waiting_in_queue_millis" : 0,  "active_shards_percent_as_number" : 50.0}

安裝插件:

經過類SQL轉化成DSL
bin/elasticsearch-plugin install install https://github.com/NLPchina/elasticsearch-sql/releases/download/5.1.1.0/elasticsearch-sql-5.1.1.0.zip
X-Pack集成了權限、監控等功能,是一款很是有用的插件。可是商用的,收費。
bin/elasticsearch-plugin install x-pack

Logstash

Logstash是一款輕量級的日誌蒐集處理框架,能夠方便的把分散的、多樣化的日誌蒐集起來,並進行自定義的處理,而後傳輸到指定的位置。

安裝:

  到官網下載logstash5.1.1版本便可。

啓動:

一、無配置文件啓動

bin/logstash -e 'input{ stdin{} } output{ stdout{} }'Sending Logstash's logs to /home/webedit/logstash/logstash-5.1.1/logs which is now configured via log4j2.properties
The stdin plugin is now waiting for input:
[2017-04-27T15:47:38,023][INFO ][logstash.pipeline        ] Starting pipeline {"id"=>"main", "pipeline.workers"=>4, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>5, "pipeline.max_inflight"=>500}
[2017-04-27T15:47:38,039][INFO ][logstash.pipeline        ] Pipeline main started
[2017-04-27T15:47:38,115][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600}
hello elastic
2017-04-27T07:49:00.966Z localhost.localdomain hello elastic

logstash會採集命令行輸入的命令

二、配置文件啓動

假設咱們須要採集的日誌記錄是這種格式的:

INFO  [17.04.27 16:12:12][com.netease.mail.vip.commenta.filter.EventLogFilter]: |44171|1|1|1|1493280732227|0.0|123.58.160.131|133001|COMMENTA-B54C43F5-4FCB-4D10-B9EC-67862FBF0055|1493280732440|huiping_mp|0.7.0|null|1|

如何採集這種格式的日誌呢?這裏採用正則表達式去匹配,具體配置文件以下:

input {

file {
    type => "commenta"
    path => ["/home/logs/commenta/stdout.log"]
    start_position => "beginning"
    codec => plain { charset => "Windows-1252" }
}

}

filter {if [type] == "commenta" {
    grok {
      match => { "message" => "%{DATA:className}\|%{BASE16FLOAT:id}\|%{DATA:eventType:int}\|%{DATA:page:int}\|%{DATA:eventFrom:int}\|%{DATA:eventTime}\|%{BASE16FLOAT:eventWeight}\|%{DATA:ip}\|%{BASE16FLOAT:userId}\|%{DATA:uniqueCode}\|%{DATA:createTime}\|%{DATA:clientFrom}\|%{DATA:appVersion}\|%{DATA:data}\|%{DATA:eventStep:int}\|"}
          remove_field => ["message"]
        }
}if '_grokparsefailure' in [tags] { #過濾掉不匹配的事件
	drop{}
}

	mutate  { #數據類型轉換  
                        convert => [ "eventWeight", "float"]
                        convert => [ "id", "float"]
                        convert => [ "userId", "float"]
                }


}
output{
       
       stdout { codec => rubydebug } #打印出行爲日誌記錄在控制檯

       elasticsearch{
             hosts => ["192.168.140.133:9200","192.168.140.134:9200","192.168.140.135:9200"]
             index => "commenta"
        }
}

下面咱們能夠啓動logstash看下效果:

./bin/logstash -f ./config/logstash.conf{     "appVersion" => "0.7.0",           "data" => "null",             "ip" => "XXXXXXXXX",      "className" => "INFO  [17.04.27 16:12:12][com.netease.mail.vip.commenta.filter.EventLogFilter]: ",      "eventType" => 1,           "type" => "commenta",    "eventWeight" => 0.0,         "userId" => 133001.0,           "tags" => [],           "path" => "/home/logs/commenta/stdout.log",     "@timestamp" => 2017-04-27T08:18:58.245Z,     "uniqueCode" => "COMMENTA-B54C43F5-4FCB-4D10-B9EC-67862FBF0055",     "createTime" => "1493280732440",       "@version" => "1",           "host" => "testfb-m126-161",      "eventTime" => "1493280732227",      "eventStep" => 1,     "clientFrom" => "huiping_mp",             "id" => 44171.0,           "page" => 1,      "eventFrom" => 1}

經過打印在控制檯的日誌能夠看到咱們已經經過logstash收集到了行爲日誌記錄(部分數據已脫敏)。固然咱們也能夠經過Kibana看到這些數據,下部分將會講到。

三、啓動問題

Q1:Unsupported major.minor version 52.0

使用的是Logstash版本爲5.1.1,須要Jdk1.8的環境,故安裝下Jdk1.8,配置下環境變量,便可執行啓動命令。

Q2:unknown setting host for elasticsearch

配置Logstash的啓動文件時,注意版本的問題,如host-->hosts

Kibana

  Kibana是一個開源的分析與可視化平臺,設計出來用於和Elasticsearch一塊兒使用的。你能夠用kibana搜索、查看、交互存放在Elasticsearch索引裏的數據,使用各類不一樣的圖表、表格、地圖等kibana可以很輕易地展現高級數據分析與可視化。

安裝:

  到官網下載Kibana5.1.1版本便可。

啓動:

主要配置以下:

# Kibana is served by a back end server. This setting specifies the port to use.
#server.port: 5601

# Specifies the address to which the Kibana server will bind. IP addresses and host names are both valid values.
# The default is 'localhost', which usually means remote machines will not be able to connect.
# To allow connections from remote users, set this parameter to a non-loopback address.
server.host: "192.168.140.133"

# Enables you to specify a path to mount Kibana at if you are running behind a proxy. This only affects
# the URLs generated by Kibana, your proxy is expected to remove the basePath value before forwarding requests
# to Kibana. This setting cannot end in a slash.
#server.basePath: ""

# The maximum payload size in bytes for incoming server requests.
#server.maxPayloadBytes: 1048576

# The Kibana server's name.  This is used for display purposes.
#server.name: "your-hostname"

# The URL of the Elasticsearch instance to use for all your queries.
elasticsearch.url: "http://192.168.140.133:9200"
.......

啓動成功後,咱們能夠監控commenta*的索引(安裝ES的時候,建立了)

bin/kibana

這時候就能夠看到Logstash收集到的數據日誌了

固然咱們也能夠配置一些統計:

爲了更直觀的展現,咱們能夠把統計「拖拽」到Dashboard中。


至此,ELK已經搭建完成,並提供一些簡單的功能。 可是有一些統計Kibana是作不了的。這時候咱們程序須要處理一下。

Java API

 HandleEsClientServer

/* ES服務器列表 */
    private String serverList;    /* 設置client.transport.sniff爲true來使客戶端去嗅探整個集羣的狀態,把集羣中其它機器的ip地址加到客戶端中,它會自動幫你添加,而且自動發現新加入集羣的機器 */
    private Boolean sniff = false;    /* 集羣名稱 */
    private String clusterName;    /* 鏈接客戶端 */
    private Client client;    /* 搜索基本工具類 */
    private SearchDao searchDao;    public HandleEsClientServer() {
    }    public HandleEsClientServer(String serverList, Boolean sniff, String clusterName) {        this.serverList = serverList;        this.sniff = sniff;        this.clusterName = clusterName;
    }    @Override
    public void afterPropertiesSet() throws Exception {

        logger.info("es server start at time={}, serverList={}, clusterName={}, sniff={}", DateUtil.toStr(new Date(),DateUtil.YYYY_MM_DD_HH_MM_SS),
                serverList, clusterName, sniff);        if (this.getServerList() == null || this.getServerList().length() == 0) {
            logger.error("es serverList is null...");            return;
        }

        List clusterList = Splitter.on(",").trimResults().omitEmptyStrings().splitToList(this.getServerList());

        List transportAddresses = new ArrayList<>();        for (String cluster : clusterList) {
            List host = Splitter.on(":").trimResults().omitEmptyStrings().splitToList(cluster);
            String ip = host.get(0);
            Integer port = Integer.valueOf(host.get(1));            try {
                transportAddresses.add(new InetSocketTransportAddress(InetAddress.getByAddress(getIpByte(ip)), port == null ? 9300 : port));
            } catch (UnknownHostException e) {
                logger.error("init es client error={} at time={} ", e, DateUtil.toStr(new Date(),DateUtil.YYYY_MM_DD_HH_MM_SS));                return;
            }
        }        //配置啓動參數
        Settings settings = Settings.builder()
                .put("cluster.name", clusterName)
                .put("client.transport.sniff", sniff)
                .build();        //初始化Client
        this.client = new PreBuiltTransportClient(settings)
                .addTransportAddresses(transportAddresses.toArray(new TransportAddress[transportAddresses.size()]));        this.searchDao = new SearchDao(this.client);

        logger.info("es server start success at time={}", DateUtil.toStr(new Date(),DateUtil.YYYY_MM_DD_HH_MM_SS));

    }

HandleEsData

 /**
     * 根據elasticsearch-sql插件的sql語句查詢結果。
     * @param query
     * @return
     * @throws SqlParseException
     * @throws SQLFeatureNotSupportedException
     */
    public SqlResponse selectBySQL(String query) throws SqlParseException, SQLFeatureNotSupportedException {

        logger.info("selectBySQL, query={}",query);        try{
            SqlElasticSearchRequestBuilder select = (SqlElasticSearchRequestBuilder) searchDao.explain(query).explain();            return new SqlResponse((SearchResponse)select.get());
        }catch (Exception e){
            logger.error(e.getMessage(),e);
        }        return null;

    }/**
     * 批量插入數據,使用Obj的id字段。
     * @param _index
     * @param _type
     * @param data
     * @param generate_id
     * @param 
     * @return
     */
    public  BulkResponse batchObjIndex(String _index, String _type, List data, boolean generate_id){

        logger.info("batchObjIndex, index={}, type={}, data={}, generate_id={}", _index, _type, data, generate_id);

        Assert.notEmpty(data, "data is not allowed empty");

        BulkRequestBuilder bulkRequest = client.prepareBulk();        for (T tObj : data) {
            Class clazz = tObj.getClass();
            String json = JSONObject.toJSONString(tObj, SerializerFeature.WriteMapNullValue);            if(generate_id){
                bulkRequest.add(client.prepareIndex(_index.toLowerCase(), _type.toLowerCase()).setSource(json));
            } else {                try {
                    Object value = clazz.getDeclaredMethod("getId").invoke(tObj);
                    String _id = String.valueOf(value);
                    bulkRequest.add(client.prepareIndex(_index.toLowerCase(), _type.toLowerCase(), _id).setSource(json));
                } catch (Exception e) {
                    logger.error(e.getMessage(),e);
                }
            }
        }        return bulkRequest.execute().actionGet();
    }

參考資料:

      http://www.learnes.net/

        http://udn.yyuap.com/doc/logstash-best-practice-cn/  

        https://www.gitbook.com/book/chenryn/elk-stack-guide-cn/details

        https://www.elastic.co/guide/en/elasticsearch/reference/5.1/getting-started.html

        https://www.elastic.co/guide/en/logstash/5.1/getting-started-with-logstash.html

        https://www.elastic.co/guide/en/kibana/5.1/getting-started.html

        http://elasticsearch.cn/


網易雲免費體驗館,0成本體驗20+款雲產品!

更多網易研發、產品、運營經驗分享請訪問網易雲社區


相關文章:
【推薦】 HTTP/2部署使用
【推薦】 相似gitlab代碼提交的熱力圖怎麼作?

相關文章
相關標籤/搜索