ELK實時日誌分析平臺環境部署--完整記錄

 

在平常運維工做中,對於系統和業務日誌的處理尤其重要。今天,在這裏分享一下本身部署的ELK(+Redis)-開源實時日誌分析平臺的記錄過程(僅依據本人的實際操做爲例說明,若有誤述,敬請指出)~html

================概念介紹================
日誌主要包括系統日誌、應用程序日誌和安全日誌。系統運維和開發人員能夠經過日誌瞭解服務器軟硬件信息、檢查配置過程當中的錯誤及錯誤發生的緣由。常常分析日誌能夠了解服務器的負荷,性能安全性,從而及時採起措施糾正錯誤。

一般,日誌被分散在儲存不一樣的設備上。若是你管理數十上百臺服務器,你還在使用依次登陸每臺機器的傳統方法查閱日誌。這樣是否是感受很繁瑣和效率低下。當務之急咱們使用集中化的日誌管理,例如:開源的syslog,將全部服務器上的日誌收集彙總。前端

集中化管理日誌後,日誌的統計和檢索又成爲一件比較麻煩的事情,通常咱們使用grep、awk和wc等Linux命令能實現檢索和統計,可是對於要求更高的查詢、排序和統計等要求和龐大的機器數量依然使用這樣的方法不免有點力不從心。java

經過咱們須要對日誌進行集中化管理,將全部機器上的日誌信息收集、彙總到一塊兒。完整的日誌數據具備很是重要的做用:
1)信息查找。經過檢索日誌信息,定位相應的bug,找出解決方案。
2)服務診斷。經過對日誌信息進行統計、分析,瞭解服務器的負荷和服務運行狀態,找出耗時請求進行優化等等。
3)數據分析。若是是格式化的log,能夠作進一步的數據分析,統計、聚合出有意義的信息,好比根據請求中的商品id,找出TOP10用戶感興趣商品。node

開源實時日誌分析ELK平臺可以完美的解決咱們上述的問題,ELK由ElasticSearch、Logstash和Kiabana三個開源工具組成:
1)ElasticSearch是一個基於Lucene的開源分佈式搜索服務器。它的特色有:分佈式,零配置,自動發現,索引自動分片,索引副本機制,restful風格接口,多數據源,自動搜索負載等。它提供了一個分佈式多用戶能力的全文搜索引擎,基於RESTful web接口。Elasticsearch是用Java開發的,並做爲Apache許可條款下的開放源碼發佈,是第二流行的企業搜索引擎。設計用於雲計算中,可以達到實時搜索,穩定,可靠,快速,安裝使用方便。
在elasticsearch中,全部節點的數據是均等的。
2)Logstash是一個徹底開源的工具,它能夠對你的日誌進行收集、過濾、分析,支持大量的數據獲取方法,並將其存儲供之後使用(如搜索)。說到搜索,logstash帶有一個web界面,搜索和展現全部日誌。通常工做方式爲c/s架構,client端安裝在須要收集日誌的主機上,server端負責將收到的各節點日誌進行過濾、修改等操做在一併發往elasticsearch上去。
3)Kibana 是一個基於瀏覽器頁面的Elasticsearch前端展現工具,也是一個開源和免費的工具,Kibana能夠爲 Logstash 和 ElasticSearch 提供的日誌分析友好的 Web 界面,能夠幫助您彙總、分析和搜索重要數據日誌。mysql

爲何要用到ELK?
通常咱們須要進行日誌分析場景是:直接在日誌文件中 grep、awk 就能夠得到本身想要的信息。但在規模較大的場景中,此方法效率低下,面臨問題包括日誌量太大如何歸檔、文本搜索太慢怎麼辦、如何多維度查詢。須要集中化的日誌管理,全部服務器上的日誌收集彙總。常看法決思路是創建集中式日誌收集系統,將全部節點上的日誌統一收集,管理,訪問。
通常大型系統是一個分佈式部署的架構,不一樣的服務模塊部署在不一樣的服務器上,問題出現時,大部分狀況須要根據問題暴露的關鍵信息,定位到具體的服務器和服務模塊,構建一套集中式日誌系統,能夠提升定位問題的效率。linux

通常大型系統是一個分佈式部署的架構,不一樣的服務模塊部署在不一樣的服務器上,問題出現時,大部分狀況須要根據問題暴露的關鍵信息,定位到具體的服務器和服務模塊,構建一套集中式日誌系統,能夠提升定位問題的效率。
一個完整的集中式日誌系統,須要包含如下幾個主要特色:
1)收集-可以採集多種來源的日誌數據
2)傳輸-可以穩定的把日誌數據傳輸到中央系統
3)存儲-如何存儲日誌數據
4)分析-能夠支持 UI 分析
5)警告-可以提供錯誤報告,監控機制ios

ELK提供了一整套解決方案,而且都是開源軟件,之間互相配合使用,完美銜接,高效的知足了不少場合的應用。目前主流的一種日誌系統。nginx

ELK工做原理展現圖:git

如上圖:Logstash收集AppServer產生的Log,並存放到ElasticSearch集羣中,而Kibana則從ES集羣中查詢數據生成圖表,再返回給Browser。github

Logstash工做原理:
Logstash事件處理有三個階段:inputs → filters → outputs。是一個接收,處理,轉發日誌的工具。支持系統日誌,webserver日誌,錯誤日誌,應用日誌,總之包括全部能夠拋出來的日誌類型。

Input:輸入數據到logstash。

一些經常使用的輸入爲:
file:從文件系統的文件中讀取,相似於tial -f命令
syslog:在514端口上監聽系統日誌消息,並根據RFC3164標準進行解析
redis:從redis service中讀取
beats:從filebeat中讀取
Filters:數據中間處理,對數據進行操做。

一些經常使用的過濾器爲:
grok:解析任意文本數據,Grok 是 Logstash 最重要的插件。它的主要做用就是將文本格式的字符串,轉換成爲具體的結構化的數據,配合正則表達式使用。內置120多個解析語法。
mutate:對字段進行轉換。例如對字段進行刪除、替換、修改、重命名等。
drop:丟棄一部分events不進行處理。
clone:拷貝 event,這個過程當中也能夠添加或移除字段。
geoip:添加地理信息(爲前臺kibana圖形化展現使用)
Outputs:outputs是logstash處理管道的最末端組件。一個event能夠在處理過程當中通過多重輸出,可是一旦全部的outputs都執行結束,這個event也就完成生命週期。

一些常見的outputs爲:
elasticsearch:能夠高效的保存數據,而且可以方便和簡單的進行查詢。
file:將event數據保存到文件中。
graphite:將event數據發送到圖形化組件中,一個很流行的開源存儲圖形化展現的組件。
Codecs:codecs 是基於數據流的過濾器,它能夠做爲input,output的一部分配置。Codecs能夠幫助你輕鬆的分割發送過來已經被序列化的數據。

一些常見的codecs:
json:使用json格式對數據進行編碼/解碼。
multiline:將匯多個事件中數據彙總爲一個單一的行。好比:java異常信息和堆棧信息。

======================ELK總體方案=======================
ELK中的三個系統分別扮演不一樣的角色,組成了一個總體的解決方案。Logstash是一個ETL工具,負責從每臺機器抓取日誌數據,對數據進行格式轉換和處理後,輸出到Elasticsearch中存儲。Elasticsearch是一個分佈式搜索引擎和分析引擎,用於數據存儲,可提供實時的數據查詢。Kibana是一個數據可視化服務,根據用戶的操做從Elasticsearch中查詢數據,造成相應的分析結果,以圖表的形式展示給用戶。
ELK的安裝很簡單,能夠按照"下載->修改配置文件->啓動"方法分別部署三個系統,也可使用docker來快速部署。具體的安裝方法這裏不詳細介紹,下面來看一個常見的部署方案,以下圖所示,部署思路是:
1)在每臺生成日誌文件的機器上,部署Logstash,做爲Shipper的角色,負責從日誌文件中提取數據,可是不作任何處理,直接將數據輸出到Redis隊列(list)中;
2)須要一臺機器部署Logstash,做爲Indexer的角色,負責從Redis中取出數據,對數據進行格式化和相關處理後,輸出到Elasticsearch中存儲;
3)部署Elasticsearch集羣,固然取決於你的數據量了,數據量小的話可使用單臺服務,若是作集羣的話,最好是有3個以上節點,同時還須要部署相關的監控插件;
4)部署Kibana服務,提供Web服務。

在前期部署階段,主要工做是Logstash節點和Elasticsearch集羣的部署,而在後期使用階段,主要工做就是Elasticsearch集羣的監控和使用Kibana來檢索、分析日誌數據了,固然也能夠直接編寫程序來消費Elasticsearch中的數據。

在上面的部署方案中,咱們將Logstash分爲Shipper和Indexer兩種角色來完成不一樣的工做,中間經過Redis作數據管道,爲何要這樣作?爲何不是直接在每臺機器上使用Logstash提取數據、處理、存入Elasticsearch?

首先,採用這樣的架構部署,有三點優點:第一,下降對日誌所在機器的影響,這些機器上通常都部署着反向代理或應用服務,自己負載就很重了,因此儘量的在這些機器上少作事;第二,若是有不少臺機器須要作日誌收集,那麼讓每臺機器都向Elasticsearch持續寫入數據,必然會對Elasticsearch形成壓力,所以須要對數據進行緩衝,同時,這樣的緩衝也能夠必定程度的保護數據不丟失;第三,將日誌數據的格式化與處理放到Indexer中統一作,能夠在一處修改代碼、部署,避免須要到多臺機器上去修改配置。 

其次,咱們須要作的是將數據放入一個消息隊列中進行緩衝,因此Redis只是其中一個選擇,也能夠是RabbitMQ、Kafka等等,在實際生產中,Redis與Kafka用的比較多。因爲Redis集羣通常都是經過key來作分片,沒法對list類型作集羣,在數據量大的時候必然不合適了,而Kafka天生就是分佈式的消息隊列系統。

1)配置nginx日誌格式
首先須要將nginx日誌格式規範化,便於作解析處理。在nginx.conf文件中設置:

log_format main '$remote_addr "$time_iso8601" "$request" $status $body_bytes_sent "$http_user_agent" "$http_referer" "$http_x_forwarded_for" "$request_time" "$upstream_response_time" "$http_cookie" "$http_Authorization" "$http_token"';
access_log  /var/log/nginx/example.access.log  main;

2)nginx日誌–>>Logstash–>>消息隊列
這部分是Logstash Shipper的工做,涉及input和output兩種插件。input部分,因爲須要提取的是日誌文件,通常使用file插件,該插件經常使用的幾個參數是:
path:指定日誌文件路徑。
type:指定一個名稱,設置type後,能夠在後面的filter和output中對不一樣的type作不一樣的處理,適用於須要消費多個日誌文件的場景。
start_position:指定起始讀取位置,「beginning」表示從文件頭開始,「end」表示從文件尾開始(相似tail -f)。
sincedb_path:與Logstash的一個坑有關。一般Logstash會記錄每一個文件已經被讀取到的位置,保存在sincedb中,若是Logstash重啓,那麼對於同一個文件,會繼續從上次記錄的位置開始讀取。若是想從新從頭讀取文件,須要刪除sincedb文件,sincedb_path則是指定了該文件的路徑。爲了方便,咱們能夠根據須要將其設置爲「/dev/null」,即不保存位置信息。

input {
    file {
        type => "example_nginx_access"
        path => ["/var/log/nginx/example.access.log"]

        start_position => "beginning"
        sincedb_path => "/dev/null"
    }
}

output部分,將數據輸出到消息隊列,以redis爲例,須要指定redis server和list key名稱。另外,在測試階段,可使用stdout來查看輸出信息。

# 輸出到redis
output {
    if [type] == "example_nginx_access" {
        redis {
            host => "127.0.0.1"
            port => "6379"
            data_type => "list"
            key => "logstash:example_nginx_access"
        }
      #  stdout {codec => rubydebug}
    }
}

3)消息隊列–>>Logstash–>>Elasticsearch
這部分是Logstash Indexer的工做,涉及input、filter和output三種插件。在input部分,咱們經過redis插件將數據從消息隊列中取出來。在output部分,咱們經過elasticsearch插件將數據寫入Elasticsearch。

# 從redis輸入數據
input {
    redis {
            host => "127.0.0.1"
            port => "6379"
            data_type => "list"
            key => "logstash:example_nginx_access"
    }
}

output {
    elasticsearch {
        index => "logstash-example-nginx-%{+YYYY.MM}"
        hosts => ["127.0.0.1:9200"]
    }
}

這裏,須要重點關注filter部分,下面列舉幾個經常使用的插件,實際使用中根據自身需求從官方文檔中查找適合本身業務的插件並使用便可,固然也能夠編寫本身的插件。
grok:是Logstash最重要的一個插件,用於將非結構化的文本數據轉化爲結構化的數據。grok內部使用正則語法對文本數據進行匹配,爲了下降使用複雜度,其提供了一組pattern,咱們能夠直接調用pattern而不須要本身寫正則表達式,參考源碼grok-patterns。grok解析文本的語法格式是%{SYNTAX:SEMANTIC},SYNTAX是pattern名稱,SEMANTIC是須要生成的字段名稱,使用工具Grok Debugger能夠對解析語法進行調試。例如,在下面的配置中,咱們先使用grok對輸入的原始nginx日誌信息(默認以message做爲字段名)進行解析,並添加新的字段request_path_with_verb(該字段的值是verb和request_path的組合),而後對request_path字段作進一步解析。
kv:用於將某個字段的值進行分解,相似於編程語言中的字符串Split。在下面的配置中,咱們將request_args字段值按照「&」進行分解,分解後的字段名稱以「request_args_」做爲前綴,而且丟棄重複的字段。
geoip:用於根據IP信息生成地理位置信息,默認使用自帶的一份GeoLiteCity database,也能夠本身更換爲最新的數據庫,可是須要數據格式須要遵循Maxmind的格式(參考GeoLite),彷佛目前只能支持legacy database,數據類型必須是.dat。下載GeoLiteCity.dat.gz後解壓, 並將文件路徑配置到source中便可。
translate:用於檢測某字段的值是否符合條件,若是符合條件則將其翻譯成新的值,寫入一個新的字段,匹配pattern能夠經過YAML文件來配置。例如,在下面的配置中,咱們對request_api字段翻譯成更加易懂的文字描述。

filter {
    grok {
        match => {"message" => "%{IPORHOST:client_ip} \"%{TIMESTAMP_ISO8601:timestamp}\" \"%{WORD:verb} %{NOTSPACE:request_path} HTTP/%{NUMBER:httpversion}\" %{NUMBER:response_status:int} %{NUMBER:response_body_bytes:int} \"%{DATA:user_agent}\" \"%{DATA:http_referer}\" \"%{NOTSPACE:http_x_forwarder_for}\" \"%{NUMBER:request_time:float}\" \"%{DATA:upstream_resopnse_time}\" \"%{DATA:http_cookie}\" \"%{DATA:http_authorization}\" \"%{DATA:http_token}\""}
        add_field => {"request_path_with_verb" => "%{verb} %{request_path}"}
    }

    grok {
        match => {"request_path" => "%{URIPATH:request_api}(?:\?%{NOTSPACE:request_args}|)"}
        add_field => {"request_annotation" => "%{request_api}"}
    }

    kv {
        prefix => "request_args_"
        field_split => "&"
        source => "request_args"
        allow_duplicate_values => false
    }

    geoip {
        source => "client_ip"
        database => "/home/elktest/geoip_data/GeoLiteCity.dat"
    }

   translate {
        field => request_path
        destination => request_annotation
        regex => true
        exact => true
        dictionary_path => "/home/elktest/api_annotation.yaml"
        override => true
    }
}

Elasticsearch
Elasticsearch承載了數據存儲和查詢的功能,其基礎概念和使用方法能夠參考另外一篇博文Elasticsearch使用總結,這裏主要介紹些實際生產中的問題和方法:
1)關於集羣配置,重點關注三個參數:第一,discovery.zen.ping.unicast.hosts,Elasticsearch默認使用Zen Discovery來作節點發現機制,推薦使用unicast來作通訊方式,在該配置項中列舉出Master節點。第二,discovery.zen.minimum_master_nodes,該參數表示集羣中可工做的具備Master節點資格的最小數量,默認值是1。爲了提升集羣的可用性,避免腦裂現象(所謂腦裂,就是同一個集羣中的不一樣節點,對集羣的狀態有不一致的理解。),官方推薦設置爲(N/2)+1,其中N是具備Master資格的節點的數量。第三,discovery.zen.ping_timeout,表示節點在發現過程當中的等待時間,默認值是3秒,能夠根據自身網絡環境進行調整,必定程度上提供可用性。

discovery.zen.ping.unicast.hosts: ["master1", "master2", "master3"] 
discovery.zen.minimum_master_nodes: 2
discovery.zen.ping_timeout: 10

2)關於集羣節點,第一,節點類型包括:候選Master節點、數據節點和Client節點。經過設置兩個配置項node.master和node.data爲true或false,來決定將一個節點分配爲何類型的節點。第二,儘可能將候選Master節點和Data節點分離開,一般Data節點負載較重,須要考慮單獨部署。
3)關於內存,Elasticsearch默認設置的內存是1GB,對於任何一個業務部署來講,這個都過小了。經過指定ES_HEAP_SIZE環境變量,能夠修改其堆內存大小,服務進程在啓動時候會讀取這個變量,並相應的設置堆的大小。建議設置系統內存的一半給Elasticsearch,可是不要超過32GB。參考官方文檔。
4)關於硬盤空間,Elasticsearch默認將數據存儲在/var/lib/elasticsearch路徑下,隨着數據的增加,必定會出現硬盤空間不夠用的情形,此時就須要給機器掛載新的硬盤,並將Elasticsearch的路徑配置到新硬盤的路徑下。經過「path.data」配置項來進行設置,好比「path.data: /data1,/var/lib/elasticsearch,/data」。須要注意的是,同一分片下的數據只能寫入到一個路徑下,所以仍是須要合理的規劃和監控硬盤的使用。
5)關於Index的劃分和分片的個數,這個須要根據數據量來作權衡了,Index能夠按時間劃分,好比每個月一個或者天天一個,在Logstash輸出時進行配置,shard的數量也須要作好控制。
6)關於監控,筆者使用過head和marvel兩個監控插件,head免費,功能相對有限,marvel如今須要收費了。另外,不要在數據節點開啓監控插件。

Kibana
Kibana提供的是數據查詢和顯示的Web服務,有豐富的圖表樣板,能知足大部分的數據可視化需求,這也是不少人選擇ELK的主要緣由之一。UI的操做沒有什麼特別須要介紹的,常用就會熟練,這裏主要介紹常常遇到的三個問題。
a)查詢語法
在Kibana的Discover頁面中,能夠輸入一個查詢條件來查詢所需的數據。查詢條件的寫法使用的是Elasticsearch的Query String語法,而不是Query DSL,參考官方文檔query-string-syntax,這裏列舉其中部分經常使用的:
.單字段的全文檢索,好比搜索args字段中包含first的文檔,寫做 args:first;
.單字段的精確檢索,好比搜索args字段值爲first的文檔,寫做 args: 「first」;
.多個檢索條件的組合,使用 NOT, AND 和 OR 來組合,注意必須是大寫,好比 args:(「first」 OR 「second」) AND NOT agent: 「third」;
.字段是否存在,_exists_:agent表示要求agent字段存在,_missing_:agent表示要求agent字段不存在;
.通配符:用 ? 表示單字母,* 表示任意個字母。
b)錯誤「Discover: Request Timeout after 30000ms」
這個錯誤常常發生在要查詢的數據量比較大的狀況下,此時Elasticsearch須要較長時間才能返回,致使Kibana發生Timeout報錯。解決這個問題的方法,就是在Kibana的配置文件中修改elasticsearch.requestTimeout一項的值,而後重啓Kibana服務便可,注意單位是ms。
c)疑惑「字符串被分解了
常常碰到這樣一個問題:爲何查詢結果的字段值是正確的,但是作圖表時卻發現字段值被分解了,不是想要的結果?以下圖所示的client_agent_info字段。

獲得這樣一個不正確結果的緣由是使用了Analyzed字段來作圖表分析,默認狀況下Elasticsearch會對字符串數據進行分析,創建倒排索引,因此若是對這麼一個字段進行terms聚合,必然會獲得上面所示的錯誤結果了。那麼應該怎麼作纔對?默認狀況下,Elasticsearch還會建立一個相對應的沒有被Analyzed的字段,即帶「.raw」後綴的字段,在這樣的字段上作聚合分析便可。
又會有不少人問這樣的問題:爲何個人Elasticsearch沒有自動建立帶「.raw」後綴的字段?然而在Logstash中輸出數據時,設置index名稱前綴爲「logstash-」就有了這個字段。這個問題的根源是Elasticsearch的dynamic template在搗鬼,dynamic temlate用於指導Elasticsearch如何爲插入的數據自動創建Schema映射關係,默認狀況下,Logstash會在Elasticsearch中創建一個名爲「logstash」的模板,全部前綴爲「logstash-」的index都會參照這個模板來創建映射關係,在該模板中申明瞭要爲每一個字符串數據創建一個額外的帶「.raw」後綴的字段。能夠向Elasticsearch來查詢你的模板,使用API:GET http://localhost:9200/_template。

以上即是對ELK日誌系統的總結介紹,還有一個重要的功能沒有提到,就是如何將日誌數據與自身產品業務的數據融合起來。舉個例子,在nginx日誌中,一般會包含API請求訪問時攜帶的用戶Token信息,因爲Token是有時效性的,咱們須要及時將這些Token轉換成真實的用戶信息存儲下來。這樣的需求一般有兩種實現方式,一種是本身寫一個Logstash filter,而後在Logstash處理數據時調用;另外一種是將Logstash Indexer產生的數據再次輸出到消息隊列中,由咱們本身的腳本程序從消息隊列中取出數據,作相應的業務處理後,輸出到Elasticsearch中。

==================ELK環境部署==================

(0)基礎環境介紹

系統: Centos7.1
防火牆: 關閉
Sellinux: 關閉

機器環境: 兩臺
elk-node1: 192.168.1.160       #master機器
elk-node2:192.168.1.161      #slave機器

註明:
master-slave模式:
master收集到日誌後,會把一部分數據碎片到salve上(隨機的一部分數據);同時,master和slave又都會各自作副本,並把副本放到對方機器上,這樣就保證了數據不會丟失。
若是master宕機了,那麼客戶端在日誌採集配置中將elasticsearch主機指向改成slave,就能夠保證ELK日誌的正常採集和web展現。

==========================================================================
因爲elk-node1和elk-node2兩臺是虛擬機,沒有外網ip,因此訪問須要經過宿主機進行代理轉發實現。

有如下兩種轉發設置:(任選其一)

經過訪問宿主機的19200,19201端口分別轉發到elk-node1,elk-node2的9200端口
經過訪問宿主機的15601端口轉發到elk-node1的5601端口

宿主機:112.110.115.10(內網ip爲192.168.1.7)  (爲了避免讓線上的真實ip暴露,這裏任意給了一個ip作記錄)

a)經過宿主機的haproxy服務進行代理轉發,以下是宿主機上的代理配置:

[root@kvm-server conf]# pwd
/usr/local/haproxy/conf
[root@kvm-server conf]# cat haproxy.cfg
..........
..........
listen node1-9200 0.0.0.0:19200
mode tcp
option tcplog
balance roundrobin
server 192.168.1.160 192.168.1.160:9200 weight 1 check inter 1s rise 2 fall 2

listen node2-9200 0.0.0.0:19201
mode tcp
option tcplog
balance roundrobin
server 192.168.1.161 192.168.1.161:9200 weight 1 check inter 1s rise 2 fall 2

listen node1-5601 0.0.0.0:15601
mode tcp
option tcplog
balance roundrobin
server 192.168.1.160 192.168.1.160:5601 weight 1 check inter 1s rise 2 fall 2

重啓haproxy服務
[root@kvm-server conf]# /etc/init.d/haproxy restart

設置宿主機防火牆
[root@kvm-server conf]# cat /etc/sysconfig/iptables
.........
-A INPUT -p tcp -m state --state NEW -m tcp --dport 19200 -j ACCEPT
-A INPUT -p tcp -m state --state NEW -m tcp --dport 19201 -j ACCEPT
-A INPUT -p tcp -m state --state NEW -m tcp --dport 15601 -j ACCEPT

[root@kvm-server conf]# /etc/init.d/iptables restart

b)經過宿主機的NAT端口轉發實現

[root@kvm-server conf]# iptables -t nat -A PREROUTING -p tcp -m tcp --dport 19200 -j DNAT --to-destination 192.168.1.160:9200
[root@kvm-server conf]# iptables -t nat -A POSTROUTING -d 192.168.1.160/32 -p tcp -m tcp --sport 9200 -j SNAT --to-source 192.168.1.7
[root@kvm-server conf]# iptables -t filter -A INPUT -p tcp -m state --state NEW -m tcp --dport 19200 -j ACCEPT

[root@kvm-server conf]# iptables -t nat -A PREROUTING -p tcp -m tcp --dport 19201 -j DNAT --to-destination 192.168.1.161:9200
[root@kvm-server conf]# iptables -t nat -A POSTROUTING -d 192.168.1.161/32 -p tcp -m tcp --sport 9200 -j SNAT --to-source 192.168.1.7
[root@kvm-server conf]# iptables -t filter -A INPUT -p tcp -m state --state NEW -m tcp --dport 19201 -j ACCEPT

[root@kvm-server conf]# iptables -t nat -A PREROUTING -p tcp -m tcp --dport 15601 -j DNAT --to-destination 192.168.1.160:5601
[root@kvm-server conf]# iptables -t nat -A POSTROUTING -d 192.168.1.160/32 -p tcp -m tcp --sport 5601 -j SNAT --to-source 192.168.1.7
[root@kvm-server conf]# iptables -t filter -A INPUT -p tcp -m state --state NEW -m tcp --dport 15601 -j ACCEPT

[root@kvm-server conf]# service iptables save
[root@kvm-server conf]# service iptables restart

提醒一點:
nat端口轉發設置成功後,/etc/sysconfig/iptables文件裏要註釋掉下面兩行!否則nat轉發會有問題!通常如上面在nat轉發規則設置好並save和restart防火牆以後就會自動在/etc/sysconfig/iptables文件裏刪除掉下面兩行內容了。
[root@kvm-server conf]# vim /etc/sysconfig/iptables
..........
#-A INPUT -j REJECT --reject-with icmp-host-prohibited
#-A FORWARD -j REJECT --reject-with icmp-host-prohibited
[root@linux-node1 ~]# service iptables restart

=============================================================

(1)Elasticsearch安裝配置

基礎環境安裝(elk-node1和elk-node2同時操做)

1)下載並安裝GPG Key
[root@elk-node1 ~]# rpm --import https://packages.elastic.co/GPG-KEY-elasticsearch

2)添加yum倉庫
[root@elk-node1 ~]# vim /etc/yum.repos.d/elasticsearch.repo
[elasticsearch-2.x]
name=Elasticsearch repository for 2.x packages
baseurl=http://packages.elastic.co/elasticsearch/2.x/centos
gpgcheck=1
gpgkey=http://packages.elastic.co/GPG-KEY-elasticsearch
enabled=1

3)安裝elasticsearch
[root@elk-node1 ~]# yum install -y elasticsearch

4)安裝相關測試軟件
#提早先下載安裝epel源:epel-release-latest-7.noarch.rpm,不然yum會報錯:No Package.....
[root@elk-node1 ~]# wget http://dl.fedoraproject.org/pub/epel/epel-release-latest-7.noarch.rpm
[root@elk-node1 ~]# rpm -ivh epel-release-latest-7.noarch.rpm
#安裝Redis
[root@elk-node1 ~]# yum install -y redis
#安裝Nginx
[root@elk-node1 ~]# yum install -y nginx
#安裝java
[root@elk-node1 ~]# yum install -y java

安裝完java後,檢測
[root@elk-node1 ~]# java -version
openjdk version "1.8.0_102"
OpenJDK Runtime Environment (build 1.8.0_102-b14)
OpenJDK 64-Bit Server VM (build 25.102-b14, mixed mode)

配置部署(下面先進行elk-node1的配置)

1)配置修改配置文件
[root@elk-node1 ~]# mkdir -p /data/es-data
[root@elk-node1 ~]# vim /etc/elasticsearch/elasticsearch.yml                               【將裏面內容狀況,配置下面內容】
cluster.name: huanqiu                            # 組名(同一個組,組名必須一致)
node.name: elk-node1                            # 節點名稱,建議和主機名一致
path.data: /data/es-data                         # 數據存放的路徑
path.logs: /var/log/elasticsearch/             # 日誌存放的路徑
bootstrap.mlockall: true                         # 鎖住內存,不被使用到交換分區去
network.host: 0.0.0.0                            # 網絡設置
http.port: 9200                                    # 端口

2)啓動並查看
[root@elk-node1 ~]# chown -R elasticsearch.elasticsearch /data/
[root@elk-node1 ~]# systemctl start elasticsearch
[root@elk-node1 ~]# systemctl status elasticsearch
CGroup: /system.slice/elasticsearch.service
└─3005 /bin/java -Xms256m -Xmx1g -Djava.awt.headless=true -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:CMSI...

注意:上面能夠看出elasticsearch設置的內存最小256m,最大1g

=====================舒適提示:  Elasticsearch啓動出現"could not find java"===================

yum方法安裝elasticsearch, 使用"systemctl start elasticsearch"啓動服務失敗.
"systemctl status elasticsearch"查看, 發現報錯說could not find java
可是"java -version" 查看發現java已經安裝了

這是由於elasticsearch在啓動過程當中, 引用的java路徑找不到

解決辦法: 在elasticsearch配置文件中定義java全路徑

[root@elk-node01 ~]# java -version
java version "1.8.0_131"
Java(TM) SE Runtime Environment (build 1.8.0_131-b11)
Java HotSpot(TM) 64-Bit Server VM (build 25.131-b11, mixed mode

[root@elk-node01 ~]# find / -name java
/var/lib/alternatives/java
/usr/share/swig/2.0.10/java
/usr/java
/usr/java/jdk1.8.0_131/bin/java
/usr/java/jdk1.8.0_131/jre/bin/java
/usr/bin/java
/etc/pki/java
/etc/pki/ca-trust/extracted/java
/etc/alternatives/java

[root@elk-node01 ~]# vim /etc/sysconfig/elasticsearch
添加JAVA_HOME環境變量的配置
JAVA_HOME=/usr/java/jdk1.8.0_131

[root@linux-node1 src]# netstat -antlp |egrep "9200|9300"
tcp6 0 0 :::9200 :::* LISTEN 3005/java
tcp6 0 0 :::9300 :::* LISTEN 3005/java

而後經過web訪問(訪問的瀏覽器最好用google瀏覽器)

http://112.110.115.10:19200/

3)經過命令的方式查看數據(在112.110.115.10宿主機或其餘外網服務器上查看,以下)
[root@kvm-server src]# curl -i -XGET 'http://192.168.1.160:9200/_count?pretty' -d '{"query":{"match_all":{}}}'
HTTP/1.1 200 OK
Content-Type: application/json; charset=UTF-8
Content-Length: 95

{
"count" : 0,
"_shards" : {
"total" : 0,
"successful" : 0,
"failed" : 0
}
}

這樣感受用命令來查看,特別的不爽。

4)接下來安裝插件,使用插件進行查看~  (下面兩個插件要在elk-node1和elk-node2上都要安裝)
4.1)安裝head插件
==================================================================
a)插件安裝方法一
[root@elk-node1 src]# /usr/share/elasticsearch/bin/plugin install mobz/elasticsearch-head

b)插件安裝方法二
首先下載head插件,下載到/usr/loca/src目錄下
下載地址:https://github.com/mobz/elasticsearch-head

======================================================
head插件包百度雲盤下載:https://pan.baidu.com/s/1boBE0qj
提取密碼:ifj7
======================================================

[root@elk-node1 src]# unzip elasticsearch-head-master.zip
[root@elk-node1 src]# ls
elasticsearch-head-master elasticsearch-head-master.zip

在/usr/share/elasticsearch/plugins目錄下建立head目錄
而後將上面下載的elasticsearch-head-master.zip解壓後的文件都移到/usr/share/elasticsearch/plugins/head下
接着重啓elasticsearch服務便可!
[root@elk-node1 src]# cd /usr/share/elasticsearch/plugins/
[root@elk-node1 plugins]# mkdir head
[root@elk-node1 plugins]# ls
head
[root@elk-node1 plugins]# cd head
[root@elk-node1 head]# cp -r /usr/local/src/elasticsearch-head-master/* ./
[root@elk-node1 head]# pwd
/usr/share/elasticsearch/plugins/head

[root@elk-node1 head]# chown -R elasticsearch:elasticsearch /usr/share/elasticsearch/plugins
[root@elk-node1 head]# ll
total 40
-rw-r--r--. 1 elasticsearch elasticsearch 104 Sep 28 01:57 elasticsearch-head.sublime-project
-rw-r--r--. 1 elasticsearch elasticsearch 2171 Sep 28 01:57 Gruntfile.js
-rw-r--r--. 1 elasticsearch elasticsearch 3482 Sep 28 01:57 grunt_fileSets.js
-rw-r--r--. 1 elasticsearch elasticsearch 1085 Sep 28 01:57 index.html
-rw-r--r--. 1 elasticsearch elasticsearch 559 Sep 28 01:57 LICENCE
-rw-r--r--. 1 elasticsearch elasticsearch 795 Sep 28 01:57 package.json
-rw-r--r--. 1 elasticsearch elasticsearch 100 Sep 28 01:57 plugin-descriptor.properties
-rw-r--r--. 1 elasticsearch elasticsearch 5211 Sep 28 01:57 README.textile
drwxr-xr-x. 5 elasticsearch elasticsearch 4096 Sep 28 01:57 _site
drwxr-xr-x. 4 elasticsearch elasticsearch 29 Sep 28 01:57 src
drwxr-xr-x. 4 elasticsearch elasticsearch 66 Sep 28 01:57 test

[root@elk-node1 _site]# systemctl restart elasticsearch
=========================================================================

插件訪問(最好提早將elk-node2節點的配置和插件都安裝後,再來進行訪問和數據插入測試)
http://112.110.115.10:19200/_plugin/head/

先插入數據實例,測試下
以下:打開」複合查詢「,在POST選項下,任意輸入如/index-demo/test,而後在下面輸入數據(注意內容之間換行的逗號不要漏掉)
數據輸入好以後(以下輸入wangshibo;hello world內容),下面點擊」驗證JSON「->」提交請求「,提交成功後,觀察右欄裏出現的信息:有index,type,version等信息,failed:0(成功消息)

再查看測試實例,以下:
"複合查詢"下,選擇GET選項在/index-demo/test/後面輸入上面POST結果中的id號不輸入內容,即{}括號裏爲空
而後點擊」驗證JSON「->"提交請求",觀察右欄內就有了上面插入的數據了(即wangshibo,hello world)

打開"基本查詢",查看下數據,以下,便可查詢到上面插入的數據:

打開「數據瀏覽」,也能查看到插入的數據:

以下:必定要提早在elk-node2節點上也完成配置(配置內容在下面提到),不然上面插入數據後,集羣狀態會呈現黃色yellow狀態,elk-node2完成配置加入到集羣裏後就會恢復到正常的綠色狀態。

4.2)安裝kopf監控插件
==========================================================================

a)監控插件安裝方法一

[root@elk-node1 src]# /usr/share/elasticsearch/bin/plugin install lmenezes/elasticsearch-kopf

b)監控插件安裝方法二

首先下載監控插件kopf,下載到/usr/loca/src目錄下
下載地址:https://github.com/lmenezes/elasticsearch-kopf

====================================================
kopf插件包百度雲盤下載:https://pan.baidu.com/s/1qYixSL2
提取密碼:ya4t
===================================================

[root@elk-node1 src]# unzip elasticsearch-kopf-master.zip
[root@elk-node1 src]# ls
elasticsearch-kopf-master elasticsearch-kopf-master.zip

在/usr/share/elasticsearch/plugins目錄下建立kopf目錄
而後將上面下載的elasticsearch-kopf-master.zip解壓後的文件都移到/usr/share/elasticsearch/plugins/kopf下
接着重啓elasticsearch服務便可!
[root@elk-node1 src]# cd /usr/share/elasticsearch/plugins/
[root@elk-node1 plugins]# mkdir kopf
[root@elk-node1 plugins]# cd kopf
[root@elk-node1 kopf]# cp -r /usr/local/src/elasticsearch-kopf-master/* ./
[root@elk-node1 kopf]# pwd
/usr/share/elasticsearch/plugins/kopf

[root@elk-node1 kopf]# chown -R elasticsearch:elasticsearch /usr/share/elasticsearch/plugins
[root@elk-node1 kopf]# ll
total 40
-rw-r--r--. 1 elasticsearch elasticsearch 237 Sep 28 16:28 CHANGELOG.md
drwxr-xr-x. 2 elasticsearch elasticsearch 22 Sep 28 16:28 dataset
drwxr-xr-x. 2 elasticsearch elasticsearch 73 Sep 28 16:28 docker
-rw-r--r--. 1 elasticsearch elasticsearch 4315 Sep 28 16:28 Gruntfile.js
drwxr-xr-x. 2 elasticsearch elasticsearch 4096 Sep 28 16:28 imgs
-rw-r--r--. 1 elasticsearch elasticsearch 1083 Sep 28 16:28 LICENSE
-rw-r--r--. 1 elasticsearch elasticsearch 1276 Sep 28 16:28 package.json
-rw-r--r--. 1 elasticsearch elasticsearch 102 Sep 28 16:28 plugin-descriptor.properties
-rw-r--r--. 1 elasticsearch elasticsearch 3165 Sep 28 16:28 README.md
drwxr-xr-x. 6 elasticsearch elasticsearch 4096 Sep 28 16:28 _site
drwxr-xr-x. 4 elasticsearch elasticsearch 27 Sep 28 16:28 src
drwxr-xr-x. 4 elasticsearch elasticsearch 4096 Sep 28 16:28 tests

[root@elk-node1 _site]# systemctl restart elasticsearch

============================================================================

訪問插件:(以下,一樣要提早安裝好elk-node2節點上的插件,不然訪問時會出現集羣節點爲黃色的yellow告警狀態)

http://112.110.115.10:19200/_plugin/kopf/#!/cluster

*************************************************************************
下面進行節點elk-node2的配置  (如上的兩個插件也在elk-node2上一樣安裝)

註釋:其實兩個的安裝配置基本上是同樣的。

[root@elk-node2 src]# mkdir -p /data/es-data
[root@elk-node2 ~]# cat /etc/elasticsearch/elasticsearch.yml
cluster.name: huanqiu
node.name: elk-node2
path.data: /data/es-data
path.logs: /var/log/elasticsearch/
bootstrap.mlockall: true
network.host: 0.0.0.0
http.port: 9200
discovery.zen.ping.multicast.enabled: false
discovery.zen.ping.unicast.hosts: ["192.168.1.160", "192.168.1.161"]

# 修改權限配置
[root@elk-node2 src]# chown -R elasticsearch.elasticsearch /data/

# 啓動服務
[root@elk-node2 src]# systemctl start elasticsearch
[root@elk-node2 src]# systemctl status elasticsearch
● elasticsearch.service - Elasticsearch
Loaded: loaded (/usr/lib/systemd/system/elasticsearch.service; enabled; vendor preset: disabled)
Active: active (running) since Wed 2016-09-28 16:49:41 CST; 1 weeks 3 days ago
Docs: http://www.elastic.co
Process: 17798 ExecStartPre=/usr/share/elasticsearch/bin/elasticsearch-systemd-pre-exec (code=exited, status=0/SUCCESS)
Main PID: 17800 (java)
CGroup: /system.slice/elasticsearch.service
└─17800 /bin/java -Xms256m -Xmx1g -Djava.awt.headless=true -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFra...

Oct 09 13:42:22 elk-node2 elasticsearch[17800]: [2016-10-09 13:42:22,295][WARN ][transport ] [elk-node2] Transport res...943817]
Oct 09 13:42:23 elk-node2 elasticsearch[17800]: [2016-10-09 13:42:23,111][WARN ][transport ] [elk-node2] Transport res...943846]
................
................

# 查看端口
[root@elk-node2 src]# netstat -antlp|egrep "9200|9300"
tcp6 0 0 :::9200 :::* LISTEN 2928/java
tcp6 0 0 :::9300 :::* LISTEN 2928/java
tcp6 0 0 127.0.0.1:48200 127.0.0.1:9300 TIME_WAIT -
tcp6 0 0 ::1:41892 ::1:9300 TIME_WAIT -
*************************************************************************

經過命令的方式查看elk-node2數據(在112.110.115.10宿主機或其餘外網服務器上查看,以下)
[root@kvm-server ~]# curl -i -XGET 'http://192.168.1.161:9200/_count?pretty' -d '{"query":{"match_all":{}}}'
HTTP/1.1 200 OK
Content-Type: application/json; charset=UTF-8
Content-Length: 95

{
"count" : 1,
"_shards" : {
"total" : 5,
"successful" : 5,
"failed" : 0
}

而後經過web訪問elk-node2
http://112.110.115.10:19201/

訪問兩個插件:
http://112.110.115.10:19201/_plugin/head/
http://112.110.115.10:19201/_plugin/kopf/#!/cluster

 

 (2)Logstash安裝配置(這個在客戶機上是要安裝的。elk-node1和elk-node2都安裝)

基礎環境安裝(客戶端安裝logstash,收集到的數據寫入到elasticsearch裏,就能夠登錄logstash界面查看到了

1)下載並安裝GPG Key
[root@elk-node1 ~]# rpm --import https://packages.elastic.co/GPG-KEY-elasticsearch

2)添加yum倉庫
[root@hadoop-node1 ~]# vim /etc/yum.repos.d/logstash.repo
[logstash-2.1]
name=Logstash repository for 2.1.x packages
baseurl=http://packages.elastic.co/logstash/2.1/centos
gpgcheck=1
gpgkey=http://packages.elastic.co/GPG-KEY-elasticsearch
enabled=1

3)安裝logstash
[root@elk-node1 ~]# yum install -y logstash

4)logstash啓動
[root@elk-node1 ~]# systemctl start elasticsearch
[root@elk-node1 ~]# systemctl status elasticsearch
● elasticsearch.service - Elasticsearch
Loaded: loaded (/usr/lib/systemd/system/elasticsearch.service; disabled; vendor preset: disabled)
Active: active (running) since Mon 2016-11-07 18:33:28 CST; 3 days ago
Docs: http://www.elastic.co
Main PID: 8275 (java)
CGroup: /system.slice/elasticsearch.service
└─8275 /bin/java -Xms256m -Xmx1g -Djava.awt.headless=true -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFrac...
..........
..........

數據的測試

1)基本的輸入輸出
[root@elk-node1 ~]# /opt/logstash/bin/logstash -e 'input { stdin{} } output { stdout{} }'
Settings: Default filter workers: 1
Logstash startup completed
hello                                                                                     #輸入這個
2016-11-11T06:41:07.690Z elk-node1 hello                        #輸出這個
wangshibo                                                                            #輸入這個
2016-11-11T06:41:10.608Z elk-node1 wangshibo               #輸出這個

2)使用rubydebug詳細輸出
[root@elk-node1 ~]# /opt/logstash/bin/logstash -e 'input { stdin{} } output { stdout{ codec => rubydebug} }'
Settings: Default filter workers: 1
Logstash startup completed
hello                                                                                    #輸入這個
{                                                                                         #輸出下面信息
           "message" => "hello",
           "@version" => "1",
      "@timestamp" => "2016-11-11T06:44:06.711Z",
                  "host" => "elk-node1"
}
wangshibo                                                                         #輸入這個
{                                                                                       #輸出下面信息
         "message" => "wangshibo",
        "@version" => "1",
   "@timestamp" => "2016-11-11T06:44:11.270Z",
               "host" => "elk-node1"
}

3) 把內容寫到elasticsearch中
[root@elk-node1 ~]# /opt/logstash/bin/logstash -e 'input { stdin{} } output { elasticsearch { hosts => ["192.168.1.160:9200"]} }'
Settings: Default filter workers: 1
Logstash startup completed                       #輸入下面的測試數據
123456
wangshibo
huanqiu
hahaha

使用rubydebug和寫到elasticsearch中的區別:其實就在於後面標準輸出的區別,前者使用codec;後者使用elasticsearch

寫到elasticsearch中在logstash中查看,以下圖:
注意:
master收集到日誌後,會把一部分數據碎片到salve上(隨機的一部分數據),master和slave又都會各自作副本,並把副本放到對方機器上,這樣就保證了數據不會丟失。
以下,master收集到的數據放到了本身的第1,3分片上,其餘的放到了slave的第0,2,4分片上。

4)即寫到elasticsearch中又寫在文件中一份
[root@elk-node1 ~]# /opt/logstash/bin/logstash -e 'input { stdin{} } output { elasticsearch { hosts => ["192.168.1.160:9200"]} stdout{ codec => rubydebug}}'
Settings: Default filter workers: 1
Logstash startup completed
huanqiupc
{
           "message" => "huanqiupc",
          "@version" => "1",
     "@timestamp" => "2016-11-11T07:27:42.012Z",
                 "host" => "elk-node1"
}
wangshiboqun
{
         "message" => "wangshiboqun",
        "@version" => "1",
   "@timestamp" => "2016-11-11T07:27:55.396Z",
               "host" => "elk-node1"
}

以上文本能夠長期保留、操做簡單、壓縮比大。下面登錄elasticsearch界面中查看;

 logstash的配置和文件的編寫

1)logstash的配置
簡單的配置方式:
[root@elk-node1 ~]# vim /etc/logstash/conf.d/01-logstash.conf
input { stdin { } }
output {
        elasticsearch { hosts => ["192.168.1.160:9200"]}
        stdout { codec => rubydebug }
}

它的執行:
[root@elk-node1 ~]# /opt/logstash/bin/logstash -f /etc/logstash/conf.d/01-logstash.conf
Settings: Default filter workers: 1
Logstash startup completed
beijing                                                #輸入內容
{                                                       #輸出下面信息
             "message" => "beijing",
            "@version" => "1",
       "@timestamp" => "2016-11-11T07:41:48.401Z",
                   "host" => "elk-node1"
}

===============================================================
參考內容:
https://www.elastic.co/guide/en/logstash/current/configuration.html
https://www.elastic.co/guide/en/logstash/current/configuration-file-structure.html
===============================================================

2)收集系統日誌

[root@elk-node1 ~]# vim  file.conf
input {
    file {
      path => "/var/log/messages"
      type => "system"
      start_position => "beginning"
    }
}

output {
    elasticsearch {
       hosts => ["192.168.1.160:9200"]
       index => "system-%{+YYYY.MM.dd}"
    }
}

執行上面日誌信息的收集,以下,這個命令會一直在執行中,表示日誌在監控收集中;若是中斷,就表示日誌不在收集!因此須要放在後臺執行~
[root@elk-node1 ~]# /opt/logstash/bin/logstash -f file.conf &

登錄elasticsearch界面,查看本機系統日誌的信息:

 

 

================================================================
參考內容:
https://www.elastic.co/guide/en/logstash/current/plugins-outputs-elasticsearch.html
================================================================

3)收集java日誌,其中包含上面講到的日誌收集

[root@elk-node1 ~]# vim  file.conf
input {
    file {
      path => "/var/log/messages"
      type => "system"
      start_position => "beginning"
    }
}

input {
    file {
       path => "/var/log/elasticsearch/huanqiu.log"
       type => "es-error" 
       start_position => "beginning"
    }
}


output {

    if [type] == "system"{
        elasticsearch {
           hosts => ["192.168.1.160:9200"]
           index => "system-%{+YYYY.MM.dd}"
        }
    }

    if [type] == "es-error"{
        elasticsearch {
           hosts => ["192.168.1.160:9200"]
           index => "es-error-%{+YYYY.MM.dd}"
        }
    }
}

注意:
若是你的日誌中有type字段 那你就不能在conf文件中使用type

執行以下命令收集:
[root@elk-node1 ~]# /opt/logstash/bin/logstash -f file.conf &

登錄elasticsearch界面,查看數據:

====================================================================
參考內容:
https://www.elastic.co/guide/en/logstash/current/event-dependent-configuration.html
====================================================================

有個問題:
每一個報錯都給收集成一行了,不是按照一個報錯,一個事件模塊收集的。

下面將行換成事件的方式展現:

[root@elk-node1 ~]# vim multiline.conf
input {
    stdin {
       codec => multiline {
          pattern => "^\["
          negate => true
          what => "previous"
        }
    }
}
output {
    stdout {
      codec => "rubydebug"
     }  
}

執行命令:

[root@elk-node1 ~]# /opt/logstash/bin/logstash -f multiline.conf 
Settings: Default filter workers: 1
Logstash startup completed
123
456
[123
{
    "@timestamp" => "2016-11-11T09:28:56.824Z",
       "message" => "123\n456",
      "@version" => "1",
          "tags" => [
        [0] "multiline"
    ],
          "host" => "elk-node1"
}
123]
[456]
{
    "@timestamp" => "2016-11-11T09:29:09.043Z",
       "message" => "[123\n123]",
      "@version" => "1",
          "tags" => [
        [0] "multiline"
    ],
          "host" => "elk-node1"
}

在沒有遇到[的時候,系統不會收集,只有碰見[的時候,纔算是一個事件,才收集起來。
======================================================================
參考內容
https://www.elastic.co/guide/en/logstash/current/plugins-codecs-multiline.html
======================================================================

(3)Kibana安裝配置

1)kibana的安裝:
[root@elk-node1 ~]# cd /usr/local/src
[root@elk-node1 src]# wget https://download.elastic.co/kibana/kibana/kibana-4.3.1-linux-x64.tar.gz
[root@elk-node1 src]# tar zxf kibana-4.3.1-linux-x64.tar.gz
[root@elk-node1 src]# mv kibana-4.3.1-linux-x64 /usr/local/
[root@elk-node1 src]# ln -s /usr/local/kibana-4.3.1-linux-x64/ /usr/local/kibana

2)修改配置文件:
[root@elk-node1 config]# pwd
/usr/local/kibana/config
[root@elk-node1 config]# cp kibana.yml kibana.yml.bak
[root@elk-node1 config]# vim kibana.yml
server.port: 5601
server.host: "0.0.0.0"
elasticsearch.url: "http://192.168.1.160:9200"
kibana.index: ".kibana"        #注意這個.Kibana索引用來存儲數據,千萬不要刪除了它。它是將es數據經過kibana進行web展現的關鍵。這個配置後,在es的web界面裏就會看到這個.kibana索引。

由於他一直運行在前臺,要麼選擇開一個窗口,要麼選擇使用screen。
安裝並使用screen啓動kibana:
[root@elk-node1 ~]# yum -y install screen
[root@elk-node1 ~]# screen                          #這樣就另開啓了一個終端窗口
[root@elk-node1 ~]# /usr/local/kibana/bin/kibana
log [18:23:19.867] [info][status][plugin:kibana] Status changed from uninitialized to green - Ready
log [18:23:19.911] [info][status][plugin:elasticsearch] Status changed from uninitialized to yellow - Waiting for Elasticsearch
log [18:23:19.941] [info][status][plugin:kbn_vislib_vis_types] Status changed from uninitialized to green - Ready
log [18:23:19.953] [info][status][plugin:markdown_vis] Status changed from uninitialized to green - Ready
log [18:23:19.963] [info][status][plugin:metric_vis] Status changed from uninitialized to green - Ready
log [18:23:19.995] [info][status][plugin:spyModes] Status changed from uninitialized to green - Ready
log [18:23:20.004] [info][status][plugin:statusPage] Status changed from uninitialized to green - Ready
log [18:23:20.010] [info][status][plugin:table_vis] Status changed from uninitialized to green - Ready

而後按ctrl+a+d組合鍵,這樣在上面另啓的screen屏裏啓動的kibana服務就一直運行在前臺了....
[root@elk-node1 ~]# screen -ls
There is a screen on:
15041.pts-0.elk-node1 (Detached)
1 Socket in /var/run/screen/S-root.

(3)訪問kibana:http://112.110.115.10:15601/
以下,若是是添加上面設置的java日誌收集信息,則在下面填寫es-error*;若是是添加上面設置的系統日誌信息system*,以此類型(能夠從logstash界面看到日誌收集項)

 而後點擊上面的Discover,在Discover中查看:

查看日誌登錄,須要點擊「Discover」-->"message",點擊它後面的「add」
注意:
須要右邊查看日誌內容時帶什麼屬性,就在左邊點擊相應屬性後面的「add」
以下圖,添加了message和path的屬性:

這樣,右邊顯示的日誌內容的屬性就帶了message和path

點擊右邊日誌內容屬性後面隱藏的<<,就可將內容向前縮進

添加新的日誌採集項,點擊Settings->+Add New,好比添加system系統日誌。注意後面的*不要忘了。

 

 

刪除kibana裏的日誌採集項,以下,點擊刪除圖標便可。

 

若是打開kibana查看日誌,發現沒有日誌內容,出現「No results found」,以下圖所示,這說明要查看的日誌在當前時間沒有日誌信息輸出,能夠點擊右上角的時間鍾來調試日誌信息的查看。

 

4)收集nginx的訪問日誌

修改nginx的配置文件,分別在nginx.conf的http和server配置區域添加下面內容:

##### http 標籤中
          log_format json '{"@timestamp":"$time_iso8601",'
                           '"@version":"1",'
                           '"client":"$remote_addr",'
                           '"url":"$uri",'
                           '"status":"$status",'
                           '"domain":"$host",'
                           '"host":"$server_addr",'
                           '"size":$body_bytes_sent,'
                           '"responsetime":$request_time,'
                           '"referer": "$http_referer",'
                           '"ua": "$http_user_agent"'
'}';
##### server標籤中
            access_log /var/log/nginx/access_json.log json;

 

截圖以下:

啓動nginx服務:

[root@elk-node1 ~]# systemctl start nginx
[root@elk-node1 ~]# systemctl status nginx
● nginx.service - The nginx HTTP and reverse proxy server
   Loaded: loaded (/usr/lib/systemd/system/nginx.service; disabled; vendor preset: disabled)
   Active: active (running) since Fri 2016-11-11 19:06:55 CST; 3s ago
  Process: 15119 ExecStart=/usr/sbin/nginx (code=exited, status=0/SUCCESS)
  Process: 15116 ExecStartPre=/usr/sbin/nginx -t (code=exited, status=0/SUCCESS)
  Process: 15114 ExecStartPre=/usr/bin/rm -f /run/nginx.pid (code=exited, status=0/SUCCESS)
 Main PID: 15122 (nginx)
   CGroup: /system.slice/nginx.service
           ├─15122 nginx: master process /usr/sbin/nginx
           ├─15123 nginx: worker process
           └─15124 nginx: worker process

Nov 11 19:06:54 elk-node1 systemd[1]: Starting The nginx HTTP and reverse proxy server...
Nov 11 19:06:55 elk-node1 nginx[15116]: nginx: the configuration file /etc/nginx/nginx.conf syntax is ok
Nov 11 19:06:55 elk-node1 nginx[15116]: nginx: configuration file /etc/nginx/nginx.conf test is successful
Nov 11 19:06:55 elk-node1 systemd[1]: Started The nginx HTTP and reverse proxy server.

編寫收集文件
此次使用json的方式收集:

[root@elk-node1 ~]# vim json.conf 
input {
   file {
      path => "/var/log/nginx/access_json.log"
      codec => "json"
   }
}

output {
   stdout {
      codec => "rubydebug"
   }
}

啓動日誌收集程序:
[root@elk-node1 ~]# /opt/logstash/bin/logstash -f json.conf        #或加個&放在後臺執行

訪問nginx頁面(在elk-node1的宿主機上執行訪問頁面的命令:curl http://192.168.1.160)就會出現如下內容:

[root@elk-node1 ~]# /opt/logstash/bin/logstash -f json.conf
Settings: Default filter workers: 1
Logstash startup completed
{
      "@timestamp" => "2016-11-11T11:10:53.000Z",
        "@version" => "1",
          "client" => "192.168.1.7",
             "url" => "/index.html",
          "status" => "200",
          "domain" => "192.168.1.160",
            "host" => "192.168.1.160",
            "size" => 3700,
    "responsetime" => 0.0,
         "referer" => "-",
              "ua" => "curl/7.19.7 (x86_64-redhat-linux-gnu) libcurl/7.19.7 NSS/3.14.0.0 zlib/1.2.3 libidn/1.18 libssh2/1.4.2",
            "path" => "/var/log/nginx/access_json.log"
}

注意:
上面的json.conf配置只是將nginx日誌輸出,尚未輸入到elasticsearch裏,因此這個時候在elasticsearch界面裏是採集不到nginx日誌的。

須要配置一下,將nginx日誌輸入到elasticsearch中,將其彙總到總文件file.conf裏,以下也將nginx-log日誌輸入到elasticserach裏:(後續就能夠只用這個彙總文件,把要追加的日誌彙總到這個總文件裏便可)

[root@elk-node1 ~]# cat file.conf 
input {
    file {
      path => "/var/log/messages"
      type => "system"
      start_position => "beginning"
    }

    file {
       path => "/var/log/elasticsearch/huanqiu.log"
       type => "es-error" 
       start_position => "beginning"
       codec => multiline {
           pattern => "^\["
           negate => true
           what => "previous"
       }
    }
    file {
       path => "/var/log/nginx/access_json.log"
       codec => json
       start_position => "beginning"
       type => "nginx-log"
    }
}


output {

    if [type] == "system"{
        elasticsearch {
           hosts => ["192.168.1.160:9200"]
           index => "system-%{+YYYY.MM.dd}"
        }
    }

    if [type] == "es-error"{
        elasticsearch {
           hosts => ["192.168.1.160:9200"]
           index => "es-error-%{+YYYY.MM.dd}"
        }
    }
    if [type] == "nginx-log"{
        elasticsearch {
           hosts => ["192.168.1.160:9200"]
           index => "nignx-log-%{+YYYY.MM.dd}"
        }
    }
}

能夠加上--configtest參數,測試下配置文件是否有語法錯誤或配置不當的地方,這個很重要!!
[root@elk-node1 ~]# /opt/logstash/bin/logstash -f file.conf --configtest
Configuration OK

而後接着執行logstash命令(因爲上面已經將這個執行命令放到了後臺,因此這裏其實不用執行,也能夠先kill以前的,再放後臺執行),而後能夠再訪問nginx界面測試下
[root@elk-node1 ~]# /opt/logstash/bin/logstash -f file.conf &

登錄elasticsearch界面查看:

 將nginx日誌整合到kibana界面裏,以下:

5)收集系統日誌

編寫收集文件並執行。

[root@elk-node1 ~]# cat syslog.conf
input {
    syslog {
        type => "system-syslog"
        host => "192.168.1.160"
        port => "514"
    }
}

output {
    stdout {
        codec => "rubydebug"
    }
}

對上面的採集文件進行執行:
[root@elk-node1 ~]# /opt/logstash/bin/logstash -f syslog.conf

從新開啓一個窗口,查看服務是否啓動:
[root@elk-node1 ~]# netstat -ntlp|grep 514
tcp6 0 0 192.168.1.160:514 :::* LISTEN 17842/java
[root@elk-node1 ~]# vim /etc/rsyslog.conf
#*.* @@remote-host:514                                                           【在此行下面添加以下內容】
*.* @@192.168.1.160:514

[root@elk-node1 ~]# systemctl restart rsyslog

回到原來的窗口(即上面採集文件的執行終端),就會出現數據:

[root@elk-node1 ~]# /opt/logstash/bin/logstash -f syslog.conf
Settings: Default filter workers: 1
Logstash startup completed
{
           "message" => "Stopping System Logging Service...\n",
          "@version" => "1",
        "@timestamp" => "2016-11-13T10:35:30.000Z",
              "type" => "system-syslog",
              "host" => "192.168.1.160",
          "priority" => 30,
         "timestamp" => "Nov 13 18:35:30",
         "logsource" => "elk-node1",
           "program" => "systemd",
          "severity" => 6,
          "facility" => 3,
    "facility_label" => "system",
    "severity_label" => "Informational"
}
........
........

再次添加到總文件file.conf中:

[root@elk-node1 ~]# cat file.conf
input {
    file {
      path => "/var/log/messages"
      type => "system"
      start_position => "beginning"
    }

    file {
       path => "/var/log/elasticsearch/huanqiu.log"
       type => "es-error" 
       start_position => "beginning"
       codec => multiline {
           pattern => "^\["
           negate => true
           what => "previous"
       }
    }
    file {
       path => "/var/log/nginx/access_json.log"
       codec => json
       start_position => "beginning"
       type => "nginx-log"
    }
    syslog {
        type => "system-syslog"
        host => "192.168.1.160"
        port => "514"
    }
}


output {

    if [type] == "system"{
        elasticsearch {
           hosts => ["192.168.1.160:9200"]
           index => "system-%{+YYYY.MM.dd}"
        }
    }

    if [type] == "es-error"{
        elasticsearch {
           hosts => ["192.168.1.160:9200"]
           index => "es-error-%{+YYYY.MM.dd}"
        }
    }
    if [type] == "nginx-log"{
        elasticsearch {
           hosts => ["192.168.1.160:9200"]
           index => "nignx-log-%{+YYYY.MM.dd}"
        }
    }
    if [type] == "system-syslog"{
        elasticsearch {
           hosts => ["192.168.1.160:9200"]
           index => "system-syslog-%{+YYYY.MM.dd}"
        }
    }
}

執行總文件(先測試下總文件配置是否有誤,而後先kill以前在後臺啓動的file.conf文件,再次執行):
[root@elk-node1 ~]# /opt/logstash/bin/logstash -f file.conf --configtest
Configuration OK
[root@elk-node1 ~]# /opt/logstash/bin/logstash -f file.conf &

測試:
向日志中添加數據,看elasticsearch和kibana的變化:
[root@elk-node1 ~]# logger "hehehehehehe1"
[root@elk-node1 ~]# logger "hehehehehehe2"
[root@elk-node1 ~]# logger "hehehehehehe3"
[root@elk-node1 ~]# logger "hehehehehehe4"
[root@elk-node1 ~]# logger "hehehehehehe5"

添加到kibana界面中:

 

6)TCP日誌的收集

編寫日誌收集文件,並執行:(有須要的話,能夠將下面收集文件的配置彙總到上面的總文件file.conf裏,進而輸入到elasticsearch界面裏和kibana裏查看)
[root@elk-node1 ~]# cat tcp.conf
input {
tcp {
host => "192.168.1.160"
port => "6666"
}
}
output {
stdout {
codec => "rubydebug"
}
}

[root@elk-node1 ~]# /opt/logstash/bin/logstash -f tcp.conf

開啓另一個窗口,測試一(安裝nc命令:yum install -y nc):
[root@elk-node1 ~]# nc 192.168.1.160 6666 </etc/resolv.conf

回到原來的窗口(即上面採集文件的執行終端),就會出現數據:
[root@elk-node1 ~]# /opt/logstash/bin/logstash -f tcp.conf
Settings: Default filter workers: 1
Logstash startup completed
{
        "message" => "",
       "@version" => "1",
   "@timestamp" => "2016-11-13T11:01:15.280Z",
              "host" => "192.168.1.160",
              "port" => 49743
}

測試二:
[root@elk-node1 ~]# echo "hehe" | nc 192.168.1.160 6666
[root@elk-node1 ~]# echo "hehe" > /dev/tcp/192.168.1.160/6666

回到以前的執行端口,在去查看,就會顯示出來:

[root@elk-node1 ~]# /opt/logstash/bin/logstash -f tcp.conf 
Settings: Default filter workers: 1
Logstash startup completed
....... { "message" => "hehe", "@version" => "1", "@timestamp" => "2016-11-13T11:39:58.263Z", "host" => "192.168.1.160", "port" => 53432 } { "message" => "hehe", "@version" => "1", "@timestamp" => "2016-11-13T11:40:13.458Z", "host" => "192.168.1.160", "port" => 53457 }

7)使用filter
編寫文件:

[root@elk-node1 ~]# cat grok.conf
input {
    stdin{}
}
filter {
  grok {
    match => { "message" => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}" }
  }
}
output {
    stdout{
        codec => "rubydebug"
    }
}

執行檢測:

[root@elk-node1 ~]# /opt/logstash/bin/logstash -f grok.conf 
Settings: Default filter workers: 1
Logstash startup completed
55.3.244.1 GET /index.html 15824 0.043                    #輸入這個,下面就會自動造成字典的形式
{
       "message" => "55.3.244.1 GET /index.html 15824 0.043",
      "@version" => "1",
    "@timestamp" => "2016-11-13T11:45:47.882Z",
          "host" => "elk-node1",
        "client" => "55.3.244.1",
        "method" => "GET",
       "request" => "/index.html",
         "bytes" => "15824",
      "duration" => "0.043"
}

其實上面使用的那些變量在程序中都有定義:

[root@elk-node1 ~]# cd /opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-patterns-core-2.0.2/patterns/
[root@elk-node1 patterns]# ls
aws     bro   firewalls      haproxy  junos         mcollective           mongodb  postgresql  redis
bacula  exim  grok-patterns  java     linux-syslog  mcollective-patterns  nagios   rails       ruby
[root@elk-node1 patterns]# cat grok-patterns
filter {
      # drop sleep events
    grok {
        match => { "message" =>"SELECT SLEEP" }
        add_tag => [ "sleep_drop" ]
        tag_on_failure => [] # prevent default _grokparsefailure tag on real records
      }
     if "sleep_drop" in [tags] {
        drop {}
     }
     grok {
        match => [ "message", "(?m)^# User@Host: %{USER:user}\[[^\]]+\] @ (?:(?<clienthost>\S*) )?\[(?:%{IP:clientip})?\]\s+Id: %{NUMBER:row_id:int}\s*# Query_time: %{NUMBER:query_time:float}\s+Lock_time: %{NUMBER:lock_time:float}\s+Rows_sent: %{NUMBER:rows_sent:int}\s+Rows_examined: %{NUMBER:rows_examined:int}\s*(?:use %{DATA:database};\s*)?SET timestamp=%{NUMBER:timestamp};\s*(?<query>(?<action>\w+)\s+.*)\n#\s*" ]
      }
      date {
        match => [ "timestamp", "UNIX" ]
        remove_field => [ "timestamp" ]
      }
}

8)mysql慢查詢

收集文件:

[root@elk-node1 ~]# cat mysql-slow.conf
input {
    file {
        path => "/root/slow.log"
        type => "mysql-slowlog"
        codec => multiline {
            pattern => "^# User@Host"
            negate => true
            what => "previous"
        }
    }
}

filter {
      # drop sleep events
    grok {
        match => { "message" =>"SELECT SLEEP" }
        add_tag => [ "sleep_drop" ]
        tag_on_failure => [] # prevent default _grokparsefailure tag on real records
      }
     if "sleep_drop" in [tags] {
        drop {}
     }
     grok {
        match => [ "message", "(?m)^# User@Host: %{USER:user}\[[^\]]+\] @ (?:(?<clienthost>\S*) )?\[(?:%{IP:clientip})?\]\s+Id: %{NUMBER:row_id:int}\s*# Query_time: %{NUMBER:query_time:float}\s+Lock_time: %{NUMBER:lock_time:float}\s+Rows_sent: %{NUMBER:rows_sent:int}\s+Rows_examined: %{NUMBER:rows_examined:int}\s*(?:use %{DATA:database};\s*)?SET timestamp=%{NUMBER:timestamp};\s*(?<query>(?<action>\w+)\s+.*)\n#\s*" ]
      }
      date {
        match => [ "timestamp", "UNIX" ]
        remove_field => [ "timestamp" ]
      }
}


output {
    stdout {
       codec =>"rubydebug"
    }
}

執行檢測:
上面須要的/root/slow.log是本身上傳的,而後本身插入數據保存後,會顯示:

[root@elk-node1 ~]# /opt/logstash/bin/logstash -f mysql-slow.conf
Settings: Default filter workers: 1
Logstash startup completed
{
    "@timestamp" => "2016-11-14T06:53:54.100Z",
       "message" => "# Time: 161114 11:05:18",
      "@version" => "1",
          "path" => "/root/slow.log",
          "host" => "elk-node1",
          "type" => "mysql-slowlog",
          "tags" => [
        [0] "_grokparsefailure"
    ]
}
{
    "@timestamp" => "2016-11-14T06:53:54.105Z",
       "message" => "# User@Host: test[test] @  [124.65.197.154]\n# Query_time: 1.725889  Lock_time: 0.000430 Rows_sent: 0  Rows_examined: 0\nuse test_zh_o2o_db;\nSET timestamp=1479092718;\nSELECT trigger_name, event_manipulation, event_object_table, action_statement, action_timing, DEFINER FROM information_schema.triggers WHERE BINARY event_object_schema='test_zh_o2o_db' AND BINARY event_object_table='customer';\n# Time: 161114 12:10:30",
      "@version" => "1",
          "tags" => [
        [0] "multiline",
        [1] "_grokparsefailure"
    ],
          "path" => "/root/slow.log",
          "host" => "elk-node1",
          "type" => "mysql-slowlog"
}
.........
.........

======================================================================
接下來描述會碰見到的一個問題:
一旦咱們的elasticsearch出現問題,就不能進行日誌採集處理了!
這種狀況下該怎麼辦呢?

解決方案;
能夠在client和elasticsearch之間添加一箇中間件做爲緩存,先將採集到的日誌內容寫到中間件上,而後再從中間件輸入到elasticsearch中。
這就完美的解決了上述的問題了。

(4)ELK中使用redis做爲中間件,緩存日誌採集內容

1)redis的配置和啓動

[root@elk-node1 ~]# vim /etc/redis.conf               #修改下面兩行內容
daemonize yes
bind 192.168.1.160
[root@elk-node1 ~]# systemctl start redis
[root@elk-node1 ~]# lsof -i:6379
COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME
redis-ser 19474 redis 4u IPv4 1344465 0t0 TCP elk-node1:6379 (LISTEN)
[root@elk-node1 ~]# redis-cli -h 192.168.1.160
192.168.1.160:6379> info
# Server
redis_version:2.8.19
.......

2)編寫從Client端收集數據的文件

[root@elk-node1 ~]# vim redis-out.conf
input {
   stdin {}
}

output {
   redis {
      host => "192.168.1.160"
      port => "6379"
      db => "6"
      data_type => "list"
      key => "demo"
   }
}

3)執行收集數據的文件,並輸入數據hello redis 

[root@elk-node1 ~]# /opt/logstash/bin/logstash -f redis-out.conf
Settings: Default filter workers: 1
Logstash startup completed             #下面輸入數據hello redis
hello redis

4)在redis中查看數據

[root@elk-node1 ~]# redis-cli -h 192.168.1.160
192.168.1.160:6379> info
# Server
.......
.......
# Keyspace
db6:keys=1,expires=0,avg_ttl=0                   #在最下面一行,顯示是db6
192.168.1.160:6379> select 6
OK
192.168.1.160:6379[6]> keys *
1) "demo"
192.168.1.160:6379[6]> LINDEX demo -1
"{\"message\":\"hello redis\",\"@version\":\"1\",\"@timestamp\":\"2016-11-14T08:04:25.981Z\",\"host\":\"elk-node1\"}"

5)繼續隨便寫點數據

[root@elk-node1 ~]# /opt/logstash/bin/logstash -f redis-out.conf 
Settings: Default filter workers: 1
Logstash startup completed
hello redis
123456
asdf
ert
wang
shi
bo
guohuihui
as
we
r
g

asdfjkdfsak
5423wer
34rt3
6y
7uj
u
io9
sdjfhsdk890
huanqiu
huanqiuchain
hqsb
asda    

6)在redis中查看

在redis中查看長度:
[root@elk-node1 ~]# redis-cli -h 192.168.1.160
192.168.1.160:6379> info
# Server
redis_version:2.8.19
.......
.......
# Keyspace
db6:keys=1,expires=0,avg_ttl=0      #顯示是db6
192.168.1.160:6379> select 6
OK
192.168.1.160:6379[6]> keys *
1) "demo"
192.168.1.160:6379[6]> LLEN demo
(integer) 24

7)將redis中的內容寫到ES中

[root@elk-node1 ~]# vim redis-in.conf
input { 
    redis {
      host => "192.168.1.160"
      port => "6379"
      db => "6"
      data_type => "list"
      key => "demo"
   }
}

output {
    elasticsearch {
      hosts => ["192.168.1.160:9200"]
      index => "redis-in-%{+YYYY.MM.dd}"
    }
}

執行:
[root@elk-node1 ~]# /opt/logstash/bin/logstash -f redis-in.conf --configtest
Configuration OK
[root@elk-node1 ~]# /opt/logstash/bin/logstash -f redis-in.conf &

在redis中查看,發現數據已被讀出:
192.168.1.160:6379[6]> LLEN demo
(integer) 0

=============================舒適提示===========================

redis默認只有16個數據庫, 也就是說最多隻能有16個db, 即db01-db15
可是key值能夠設置不一樣, 也就是針對不一樣日誌的key前綴能夠設置不一樣.
好比: 
key => "nginx.log"的值最多能夠設置16個db, 即db01-db15
key => "mysql.log"的值最多能夠設置16個db, 即db01-db15
key => "tomcat.log"的值最多能夠設置16個db, 即db01-db15

登錄elasticsearch界面查看:

 

8)接着,將收集到的全部日誌寫入到redis中。這了從新定義一個添加redis緩存後的總文件shipper.conf。(能夠將以前執行的總文件file.conf停掉)

[root@elk-node1 ~]# vim shipper.conf
input {
    file {
      path => "/var/log/messages"
      type => "system"
      start_position => "beginning"
    }
 
    file {
       path => "/var/log/elasticsearch/huanqiu.log"
       type => "es-error"
       start_position => "beginning"
       codec => multiline {
           pattern => "^\["
           negate => true
           what => "previous"
       }
    }
    file {
       path => "/var/log/nginx/access_json.log"
       codec => json
       start_position => "beginning"
       type => "nginx-log"
    }
    syslog {
        type => "system-syslog"
        host => "192.168.1.160"
        port => "514"
    }
 
}
 
 
output {
   if [type] == "system"{
     redis {
        host => "192.168.1.160"
        port => "6379"
        db => "6"
        data_type => "list"
        key => "system"
     }
   }
 
    if [type] == "es-error"{
      redis {
        host => "192.168.1.160"
        port => "6379"
        db => "6"
        data_type => "list"
        key => "demo"
        }
     }
    if [type] == "nginx-log"{    
       redis {
          host => "192.168.1.160"
          port => "6379"
          db => "6"
          data_type => "list"
          key => "nginx-log"
       }
    }
    if [type] == "system-syslog"{
       redis {
          host => "192.168.1.160"
          port => "6379"
          db => "6"
          data_type => "list"
          key => "system-syslog"
       }    
     }
}

執行上面的文件(提早將上面以前啓動的file.conf文件的執行給結束掉!)
[root@elk-node1 ~]# /opt/logstash/bin/logstash -f shipper.conf --configtest
Configuration OK
[root@elk-node1 ~]# /opt/logstash/bin/logstash -f shipper.conf
Settings: Default filter workers: 1
Logstash startup completed

在redis中查看:
[root@elk-node1 ~]# redis-cli -h 192.168.1.160
192.168.1.160:6379> info
# Server
redis_version:2.8.19
.......
.......
# Keyspace
db6:keys=1,expires=0,avg_ttl=0                      #顯示是db6
192.168.1.160:6379> select 6
OK
192.168.1.160:6379[6]> keys *
1) "demo"
2) "system"
192.168.1.160:6379[6]> keys *
1) "nginx-log"
2) "demo"
3) "system"

另開一個窗口,添加點日誌:
[root@elk-node1 ~]# logger "12325423"
[root@elk-node1 ~]# logger "12325423"
[root@elk-node1 ~]# logger "12325423"
[root@elk-node1 ~]# logger "12325423"
[root@elk-node1 ~]# logger "12325423"
[root@elk-node1 ~]# logger "12325423"

又會增長日誌:
192.168.1.160:6379[6]> keys *
1) "system-syslog"
2) "nginx-log"
3) "demo"
4) "system"

其實能夠在任意的一臺ES中將數據從redis讀取到ES中。
下面我們在elk-node2節點,將數據從redis讀取到ES中:

編寫文件:

[root@elk-node2 ~]# cat file.conf
input {
     redis {
        type => "system"
        host => "192.168.1.160"
        port => "6379"
        db => "6"
        data_type => "list"
        key => "system"
     }

      redis {
        type => "es-error"
        host => "192.168.1.160"
        port => "6379"
        db => "6"
        data_type => "list"
        key => "es-error"
        }
       redis {
          type => "nginx-log"
          host => "192.168.1.160"
          port => "6379"
          db => "6"
          data_type => "list"
          key => "nginx-log"
       }
       redis {
          type => "system-syslog"
          host => "192.168.1.160"
          port => "6379"
          db => "6"
          data_type => "list"
          key => "system-syslog"
       }    

}


output {

    if [type] == "system"{
        elasticsearch {
           hosts => ["192.168.1.160:9200"]
           index => "system-%{+YYYY.MM.dd}"
        }
    }

    if [type] == "es-error"{
        elasticsearch {
           hosts => ["192.168.1.160:9200"]
           index => "es-error-%{+YYYY.MM.dd}"
        }
    }
    if [type] == "nginx-log"{
        elasticsearch {
           hosts => ["192.168.1.160:9200"]
           index => "nignx-log-%{+YYYY.MM.dd}"
        }
    }
    if [type] == "system-syslog"{
        elasticsearch {
           hosts => ["192.168.1.160:9200"]
           index => "system-syslog-%{+YYYY.MM.dd}"
        }
    }
}

執行:
[root@elk-node2 ~]# /opt/logstash/bin/logstash -f file.conf --configtest
Configuration OK
[root@elk-node2 ~]# /opt/logstash/bin/logstash -f file.conf &

去redis中檢查,發現數據已經被讀出到elasticsearch中了。
192.168.1.160:6379[6]> keys *
(empty list or set)

同時登錄logstash和kibana看,發現能夠正常收集到日誌了

能夠執行這個 去查看nginx日誌
[root@elk-node1 ~]# ab -n10000 -c1 http://192.168.1.160/

也能夠啓動多個redis寫到ES中,具體根據本身的實際狀況而定。

==============logstash配置java環境===============
因爲新版的ELK環境要求java1.8,可是有些服務器因爲業務代碼自身限制只能用java6或java7。
這種狀況下,要安裝Logstash,就只能單獨配置Logstas本身使用的java環境了。

操做以下:
0) 使用rpm包安裝logstash

1)安裝java8,參考:http://www.cnblogs.com/kevingrace/p/7607442.html

2)在/etc/sysconfig/logstash文件結尾添加下面兩行內容:
[root@cx-app01 ~]# vim /etc/sysconfig/logstash
.......
JAVA_CMD=/usr/local/jdk1.8.0_172/bin 
JAVA_HOME=/usr/local/jdk1.8.0_172

3)在/opt/logstash/bin/logstash.lib.sh文件添加下面一行內容:
[root@cx-app02 ~]# vim /opt/logstash/bin/logstash.lib.sh
.......
export JAVA_HOME=/usr/local/jdk1.8.0_172

4) 而後使用logstash收集日誌,就不會報java環境錯誤了。

==================配置範例===================

以下的配置範例:
192.168.10.44爲elk的master節點,同時也是redis節點
  
[root@client-node01 opt]# pwd
/opt
[root@client-node01 opt]# cat redis-in.conf
input {
    file {
       path => "/usr/local/tomcat8/logs/catalina.out"
       type => "tomcat8-logs"
       start_position => "beginning"
       codec => multiline {
           pattern => "^\["           //表示收集以"["開頭的日誌信息
           negate => true
           what => "previous"
       }
    }
}
  
output {
    if [type] == "tomcat8-logs"{
       redis {
          host => "192.168.10.44"
          port => "6379"
          db => "1"
          data_type => "list"
          key => "tomcat8-logs"
       } 
     }
}
  
[root@client-node01 opt]# cat redis-input.conf
input {
  file {
        path => "/var/log/messages"
        type => "systemlog"
        start_position => "beginning"
        stat_interval => "2"
  }
}
  
output {
  if [type] == "systemlog" {
        redis {
                data_type => "list"
                host => "192.168.10.44"
                db => "2"
                port => "6379"
                key => "systemlog"
        }
  }
  
}
  
[root@client-node01 opt]# cat file.conf
input {
     redis {
        type => "tomcat8-logs"
        host => "192.168.10.44"
        port => "6379"
        db => "1"
        data_type => "list"
        key => "tomcat8-logs"
     }
  
       redis {
          type => "systemlog"
          host => "192.168.10.44"
          port => "6379"
          db => "2"
          data_type => "list"
          key => "systemlog"
       } 
   
}
   
   
output {
   
    if [type] == "tomcat8-logs"{
        elasticsearch {
           hosts => ["192.168.10.44:9200"]
           index => "elk-node2-tomcat8-logs-%{+YYYY.MM.dd}"
        }
    }
  
    if [type] == "systemlog"{
        elasticsearch {
           hosts => ["192.168.10.44:9200"]
           index => "elk-node2-systemlog-%{+YYYY.MM.dd}"
        }
    }
}
  
  
[root@client-node01 opt]# /opt/logstash/bin/logstash -f /opt/redis-in.conf --configtest
Configuration OK
[root@client-node01 opt]# /opt/logstash/bin/logstash -f /opt/redis-input.conf --configtest
Configuration OK
[root@client-node01 opt]# /opt/logstash/bin/logstash -f /opt/file.conf --configtest
Configuration OK
  
啓動logstash
[root@client-node01 opt]# /opt/logstash/bin/logstash -f /opt/redis-in.conf &
[root@client-node01 opt]# /opt/logstash/bin/logstash -f /opt/redis-input.conf &
[root@client-node01 opt]# /opt/logstash/bin/logstash -f /opt/file.conf &
  
這時候,當/usr/local/tomcat8/logs/catalina.out和/var/log/messages文件裏有新日誌信息寫入時,就會觸發動做,
在redis裏就能查看到相關信息,並查看寫入到es裏。

=========================================================================================================
舒適提示:
當客戶機的日誌信息收集後,通過redis剛讀到es數據庫裏後,若是沒有新數據寫入,則默認在es的訪問界面裏是看不到
數據的,只有當日志文件裏有新的日誌寫入後纔會觸發數據展現的動做,即es的訪問界面(http://192.168.10.44:9200/_plugin/head/)
裏才能看到日誌數據的展現效果。
==========================================================================================================
  
假設想上面兩個文件裏寫入測試數據
[root@client-node01 opt]# echo "hellohellohellohello" >> /var/log/messages
[root@client-node01 opt]# echo "[hahahahahahhahahahahahahahahahahahah]" >> /usr/local/tomcat8/logs/catalina.out
  
到redis裏發現有相關的key,很快就會讀到es裏。能夠配置到kibana裏觀察。
  
能夠先測試下日誌信息是否寫到redis裏?而後再測試下數據是否從redis讀到es裏?一步步肯定數據去向。
 
注意上面redis-in.conf文件中的下面設置,使用正則匹配,收集以哪些字符開頭的日誌信息:
pattern => "^\["                    表示收集以"["開頭的日誌信息
pattern => "^2018"                  表示收集以"2018"開頭的日誌信息
pattern => "^[a-zA-Z0-9]"           表示收集以字母(大小寫)或數字開頭的日誌信息
pattern => "^[a-zA-Z0-9]|[^ ]+"     表示收集以字母(大小寫)或數字或空格的日誌信息
相關文章
相關標籤/搜索