在許多狀況下,咱們但願使用不是由Elasticsearch自己支持的不一樣數據庫的輸入。在本文中,咱們將展現如何經過Logstash將數據從MySql數據庫遷移到Elasticsearch。mysql
Logstash可用的JDBC插件確保來自任何具備JDBC接口的數據庫的數據均可以做爲輸入存入Logstash。這個插件還支持須要調度運行logstash。它還經過使用查詢使選擇性數據做爲輸入。在這些類型的數據庫中,咱們有行和列的概念。每一個行都被視爲單個事件,每一個行(事件)中的列都被視爲事件中的字段。git
如下框圖說明了JDBC鏈接器插件在從JDBC支持的數據庫遷移數據時的做用:
github
在圖中,咱們有logstash運行配置文件,觸發咱們設置的預約義查詢,以便將咱們感興趣的數據收集到順序數據庫。一旦查詢被觸發到JDBC插件,它將它傳遞到數據庫並收集數據,它將移交給Logstash。 sql
根據咱們的要求,咱們能夠處理數據並以所需的形式進行處理,在處理後將處理的數據索引到Elasticsearch。咱們將在後面的章節中的示例中顯示詳細的應用程序。數據庫
讓咱們繼續在Logstash的幫助下將數據從順序數據庫(如MySql)遷移到Elasticsearch。咱們須要相應的MySql的JDBC驅動程序。您能夠 在這裏下載。 如今讓咱們"testdb"
使用如下命令建立一個在MySql 中命名的數據庫:curl
建立 testdb
數據庫如今已建立,咱們只是確保咱們使用相同的數據庫用於咱們的目的:elasticsearch
顯示 數據庫 ; 使用 testdb;
使用如下模式建立"testtable"
在數據庫下命名的表"testdb"
:測試
建立 表 testtable(PersonID int,LastName varchar(255),FirstName varchar(255),City varchar(255),Date datetime(6));
如今在上表中插入一些測試數據: url
INSERT INTO testtable(PersonID,LastName,FirstName,City,Date) VALUES('4005','Kallis','Jaques','Cape Town','2016-05-23 16:12:03.568810'); INSERT INTO testtable(PersonID,LastName,FirstName,City,Date) VALUES('4004','Baron','Richard','Cape Town','2016-05-23 16:12:04.370460'); INSERT INTO testtable(PersonID,LastName,FirstName,City,
咱們建立了一個包含3名員工詳細信息的表。您能夠經過傳遞查詢來顯示錶的詳細信息以顯示其全部內容:spa
select * from testtable
結果表將以下所示:
如今咱們已經建立了一個內容如上所示的MySql表,看看如何配置Logstash。在logstash文件夾中,咱們有一個logstash.conf
文件,它是要配置和運行以獲取必要的結果的文件。初始配置如如下屏幕截圖所示:
在上面的配置文件中,咱們提到了許多參數,例如:JDBC鏈接器檢查數據的數據庫,JDBC插件的位置,MySql訪問的用戶名和密碼,以及查詢語句。將上述設置應用於"logstash.conf"
文件後,經過鍵入如下命令運行Logstash:
bin / logstash -f logstash .conf
如JDBC部分中的框圖中所述,logstash配置文件將查詢傳遞給JDBC驅動程序以及用戶憑據。它還獲取數據並將數據提供給Logstash。Logstash將使其JSON格式化並索引到Elasticsearch數據庫。查詢索引"test-migrate"
以下:
curl -XPOST'http :// localhost:9200 / test-migrate / _search?pretty = true' -d '{}'
上述查詢將每行做爲單獨的文檔列出,列爲字段。一個例子:
{ 「_index」 : 「測試遷移」, 「_type」 : 「數據」, 「_id」 :「4004」 , 「_score」 :1, 「_source」:{ 「PERSONID」 :4004, 「姓氏」:「男爵「, 」名字「:」理查「, 」城市「:」開普敦「, 」日期「:」2016-05-23T10:42:04.370Z「 , 「@version」:「1」, 「@timestamp」:「2016-07-10T10:36:43.685Z」 }} }}
在本節中,咱們將展現各類用例場景。向上面的MySql中添加另外一行數據,以下所示:
INSERT INTO testtable(PersonID,LastName,FirstName,City,Date) VALUES('4002','Cheers','Joan','Cape Town','2016-05-23 16:12:07.163681');
另外,更新同一表格中一行的現有值,以下所示:
UPDATE測試表 - > SET FirstName = 'James' - > WHERE PersonID = 4005 ;
完成上述步驟後,再次運行logstash配置文件。咱們指望總共4個文檔包括新的行和更新的行。可是,當再次檢查索引時,不是這樣的。相反,咱們共有7個文件。這是由於初始文檔在elasticsearch數據庫中保持不變,這是因爲沒有從表自己提供特定的id。當咱們運行logstash配置時,整個內容"testtable"
都會被索引一次。
咱們如何解決這種重複?咱們必須爲每一個文檔提供一個惟一的ID。對於每次運行,每一個文檔的ID應該相同,以防止重複問題。這能夠經過編輯conf文件的輸出部分來實現,以下所示:
咱們遇到的其餘重要需求之一將是更改字段名稱和值,當咱們索引到elasticsearch。咱們在咱們當前的例子中添加一個要求來演示這個。對於全部與「開普敦」匹配的文檔,該字段"City"
應替換爲值"South Africa"
,字段值應替換爲"Country"
。
爲了實現此要求,請使用該"filter"
屬性在elasticsearch中操做已提取的數據。使用"mutate"
裏面的屬性"filter"
執行所需的更改。使用上面的"input"
和"output"
部分的設置,咱們須要"filter"
在logstash.conf
文件中添加如下部分:
filter { if [city] == 「開普敦」 { mutate { rename => { 「city」 => 「country」 } replace => [ 「country」,「South Africa」 ] }} }} }}
檢查所獲取的數據"Cape Town"
的每一個事件的"City"
列的值。若是找到匹配項,"City"
則重命名該字段,"country"
並將每一個匹配的值替換爲"South Africa"
。
若是數據在MySql數據庫中不斷更新,咱們須要遞增和按期對其進行索引,該怎麼辦?要按期獲取數據,請"scheduler"
在輸入部分中添加屬性。在"scheduler,"
給定的值時,讓每隔一段時間運行conf文件。它是高度可定製的,並使用Rufus調度程序語法。
對於增量更新,請修改查詢以"sql_last_value"
針對字段使用。這裏咱們給這個字段 "Date"
。咱們還設置"use_column_value"
爲true,並將對應的列連接到"Date"
使用"tracking_column"
。
用於狀況1,2和3的完整配置文件以下:
爲了看到上面的配置工做,添加幾個字段到現有的MySql表,其"Date"
值比以前存在的值更新。如今運行logstash,您能夠看到只有新數據已在Elasticsearch索引中創建索引。
在本文中,咱們討論了用於使用logstash將數據從連續數據庫遷移到Elasticsearch的JDBC插件。咱們還熟悉如何處理常見問題,如重複,字段和值的突變,以及調度和增量更新。問題/意見?給咱們一行下面。