ELK是指Elasticsearch + Logstash + Kibaba三個組件的組合。Elasticsearch主要充當一個全文檢索和分析引擎,Logstash是一款分佈式日誌收集系統,Kibana能夠爲這個平臺提供可視化的Web界面。node
首先從官網(https://www.elastic.co/cn/downloads)下載安裝包linux
elasticsearch-6.2.3.tar.gzbootstrap
kibana-6.2.3-linux-x86_64.tar.gzvim
logstash-6.2.3.tar.gz瀏覽器
jdk-8u151-linux-x64.tar.gzbash
因爲elk官方指定使用oracle的jdk8,若是操做系統自帶有openjdk能夠先進行卸載而後再安裝jdk。oracle
jdk安裝包解壓ssh
tar -zxvf jdk-8u151-linux-x64.tar.gz
配置路徑,在/etc/profile文件配置curl
vim /etc/profile
添加配置elasticsearch
export JAVA_HOME=/home/jdk1.8.0_151 export PATH=$PATH:$JAVA_HOME/bin
是配置生效
source /etc/profile
修改/etc/sysctl.conf 和/etc/security/limits.conf 配置
vi /etc/sysctl.conf #添加配置 vm.max_map_count = 655360 #使配置生效 sysctl -p
vi /etc/security/limits.conf #添加配置 * soft nofile 65536 * hard nofile 131072 * soft nproc 65536 * hard nproc 131072
解壓elasticsearch-6.2.3.tar.gz到/home/eselk/elk目錄下
tar -zxvf elasticsearch-6.2.3.tar.gz
進入到elasticsearch的配置目錄(/home/eselk/elk/elasticsearch-6.2.3/config
)下修改配置文件elasticsearch.yml
cluster.name: es_cluster #集羣的名稱 node.name: secms-elk1 #節點名稱 path.data: /home/eselk/elk/data #數據存儲的目錄(多個目錄使用逗號分隔) path.logs: /home/eselk/elk/logs #日誌路徑 network.host: 0.0.0.0 http.port: 9200 #端口默認9200 bootstrap.mlockall: true #鎖住內存,使內存不會分配至交換區(swap)
啓動es,查看是否啓動成功
./bin/elasticsearch -d
執行命令
curl http://10.118.213.223:9200
能夠看到輸出
{ "name" : "secms-elk1", "cluster_name" : "es_cluster", "cluster_uuid" : "PetmCJcwQX-RAwVdWiVC-Q", "version" : { "number" : "6.2.3", "build_hash" : "c59ff00", "build_date" : "2018-03-13T10:06:29.741383Z", "build_snapshot" : false, "lucene_version" : "7.2.1", "minimum_wire_compatibility_version" : "5.6.0", "minimum_index_compatibility_version" : "5.0.0" }, "tagline" : "You Know, for Search" }
es安裝成功,將es安裝複製到到其餘機子上面,修改配置文件elasticsearch.yml中node.name節點名字以後啓動各個節點,es系統會自動將cluster name相同的機器組成一個集羣。
解壓logstash-6.2.3.tar.gz 到/home/eselk/elk目錄下
tar -zxvf logstash-6.2.3.tar.gz
編輯配置文件,這裏在logstash安裝目錄下新建文件logstash-simple.conf,文件內容
input{ #接收kafka中數據 kafka{ bootstrap_servers => "10.118.213.175:9092,10.118.213.219:9092" #kafka集羣 topics=>["sentinel_logs","siem-alarm","siem-audit","siem-ddos","siem-fw","siem-ips","siem-risk","siem-syflog","siem-waf"] #接收主題中數據 codec => plain consumer_threads => 4 decorate_events => true } } output{ #輸出數據到es中 elasticsearch{ hosts => ["10.118.213.223:9200"] index => "all-log" } }
保存文件以後啓動logstash
./bin/logstash -f logstash-simple.conf > /dev/null &
查看logstash進程,確認logstash是否啓動成功
ps -ef|grep logstash
下面咱們在另外一個機子上面也安裝logstash,用來接收用戶操做命令道es中,按照一樣步驟解壓logstash到指定目錄,而後新建配置文件logstash-simple.conf文件內容
input { #讀取文件中數據 file { path=> [ "/var/log/history.log" ] type=>"history_log" } } output { elasticsearch { hosts => "10.118.213.223:9200" index => "ssh_opt" } }
按照前面步驟啓動logstash。
解壓kibana-6.2.3-linux-x86_64.tar.gz到指定目錄/home/eselk/elk
修改conf中配置文件
#訪問端口 server.port: 5601 #全部站點均可以訪問 server.host: "0.0.0.0" #指到es集羣master節點 elasticsearch.url: "http://10.118.213.223:9200" #kibana是一個小系統,本身也須要存儲數據(將kibana的數據保存到.kibana的索引中,會在ES裏面建立一個.kibana) kibana.index: ".kibana"
啓動kibaba
./bin/kibana > /dev/null &
能夠經過http://10.118.213.223:5601/在瀏覽器中訪問
若是咱們要查找logstash發送過來的數據,須要先配置index
新建index匹配規則
一樣操做再新建收集用戶操做命令的index匹配規則ssh_opt*,這樣能夠在Discover頁面查詢搜索數據。如今無論是all-log*或者ssh_opt*都是不能匹配到數據的,由於咱們尚未發送數據到logstash。
1.咱們經過kaka客戶端程序向kafuka主題中寫入日誌數據,而後查詢all-log*匹配規則能夠發現查詢到接收到的數據。
2.收集用戶操做命令數據,咱們首先要將用戶的操做命令寫入/var/log/history.log目錄下,而後再經過logstash進行收集。
在主機/etc/bashrc添加配置
vim /etc/bashrc
在文件末尾添加以下內容(注意先貼到記事本中,下面的指令只有兩行):
export HISTORY_FILE="/var/log/history.log" export PROMPT_COMMAND='{ thisHistID=`history 1|awk "{print\\$1}"`;lastCommand=`history 1| awk "{\\$1=\"\" ;print}"`;user=`id $(whoami)`;whoStr=(`who -u am i`);realUser=${whoStr[0]};logMonth=${whoStr[2]};logDay=${whoStr[3]};logTime=${whoStr[4]};pid=${whoStr[6]};ip=${whoStr[7]};if [ ${thisHistID}x != ${lastHistID}x ];then echo -E `date "+%Y/%m/%d %H:%M:%S"` $user\($realUser\)@$ip[PID:$pid][LOGIN:$logMonth $logDay $logTime] --- $lastCommand ;lastHistID=$thisHistID;fi; } >> $HISTORY_FILE'
執行source /etc/bashrc使配置生效。新建history.log文件並賦權
[root@rhserver ~]# cd /var/log [root@rhserver ~]# touch history.log [root@rhserver ~]# chmod 777 history.log
再執行一些命令查看ssh_opt*匹配到數據剛執行的測試命令