Mysql數據同步Elasticsearch方案總結

前言

要經過elasticsearch實現數據檢索,首先要將數據導入elasticsearch,並實現數據源與elasticsearch數據同步.這裏使用的數據源是Mysql數據庫.目前mysql與elasticsearch經常使用的同步機制大可能是基於插件實現的,經常使用的插件包括:logstash-input-jdbc,go-mysql-elasticsearch, elasticsearch-jdbc。 html

插件優缺點對比

1. logstash-input-jdbc

logstash官方插件,集成在logstash中,下載logstash便可,經過配置文件實現mysql與elasticsearch數據同步java

優勢mysql

  • 能實現mysql數據全量和增量的數據同步,且能實現定時同步.
  • 版本更新迭代快,相對穩定.
  • 做爲ES固有插件logstash一部分,易用

缺點git

  • 不能實現同步刪除操做,MySQL數據刪除後Elasticsearch中數據仍存在.
  • 同步最短期差爲一分鐘,一分鐘數據同步一次,沒法作到實時同步.

2. go-mysql-elasticsearch

go-mysql-elasticsearch 是國內做者開發的一款插件github

優勢web

  • 能實現mysql數據增長,刪除,修改操做的實時數據同步

缺點sql

  • 沒法實現數據全量同步Elasticsearch
  • 仍處理開發、相對不穩定階段

3. elasticsearch-jdbc

目前最新的版本是2.3.4,支持的ElasticSearch的版本爲2.3.4, 未實踐數據庫

優勢apache

  • 能實現mysql數據全量和增量的數據同步.

缺點編程

  • 目前最新的版本是2.3.4,支持的ElasticSearch的版本爲2.3.4
  • 不能實現同步刪除操做,MySQL數據刪除後Elasticsearch中數據仍存在.

方案一:logstash-input-jdbc實現mysql數據庫與elasticsearch同步

1.安裝

logstash5.x以後,集成了logstash-input-jdbc插件。安裝logstash後經過命令安裝logstash-input-jdbc插件

cd /logstash-6.4.2/bin
./logstash-plugin install logstash-input-jdbc

2.配置

在logstash-6.4.2/config文件夾下新建jdbc.conf,配置內容以下

# 輸入部分
input {
  stdin {}
  jdbc {
    # mysql數據庫驅動
    jdbc_driver_library => "/usr/local/logstash-6.4.2/config/mysql-connector-java-5.1.30.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    # mysql數據庫連接,數據庫名
    jdbc_connection_string => "jdbc:mysql://localhost:3306/octopus"
    # mysql數據庫用戶名,密碼
    jdbc_user => "root"
    jdbc_password => "12345678"
    # 設置監聽間隔  各字段含義(分、時、天、月、年),所有爲*默認含義爲每分鐘更新一次
    schedule => "* * * * *"
    # 分頁
    jdbc_paging_enabled => "true"
    # 分頁大小
    jdbc_page_size => "50000"
    # sql語句執行文件,也可直接使用 statement => 'select * from t_school_archives_fold create_time >= 
                                :sql_last_value order by create_time limit 200000'
    statement_filepath => "/usr/local/logstash-6.4.2/config/jdbc.sql"
    # elasticsearch索引類型名
    type => "t_employee"
  }
}

# 過濾部分(不是必須項)
filter {
    json {
        source => "message"
        remove_field => ["message"]
    }
}

# 輸出部分
output {
    elasticsearch {
        # elasticsearch索引名
        index => "octopus"
        # 使用input中的type做爲elasticsearch索引下的類型名
        document_type => "%{type}"   # <- use the type from each input
        # elasticsearch的ip和端口號
        hosts => "localhost:9200"
        # 同步mysql中數據id做爲elasticsearch中文檔id
        document_id => "%{id}"
    }
    stdout {
        codec => json_lines
    }
}

# 注: 使用時請去掉此文件中的註釋,否則會報錯

在logstash-6.4.2/config 目錄下新建jdbc.sql文件

select * from t_employee

3.運行

cd logstash-6.4.2
# 檢查配置文件語法是否正確
bin/logstash -f config/jdbc.conf --config.test_and_exit
# 啓動
bin/logstash -f config/jdbc.conf --config.reload.automatic

--config.reload.automatic: 會自動從新加載配置文件內容

在kibana中建立索引後查看同步數據

PUT octopus
GET octopus/_search

 

方案二: go-mysql-elasticsearch實現mysql數據庫與elasticsearch同步

1. mysql binlog日誌

go-mysql-elasticsearch經過mysql中binlog日誌實現數據增長,刪除,修改同步elasticsearch

mysql的binlog日誌主要用於數據庫的主從複製與數據恢復。binlog中記錄了數據的增刪改查操做,主從複製過程當中,主庫向從庫同步binlog日誌,從庫對binlog日誌中的事件進行重放,從而實現主從同步。
mysql binlog日誌有三種模式,分別爲:

ROW:   記錄每一行數據被修改的狀況,可是日誌量太大
STATEMENT:   記錄每一條修改數據的SQL語句,減小了日誌量,可是SQL語句使用函數或觸發器時容易出現主從不一致
MIXED:   結合了ROW和STATEMENT的優勢,根據具體執行數據操做的SQL語句選擇使用ROW或者STATEMENT記錄日誌

要經過mysql binlog將數據同步到ES集羣,只能使用ROW模式,由於只有ROW模式才能知道mysql中的數據的修改內容。

以UPDATE操做爲例,ROW模式的binlog日誌內容示例以下:

SET TIMESTAMP=1527917394/*!*/;
    BEGIN
    /*!*/;
    # at 3751
    #180602 13:29:54 server id 1  end_log_pos 3819 CRC32 0x8dabdf01     Table_map: `webservice`.`building` mapped to number 74
    # at 3819
    #180602 13:29:54 server id 1  end_log_pos 3949 CRC32 0x59a8ed85     Update_rows: table id 74 flags: STMT_END_F
    
    BINLOG '
    UisSWxMBAAAARAAAAOsOAAAAAEoAAAAAAAEACndlYnNlcnZpY2UACGJ1aWxkaW5nAAYIDwEPEREG
    wACAAQAAAAHfq40=
    UisSWx8BAAAAggAAAG0PAAAAAEoAAAAAAAEAAgAG///A1gcAAAAAAAALYnVpbGRpbmctMTAADwB3
    UkRNbjNLYlV5d1k3ajVbD64WWw+uFsDWBwAAAAAAAAtidWlsZGluZy0xMAEPAHdSRE1uM0tiVXl3
    WTdqNVsPrhZbD64Whe2oWQ==
    '/*!*/;
    ### UPDATE `webservice`.`building`
    ### WHERE
    ###   @1=2006 /* LONGINT meta=0 nullable=0 is_null=0 */
    ###   @2='building-10' /* VARSTRING(192) meta=192 nullable=0 is_null=0 */
    ###   @3=0 /* TINYINT meta=0 nullable=0 is_null=0 */
    ###   @4='wRDMn3KbUywY7j5' /* VARSTRING(384) meta=384 nullable=0 is_null=0 */
    ###   @5=1527754262 /* TIMESTAMP(0) meta=0 nullable=0 is_null=0 */
    ###   @6=1527754262 /* TIMESTAMP(0) meta=0 nullable=0 is_null=0 */
    ### SET
    ###   @1=2006 /* LONGINT meta=0 nullable=0 is_null=0 */
    ###   @2='building-10' /* VARSTRING(192) meta=192 nullable=0 is_null=0 */
    ###   @3=1 /* TINYINT meta=0 nullable=0 is_null=0 */
    ###   @4='wRDMn3KbUywY7j5' /* VARSTRING(384) meta=384 nullable=0 is_null=0 */
    ###   @5=1527754262 /* TIMESTAMP(0) meta=0 nullable=0 is_null=0 */
    ###   @6=1527754262 /* TIMESTAMP(0) meta=0 nullable=0 is_null=0 */
    # at 3949
    #180602 13:29:54 server id 1  end_log_pos 3980 CRC32 0x58226b8f     Xid = 182
    COMMIT/*!*/;

STATEMENT模式下binlog日誌內容示例爲:

SET TIMESTAMP=1527919329/*!*/;
    update building set Status=1 where Id=2000
    /*!*/;
    # at 688
    #180602 14:02:09 server id 1  end_log_pos 719 CRC32 0x4c550a7d  Xid = 200
    COMMIT/*!*/;

從ROW模式和STATEMENT模式下UPDATE操做的日誌內容能夠看出,ROW模式完整地記錄了要修改的某行數據更新前的全部字段的值以及更改後全部字段的值,而STATEMENT模式只單單記錄了UPDATE操做的SQL語句。咱們要將mysql的數據實時同步到ES, 只能選擇ROW模式的binlog, 獲取並解析binlog日誌的數據內容,執行ES document api,將數據同步到ES集羣中。

查看,修改binlog模式

# 查看binlog模式
mysql> show variables like "%binlog_format%";

# 修改binlog模式
mysql> set global binlog_format='ROW';

# 查看binlog是否開啓
mysql> show variables like 'log_bin';

# 開啓bīnlog
修改my.cnf文件log-bin = mysql-bin

2. 安裝

# 安裝go
sudo apt-get install go

# 安裝godep
go get github.com/tools/godep

# 獲取go-mysql-elasticsearch插件
go get github.com/siddontang/go-mysql-elasticsearch

# 安裝go-mysql-elasticsearch插件
cd go/src/github.com/siddontang/go-mysql-elasticsearch
make

3. 配置

go/src/github.com/siddontang/go-mysql-elasticsearch/etc/river.toml

# MySQL address, user and password
# user must have replication privilege in MySQL.
my_addr = "127.0.0.1:3306"     # 須要同步的mysql基本設置
my_user = "root"
my_pass = "root"

# Elasticsearch address
es_addr = "127.0.0.1:9200"     # 本地elasticsearch配置

# Path to store data, like master.info, and dump MySQL data 
data_dir = "./var"             # 數據存儲的url
# 如下配置保存默認不變
# Inner Http status address
stat_addr = "127.0.0.1:12800"

# pseudo server id like a slave 
server_id = 1001

# mysql or mariadb
flavor = "mysql"
# mysqldump execution path
mysqldump = "mysqldump"

# MySQL data source
[[source]]
schema = "test"             //elasticsearch 與 mysql 同步時對應的數據庫名稱

# Only below tables will be synced into Elasticsearch.
# 要同步test這個database裏面的幾張表。對於一些項目若是使用了分表機制,咱們能夠用通配符來匹配,譬如t_[0-9]{4},就可# 以匹配 table  t_0000 到 t_9999。
tables = ["t", "t_[0-9]{4}", "tfield", "tfilter"]  

# Below is for special rule mapping
# 對一個 table,咱們須要指定將它的數據同步到 ES 的哪個 index 的 type 裏面。若是不指定,咱們默認會用起 schema  # name 做爲 ES 的 index 和 type
[[rule]]
schema = "test"    //數據庫名稱
table = "t"        //表名稱
index = "test"        //對應的索引名稱
type = "t"            //對應的類型名稱

# 將全部知足格式 t_[0-9]{4} 的 table 同步到 ES 的 index 爲 test,type 爲 t 的下面。固然,這些表須要保證
# schema 是一致的
[[rule]]
schema = "test"
table = "t_[0-9]{4}"
index = "test"
type = "t"

# 對於 table tfilter,咱們只會同步 id 和 name 這兩列,其餘的都不會同步
filter = ["id", "name"]
# table tfield 的 column id ,咱們映射成了 es_id,而 tags 則映射成了 es_tags
# list 這個字段,他顯示的告知須要將對應的 column 數據轉成 ES 的 array type。這個如今一般用於 MySQL 的 varchar # 等類型,咱們可能會存放相似 「a,b,c」 這樣的數據,而後但願同步給 ES 的時候變成 [a, b, c] 這樣的列表形式。

[rule.field]
# Map column `id` to ES field `es_id`
id="es_id"
# Map column `tags` to ES field `es_tags` with array type 
tags="es_tags,list"
# Map column `keywords` to ES with array type
keywords=",list"

4. 運行

cd go/src/github.com/siddontang/go-mysql-elasticsearch
bin/go-mysql-elasticsearch -config=./etc/river.toml

 

方案三: Apache-NiFi實現mysql數據與elasticsearch同步

1. 背景

NiFi以前是在美國國家安全局(NSA)開發和使用了8年的一個可視化、可定製的數據集成產品。2014年NSA將其貢獻給了Apache開源社區,2015年7月成功成爲Apache頂級項目。

2. 簡介

Apache NiFi 是一個易於使用、功能強大並且可靠的數據處理和分發系統。Apache NiFi 是爲數據流設計,它支持高度可配置的指示圖的數據路由、轉換和系統中介邏輯,支持從多種數據源動態拉取數據。簡單地說,NiFi是爲自動化系統之間的數據流而生。 這裏的數據流表示系統之間的自動化和受管理的信息流。 基於WEB圖形界面,經過拖拽、鏈接、配置完成基於流程的編程,實現數據採集、處理等功能。

3. 下載安裝配置

  • Apache NiFi 基於 java 開發,要求運行環境爲 JDK 8.0 以上。
  • Apache NiFi 下載地址:http://nifi.apache.org/download.html
  • 經常使用配置在 conf 目錄下的 nifi.properties 和 bootstrap.conf 文件中,詳見:NiFi System Administrator's Guide
  • web 控制檯端口在 nifi.proerties 文件中的 nifi.web.http.port 參數修改,默認值 8080

4. 啓動Apache NiFi

命令行進入 Apache NiFi 目錄,運行命令 ./bin/nifi.sh start

Apache NiFi 的經常使用命令:

命令 說明
run 交互式啓動
start 後臺啓動
stop 中止
status 查看服務狀態

訪問 http://localhost:8080/nifi 

5. 實例應用

描述

數字圖書館有一套基於 MySQL 的電子書管理系統,電子書的基本信息保存在數據庫表中,書的數字內容以多種常見的文檔格式(PDF、Word、PPT、RTF、TXT、CHM、EPUB等)保存在存儲系統中。如今須要利用 ElasticSearch 實現一套全文檢索系統,以便用戶能夠經過對電子書的基本信息和數字內容進行模糊查詢,快速找到相關書籍。

數據庫表結構

CREATE TABLE `book` (
  `id` varchar(100) NOT NULL,
  `title` varchar(50) DEFAULT NULL,
  `desc` varchar(1000) DEFAULT NULL,
  `path` varchar(200) DEFAULT NULL,
  `create_time` datetime DEFAULT NULL,
  `update_time` datetime DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8
字段 意義
id 主鍵
title 書名
desc 介紹
path 存儲路徑
create_time 建立時間
update_time 更新時間

邏輯約束:建立書籍記錄時,create_time 等於 update_time,即當前時間,每次更新書籍時,更新 update_time 時間。全文檢索系統根據 update_time 時間更新書籍索引。

 技術方案

基本思路: 

  1. 按期掃描 MySQL 中的 book 表,根據字段 update_time 批量抓取最新的電子書數據。
  2. 從 path 字段獲取電子書數字內容的文檔存儲路徑。從存儲系統中抓取電子書文檔並進行 BASE64編碼。
  3. 將從 book 表批量抓取的數據轉換爲 JSON 文檔,並將 BASE64編碼後的電子書文檔合併入 JSON,一同寫入 ElasticSearch,利用 ElasticSearch 的插件 Ingest Attachment Processor Plugin 對電子書文檔進行文本抽取,並進行持久化,創建全文索引。

文檔附件的文本抽取

ElasticSearch只能處理文本,不能直接處理二進制文檔。要利用 ElasticSearch 實現附件文檔的全文檢索須要 2 個步驟:

  1. 對多種主流格式的文檔進行文本抽取。
  2. 將抽取出來的文本內容導入 ElasticSearch ,利用 ElasticSearch強大的分詞和全文索引能力。

 Ingest Attachment Processor Plugin 是一個開箱即用的插件,使用它能夠幫助 ElasticSearch 自動完成這 2 個步驟。

基本原理是利用 ElasticSearch 的 Ingest Node 功能,此功能支持定義命名處理器管道 pipeline,pipeline中能夠定義多個處理器,在數據插入 ElasticSearch 以前進行預處理。而 Ingest Attachment Processor Plugin 提供了關鍵的預處理器 attachment,支持自動對入庫文檔的指定字段做爲文檔文件進行文本抽取,並將抽取後獲得的文本內容和相關元數據加入原始入庫文檔。

由於 ElasticSearch 是基於 JSON 格式的文檔數據庫,因此附件文檔在插入 ElasticSearch 以前必須進行 Base64 編碼。

固然,Attachment Processor Plugin 不是惟一方案。若是須要深刻定製文檔抽取功能,或基於功能解耦等考量,徹底能夠利用 Apache Tika http://tika.apache.org 實現獨立的文檔抽取應用。

安裝附件文本抽取插件

cd elasticsearch-6.4.2
./bin/elasticsearch-plugin install ingest-attachment

安裝中文分詞插件

cd elasticsearch-6.4.2
./bin/elasticsearch-plugin install https://github.com/medcl/elasticsearch-analysis-ik/releases/download/v6.4.2/elasticsearch-analysis-ik-6.4.2.zip

使用kibana創建文本抽取管道

PUT /_ingest/pipeline/attachment
{
    "description": "Extract attachment information",
    "processors": [
        {
            "attachment": {
                "field": "data",
                "ignore_missing": true
            }
        },
        {
            "remove": {
                "field": "data"
            }
        }
    ]
}

以上,咱們創建了 一個 pipeline 命名 attachment,其中定義了 2 個預處理器 "attachment" 和 "remove" ,它們按定義順序對入庫數據進行預處理。

"attachment" 預處理器即上文安裝的插件 "Ingest Attachment Processor Plugin" 提供,將入庫文檔字段 "data" 視爲文檔附件進行文本抽取。要求入庫文檔必須將文檔附件進行 BASE64編碼寫入 "data" 字段。

文本抽取後, 後續再也不須要保留 BASE64 編碼的文檔附件,將其持久化到 ElasticSearch 中沒有意義,"remove" 預處理器用於將其從源文檔中刪除。

使用kibana創建文檔結構映射

ElasticSearch 是文檔型數據庫,以 JSON 文檔爲處理對象。文檔結構以 mapping 形式定義,至關於關係型數據庫創建表結構。如下,咱們創建 MySQL 的 book 表在 ElasticSearch 中的文檔結構映射。

PUT /book
{
  "mappings": {
    "idx": {
      "properties": {
        "id": {
          "type": "keyword"
        },
        "title": {
          "type": "text",
          "analyzer": "ik_max_word"
        },
        "desc": {
          "type": "text",
          "analyzer": "ik_max_word"
        },
        "path": {
          "type": "keyword"
        },
        "create_time": {
          "type": "date",
          "format": "yyyy-MM-dd HH:mm:ss"
        },
        "update_time": {
          "type": "date",
          "format": "yyyy-MM-dd HH:mm:ss"
        },
        "attachment": {
          "properties": {
            "content": {
              "type": "text",
              "analyzer": "ik_max_word"
            }
          }
        }
      }
    }
  }
}

除了 book 表中的原有字段外,咱們在 ElasticSearch 中增長了 "attachment" 字段,這個字段是 "attachment" 命名 pipeline 抽取文檔附件中文本後自動附加的字段。這是一個嵌套字段,其包含多個子字段,包括抽取文本 content 和一些文檔信息元數據。

在本文的應用場景中,咱們須要對 book 的 title、desc 和 attachment.content 進行全文檢索,因此在創建 mapping 時,咱們爲這 3 個字段指定分析器 "analyzer" 爲 "ik_max_word",以讓 ElasticSearch 在創建全文索引時對它們進行中文分詞。

導入apache NiFi流程模板

Apache NiFi 支持將配置好的流程保存爲模板,鼓勵社區開發者之間分享模板。本文及使用的流程模板在開源項目:
https://gitee.com/streamone/full-text-search-in-action
模板文件在 /nifi/FullText-mysql.xml

下載模板文件 FullText-mysql.xml ,而後點擊控制檯左側 "Operate" 操做欄裏的 "Upload Template" 上傳模板。

應用流程模板

拖拽控制檯頂部一排組件圖標中的 "Template" 到空白網格區域,在彈出的 "Add Template" 窗口中選擇剛剛上傳的模板 "FullText-mysql",點擊 "Add"。空白網格區域將出現以下下圖的 "process group",它是一組 "processor" 的集合,咱們的處理流程就是由這組 "processor" 按照數據處理邏輯有序組合而成。

NiFi模板

雙擊此 "process group" 進入,將看到完整的流程配置,以下圖:

NiFi process group

運行這個流程以前須要完成幾個配置項:

  1. 配置並啓動數據庫鏈接池
    在空白網格處點擊鼠標右鍵,在彈出菜單中點擊 "configure",在彈出的 "FullText-mysql Configuration" 窗口中打開 "controller services" 標籤頁以下圖,點擊表格中 "DBCPConnectionPool" 右側 "Configure" 圖標,進行數據庫鏈接池配置。 NiFi controller services
    在彈出的 "Configure Controller Service" 窗口中打開 "PROPERTIES" 標籤頁,在表格中填寫 MySQL數據庫相關信息,以下圖: 配置數據庫鏈接池
    其中的 "Database Driver Location(s)" 填寫咱們下載的 "mysql-connector-java-5.1.46-bin.jar" 路徑。 配置好數據庫鏈接池之後點擊 "APPLY" 回到 "controller services" 標籤頁,點擊表格中 「DBCPConnectionPool」 右側 「Enable」 圖標啓動數據庫鏈接池。

  2. 修改變量
    在空白網格處點擊鼠標右鍵,在彈出菜單中點擊 "variables",打開 "Variables" 窗口,修改表格中的 "elasticSearchServer" 參數值爲 ElasticSearch 服務地址,修改表格中的 "rootPath" 參數爲電子書數字文檔在文件系統中的根路徑。

回到 "process group" 流程頁面,在空白網格處點擊鼠標右鍵,在彈出菜單中點擊 "start" 菜單,啓動流程。

至此,咱們完成了本文應用場景中 Apache NiFi 的流程配置。Apache NiFi 每隔 10 秒掃描 MySQL 的 book 表,抓取最新的電子書數據,處理後導入 ElasticSearch。

全文檢索查詢

book表數據

全文檢索語句

GET /book/_search
  {
  "query": {
    "multi_match": {
      "query": "安靜",
      "fields": ["title", "desc", "attachment.content"]
    }
  },
  "_source": {
  	"excludes": ["attachment.content"]
  },
  "from": 0, "size": 200,
  "highlight": {
  	    "encoder": "html",
		"pre_tags": ["<em>"],
		"post_tags": ["</em>"],
		"fields": {
		  "title": {},
		  "desc": {},
		  "attachment.content": {}
		}
  }

}

查詢結果

 

參考連接

https://www.jianshu.com/p/c3faa26bc221

https://www.jianshu.com/p/96c7858b580f

https://my.oschina.net/streamone/blog/1825807

相關文章
相關標籤/搜索