拉取7.3.2的elasticsearch鏡像。html
docker run -d --name elasticsearch --net ELS -p 9200:9200 -p 9300:9300 -e "discovery.type=single-node" elasticsearch:7.3.2前端
啓動後用java
docker container cp -a 容器ID:路徑 宿主機路徑
拷貝容器內的config文件到宿主機用來掛載。
刪除此時的容器,在拷貝出來的配置文件elasticsearch.yml 加上:
http.cors.enabled: true
http.cors.allow-origin: "*"
http.cors.allow-headers: Authorization,X-Requested-With,Content-Length,Content-Type 容許跨域node
xpack.security.enabled: true 開啓校驗。
mysql
完成版:sql
cluster.name: "docker-cluster"
network.host: 0.0.0.0
http.port: 9200
http.cors.enabled: true
http.cors.allow-origin: "*"
http.cors.allow-headers: Authorization,X-Requested-With,Content-Length,Content-Type
xpack.security.enabled: truedocker
docker run -d --name elasticsearch --net ELS -p 9200:9200 -p 9300:9300 -e "discovery.type=single-node" -e "ELASTIC_PASSWORD=Tsl@2018" -e "KIBANA_PASSWORD=Tsl@2018" -v /home/aa/elastic/config:/usr/share/elasticsearch/config elasticsearch:7.3.2
指定了elastic和kibana的密碼。而且掛載了配置文件。
拉取7.3.2的kibana鏡像。
一樣的
docker run -d --name kibana --net ELS -p 5601:5601 kibana:7.3.2 空啓動後,拷貝config配置文件夾到宿主機上。
修改配置文件kibana.yml。
添加配置 :
elasticsearch.hosts: [ "http://192.168.66.34:9200" ]
xpack.monitoring.ui.container.elasticsearch.enabled: true
elasticsearch.username: "kibana"
elasticsearch.password: "1111"
xpack.security.enabled: true.數據庫
完整版:json
server.name: kibana
server.host: "0"
elasticsearch.hosts: [ "http://192.168.66.34:9200" ]
xpack.monitoring.ui.container.elasticsearch.enabled: true
elasticsearch.username: "kibana"
elasticsearch.password: "1111"
xpack.security.enabled: true跨域
此時配置的用戶名 kibana只是kibana連接es的用戶名,並非kibana登錄的用戶名,登錄仍是須要最高權限的elasstic帳號登錄。
docker run -d --name kibana --net ELS -p 5601:5601 -v /home/aa/kibana/config:/usr/share/kibana/config kibana:7.3.2
拉取logstash的鏡像。
針對從數據庫抽取數據到es,7版本的logstash不用像5同樣須要指定啓動讀取配置文件了。而是有個專門的文件夾存放讀取寫入配置文件的。
pipeline文件夾存放讀取寫入配置conf文件的。
config 存放logstash的啓動配置
logstash.yml修改後完整版:
http.host: "0.0.0.0"
xpack.monitoring.elasticsearch.hosts: [ "http://192.168.66.34:9200" ]
xpack.management.elasticsearch.username: elastic
xpack.management.elasticsearch.password: 1111
xpack.monitoring.enabled: true
xpack.monitoring.elasticsearch.username: elastic
xpack.monitoring.elasticsearch.password: 1111
須要複製活掛載驅動jar到容器中,
docker run -d -p 5044:5044 -p 9600:9600 -it --name logstash -v /home/aa/logstash/config/:/usr/share/logstash/config/ -v /home/aa/logstash/pipeline:/usr/share/logstash/pipeline -v /home/aa/logstash/mysql/:/some/config-dir/ --network ELS logstash:7.3.2
這個版本在這樣啓動後,會一直報錯找不到驅動。
不是掛載位置的問題。後來查了很久說是容器內的java的classpath位置在/usr/share/logstash/logstash-core/lib/jars下,掛載在別的地方,儘管conf配置文件寫對了地址,依然是讀取不到的!!!
把驅動jar放在此文件夾下,conf裏的驅動地址空着而後啓動就能夠讀取到了。
應該最終elk三兄弟整合成一個docker-compose文件的,如今還沒學會。等等會了補充上。
input {
jdbc {
jdbc_driver_library => ""
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://192.168.66.34:3309/111?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8"
#&useSSL=true&useUnicode=true&characterEncoding=UTF-8"
jdbc_user => "root"
jdbc_password => "111"
#使用前驗證鏈接是否有效
jdbc_validate_connection => true
#多久進行鏈接有效驗證(4小時)
jdbc_validation_timeout => 14400
#鏈接失敗後最大重試次數
connection_retry_attempts => 50
#鏈接失敗後重試時間間隔
connection_retry_attempts_wait_time => 1
jdbc_page_size => "2000"
# 同步頻率(分 時 天 月 年),默認每分鐘同步一次
schedule => "* * * * *"
statement => " select sal.alarmID, vi.districtID, di.name districtName, de.streetID, st.name streetName,
de.committeeID, comm.name committeeName, sal.villageID, vi.name villageName, de.buildingID,
vi.name, sal.alarmCount, sal.address,
sal.deviceType,sal.alarmTypeName,sal.modelID,
sal.alarmLevel,sal.alarmState,sal.alarmTime,
sal.alarmContent,de.installAddr,sal.updateTime
from e_sense_alarm_log sal
left join e_device de on de.deviceID = sal.deviceID
left join b_village vi on vi.villageID = de.villageID
left join b_district di on di.districtID = vi.districtID
left join b_street st on st.streetID = vi.streetID
left join b_committee comm on comm.committeeID = vi.committeeID
WHERE sal.updateTime >= :sql_last_value"
#是否記錄上次執行結果, 若是爲真,將會把上次執行到的 tracking_column 字段的值記錄下來,保存到 last_run_metadata_path 指定的文件中
record_last_run => true
use_column_value => true
tracking_column => "updateTime"
tracking_column_type => "timestamp"
last_run_metadata_path => "/usr/share/logstash/last_record/logstash_alarm_last_time"
type => "alarm"
# 是否將 字段(column) 名稱轉小寫
lowercase_column_names => false
}
jdbc {
jdbc_driver_library => ""
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://192.168.66.34:3309/111?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8"
#&useSSL=true&useUnicode=true&characterEncoding=UTF-8"
jdbc_user => "root"
jdbc_password => "111"
#使用前驗證鏈接是否有效
jdbc_validate_connection => true
#多久進行鏈接有效驗證(4小時)
jdbc_validation_timeout => 14400
#鏈接失敗後最大重試次數
connection_retry_attempts => 50
#鏈接失敗後重試時間間隔
connection_retry_attempts_wait_time => 1
jdbc_page_size => "2000"
schedule => "* * * * *"
statement => " select de.deviceID, de.isDelete, vi.districtID, di.name districtName, de.streetID, st.name streetName,
de.committeeID, comm.name committeeName, de.villageID, vi.name villageName, de.buildingID,
de.installAddr as installadd,
de.type as devicetype, bu.buildingNo as buildingno, bu.name as buildingName,
de.productModel as productmodel, de.name, de.code as code, de.installTime as installtime,
de.state, de.updateTime as updatetime
from e_device de
left join b_building bu on de.buildingID = bu.buildingID
left join b_village vi on vi.villageID = de.villageID
left join b_district di on di.districtID = vi.districtID
left join b_street st on st.streetID = vi.streetID
left join b_committee comm on comm.committeeID = vi.committeeID
WHERE de.updateTime >= :sql_last_value"
#是否記錄上次執行結果, 若是爲真,將會把上次執行到的 tracking_column 字段的值記錄下來,保存到 last_run_metadata_path 指定的文件中
record_last_run => true
use_column_value => true
tracking_column => "updateTime"
tracking_column_type => "timestamp"
last_run_metadata_path => "/usr/share/logstash/last_record/logstash_device_last_time"
type => "device"
# 是否將 字段(column) 名稱轉小寫
lowercase_column_names => false
}
jdbc {
jdbc_driver_library => ""
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://192.168.66.34:3309/111?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8"
#&useSSL=true&useUnicode=true&characterEncoding=UTF-8"
jdbc_user => "root"
jdbc_password => "111"
#使用前驗證鏈接是否有效
jdbc_validate_connection => true
#多久進行鏈接有效驗證(4小時)
jdbc_validation_timeout => 14400
#鏈接失敗後最大重試次數
connection_retry_attempts => 50
#鏈接失敗後重試時間間隔
connection_retry_attempts_wait_time => 1
jdbc_page_size => "2000"
schedule => "* * * * *"
statement => " select al.accessLogID, vi.districtID, di.name districtName,vi.streetID, st.name streetName, vi.committeeID,
comm.name committeeName, al.villageID, vi.name villageName, al.buildingID as buildingid,bui.name buildName,
peo.peopleID, al.peopleName as peoplename,
peo.gender, peo.phoneNo as phoneno, al.credentialNo as credentialno,
lab.name as peoplelabel, bu.buildingNo as buildingno,
al.cardNo as cardno, al.updateTime as opentime, peo.headPic as headpic,
(case al.openType when '100101' then '刷門禁卡開門' when '100201' then '人臉識別開門' when '100301' then '手機藍牙開門'
when '100302' then '手機遠程開門' when '100303' then '電話按鍵開門' when '100401' then '出門按鈕開門'
when '100402' then '鍵盤密碼開門' when '100501' then '身份證開門' when '100601' then '訪客呼叫開門' end) opentype,
peo.livePic as livepic, peo.idPic as idpic , al.faceLogID faceLogID, io.name ioName, al.deviceID
from e_access_log al
left join p_people peo on peo.credentialNo =al.credentialNo
left join p_people_label pl on pl.peopleID = peo.peopleID
left join s_label lab on lab.labelID = pl.labelID
left join b_building bu on bu.buildingID = al.buildingID
left join b_village vi on vi.villageID = al.villageID
left join b_in_out io on io.ioID = al.ioID
left join b_district di on di.districtID = vi.districtID
left join b_street st on st.streetID = vi.streetID
left join b_committee comm on comm.committeeID = vi.committeeID
left join b_building bui on bui.buildingID = al.buildingID
WHERE al.updateTime >= :sql_last_value"
#是否記錄上次執行結果, 若是爲真,將會把上次執行到的 tracking_column 字段的值記錄下來,保存到 last_run_metadata_path 指定的文件中
record_last_run => true
use_column_value => true
tracking_column => "updateTime"
tracking_column_type => "timestamp"
last_run_metadata_path => "/usr/share/logstash/last_record/logstash_accessLog_last_time"
type => "accessLog"
# 是否將 字段(column) 名稱轉小寫
lowercase_column_names => false
}
jdbc {
jdbc_driver_library => ""
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://192.168.66.34:3309/111?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8"
#&useSSL=true&useUnicode=true&characterEncoding=UTF-8"
jdbc_user => "root"
jdbc_password => "111"
#使用前驗證鏈接是否有效
jdbc_validate_connection => true
#多久進行鏈接有效驗證(4小時)
jdbc_validation_timeout => 14400
#鏈接失敗後最大重試次數
connection_retry_attempts => 50
#鏈接失敗後重試時間間隔
connection_retry_attempts_wait_time => 1
jdbc_page_size => "2000"
schedule => "* * * * *"
statement => "select fl.faceLogID,io.type as faceinouttype, vi.districtID, di.name districtName, vi.streetID, st.name streetName,
vi.committeeID, comm.name committeeName, io.villageID, vi.name villageName, io.ioID as ioid, io.name, bid.deviceID,
fl.personType as persontype,
peo.peopleName as peoplename, peo.gender, peo.nation, peo.birthDate,
peo.phoneNo as phoneno, peo.credentialNo as credentialno,
peo.domiclleDetailAddress, peo.residenceDetailAddress,
sl.name as peoplelabel, fl.updateTime as facecapturetime, fl.bkgUrl as bkgurl,
fl.faceUrl as faceurl, peo.headPic as headpic, peo.livePic as livepic, peo.idPic as idpic ,
peo.political, peo.education, peo.maritialStatus, peo.origin, fl.faceSimilarity*100 faceSimilarity, peo.peopleType
from e_face_log fl
left join b_in_out io on io.ioID = fl.ioID
left join p_people peo on peo.credentialNo = fl.credentialNo
left join p_people_label pl on pl.peopleID = peo.peopleID
left join s_label sl on sl.labelID = pl.labelID
left join b_village vi on vi.villageID = io.villageID
left join b_inout_device bid on bid.ioID = io.ioID
left join b_district di on di.districtID = vi.districtID
left join b_street st on st.streetID = vi.streetID
left join b_committee comm on comm.committeeID = vi.committeeID
where fl.updateTime >= :sql_last_value
#and fl.faceSource = 0
"
#是否記錄上次執行結果, 若是爲真,將會把上次執行到的 tracking_column 字段的值記錄下來,保存到 last_run_metadata_path 指定的文件中
record_last_run => true
use_column_value => true
tracking_column => "updateTime"
tracking_column_type => "timestamp"
last_run_metadata_path => "/usr/share/logstash/last_record/logstash_wkface_last_time"
type => "wkface"
# 是否將 字段(column) 名稱轉小寫
lowercase_column_names => false
}
jdbc {
jdbc_driver_library => ""
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://192.168.66.34:3309/111?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8"
#&useSSL=true&useUnicode=true&characterEncoding=UTF-8"
jdbc_user => "root"
jdbc_password => "111"
#使用前驗證鏈接是否有效
jdbc_validate_connection => true
#多久進行鏈接有效驗證(4小時)
jdbc_validation_timeout => 14400
#鏈接失敗後最大重試次數
connection_retry_attempts => 50
#鏈接失敗後重試時間間隔
connection_retry_attempts_wait_time => 1
jdbc_page_size => "2000"
schedule => "* * * * *"
statement => "select pr.parkingReserveID, vi.districtID, di.name districtName, vi.streetID, st.name streetName, vi.committeeID,
comm.name committeeName, pr.villageID, vi.name villageName, io.ioID as inioid,
io.ioID as outioid, pr.inParkingLogID as inparkinglogid,
pr.outParkingLogID as outparkinglogid, pr.carBrand as cartype,
pr.plateNo as plateno, peo.peopleName as peoplename, peo.phoneNo as phoneno,
peo.credentialNo as credentialno, pr.insertTime as intime, pr.updateTime as outtime,
peo.headPic as headpic,
peo.livePic as livepic, peo.idPic as idpic, inlog.platePic as inplatepic,
outlog.platePic as outplatepic, inlog.minPlatePic as inplatepic,
outlog.minPlatePic as outplatepic, pr.isRegister
from e_parking_reserve pr
left join e_parking_channel pc on pc.parkingID = pr.parkingID
left join b_in_out io on io.ioID = pc.ioID
left join e_parking_car ec on ec.plateNo = pr.plateNo
left join p_people peo on peo.peopleID = ec.peopleID
left join e_parking_log inlog on inlog.parkingLogID = pr.inParkingLogID
left join e_parking_log outlog on outlog.parkingLogID = pr.outParkingLogID
left join b_village vi on vi.villageID = io.villageID
left join b_district di on di.districtID = vi.districtID
left join b_street st on st.streetID = vi.streetID
left join b_committee comm on comm.committeeID = vi.committeeID
where pr.updateTime >= :sql_last_value"
#是否記錄上次執行結果, 若是爲真,將會把上次執行到的 tracking_column 字段的值記錄下來,保存到 last_run_metadata_path 指定的文件中
record_last_run => true
use_column_value => true
tracking_column => "updateTime"
tracking_column_type => "timestamp"
last_run_metadata_path => "/usr/share/logstash/last_record/logstash_wkcar_last_time"
type => "wkcar"
# 是否將 字段(column) 名稱轉小寫
lowercase_column_names => false
}
}
output {
if [type] == "alarm"{
elasticsearch {
# ES的IP地址及端口
hosts => ["192.168.66.34:9200"]
user => "elastic"
password => "111"
# 索引名稱 可自定義
index => "alarmlogindex"
# 須要關聯的數據庫中有有一個id字段,對應類型中的id
document_id => "%{alarmID}"
document_type => "alarm"
}
}
if [type] == "device"{
elasticsearch {
# ES的IP地址及端口
hosts => ["192.168.66.34:9200"]
user => "elastic"
password => "111"
# 索引名稱 可自定義
index => "deviceindex"
# 須要關聯的數據庫中有有一個id字段,對應類型中的id
document_id => "%{deviceID}"
document_type => "device"
}
}
if [type] == "accessLog"{
elasticsearch {
# ES的IP地址及端口
hosts => ["192.168.66.34:9200"]
user => "elastic"
password => "111"
# 索引名稱 可自定義
index => "accesslogindex"
# 須要關聯的數據庫中有有一個id字段,對應類型中的id
document_id => "%{accessLogID}"
document_type => "accessLog"
}
}
if [type] == "wkface"{
elasticsearch {
# ES的IP地址及端口
hosts => ["192.168.66.34:9200"]
user => "elastic"
password => "111"
# 索引名稱 可自定義
index => "facelogindex"
# 須要關聯的數據庫中有有一個id字段,對應類型中的id
document_id => "%{faceLogID}"
document_type => "wkface"
}
}
if [type] == "wkcar"{
elasticsearch {
# ES的IP地址及端口
hosts => ["192.168.66.34:9200"]
user => "elastic"
password => "111"
# 索引名稱 可自定義
index => "parkingreservelogindex"
# 須要關聯的數據庫中有有一個id字段,對應類型中的id
document_id => "%{parkingReserveID}"
document_type => "wkcar"
}
}
stdout {
# JSON格式輸出
codec => json_lines
}
}
docker-compose.yml的elk配置:
提早把elk的3個config文件夾複製進相應的文件夾下。
logstash由於須要複製驅動進容器,因此須要自定義一個鏡像。
Dockerfile內容:
FROM logstash:7.2.0
MAINTAINER kf
ADD ./mysql/*****.jar /usr/share/logstash/logstash-core/lib/jars //複製驅動jar進鏡像
RUN mkdir last_record //容器內在當前目錄下建立文件夾
此處源文件須要是相對路徑。不能寫絕對路徑。
docker-compose.yml文件內容:
version: "3"
services:
elasticsearch:
image: elasticsearch:7.2.0
container_name: elastic
ports:
- 9200:9200
- 9300:9300
environment:
ELASTIC_PASSWORD: Root@2018
KIBANA_PASSWORD: Kibana@2018
LOGSTASH_PASSWORD: Logstash@2018
discovery_type: single-node
volumes:
- /root/data/elastic/config:/usr/share/elasticsearch/config
restart: always
kibana:
image: kibana:7.2.0
container_name: kibana
ports:
- 5601:5601
volumes:
- /root/data/kibana/config:/usr/share/kibana/config
restart: always
logstash:
image: logstash:7 自定義的鏡像
container_name: logstash
ports:
- 5044:5044
- 9600:9600
volumes:
- /root/data/logstash/config:/usr/share/logstash/config
- /root/data/logstash/pipeline:/usr/share/logstash/pipeline
restart: always
networks:
default:
external:
name: ELS
執行docker-compose up -d 報錯須要建立network 根據提示建立完成後再次執行便可。
發現docker-compose命令啓動的es會報錯。
最後仍是用了docker run的方式啓動。
沒有找到緣由。有搞成了的麻煩留言給我,謝謝
後續。請教了別人,着了道緣由。docker-compose啓動時 已集羣方式啓動的,雖然配了單節點啓動的環境變量,但仍是不會生效。discovery_type: single-node
此時要把此變量加在掛載的elasticsearch.yml文件中。完整版:
cluster.name: "docker-cluster"
discovery.type: "single-node"
network.host: 0.0.0.0
http.port: 9200
http.cors.enabled: true
http.cors.allow-origin: "*"
http.cors.allow-headers: Authorization,X-Requested-With,Content-Length,Content-Type
xpack.security.enabled: true
#xpack.security.transport.ssl.enabled: true
此時docker-compose.yml完整版是:
version: "3.7"
services:
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:7.3.2
container_name: elastic
ports:
- 9200:9200
- 9300:9300
environment:
#- discovery_type=single-node
- ELASTIC_PASSWORD=Root@
- KIBANA_PASSWORD=Kibana@
- LOGSTASH_PASSWORD=Logstash@
volumes:
- ./elastic/config:/usr/share/elasticsearch/config
restart: always
kibana:
image: kibana:7.3.2
container_name: kibana
ports:
- 5601:5601
volumes:
- /data/elk/kibana/config:/usr/share/kibana/config
depends_on:
- elasticsearch
restart: always
logstash:
image: logstash:7
container_name: logstash
ports:
- 5044:5044
- 9600:9600
volumes:
- /data/elk/logstash/config:/usr/share/logstash/config
- /data/elk/logstash/pipeline:/usr/share/logstash/pipeline
depends_on:
- elasticsearch
restart: always
networks:
default:
external:
name: ELS
此時既能夠 docker-compose up -d --build 啓動elk了。
此時存入es的數據會存在時區致使的時間差8小時問題。能夠在docker-compose的es的環境變量加入:- TZ=Asia/Shanghai
便可將插入的時間字段和數據庫一致。但此時的時間會是UTC格式的。前端轉變格式後 時間又會默認加8小時。因此儘可能在es這裏存入時時間格式也轉爲普通的YYYY-MM-DD HH:mm:ss格式。
查到一個函數:DATE_FORMAT(sal.alarmTime,'%Y-%m-%d %T') 存儲類型type爲text。
還有一種方式看來的,沒有通過測試:
將mysql中的mytime數據在logstash中做一個8小時追加
filter{ ruby
{
code => "event.set('mytime', event.get('mytime').time.localtime + 8*60*60)"
}}
————————————————
轉換時間格式爲text後,查詢會報錯:
此時須要在kibana執行:
PUT facelogindex/_mapping
{
"properties": {
"facecapturetime": {
"type": "text",
"fielddata": true
}
}
}
facelogindex爲索引, facecapturetime爲字段名