在系列開篇,我提到了四種將SQL SERVER數據同步到ES中的方案,本文將採用最簡單的一種方案,即便用LastModifyTime來追蹤DB中在最近一段時間發生了變動的數據。html
安裝部分的官方文檔在這裏:https://www.elastic.co/guide/en/logstash/current/installing-logstash.htmljava
能夠直接查看官方文檔。sql
我這裏使用的仍是以前文章中所述的CentOS來進行安裝。數據庫
首先須要安裝Java(萬物源於Java)服務器
輸入命令找到的OpenJDK 1.8.X版本(截止我嘗試時,在Java11上會有問題):jvm
yum search java | grep -i --color JDK
使用Yum進行安裝:elasticsearch
yum install java-1.8.0-openjdk
配置環境變量JAVA_HOME、CLASSPATH、PATH。ide
打開/etc/profile文件:sqlserver
vi /etc/profile
將下面幾行代碼粘貼到該文件的最後:fetch
export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.222.b10-0.el7_6.x86_64/
export CLASSPATH=.:$JAVA_HOME/jre/lib/rt.jar:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
export PATH=$PATH:$JAVA_HOME/bin
保存並關閉,而後執行下列命令讓設置當即生效。
source /etc/profile
能夠輸入下面的命令查看是否已生效:
java –-version
echo $JAVA_HOME
echo $CLASSPATH
echo $PATH
首先註冊ELK官方的GPG-KEY:
rpm --import https://artifacts.elastic.co/GPG-KEY-elasticsearch
而後cd /etc/yum.repos.d/文件夾下,建立一個logstash.repo文件,並將下面一段內容粘貼到該文件中保存:
[logstash-7.x] name=Elastic repository for 7.x packages baseurl=https://artifacts.elastic.co/packages/7.x/yum gpgcheck=1 gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch enabled=1 autorefresh=1 type=rpm-md
而後執行安裝命令:
sudo yum install logstash
以上步驟可能比較慢,還有另一種辦法,就是經過下載來安裝LogStash:
官方文檔在這裏:https://www.elastic.co/cn/downloads/logstash
首先在上面的連接中下載LogStash的tar.gz包,這個過程有可能也很慢,個人解決方案是在本身機器上使用迅雷進行下載,完事兒Copy到Linux服務器中。
下載完成後,執行解壓操做:
sudo tar -xvf logstash-7.2.0.tar.gz
解壓完成後,進入解壓後的logstash-7.2.0文件夾。
接着咱們安裝Logstash-input-jdbc插件:
bin/logstash-plugin install logstash-input-jdbc
下載SQL SERVER jbdc組件,這裏咱們從微軟官網下載:https://docs.microsoft.com/en-us/sql/connect/jdbc/download-microsoft-jdbc-driver-for-sql-server?view=sql-server-2017 ,固然這個連接只是目前的,若是你在嘗試時這個連接失效了,那就自行百度搜索吧~
下載完成後,解壓到logstash下面的lib目錄下,這裏我本身爲了方便,把微軟默認給jdbc外面包的一層語言名稱的文件夾給去掉了。
接着,咱們到/config文件夾,新建一個logstash.conf文件,內容大概以下:
下面的每個參數含義均可以在官方文檔中找到:
input { jdbc { jdbc_driver_library => "/usr/local/logstash-7.2.0/lib/mssql-jdbc-7.2.2/mssql-jdbc-7.2.2.jre8.jar" // 這裏請靈活應變,能找到咱們上一步下載的jdbc jar包便可 jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver" // 這個名字是固定的 jdbc_connection_string => "jdbc:sqlserver: //數據庫ServerIP:1433;databaseName=數據庫名;" jdbc_user => "數據庫帳號" jdbc_password => "數據庫密碼" schedule => "* * * * *" // Corn 表達式,請自行百度寫法 jdbc_default_timezone => "Asia/Shanghai" jdbc_page_size => "500" // 每一批傳輸的數量 record_last_run => "true" //是否保存狀態 use_column_value => "true" //設置爲時true,使用定義的 tracking_column值做爲:sql_last_value。設置爲時false,:sql_last_value反映上次執行查詢的時間。 tracking_column => "LastModificationTime" //配合use_column_value使用 last_run_metadata_path => "/usr/opt/logstash/config/last_id" //記錄:sql_last_value的文件 lowercase_column_names => "false" //將DB中的列名自動轉換爲小寫 tracking_column_type => "timestamp" //tracking_column的數據類型,只能是numberic和timestamp clean_run => "false" //是否應保留先前的運行狀態,其實我也不知道這個字段幹啥用的~~ statement => "SELECT * FROM 表 WITH(NOLOCK) WHERE LastModificationTime > :sql_last_value" //從DB中抓數據的SQL腳本 } } output { elasticsearch { index => "test" //ES集羣的索引名稱 document_id => "%{Id}" //Id是表裏面的主鍵,爲了拿這個主鍵在ES中生成document ID hosts => ["http://192.168.154.135:9200"]// ES集羣的地址 } }
上面的被註釋搞的亂糟糟的,給大家一個能夠複製的版本吧:
input { jdbc { jdbc_driver_library => "/usr/local/logstash-7.2.0/lib/mssql-jdbc-7.2.2/mssql-jdbc-7.2.2.jre8.jar" jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver" jdbc_connection_string => "jdbc:sqlserver://SERVER_IP:1433;databaseName=DBName;" jdbc_user => "xxx" jdbc_password => "password" schedule => "* * * * *" jdbc_default_timezone => "Asia/Shanghai" jdbc_page_size => "50000" record_last_run => "true" use_column_value => "true" tracking_column => "LastModificationTime" last_run_metadata_path => "/usr/local/logstash-7.2.0/config/last_id" lowercase_column_names => "false" tracking_column_type => "timestamp" clean_run => "false" statement => "SELECT * FROM xxx WITH(NOLOCK) WHERE LastModificationTime > :sql_last_value" } } output { elasticsearch { index => "item" document_id => "%{Id}" hosts => ["http://ES集羣IP:9200"] } }
回頭來講一下這個LogStash的總體思路吧,其實個人理解,LogStash就是一個數據搬運工,他的搬運數據,分爲三個大的階段:
對應的官方文檔:https://www.elastic.co/guide/en/logstash/current/pipeline.html
而這每個階段,都是經過一些插件來實現的,好比在上述的配置文件中,咱們有:
剩下的部分就簡單了,切換目錄到logstash的目錄下,執行命令:
bin/logstash -f config/logstash.conf
最後執行的效果圖大概以下:
可使用Elasticsearch-Head等插件來查看是否同步正常:
大概就是這樣啦,後續我這邊會繼續嘗試使用其餘方式來進行數據同步,歡迎你們關注~