企業日誌大數據分析系統ELK+KAFKA實現【轉】

背景:

最近線上上了ELK,可是隻用了一臺Redis在中間做爲消息隊列,以減輕前端es集羣的壓力,Redis的集羣解決方案暫時沒有接觸過,而且Redis做爲消息隊列並非它的強項;因此最近將Redis換成了專業的消息信息發佈訂閱系統Kafka, Kafka的更多介紹你們能夠看這裏:http://blog.csdn.net/lizhitao/article/details/39499283  ,關於ELK的知識網上有不少的哦, 此篇博客主要是總結一下目前線上這個平臺的實施步驟,ELK是怎麼跟Kafka結合起來的。好吧,動手!前端

ELK架構拓撲:

然而我這裏的整個日誌收集平臺就是這樣的拓撲:java

1,使用一臺Nginx代理訪問kibana的請求;node

2,兩臺es組成es集羣,而且在兩臺es上面都安裝kibana;(如下對elasticsearch簡稱es)linux

3,中間三臺服務器就是個人kafka(zookeeper)集羣啦; 上面寫的消費者/生產者這是kafka(zookeeper)中的概念;nginx

4,最後面的就是一大堆的生產服務器啦,上面使用的是logstash,固然除了logstash也可使用其餘的工具來收集你的應用程序的日誌,例如:Flume,Scribe,Rsyslog,Scripts……git

角色:github

軟件選用:web

elasticsearch-1.7.3.tar.gz #這裏須要說明一下,前幾天使用了最新的elasticsearch2.0,java-1.8.0報錯,目前未找到緣由,故這裏使用1.7.3版本redis

Logstash-2.0.0.tar.gzapache

kibana-4.1.2-linux-x64.tar.gz

以上軟件均可以從官網下載:https://www.elastic.co/downloads

java-1.8.0,nginx採用yum安裝

部署步驟:

1.ES集羣安裝配置;

2.Logstash客戶端配置(直接寫入數據到ES集羣,寫入系統messages日誌);

3.Kafka(zookeeper)集羣配置;(Logstash寫入數據到Kafka消息系統);

4.Kibana部署;

5.Nginx負載均衡Kibana請求;

6.案例:nginx日誌收集以及MySQL慢日誌收集;

7.Kibana報表基本使用;

ES集羣安裝配置;

es1.example.com:

1.安裝java-1.8.0以及依賴包

yum install -y epel-release

yum install -y java-1.8.0 git wget lrzsz

2.獲取es軟件包

wget https://download.elastic.co/elasticsearch/elasticsearch/elasticsearch-1.7.3.tar.gz

tar -xf elasticsearch-1.7.3.tar.gz -C /usr/local

ln -sv /usr/local/elasticsearch-1.7.3 /usr/local/elasticsearch

3.修改配置文件

[root@es1 ~]# vim /usr/local/elasticsearch/config/elasticsearch.yml

32 cluster.name: es-cluster #組播的名稱地址

40 node.name: "es-node1 " #節點名稱,不能和其餘節點重複

47 node.master: true #節點可否被選舉爲master

51 node.data: true #節點是否存儲數據

107 index.number_of_shards: 5 #索引分片的個數

111 index.number_of_replicas: 1 #分片的副本個數

145 path.conf: /usr/local/elasticsearch/config/ #配置文件的路徑

149 path.data: /data/es/data #數據目錄路徑

159 path.work: /data/es/worker #工做目錄路徑

163 path.logs: /usr/local/elasticsearch/logs/ #日誌文件路徑

167 path.plugins: /data/es/plugins #插件路徑

184 bootstrap.mlockall: true #內存不向swap交換

232 http.enabled: true #啓用http

4.建立相關目錄

mkdir /data/es/{data,worker,plugins} -p

5.獲取es服務管理腳本

[root@es1 ~]# git clone https://github.com/elastic/elasticsearch-servicewrapper.git

[root@es1 ~]# mv elasticsearch-servicewrapper/service /usr/local/elasticsearch/bin/

[root@es1 ~]# /usr/local/elasticsearch/bin/service/elasticsearch install

Detected RHEL or Fedora:

Installing the Elasticsearch daemon..

[root@es1 ~]#

#這時就會在/etc/init.d/目錄下安裝上es的管理腳本啦

#修改其配置:

[root@es1 ~]#

set.default.ES_HOME=/usr/local/elasticsearch #安裝路徑

set.default.ES_HEAP_SIZE=1024 #jvm內存大小,根據實際環境調整便可

6.啓動es ,並檢查其服務是否正常

[root@es1 ~]# netstat -nlpt | grep -E "9200|"9300

tcp 0 0 0.0.0.0:9200 0.0.0.0:* LISTEN 1684/java

tcp 0 0 0.0.0.0:9300 0.0.0.0:* LISTEN 1684/java

訪問http://192.168.2.18:9200/ 若是出現如下提示信息說明安裝配置完成啦,

7.es1節點好啦,咱們直接把目錄複製到es2

[root@es1 local]# scp -r elasticsearch-1.7.3 192.168.12.19:/usr/local/

[root@es2 local]# ln -sv elasticsearch-1.7.3 elasticsearch

[root@es2 local]# elasticsearch/bin/service/elasticsearch install

#es2只須要修改node.name便可,其餘都與es1相同配置

8.安裝es的管理插件

es官方提供一個用於管理es的插件,可清晰直觀看到es集羣的狀態,以及對集羣的操做管理,安裝方法以下:

[root@es1 local]# /usr/local/elasticsearch/bin/plugin -i mobz/elasticsearch-head

安裝好以後,訪問方式爲: http://192.168.2.18:9200/_plugin/head,因爲集羣中如今暫時沒有數據,因此顯示爲空,

此時,es集羣的部署完成。

Logstash客戶端安裝配置;

在webserve1上面安裝Logstassh

1.downloads  軟件包 ,這裏注意,Logstash是須要依賴java環境的,因此這裏仍是須要yum install -y java-1.8.0.

[root@webserver1 ~]# wget https://download.elastic.co/logstash/logstash/logstash-2.0.0.tar.gz

[root@webserver1 ~]# tar -xf logstash-2.0.0.tar.gz -C /usr/local

[root@webserver1 ~]# cd /usr/local/

[root@webserver1 local]# ln -sv logstash-2.0.0 logstash

[root@webserver1 local]# mkdir logs etc

2.提供logstash管理腳本,其中裏面的配置路徑可根據實際狀況修改

#!/bin/bash

#chkconfig: 2345 55 24

#description: logstash service manager

#auto: Maoqiu Guo

FILE='/usr/local/logstash/etc/*.conf' #logstash配置文件

LOGBIN='/usr/local/logstash/bin/logstash agent --verbose --config' #指定logstash配置文件的命令

LOCK='/usr/local/logstash/locks' #用鎖文件配合服務啓動與關閉

LOGLOG='--log /usr/local/logstash/logs/stdou.log' #日誌

START() {

    if [ -f $LOCK ];then

        echo -e "Logstash is already \033[32mrunning\033[0m, do nothing."

    else

        echo -e "Start logstash service.\033[32mdone\033[m"

        nohup ${LOGBIN} ${FILE} ${LOGLOG} &

        touch $LOCK

    fi

}

STOP() {

    if [ ! -f $LOCK ];then

        echo -e "Logstash is already stop, do nothing."

    else

        echo -e "Stop logstash serivce \033[32mdone\033[m"

        rm -rf $LOCK

        ps -ef | grep logstash | grep -v "grep" | awk '{print $2}' | xargs kill -s 9 >/dev/null

    fi

}

STATUS() {

    ps aux | grep logstash | grep -v "grep" >/dev/null

    if [ -f $LOCK ] && [ $? -eq 0 ]; then

        echo -e "Logstash is: \033[32mrunning\033[0m..."

    else

        echo -e "Logstash is: \033[31mstopped\033[0m..."

    fi

}

TEST(){

    ${LOGBIN} ${FILE} --configtest

}

case "$1" in

start)

    START

    ;;

stop)

    STOP

    ;;

status)

    STATUS

    ;;

restart)

    STOP

sleep 2

START

    ;;

test)

    TEST

    ;;

*)

    echo "Usage: /etc/init.d/logstash (test|start|stop|status|restart)"

    ;;

esac

3.Logstash 向es集羣寫數據

(1)編寫一個logstash配置文件

[root@webserver1 etc]# cat logstash.conf

input { #數據的輸入從標準輸入

stdin {}

}

output { #數據的輸出咱們指向了es集羣

elasticsearch {

hosts => ["192.168.2.18:9200","192.168.2.19:9200"]   #es主機的ip及端口

}

}

[root@webserver1 etc]#

(2)檢查配置文件是否有語法錯

[root@webserver1 etc]# /usr/local/logstash/bin/logstash -f logstash.conf --configtest --verbose

Configuration OK

[root@webserver1 etc]#

(3)既然配置ok咱們手動啓動它,而後寫點東西看可否寫到es

ok.上圖已經看到logstash已經能夠正常的工做啦.

4.下面演示一下如何收集系統日誌

將以前的配置文件修改以下所示內容,而後啓動logstash服務就能夠在web頁面中看到messages的日誌寫入es,而且建立了一條索引

[root@webserver1 etc]# cat logstash.conf

input {       #這裏的輸入使用的文件,即日誌文件messsages

file {   

path => "/var/log/messages"   #這是日誌文件的絕對路徑

start_position => "beginning" #這個表示從messages的第一行讀取,即文件開始處

}

}

output {    #輸出到es

elasticsearch {

hosts => ["192.168.2.18:9200","192.168.2.19:9200"]

index => "system-messages-%{+YYYY-MM}"  #這裏將按照這個索引格式來建立索引

}

}

[root@webserver1 etc]#

啓動logstash後,咱們來看head這個插件的web頁面

ok,系統日誌咱們已經成功的收集,而且已經寫入到es集羣中,那上面的演示是logstash直接將日誌寫入到es集羣中的,這種場合我以爲若是量不是很大的話直接像上面已將將輸出output定義到es集羣便可,若是量大的話須要加上消息隊列來緩解es集羣的壓力。前面已經提到了我這邊以前使用的是單臺redis做爲消息隊列,可是redis不能做爲list類型的集羣,也就是redis單點的問題無法解決,因此這裏我選用了kafka ;下面就在三臺server上面安裝kafka集羣

Kafka集羣安裝配置;

在搭建kafka集羣時,須要提早安裝zookeeper集羣,固然kafka已經自帶zookeeper程序只須要解壓而且安裝配置就好了

kafka1上面的配置:

1.獲取軟件包.官網:http://kafka.apache.org

[root@kafka1 ~]# wget http://mirror.rise.ph/apache/kafka/0.8.2.1/kafka_2.11-0.8.2.1.tgz

[root@kafka1 ~]# tar -xf kafka_2.11-0.8.2.1.tgz -C /usr/local/

[root@kafka1 ~]# cd /usr/local/

[root@kafka1 local]# ln -sv kafka_2.11-0.8.2.1 kafka

2.配置zookeeper集羣,修改配置文件

[root@kafka1 ~]# vim /usr/local/kafka/config/zookeeper.propertie

dataDir=/data/zookeeper

clienrtPort=2181

tickTime=2000

initLimit=20

syncLimit=10

server.2=192.168.2.22:2888:3888

server.3=192.168.2.23:2888:3888

server.4=192.168.2.24:2888:3888

#說明:

tickTime: 這個時間是做爲 Zookeeper 服務器之間或客戶端與服務器之間維持心跳的時間間隔,也就是每一個 tickTime 時間就會發送一個心跳。

2888端口:表示的是這個服務器與集羣中的 Leader 服務器交換信息的端口;

3888端口:表示的是萬一集羣中的 Leader 服務器掛了,須要一個端口來從新進行選舉,選出一個新的 Leader,而這個端口就是用來執行選舉時服務器相互通訊的端口。

3.建立zookeeper所須要的目錄

[root@kafka1 ~]# mkdir /data/zookeeper

4.在/data/zookeeper目錄下建立myid文件,裏面的內容爲數字,用於標識主機,若是這個文件沒有的話,zookeeper是無法啓動的哦

[root@kafka1 ~]# echo 2 > /data/zookeeper/myid

以上就是zookeeper集羣的配置,下面等我配置好kafka以後直接複製到其餘兩個節點便可

5.kafka配置

[root@kafka1 ~]# vim /usr/local/kafka/config/server.properties

broker.id=2      # 惟一,填數字,本文中分別爲2/3/4

prot=9092        # 這個broker監聽的端口 

host.name=192.168.2.22  # 惟一,填服務器IP

log.dir=/data/kafka-logs # 該目錄能夠不用提早建立,在啓動時本身會建立

zookeeper.connect=192.168.2.22:2181,192.168.2.23:2181,192.168.2.24:2181  #這個就是zookeeper的ip及端口

num.partitions=16 # 須要配置較大 分片影響讀寫速度

log.dirs=/data/kafka-logs # 數據目錄也要單獨配置磁盤較大的地方

log.retention.hours=168 # 時間按需求保留過時時間 避免磁盤滿

6.將kafka(zookeeper)的程序目錄所有拷貝至其餘兩個節點

[root@kafka1 ~]# scp -r /usr/local/kafka 192.168.2.23:/usr/local/

[root@kafka1 ~]# scp -r /usr/local/kafka 192.168.2.24:/usr/local/

7.修改兩個借點的配置,注意這裏除了如下兩點不一樣外,都是相同的配置

(1)zookeeper的配置

mkdir /data/zookeeper

echo "x" > /data/zookeeper/myid

(2)kafka的配置

broker.id=2

host.name=192.168.2.22

8.修改完畢配置以後咱們就能夠啓動了,這裏先要啓動zookeeper集羣,才能啓動kafka

咱們按照順序來,kafka1 –> kafka2 –>kafka3

[root@kafka1 ~]# /usr/local/kafka/bin/zookeeper-server-start.sh /usr/local/kafka/config/zookeeper.properties & #zookeeper啓動命令

[root@kafka1 ~]# /usr/local/kafka/bin/zookeeper-server-stop.sh #zookeeper中止的命令

注意,若是zookeeper有問題 nohup的日誌文件會很是大,把磁盤佔滿,這個zookeeper服務能夠經過本身些服務腳原本管理服務的啓動與關閉。

後面兩臺執行相同操做,在啓動過程中會出現如下報錯信息

[2015-11-13 19:18:04,225] WARN Cannot open channel to 3 at election address /192.168.2.23:3888 (org.apache.zookeeper.server.quorum.QuorumCnxManager)

java.net.ConnectException: Connection refused

    at java.net.PlainSocketImpl.socketConnect(Native Method)

    at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)

    at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)

    at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)

    at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)

    at java.net.Socket.connect(Socket.java:589)

    at org.apache.zookeeper.server.quorum.QuorumCnxManager.connectOne(QuorumCnxManager.java:368)

    at org.apache.zookeeper.server.quorum.QuorumCnxManager.connectAll(QuorumCnxManager.java:402)

    at org.apache.zookeeper.server.quorum.FastLeaderElection.lookForLeader(FastLeaderElection.java:840)

    at org.apache.zookeeper.server.quorum.QuorumPeer.run(QuorumPeer.java:762)

[2015-11-13 19:18:04,232] WARN Cannot open channel to 4 at election address /192.168.2.24:3888 (org.apache.zookeeper.server.quorum.QuorumCnxManager)

java.net.ConnectException: Connection refused

    at java.net.PlainSocketImpl.socketConnect(Native Method)

    at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)

    at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)

    at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)

    at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)

    at java.net.Socket.connect(Socket.java:589)

    at org.apache.zookeeper.server.quorum.QuorumCnxManager.connectOne(QuorumCnxManager.java:368)

    at org.apache.zookeeper.server.quorum.QuorumCnxManager.connectAll(QuorumCnxManager.java:402)

    at org.apache.zookeeper.server.quorum.FastLeaderElection.lookForLeader(FastLeaderElection.java:840)

    at org.apache.zookeeper.server.quorum.QuorumPeer.run(QuorumPeer.java:762)

[2015-11-13 19:18:04,233] INFO Notification time out: 6400 (org.apache.zookeeper.server.quorum.FastLeaderElection)

因爲zookeeper集羣在啓動的時候,每一個結點都試圖去鏈接集羣中的其它結點,先啓動的確定連不上後面還沒啓動的,因此上面日誌前面部分的異常是能夠忽略的。經過後面部分能夠看到,集羣在選出一個Leader後,最後穩定了。

其餘節點也可能會出現相似的狀況,屬於正常。

9.zookeeper服務檢查

[root@kafka1~]# netstat -nlpt | grep -E "2181|2888|3888"

tcp 0 0 192.168.2.24:3888 0.0.0.0:* LISTEN 1959/java

tcp 0 0 0.0.0.0:2181 0.0.0.0:* LISTEN 1959/java

[root@kafka2 ~]# netstat -nlpt | grep -E "2181|2888|3888"

tcp 0 0 192.168.2.23:3888 0.0.0.0:* LISTEN 1723/java

tcp 0 0 0.0.0.0:2181 0.0.0.0:* LISTEN 1723/java

[root@kafka3 ~]# netstat -nlpt | grep -E "2181|2888|3888"

tcp 0 0 192.168.2.24:3888 0.0.0.0:* LISTEN 950/java

tcp 0 0 0.0.0.0:2181 0.0.0.0:* LISTEN 950/java

tcp 0 0 192.168.2.24:2888 0.0.0.0:* LISTEN 950/java

#能夠看出,若是哪臺是Leader,那麼它就擁有2888這個端口

ok.  這時候zookeeper集羣已經啓動起來了,下面啓動kafka,也是依次按照順序啓動

[root@kafka1 ~]# nohup /usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties & #kafka啓動的命令

[root@kafka1 ~]# /usr/local/kafka/bin/kafka-server-stop.sh #kafka中止的命令

注意,跟zookeeper服務同樣,若是kafka有問題 nohup的日誌文件會很是大,把磁盤佔滿,這個kafka服務一樣能夠經過本身些服務腳原本管理服務的啓動與關閉。

此時三臺上面的zookeeper及kafka都已經啓動完畢,來檢測如下吧

(1)創建一個主題

[root@kafka1 ~]# /usr/local/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic summer

#注意:factor大小不能超過broker數

(2)查看有哪些主題已經建立

[root@kafka1 ~]# /usr/local/kafka/bin/kafka-topics.sh --list --zookeeper 192.168.2.22:2181 #列出集羣中全部的topic

summer #已經建立成功

(3)查看summer這個主題的詳情

[root@kafka1 ~]# /usr/local/kafka/bin/kafka-topics.sh --describe --zookeeper 192.168.2.22:2181 --topic summer

Topic:summer    PartitionCount:1    ReplicationFactor:3    Configs:

    Topic: summer    Partition: 0    Leader: 2    Replicas: 2,4,3    Isr: 2,4,3

#主題名稱:summer

#Partition:只有一個,從0開始

#leader :id爲2的broker

#Replicas 副本存在於broker id爲2,3,4的上面

#Isr:活躍狀態的broker

(4)發送消息,這裏使用的是生產者角色

[root@kafka1 ~]# /bin/bash /usr/local/kafka/bin/kafka-console-producer.sh --broker-list 192.168.2.22:9092 --topic summer

This is a messages

welcome to kafka

(5)接收消息,這裏使用的是消費者角色

[root@kafka2 ~]# /usr/local/kafka/bin/kafka-console-consumer.sh --zookeeper 192.168.2.24:2181 --topic summer --from-beginning

This is a messages

welcome to kafka

若是可以像上面同樣可以接收到生產者發過來的消息,那說明基於kafka的zookeeper集羣就成功啦。

10,下面咱們將webserver1上面的logstash的輸出改到kafka上面,將數據寫入到kafka中

(1)修改webserver1上面的logstash配置,以下所示:各個參數能夠到官網查詢.

root@webserver1 etc]# cat logstash.conf

input { #這裏的輸入仍是定義的是從日誌文件輸入

file {

type => "system-message"

path => "/var/log/messages"

start_position => "beginning"

}

}

output {

#stdout { codec => rubydebug } #這是標準輸出到終端,能夠用於調試看有沒有輸出,注意輸出的方向能夠有多個

kafka { #輸出到kafka

bootstrap_servers => "192.168.2.22:9092,192.168.2.23:9092,192.168.2.24:9092" #他們就是生產者

topic_id => "system-messages" #這個將做爲主題的名稱,將會自動建立

compression_type => "snappy" #壓縮類型

}

}

[root@webserver1 etc]#

(2)配置檢測

[root@webserver1 etc]# /usr/local/logstash/bin/logstash -f logstash.conf --configtest --verbose

Configuration OK

[root@webserver1 etc]#

(2)啓動Logstash,這裏我直接在命令行執行便可

[root@webserver1 etc]# /usr/local/logstash/bin/logstash -f logstash.conf

(3)驗證數據是否寫入到kafka,這裏咱們檢查是否生成了一個叫system-messages的主題

[root@kafka1 ~]# /usr/local/kafka/bin/kafka-topics.sh --list --zookeeper 192.168.2.22:2181

summer

system-messages #能夠看到這個主題已經生成了

#再看看這個主題的詳情:

[root@kafka1 ~]# /usr/local/kafka/bin/kafka-topics.sh --describe --zookeeper 192.168.2.22:2181 --topic system-messages

Topic:system-messages    PartitionCount:16    ReplicationFactor:1    Configs:

    Topic: system-messages    Partition: 0    Leader: 2    Replicas: 2    Isr: 2

    Topic: system-messages    Partition: 1    Leader: 3    Replicas: 3    Isr: 3

    Topic: system-messages    Partition: 2    Leader: 4    Replicas: 4    Isr: 4

    Topic: system-messages    Partition: 3    Leader: 2    Replicas: 2    Isr: 2

    Topic: system-messages    Partition: 4    Leader: 3    Replicas: 3    Isr: 3

    Topic: system-messages    Partition: 5    Leader: 4    Replicas: 4    Isr: 4

    Topic: system-messages    Partition: 6    Leader: 2    Replicas: 2    Isr: 2

    Topic: system-messages    Partition: 7    Leader: 3    Replicas: 3    Isr: 3

    Topic: system-messages    Partition: 8    Leader: 4    Replicas: 4    Isr: 4

    Topic: system-messages    Partition: 9    Leader: 2    Replicas: 2    Isr: 2

    Topic: system-messages    Partition: 10    Leader: 3    Replicas: 3    Isr: 3

    Topic: system-messages    Partition: 11    Leader: 4    Replicas: 4    Isr: 4

    Topic: system-messages    Partition: 12    Leader: 2    Replicas: 2    Isr: 2

    Topic: system-messages    Partition: 13    Leader: 3    Replicas: 3    Isr: 3

    Topic: system-messages    Partition: 14    Leader: 4    Replicas: 4    Isr: 4

    Topic: system-messages    Partition: 15    Leader: 2    Replicas: 2    Isr: 2

[root@kafka1 ~]#

能夠看出,這個主題生成了16個分區,每一個分區都有對應本身的Leader,可是我想要有10個分區,3個副本如何辦?仍是跟咱們上面同樣命令行來建立主題就行,固然對於logstash輸出的咱們也能夠提早先定義主題,而後啓動logstash 直接往定義好的主題寫數據就行啦,命令以下:

[root@kafka1 ~]# /usr/local/kafka/bin/kafka-topics.sh --create --zookeeper 192.168.2.22:2181 --replication-factor 3 --partitions 10 --topic TOPIC_NAME

好了,咱們將logstash收集到的數據寫入到了kafka中了,在實驗過程當中我使用while腳本測試了若是不斷的往kafka寫數據的同時停掉兩個節點,數據寫入沒有任何問題。

那如何將數據從kafka中讀取而後給咱們的es集羣呢?那下面咱們在kafka集羣上安裝Logstash,安裝步驟再也不贅述;三臺上面的logstash 的配置以下,做用是將kafka集羣的數據讀取而後轉交給es集羣,這裏爲了測試我讓他新建一個索引文件,注意這裏的輸入日誌仍是messages,主題名稱仍是「system-messages」

[root@kafka1 etc]# more logstash.conf

input {

kafka {

zk_connect => "192.168.2.22:2181,192.168.2.23:2181,192.168.2.24:2181" #消費者們

topic_id => "system-messages"

codec => plain

reset_beginning => false

consumer_threads => 5

decorate_events => true

}

}

output {

elasticsearch {

hosts => ["192.168.2.18:9200","192.168.2.19:9200"]

index => "test-system-messages-%{+YYYY-MM}" #爲了區分以前實驗,我這裏新生成的因此名字爲「test-system-messages-%{+YYYY-MM}」

}

}

在三臺kafka上面啓動Logstash,注意我這裏是在命令行啓動的;

[root@kafka1 etc]# pwd

/usr/local/logstash/etc

[root@kafka1 etc]# /usr/local/logstash/bin/logstash -f logstash.conf

[root@kafka2 etc]# pwd

/usr/local/logstash/etc

[root@kafka2 etc]# /usr/local/logstash/bin/logstash -f logstash.conf

[root@kafka3 etc]# pwd

/usr/local/logstash/etc

[root@kafka3 etc]# /usr/local/logstash/bin/logstash -f logstash.conf

在webserver1上寫入測試內容,即webserver1上面利用message這個文件來測試,我先將其清空,而後啓動

[root@webserver1 etc]# >/var/log/messages

[root@webserver1 etc]# echo "我將經過kafka集羣達到es集羣哦^0^" >> /var/log/messages

#啓動logstash,讓其讀取messages中的內容

下圖爲我在客戶端寫入到kafka集羣的同時也將其輸入到終端,這裏寫入了三條內容

而下面三張圖側能夠看出,三臺Logstash 很平均的從kafka集羣當中讀取出來了日誌內容

再來看看咱們的es管理界面

ok ,看到了吧,

流程差很少就是下面 醬紫咯

因爲篇幅較長,我將

4.Kibana部署;

5.Nginx負載均衡Kibana請求;

6.案例:nginx日誌收集以及MySQL慢日誌收集;

7.Kibana報表基本使用;

 

轉自

企業日誌大數據分析系統ELK+KAFKA實現http://mp.weixin.qq.com/s?__biz=MzIyMDA1MzgyNw==&mid=2651969453&idx=1&sn=1c4d7d7b55deea7300f3b3701fbcdc89&chksm=8c349f81bb431697da71338a862c9528b5cd1c176129424a8b70548ae9c44017289650ac4d61&scene=0#rd

相關文章
相關標籤/搜索