ElasticSearch + Logstash進行數據庫同步

介紹

在咱們使用mysql和elasticsearch結合使用的時候,可能會有一些同步的需求,想要數據庫和elasticsearch同步的方式其實有不少。java

可使用canal,它主要是監聽mysql的binlog 日誌,能夠監聽數據的一些變化,若是數據發生了變化咱們須要作什麼邏輯,這些都是能夠人爲實現的,它是將本身模擬成一個slave節點,當master節點的數據發生變化是,它可以看到數據的變化。可是缺點也很明顯,因爲是java實現的,因此比較重,還須要使用zookeeper等集羣管理工具來管理canal節點,因此本文暫時不介紹這種方式。mysql

本文主要介紹使用Logstash JDBC的方式來實現同步,這個方式同步比較簡單。固然它有一些缺點,就是有點耗內存(內存大就當我沒說😂)。git

最終效果展現

  1. 先看下ElasticSearch的數據, 這要好分辨效果,從響應結果能夠看到如今有id爲1,2,3的三條數據。 執行查詢語句
GET /myapp/_search
{
  "_source": "id"
}
複製代碼

響應結果github

{
  "took": 5,
  "timed_out": false,
  "_shards": {
    "total": 5,
    "successful": 5,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": 3,
    "max_score": 1,
    "hits": [
      {
        "_index": "myapp",
        "_type": "doc",
        "_id": "2",
        "_score": 1,
        "_source": {
          "id": 2
        }
      },
      {
        "_index": "myapp",
        "_type": "doc",
        "_id": "1",
        "_score": 1,
        "_source": {
          "id": 1
        }
      },
      {
        "_index": "myapp",
        "_type": "doc",
        "_id": "3",
        "_score": 1,
        "_source": {
          "id": 3
        }
      }
    ]
  }
}
複製代碼
  1. 如今咱們來修改增長一條數據,看看es數據的變化 這裏是數據庫如今的數據, 能夠看到裏面有3條記錄。
mysql> select * from user;
+------+----------+-------------+------------+-------+------------+---------+
| id   | name     | phone       | password   | title | content    | article |
+------+----------+-------------+------------+-------+------------+---------+
|    1 | zhnagsan | 181222      | 123123     | ???   | ??????     | ???IE   |
|    2 | lishi    | 181222113   | 232123123  | 23??? | 234??????  | 4???IE  |
|    3 | wangwu   | 18111214204 | 1337547531 | ????? | lc content | Java    |
+------+----------+-------------+------------+-------+------------+---------+
3 rows in set (0.00 sec)
mysql>
複製代碼

如今咱們執行一個sql向裏面添加一條數據sql

mysql> insert into user (id, name, phone, password, title, content, article) values (4, "lc", "123456789", "123456789", "測試", "測試內容", "Java") 
Query OK, 1 row affected (0.00 sec)
mysql>
複製代碼
  1. 在次執行查詢語句,看看es的數據變化,能夠看到已經多了一條id爲4的數據了,中間這個同步默認會有1分鐘的延遲。 執行搜索
GET /myapp/_search
{
  "_source": "id"
}
複製代碼

響應結果數據庫

{
  "took": 2,
  "timed_out": false,
  "_shards": {
    "total": 5,
    "successful": 5,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": 4,
    "max_score": 1,
    "hits": [
      {
        "_index": "myapp",
        "_type": "doc",
        "_id": "2",
        "_score": 1,
        "_source": {
          "id": 2
        }
      },
      {
        "_index": "myapp",
        "_type": "doc",
        "_id": "4",
        "_score": 1,
        "_source": {
          "id": 4
        }
      },
      {
        "_index": "myapp",
        "_type": "doc",
        "_id": "1",
        "_score": 1,
        "_source": {
          "id": 1
        }
      },
      {
        "_index": "myapp",
        "_type": "doc",
        "_id": "3",
        "_score": 1,
        "_source": {
          "id": 3
        }
      }
    ]
  }
}
複製代碼

環境

Virtual machine:VMware 11.0.2 Operator System:CentOS release 6.9 (Final) ElasticSearch:6.4.0 Kibana版本:6.4.0 LogStash版本:6.6.1 JDK版本:1.8.0_181 MySQL版本: 5.1.73(這個版本是用yum直接安裝的,其實這個教程和mysql版本沒有多大關係,由於到時候是使用jdbc的驅動包來鏈接數據庫的) logstash jdbc驅動包版本 5.1.46vim

獲取須要的環境

本文暫時只提供下載地址(暫時偷個大懶 😄,安裝順序是按照連接排列順序),logstash會給予詳細使用說明,它是不須要安裝的是須要解壓就好了。bash

Virtual machine:VMware 11.0.2服務器

Operator System:CentOS release 6.9 (Final)微信

JDK版本:1.8.0_181

ElasticSearch:6.4.0

Kibana版本:6.4.0

Logstash版本:6.6.1

logStash驅動包

使用logstash

若是環境都安裝好了能夠看下面的了,沒安裝好也能夠看😁

logstash 介紹

Logstash是一個開源數據收集引擎,具備實時管道功能。Logstash能夠動態地未來自不一樣數據源的數據統一塊兒來,並將數據標準化到你所選擇的目的地。

集中、轉換和存儲你的數據

Logstash是一個開源的服務器端數據處理管道,能夠同時從多個數據源獲取數據,並對其進行轉換,而後將其發送到你最喜歡的「存儲」。(固然,咱們最喜歡的是Elasticsearch)

輸入:採集各類樣式、大小和來源的數據

數據每每以各類各樣的形式,或分散或集中地存在於不少系統中。Logstash 支持各類輸入選擇 ,能夠在同一時間從衆多經常使用來源捕捉事件。可以以連續的流式傳輸方式,輕鬆地從您的日誌、指標、Web 應用、數據存儲以及各類 AWS 服務採集數據。

過濾器:實時解析和轉換數據

數據從源傳輸到存儲庫的過程當中,Logstash 過濾器可以解析各個事件,識別已命名的字段以構建結構,並將它們轉換成通用格式,以便更輕鬆、更快速地分析和實現商業價值。 Logstash 可以動態地轉換和解析數據,不受格式或複雜度的影響:

  1. 利用 Grok 從非結構化數據中派生出結構
  2. 從 IP 地址破譯出地理座標
  3. 將 PII 數據匿名化,徹底排除敏感字段
  4. 總體處理不受數據源、格式或架構的影響

輸出:選擇你的存儲,導出你的數據

儘管 Elasticsearch 是咱們的首選輸出方向,可以爲咱們的搜索和分析帶來無限可能,但它並不是惟一選擇。Logstash 提供衆多輸出選擇,您能夠將數據發送到您要指定的地方,而且可以靈活地解鎖衆多下游用例。

安裝logstash

首先,讓咱們經過最基本的Logstash管道來測試一下剛纔安裝的Logstash Logstash管道有兩個必需的元素,輸入和輸出,以及一個可選元素過濾器。輸入插件從數據源那裏消費數據,過濾器插件根據你的指望修改數據,輸出插件將數據寫入目的地。

接下來,從命令行輸入以下命令

bin/logstash -e 'input { stdin {} } output { stdout {} }'
選項 -e 的意思是容許你從命令行指定配置
複製代碼

當啓動完成時,會等待你的輸入,你能夠輸入hello world試試,它會給你一下信息的回饋。

使用logstash進行Mysql和ElasticSearch的同步

準備JDBC驅動包

  1. 首先,將咱們剛纔給予的下載連接裏面的jdbc驅動包放到logstash目錄裏面來

  2. 解壓這個文件

[root@localhost logstash-6.6.1]# unzip mysql-connector-java-5.1.46.zip
複製代碼

生成mysqlsyn.conf文件

  1. 進入config目錄,建立文件mysqlsyn.conf
  2. 使用vim編輯器打開這個文件,並向裏面添加如下內容,而且保存退出。
input {
			jdbc {
			 		# jdbc驅動包位置
					jdbc_driver_library => "/mnt/logstash-6.6.1/mysql-connector-java-5.1.46/mysql-connector-java-5.1.46-bin.jar"
					# 要使用的驅動包類,有過java開發經驗的應該很熟悉這個了,不一樣的數據庫調用的類不同。
					jdbc_driver_class => "com.mysql.jdbc.Driver"
					 # myqsl數據庫的鏈接信息
					jdbc_connection_string => "jdbc:mysql://0.0.0.0:3306/myapp"
					 # mysql用戶
					jdbc_user => "root"
					 # mysql密碼
					jdbc_password => "root"
					# 定時任務, 多久執行一次查詢, 默認一分鐘,若是想要沒有延遲,可使用 schedule => "* * * * * *"
					schedule => "* * * * *"
					 # 你要執行的語句
					statement => "select * from user"
			}
	}

	output {
			# 將數據輸出到ElasticSearch中
			  elasticsearch {
			  		# es ip加端口
					hosts => ["0.0.0.0:9200"]
					# es文檔索引
					index => "myusreinfo"
					# es文檔數據的id,%{id}表明的是用數據庫裏面記錄的id做爲文檔的id
					document_id => "%{id}"
	  }
	}
複製代碼

啓動logstash進行同步

上面咱們已經生成了這個mysqlsyn.conf這個文件,接下來咱們就使用logstash來進行數據同步吧,同步以前先看下咱們的數據庫的user表的數據。

  1. 查看mysql數據, 可以看到咱們仍是隻有剛開始的4條數據

  2. 檢查ElasticSearch是否有myusreinfo這個索引,能夠從圖中看到咱們只有myapp這個索引。

  3. 帶上配置文件啓動logstash

[root@localhost logstash-6.6.1]# ./bin/logstash -f config/mysqlsyn.conf
-f 指定配置文件啓動
複製代碼

啓動成功,而且已經在同步數據了,這個同步默認是每分鐘執行一次,能夠看到5分鐘執行了5次

檢查同步效果

  1. 上面已經啓動了同步,如今咱們去看看ElasticSearch裏面的是否有數據,從圖中能夠看到myusrinfo已經同步到es裏面了,而且能夠看到docs.count的數量就是咱們剛纔數據庫裏面數據的數量。

  2. 咱們像數據庫裏面增長一條數據,而後看下ElasticSearch的數據是否會改變

  3. 查看ElasticSearch裏面是否有剛纔添加的數據, 從圖中能夠看到已經有5條數據了

  4. 先看ElasticSearch裏面id爲5的數據,能夠看到name爲yinya

  5. 數據庫修改一條id爲5的數據,看看ElasticSearch的數據變化

  6. 查看ElasticSearch裏面數據是否已經更改,能夠看到數據已經更改了

  7. 刪除兩條數據看看ElasticSearch數據的變化, 刪除了id爲1和2的兩條數據

  8. 查看ElasticSearch裏面數據是否已經更改, 然而數據並無變

對delete操做的實時同步潑冷水

到目前爲止,全部google,stackoverflow,elastic.co,github上面搜索的插件和實時同步的信息,告訴咱們:目前同步delete尚未好的解決方案。 折中的解決方案以下: 方案探討:discuss.elastic.co/t/delete-el…

stackoverflow.com/questions/3…

  1. 方案一 在原有的mysql數據庫表中,新增一個字段status, 默認值爲ok,若是要刪除數據,實則用update操做,status改成deleted, 這樣,就能同步到es中。es中以status狀態值區分該行數據是否存在。deleted表明已刪除,ok表明正常。

  2. 方案二 使用go elasticsearch 插件實現同步。

歡迎你們加我微信coding4life,或掃描二維碼,一塊兒討論技術~

相關文章
相關標籤/搜索