參考:https://www.cnblogs.com/fengjian2016/p/5841556.htmlhtml
https://www.cnblogs.com/hei12138/p/7805475.htmlpython
https://blog.csdn.net/lhmood/article/details/79099615nginx
https://www.cnblogs.com/Orgliny/p/5730381.htmlredis
ELK可使用redis做爲消息隊列,但redis做爲消息隊列不是強項並且redis集羣不如專業的消息發佈系統kafka,如下使用ELK結合kafka做爲消息隊列進行配置apache
本次配置拓撲圖json
主機角色bootstrap
主機名 | IP | 系統 | 角色 |
prd-elk-logstash-01 | 172.16.90.20 | CentOS7.5 | logstash日誌傳輸 |
prd-elk-kafka-01 | 172.16.90.21 | CentOS7.5 | 日誌消息隊列kafka01 |
prd-elk-kafka-02 | 172.16.90.22 | CentOS7.5 | 日誌消息隊列kafka02 |
prd-elk-kafka-03 | 172.16.90.23 | CentOS7.5 | 日誌消息隊列kafka03 |
prd-elk-logstash-02 | 172.16.90.24 | CentOS7.5 | logstash日誌過濾,存儲 |
軟件選用後端
JKD 1.8.0_171 elasticsearch 6.5.4 logstash 6.5.4 kibana 6.5.4 kafka 2.11-2.1.1
172.16.90.20安裝logstash和filebeatruby
rpm -ivh logstash-6.5.4.rpm rpm -ivh filebeat-6.5.4-x86_64.rpm
修改filebeat配置文件收集系統messages日誌 服務器
/etc/filebeat/filebeat.yml
filebeat.inputs: - type: log enabled: true paths: - /var/log/messages #需收集日誌路徑 tags: ["messages"] filebeat.config.modules: path: ${path.config}/modules.d/*.yml reload.enabled: false setup.template.settings: index.number_of_shards: 3 setup.kibana: output.logstash: hosts: ["172.16.90.20:5044"] #輸出至logstash processors: - add_host_metadata: - add_cloud_metadata: - drop_fields: fields: ["beat", "input", "source", "offset", "prospector","host"] #刪除無用的參數
修改logstash先標準輸出至屏幕
/etc/logstash/conf.d/filebeat-logstash.conf
input{ beats { port => 5044 } } output{ if "messages" in [tags]{ stdout{ codec => rubydebug } } }
運行logstash查看輸出
/usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/filebeat-logstash.conf
輸出以下
172.16.90.24安裝logstash,elasticsearch,kibana安裝配置過程不詳敘,經過172.16.90.20收集的日誌信息能夠直接傳遞至172.16.90.24的logstash進行過濾及es存儲,規模不大可使用這種方式,下面安裝配置kafka集羣
Kafka集羣安裝配置
在搭建kafka集羣以前須要提早安裝zookeeper集羣,kafka壓縮包只帶zookeeper程序,只須要解壓配置便可使用
獲取軟件包,官方網站 http://kafka.apache.org/
PS:不要下載src包,運行會報錯
解壓拷貝至指定目錄
tar -xf kafka_2.11-2.1.1.tgz cp -a kafka_2.11-2.1.1 /usr/local/ cd /usr/local/ ln -s kafka_2.11-2.1.1/ kafka
修改配置文件zookeeper配置文件
/usr/local/kafka/config/zookeeper.properties #數據路徑 dataDir=/data/zookeeper # the port at which the clients will connect clientPort=2181 # disable the per-ip limit on the number of connections since this is a non-production config maxClientCnxns=0 #tickTime : 這個時間是做爲 Zookeeper 服務器之間或客戶端與服務器之間維持心跳的時間間隔,也就是每一個 tickTime 時間就會發送一個心跳。 tickTime=2000 initLimit=20 syncLimit=10 #2888 端口:表示的是這個服務器與集羣中的 Leader 服務器交換信息的端口; #3888 端口:表示的是萬一集羣中的 Leader 服務器掛了,須要一個端口來從新進行選舉,選出一個新的 Leader ,而這個端口就是用來執行選舉時服務器相互通訊的端口 server.1=172.16.90.21:2888:3888 server.2=172.16.90.22:2888:3888 server.3=172.16.90.23:2888:3888
建立數據目錄並建立myid文件,文件爲數字,用於標識惟一主機,必須有這個文件不然zookeeper沒法啓動
mkdir /data/zookeeper -p echo 1 >/data/zookeeper/myid
修改kafka配置/usr/local/kafka/config/server.properties
#惟一數字分別爲1,2,2 broker.id=1 #這個broker監聽的端口 prot=9092 #惟一填服務器IP host.name=172.16.90.21 num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 #kafka日誌路徑,不須要提早建立,啓動kafka時建立 log.dirs=/data/kafka-logs #分片數,須要配置較大,分片影響讀寫速度 num.partitions=16 num.recovery.threads.per.data.dir=1 offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 log.retention.hours=168 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 #zookpeer集羣 zookeeper.connect=192.168.1.11:2181,192.168.1.12:2181,192.168.1.13:2181 zookeeper.connection.timeout.ms=6000 group.initial.rebalance.delay.ms=0
把配置拷貝至其餘kafka主機,zookeeper.properties配置同樣 ,server.properties配置一下兩處不同,myid也不同
broker.id=2 host.name=172.16.90.22 broker.id=3 host.name=172.16.90.23
配置完三臺主機,啓動zookeeper啓動順序爲服務器1 2 3
/usr/local/kafka/bin/zookeeper-server-start.sh /usr/local/kafka/config/zookeeper.properties
啓動過程當中提示拒絕鏈接不用理會,因爲zookeeper集羣在啓動的時候,每一個結點都試圖去鏈接集羣中的其它結點,先啓動的確定連不上後面還沒啓動的,因此上面日誌前面部分的異常是能夠忽略的。經過後面部分能夠看到,集羣在選出一個Leader後,最後穩定了。其餘節點也可能會出現相似的狀況,屬於正常
檢測是否啓動
netstat -ntalp|grep -E "2181|2888|3888"
啓動kafka 啓動順序也是1 2 3
nohup /usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties &
啓動成功會啓動9092端口
PS:若是系統有問題會致使日誌不停輸出
三臺主機啓動完畢,下面來檢測一下
在kafka01建立一個主題,主題名爲sumer
/usr/local/kafka/bin/kafka-topics.sh --create --zookeeper 172.16.90.21:2181 --replication-factor 3 --partitions 1 --topic summer
查看summer主題詳情
/usr/local/kafka/bin/kafka-topics.sh --describe --zookeeper 172.16.90.21:2181 --topic summer
Topic:summer PartitionCount:1 ReplicationFactor:3 Configs: Topic: summer Partition: 0 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1 #主題名稱:summer #Partition:只有一個從0開始 #leader:id爲2的broker #Replicas: 副本存在id爲 2 3 1的上面 #Isr:活躍狀態的broker
刪除主題
/usr/local/kafka/bin/kafka-topics.sh --delete --zookeeper 172.16.90.21:2181 --topic summer
使用kafka01發送消息,這裏是生產者角色
/usr/local/kafka/bin/kafka-console-producer.sh --broker-list 172.16.90.21:9092 --topic summer
出現命令行須要手動輸入消息
使用kafka02接收消息,這裏是消費者角色
/usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server 172.16.90.22:9092 --topic summer --from-beginning
在kafka01輸入消息而後會在kafka02接收到該消息
下面將logstash-01的輸出從標準輸出改到kafka中,修改配置文件
/etc/logstash/conf.d/filebeat-logstash.conf input{ beats { port => 5044 } } output{ if "messages" in [tags]{ kafka{ #輸出至kafka bootstrap_servers => "172.16.90.21:9092,172.16.90.22:9092,172.16.90.23:9092" #主題名稱,將會自動建立 topic_id => "system-messages" #壓縮類型 compression_type => "snappy" } stdout{ codec => rubydebug } } }
啓動logstash輸出測試
kafka01查看是否生成這個主題
/usr/local/kafka/bin/kafka-topics.sh --describe --zookeeper 172.16.90.21:2181 --topic system-messages
能夠看出,這個主題生成了16個分區,每一個分區都有對應本身的Leader,可是我想要有10個分區,3個副本如何辦?仍是跟咱們上面同樣命令行來建立主題就行,固然對於logstash輸出的咱們也能夠提早先定義主題,而後啓動logstash 直接往定義好的主題寫數據就行啦,命令以下
/usr/local/kafka/bin/kafka-topics.sh --create --zookeeper 172.16.90.21:2181 --replication-factor 3 --partitions 10 --topic TOPIC_NAME
通過以上步驟已經經過完成了 filebeat-logstash01-kafka過程,下面配置logstash02接收消費kafka消息
logstash02須要安裝logstash,elasticsearch,kibana
rpm -ivh elasticsearch-6.5.4.rpm rpm -ivh kibana-6.5.4-x86_64.rpm rpm -ivh logstash-6.5.4.rpm
修改logstash配置先進行標準輸出測試
input{ kafka { #kafka輸入 bootstrap_servers => "172.16.90.21:9092,172.16.90.22:9092,172.16.90.23:9092" codec => plain #主題 topics => ["system-messages"] consumer_threads => 5 decorate_events => false auto_offset_reset => "earliest" #定義type方便後面選擇過濾及輸出 type => "system-messages" } } output{ if "system-messages" in [type] { # elasticsearch{ # hosts => ["172.16.90.24:9200"] # index => "messages-%{+YYYY.MM}" # } stdout{ codec => rubydebug } } }
對比logstash01和logstash02輸出能夠發現通過kafka的輸出message會有所不一樣
原始message
通過kafka輸出message以下
logstash-kafka 插件輸入和輸出默認 codec 爲 json 格式。在輸入和輸出的時候注意下編碼格式。消息傳遞過程當中 logstash 默認會爲消息編碼內加入相應的時間戳和 hostname 等信息。若是不想要以上信息(通常作消息轉發的狀況下),能夠修改logstash01配置增長配置 codec => plain{ format => "%{message}"}
input{ beats { port => 5044 } } output{ if "messages" in [tags]{ kafka{ #輸出至kafka bootstrap_servers => "172.16.90.21:9092,172.16.90.22:9092,172.16.90.23:9092" #主題名稱,將會自動建立 topic_id => "system-messages" #壓縮類型 compression_type => "snappy" #定義消息經過kafaka傳遞後格式不變 codec => plain{ format => "%{message}"} } stdout{ codec => rubydebug } } }
PS:使用配置codec => plain{ format => "%{message}"} 後傳遞給kafka的消息僅僅剩下源messages信息,能夠在kafka查看
/usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server 172.16.90.21:9092 --topic nginx-prod-log --from-beginning
源信息爲
kafka消息隊列輸出
後端過濾的logstash輸出
啓動logstahs再次對比先後輸出
原message
通過kafka後的message
中止任意兩個kafka只有有一臺kafka主機在運行就能保障日誌正常輸出
把標準輸出刪除寫入到elasticsearch便可