使用Logstash將MySql數據遷移到Elasticsearch中

在許多狀況下,咱們但願使用不是由Elasticsearch自己支持的不一樣數據庫的輸入。在本文中,咱們將展現如何經過Logstash將數據從MySql數據庫遷移到Elasticsearch。mysql

JBDC插件

Logstash可用的JDBC插件確保來自任何具備JDBC接口的數據庫的數據均可以做爲輸入存入Logstash。這個插件還支持須要調度運行logstash。它還經過使用查詢使選擇性數據做爲輸入。在這些類型的數據庫中,咱們有行和列的概念。每一個行都被視爲單個事件,每一個行(事件)中的列都被視爲事件中的字段。git

如下框圖說明了JDBC鏈接器插件在從JDBC支持的數據庫遷移數據時的做用: 
mysql-logstash1.png#asset:1073github

在圖中,咱們有logstash運行配置文件,觸發咱們設置的預約義查詢,以便將咱們感興趣的數據收集到順序數據庫。一旦查詢被觸發到JDBC插件,它將它傳遞到數據庫並收集數據,它將移交給Logstash。 sql

根據咱們的要求,咱們能夠處理數據並以所需的形式進行處理,在處理後將處理的數據索引到Elasticsearch。咱們將在後面的章節中的示例中顯示詳細的應用程序。數據庫

將MySql數據插入Elasticsearch

讓咱們繼續在Logstash的幫助下將數據從順序數據庫(如MySql)遷移到Elasticsearch。咱們須要相應的MySql的JDBC驅動程序。您能夠 在這裏下載 如今讓咱們"testdb"使用如下命令建立一個在MySql 中命名的數據庫:curl

建立 testdb

數據庫如今已建立,咱們只是確保咱們使用相同的數據庫用於咱們的目的:elasticsearch

顯示 數據庫 ;
使用 testdb;

使用如下模式建立"testtable"在數據庫下命名的表"testdb"測試

建立 表 testtable(PersonID int,LastName varchar(255),FirstName varchar(255),City varchar(255),Date datetime(6));

如今在上表中插入一些測試數據: url

INSERT  INTO testtable(PersonID,LastName,FirstName,City,Date)
 VALUES('4005','Kallis','Jaques','Cape Town','2016-05-23 16:12:03.568810');
INSERT  INTO testtable(PersonID,LastName,FirstName,City,Date)
 VALUES('4004','Baron','Richard','Cape Town','2016-05-23 16:12:04.370460');
INSERT  INTO testtable(PersonID,LastName,FirstName,City,

咱們建立了一個包含3名員工詳細信息的表。您能夠經過傳遞查詢來顯示錶的詳細信息以顯示其全部內容:spa

select * from testtable

結果表將以下所示: 

mysqllogstash2.png#asset:1074

Logstash配置

如今咱們已經建立了一個內容如上所示的MySql表,看看如何配置Logstash。在logstash文件夾中,咱們有一個logstash.conf文件,它是要配置和運行以獲取必要的結果的文件。初始配置如如下屏幕截圖所示: 

mysqllogstash3.png#asset:1075

在上面的配置文件中,咱們提到了許多參數,例如:JDBC鏈接器檢查數據的數據庫,JDBC插件的位置,MySql訪問的用戶名和密碼,以及查詢語句。將上述設置應用於"logstash.conf"文件後,經過鍵入如下命令運行Logstash:

bin / logstash -f logstash .conf

如JDBC部分中的框圖中所述,logstash配置文件將查詢傳遞給JDBC驅動程序以及用戶憑據。它還獲取數據並將數據提供給Logstash。Logstash將使其JSON格式化並索引到Elasticsearch數據庫。查詢索引"test-migrate"以下:

curl -XPOST'http :// localhost:9200 / test-migrate / _search?pretty = true'  -d  '{}'

上述查詢將每行做爲單獨的文檔列出,列爲字段。一個例子: 

{
   「_index」 : 「測試遷移」,
   「_type」 : 「數據」,
   「_id」 :「4004」 ,
   「_score」 :1,
   「_source」:{
     「PERSONID」 :4004,
     「姓氏」:「男爵「,
     」名字「:」理查「,
     」城市「:」開普敦「,
     」日期「:」2016-05-23T10:42:04.370Z「 ,
    「@version」:「1」,
     「@timestamp」:「2016-07-10T10:36:43.685Z」
  }}
}}

更多配置

在本節中,咱們將展現各類用例場景。向上面的MySql中添加另外一行數據,以下所示:

INSERT  INTO testtable(PersonID,LastName,FirstName,City,Date)
 VALUES('4002','Cheers','Joan','Cape Town','2016-05-23 16:12:07.163681');

另外,更新同一表格中一行的現有值,以下所示:

UPDATE測試表
- > SET FirstName = 'James' 
 - > WHERE PersonID = 4005 ;

1.重複問題

完成上述步驟後,再次運行logstash配置文件。咱們指望總共4個文檔包括新的行和更新的行。可是,當再次檢查索引時,不是這樣的。相反,咱們共有7個文件。這是由於初始文檔在elasticsearch數據庫中保持不變,這是因爲沒有從表自己提供特定的id。當咱們運行logstash配置時,整個內容"testtable"都會被索引一次。

咱們如何解決這種重複?咱們必須爲每一個文檔提供一個惟一的ID。對於每次運行,每一個文檔的ID應該相同,以防止重複問題。這能夠經過編輯conf文件的輸出部分來實現,以下所示:

mysqllogstash4.png#asset:1076

2.變換操做

咱們遇到的其餘重要需求之一將是更改字段名稱和值,當咱們索引到elasticsearch。咱們在咱們當前的例子中添加一個要求來演示這個。對於全部與「開普敦」匹配的文檔,該字段"City"應替換爲值"South Africa",字段值應替換爲"Country"

爲了實現此要求,請使用該"filter"屬性在elasticsearch中操做已提取的數據。使用"mutate"裏面的屬性"filter"執行所需的更改。使用上面的"input""output"部分的設置,咱們須要"filter"logstash.conf文件中添加如下部分:

filter {
   if [city] == 「開普敦」 {
    mutate {
       rename => { 「city」 => 「country」 }
        replace => [ 「country」,「South Africa」 ]
      }}
   }}
}}

檢查所獲取的數據"Cape Town"的每一個事件的"City"列的值。若是找到匹配項,"City"則重命名該字段,"country"並將每一個匹配的值替換爲"South Africa"。  

3.計劃和增量更新

若是數據在MySql數據庫中不斷更新,咱們須要遞增和按期對其進行索引,該怎麼辦?要按期獲取數據,請"scheduler"在輸入部分中添加屬性。在"scheduler,"給定的值時,讓每隔一段時間運行conf文件。它是高度可定製的,並使用Rufus調度程序語法

對於增量更新,請修改查詢以"sql_last_value"針對字段使用。這裏咱們給這個字段  "Date"。咱們還設置"use_column_value"爲true,並將對應的列連接到"Date"使用"tracking_column"

用於狀況1,2和3的完整配置文件以下:

mysqllogstash5.png#asset:1077

爲了看到上面的配置工做,添加幾個字段到現有的MySql表,其"Date"值比以前存在的值更新。如今運行logstash,您能夠看到只有新數據已在Elasticsearch索引中創建索引。

結論

在本文中,咱們討論了用於使用logstash將數據從連續數據庫遷移到Elasticsearch的JDBC插件。咱們還熟悉如何處理常見問題,如重複,字段和值的突變,以及調度和增量更新。問題/意見?給咱們一行下面。

相關文章
相關標籤/搜索