SQL數據同步到ElasticSearch(三)- 使用Logstash+LastModifyTime同步數據

在系列開篇,我提到了四種將SQL SERVER數據同步到ES中的方案,本文將採用最簡單的一種方案,即便用LastModifyTime來追蹤DB中在最近一段時間發生了變動的數據。html

安裝Java

安裝部分的官方文檔在這裏:https://www.elastic.co/guide/en/logstash/current/installing-logstash.htmljava

能夠直接查看官方文檔。sql

我這裏使用的仍是以前文章中所述的CentOS來進行安裝。數據庫

首先須要安裝Java(萬物源於Java)服務器

輸入命令找到的OpenJDK 1.8.X版本(截止我嘗試時,在Java11上會有問題):jvm

yum search java | grep -i --color JDK

使用Yum進行安裝:elasticsearch

yum install java-1.8.0-openjdk

配置環境變量JAVA_HOME、CLASSPATH、PATH。ide

打開/etc/profile文件:sqlserver

vi /etc/profile

將下面幾行代碼粘貼到該文件的最後:fetch

--這句要本身到/usr/lib/jvm下面找對應的目錄,不能直接copy
export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.222.b10-0.el7_6.x86_64/ 
export CLASSPATH=.:$JAVA_HOME/jre/lib/rt.jar:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
export PATH=$PATH:$JAVA_HOME/bin

保存並關閉,而後執行下列命令讓設置當即生效。

source /etc/profile

能夠輸入下面的命令查看是否已生效:

java –-version
echo $JAVA_HOME
echo $CLASSPATH
echo $PATH

安裝LogStash

首先註冊ELK官方的GPG-KEY:

而後cd /etc/yum.repos.d/文件夾下,建立一個logstash.repo文件,並將下面一段內容粘貼到該文件中保存:

[logstash-7.x]
name=Elastic repository for 7.x packages
baseurl=https://artifacts.elastic.co/packages/7.x/yum
gpgcheck=1
gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch
enabled=1
autorefresh=1
type=rpm-md

而後執行安裝命令:

sudo yum install logstash

以上步驟可能比較慢,還有另一種辦法,就是經過下載來安裝LogStash:

官方文檔在這裏:https://www.elastic.co/cn/downloads/logstash

首先在上面的連接中下載LogStash的tar.gz包,這個過程有可能也很慢,個人解決方案是在本身機器上使用迅雷進行下載,完事兒Copy到Linux服務器中。

下載完成後,執行解壓操做:

sudo tar -xvf logstash-7.2.0.tar.gz

解壓完成後,進入解壓後的logstash-7.2.0文件夾。

接着咱們安裝Logstash-input-jdbc插件:

bin/logstash-plugin install logstash-input-jdbc

下載SQL SERVER jbdc組件,這裏咱們從微軟官網下載:https://docs.microsoft.com/en-us/sql/connect/jdbc/download-microsoft-jdbc-driver-for-sql-server?view=sql-server-2017 ,固然這個連接只是目前的,若是你在嘗試時這個連接失效了,那就自行百度搜索吧~

下載完成後,解壓到logstash下面的lib目錄下,這裏我本身爲了方便,把微軟默認給jdbc外面包的一層語言名稱的文件夾給去掉了。

接着,咱們到/config文件夾,新建一個logstash.conf文件,內容大概以下:

下面的每個參數含義均可以在官方文檔中找到:

input {
    jdbc {
        jdbc_driver_library => "/usr/local/logstash-7.2.0/lib/mssql-jdbc-7.2.2/mssql-jdbc-7.2.2.jre8.jar" // 這裏請靈活應變,能找到咱們上一步下載的jdbc jar包便可
        jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver" // 這個名字是固定的
        jdbc_connection_string => "jdbc:sqlserver: //數據庫ServerIP:1433;databaseName=數據庫名;"
        jdbc_user => "數據庫帳號"
        jdbc_password => "數據庫密碼"
        schedule => "* * * * *" // Corn 表達式,請自行百度寫法
        jdbc_default_timezone => "Asia/Shanghai"
        jdbc_page_size => "500" // 每一批傳輸的數量
        record_last_run => "true" //是否保存狀態 
        use_column_value => "true" //設置爲時true,使用定義的 tracking_column值做爲:sql_last_value。設置爲時false,:sql_last_value反映上次執行查詢的時間。
        tracking_column => "LastModificationTime" //配合use_column_value使用
        last_run_metadata_path => "/usr/opt/logstash/config/last_id" //記錄:sql_last_value的文件
        lowercase_column_names => "false" //將DB中的列名自動轉換爲小寫
        tracking_column_type => "timestamp" //tracking_column的數據類型,只能是numberic和timestamp
        clean_run => "false" //是否應保留先前的運行狀態,其實我也不知道這個字段幹啥用的~~
        statement => "SELECT * FROM 表 WITH(NOLOCK) WHERE LastModificationTime > :sql_last_value" //從DB中抓數據的SQL腳本
    }
}
output {
    elasticsearch {
        index => "test"  //ES集羣的索引名稱     
        document_id => "%{Id}" //Id是表裏面的主鍵,爲了拿這個主鍵在ES中生成document ID
        hosts => ["http://192.168.154.135:9200"]// ES集羣的地址
    }
}

上面的被註釋搞的亂糟糟的,給大家一個能夠複製的版本吧:

input {
    jdbc {
        jdbc_driver_library => "/usr/local/logstash-7.2.0/lib/mssql-jdbc-7.2.2/mssql-jdbc-7.2.2.jre8.jar"
        jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
        jdbc_connection_string => "jdbc:sqlserver://SERVER_IP:1433;databaseName=DBName;"
        jdbc_user => "xxx"
        jdbc_password => "password"
        schedule => "* * * * *"
        jdbc_default_timezone => "Asia/Shanghai"
        jdbc_page_size => "50000"
        record_last_run => "true"
        use_column_value => "true"
        tracking_column => "LastModificationTime"
        last_run_metadata_path => "/usr/local/logstash-7.2.0/config/last_id"
        lowercase_column_names => "false"
        tracking_column_type => "timestamp"
        clean_run => "false"
        statement => "SELECT * FROM xxx WITH(NOLOCK) WHERE LastModificationTime > :sql_last_value"
    }
}
output {
    elasticsearch {
        index => "item"       
        document_id => "%{Id}"
        hosts => ["http://ES集羣IP:9200"]
    }
}

Logstash 總體思路

回頭來講一下這個LogStash的總體思路吧,其實個人理解,LogStash就是一個數據搬運工,他的搬運數據,分爲三個大的階段:

  1. 讀取數據(input)
  2. 過濾數據(filter)
  3. 輸出數據(output)

對應的官方文檔:https://www.elastic.co/guide/en/logstash/current/pipeline.html

而這每個階段,都是經過一些插件來實現的,好比在上述的配置文件中,咱們有:

  • 讀取數據即input部分,這部分因爲咱們是須要從數據庫讀取數據,因此使用了一個能夠執行SQL語句的jdbc-input插件,這裏若是咱們的數據源是其餘的部分,就須要使用其餘的一些插件來實現。
  • 也有輸出數據部分,這部分咱們是將數據寫入到ElasticSearch,因此咱們使用了一個elasticsearch-output插件。這裏也能夠將數據寫入到kafka等其餘的一些產品中,也是須要一些插件便可搞定。
  • 能夠發現咱們上面的部分沒有涉及到filter插件,其實若是咱們想對數據作一些過濾、規範化處理等,均可以使用filter插件來進行處理,具體的還須要進一步去探索啦~

執行數據同步

剩下的部分就簡單了,切換目錄到logstash的目錄下,執行命令:

bin/logstash -f config/logstash.conf

最後執行的效果圖大概以下:

image

可使用Elasticsearch-Head等插件來查看是否同步正常:

image

image

大概就是這樣啦,後續我這邊會繼續嘗試使用其餘方式來進行數據同步,歡迎你們關注~

相關文章
相關標籤/搜索