在咱們使用mysql和elasticsearch結合使用的時候,可能會有一些同步的需求,想要數據庫和elasticsearch同步的方式其實有不少。java
可使用canal,它主要是監聽mysql的binlog 日誌,能夠監聽數據的一些變化,若是數據發生了變化咱們須要作什麼邏輯,這些都是能夠人爲實現的,它是將本身模擬成一個slave節點,當master節點的數據發生變化是,它可以看到數據的變化。可是缺點也很明顯,因爲是java實現的,因此比較重,還須要使用zookeeper等集羣管理工具來管理canal節點,因此本文暫時不介紹這種方式。mysql
本文主要介紹使用Logstash JDBC的方式來實現同步,這個方式同步比較簡單。固然它有一些缺點,就是有點耗內存(內存大就當我沒說😂)。git
GET /myapp/_search
{
"_source": "id"
}
複製代碼
響應結果github
{
"took": 5,
"timed_out": false,
"_shards": {
"total": 5,
"successful": 5,
"skipped": 0,
"failed": 0
},
"hits": {
"total": 3,
"max_score": 1,
"hits": [
{
"_index": "myapp",
"_type": "doc",
"_id": "2",
"_score": 1,
"_source": {
"id": 2
}
},
{
"_index": "myapp",
"_type": "doc",
"_id": "1",
"_score": 1,
"_source": {
"id": 1
}
},
{
"_index": "myapp",
"_type": "doc",
"_id": "3",
"_score": 1,
"_source": {
"id": 3
}
}
]
}
}
複製代碼
mysql> select * from user;
+------+----------+-------------+------------+-------+------------+---------+
| id | name | phone | password | title | content | article |
+------+----------+-------------+------------+-------+------------+---------+
| 1 | zhnagsan | 181222 | 123123 | ??? | ?????? | ???IE |
| 2 | lishi | 181222113 | 232123123 | 23??? | 234?????? | 4???IE |
| 3 | wangwu | 18111214204 | 1337547531 | ????? | lc content | Java |
+------+----------+-------------+------------+-------+------------+---------+
3 rows in set (0.00 sec)
mysql>
複製代碼
如今咱們執行一個sql向裏面添加一條數據sql
mysql> insert into user (id, name, phone, password, title, content, article) values (4, "lc", "123456789", "123456789", "測試", "測試內容", "Java")
Query OK, 1 row affected (0.00 sec)
mysql>
複製代碼
GET /myapp/_search
{
"_source": "id"
}
複製代碼
響應結果數據庫
{
"took": 2,
"timed_out": false,
"_shards": {
"total": 5,
"successful": 5,
"skipped": 0,
"failed": 0
},
"hits": {
"total": 4,
"max_score": 1,
"hits": [
{
"_index": "myapp",
"_type": "doc",
"_id": "2",
"_score": 1,
"_source": {
"id": 2
}
},
{
"_index": "myapp",
"_type": "doc",
"_id": "4",
"_score": 1,
"_source": {
"id": 4
}
},
{
"_index": "myapp",
"_type": "doc",
"_id": "1",
"_score": 1,
"_source": {
"id": 1
}
},
{
"_index": "myapp",
"_type": "doc",
"_id": "3",
"_score": 1,
"_source": {
"id": 3
}
}
]
}
}
複製代碼
Virtual machine:VMware 11.0.2 Operator System:CentOS release 6.9 (Final) ElasticSearch:6.4.0 Kibana版本:6.4.0 LogStash版本:6.6.1 JDK版本:1.8.0_181 MySQL版本: 5.1.73(這個版本是用yum直接安裝的,其實這個教程和mysql版本沒有多大關係,由於到時候是使用jdbc的驅動包來鏈接數據庫的) logstash jdbc驅動包版本 5.1.46vim
本文暫時只提供下載地址(暫時偷個大懶 😄,安裝順序是按照連接排列順序),logstash會給予詳細使用說明,它是不須要安裝的是須要解壓就好了。bash
Virtual machine:VMware 11.0.2服務器
Operator System:CentOS release 6.9 (Final)微信
若是環境都安裝好了能夠看下面的了,沒安裝好也能夠看😁
Logstash是一個開源數據收集引擎,具備實時管道功能。Logstash能夠動態地未來自不一樣數據源的數據統一塊兒來,並將數據標準化到你所選擇的目的地。
Logstash是一個開源的服務器端數據處理管道,能夠同時從多個數據源獲取數據,並對其進行轉換,而後將其發送到你最喜歡的「存儲」。(固然,咱們最喜歡的是Elasticsearch)
數據每每以各類各樣的形式,或分散或集中地存在於不少系統中。Logstash 支持各類輸入選擇 ,能夠在同一時間從衆多經常使用來源捕捉事件。可以以連續的流式傳輸方式,輕鬆地從您的日誌、指標、Web 應用、數據存儲以及各類 AWS 服務採集數據。
數據從源傳輸到存儲庫的過程當中,Logstash 過濾器可以解析各個事件,識別已命名的字段以構建結構,並將它們轉換成通用格式,以便更輕鬆、更快速地分析和實現商業價值。 Logstash 可以動態地轉換和解析數據,不受格式或複雜度的影響:
儘管 Elasticsearch 是咱們的首選輸出方向,可以爲咱們的搜索和分析帶來無限可能,但它並不是惟一選擇。Logstash 提供衆多輸出選擇,您能夠將數據發送到您要指定的地方,而且可以靈活地解鎖衆多下游用例。
接下來,從命令行輸入以下命令
bin/logstash -e 'input { stdin {} } output { stdout {} }'
選項 -e 的意思是容許你從命令行指定配置
複製代碼
當啓動完成時,會等待你的輸入,你能夠輸入hello world
試試,它會給你一下信息的回饋。
首先,將咱們剛纔給予的下載連接裏面的jdbc驅動包放到logstash目錄裏面來
解壓這個文件
[root@localhost logstash-6.6.1]# unzip mysql-connector-java-5.1.46.zip
複製代碼
input {
jdbc {
# jdbc驅動包位置
jdbc_driver_library => "/mnt/logstash-6.6.1/mysql-connector-java-5.1.46/mysql-connector-java-5.1.46-bin.jar"
# 要使用的驅動包類,有過java開發經驗的應該很熟悉這個了,不一樣的數據庫調用的類不同。
jdbc_driver_class => "com.mysql.jdbc.Driver"
# myqsl數據庫的鏈接信息
jdbc_connection_string => "jdbc:mysql://0.0.0.0:3306/myapp"
# mysql用戶
jdbc_user => "root"
# mysql密碼
jdbc_password => "root"
# 定時任務, 多久執行一次查詢, 默認一分鐘,若是想要沒有延遲,可使用 schedule => "* * * * * *"
schedule => "* * * * *"
# 你要執行的語句
statement => "select * from user"
}
}
output {
# 將數據輸出到ElasticSearch中
elasticsearch {
# es ip加端口
hosts => ["0.0.0.0:9200"]
# es文檔索引
index => "myusreinfo"
# es文檔數據的id,%{id}表明的是用數據庫裏面記錄的id做爲文檔的id
document_id => "%{id}"
}
}
複製代碼
上面咱們已經生成了這個mysqlsyn.conf這個文件,接下來咱們就使用logstash來進行數據同步吧,同步以前先看下咱們的數據庫的user表的數據。
查看mysql數據, 可以看到咱們仍是隻有剛開始的4條數據
檢查ElasticSearch是否有myusreinfo
這個索引,能夠從圖中看到咱們只有myapp
這個索引。
帶上配置文件啓動logstash
[root@localhost logstash-6.6.1]# ./bin/logstash -f config/mysqlsyn.conf
-f 指定配置文件啓動
複製代碼
啓動成功,而且已經在同步數據了,這個同步默認是每分鐘執行一次,能夠看到5分鐘執行了5次
上面已經啓動了同步,如今咱們去看看ElasticSearch裏面的是否有數據,從圖中能夠看到myusrinfo已經同步到es裏面了,而且能夠看到docs.count
的數量就是咱們剛纔數據庫裏面數據的數量。
咱們像數據庫裏面增長一條數據,而後看下ElasticSearch的數據是否會改變
查看ElasticSearch裏面是否有剛纔添加的數據, 從圖中能夠看到已經有5條數據了
先看ElasticSearch裏面id爲5的數據,能夠看到name爲yinya
數據庫修改一條id爲5的數據,看看ElasticSearch的數據變化
查看ElasticSearch裏面數據是否已經更改,能夠看到數據已經更改了
刪除兩條數據看看ElasticSearch數據的變化, 刪除了id爲1和2的兩條數據
查看ElasticSearch裏面數據是否已經更改, 然而數據並無變
到目前爲止,全部google,stackoverflow,elastic.co,github上面搜索的插件和實時同步的信息,告訴咱們:目前同步delete尚未好的解決方案。 折中的解決方案以下: 方案探討:discuss.elastic.co/t/delete-el…
stackoverflow.com/questions/3…
方案一 在原有的mysql數據庫表中,新增一個字段status, 默認值爲ok,若是要刪除數據,實則用update操做,status改成deleted, 這樣,就能同步到es中。es中以status狀態值區分該行數據是否存在。deleted表明已刪除,ok表明正常。
方案二 使用go elasticsearch 插件實現同步。
歡迎你們加我微信coding4life,或掃描二維碼,一塊兒討論技術~