利用logstash從mysql同步數據到ElasticSearch

前面一篇已經把logstash和logstash-input-jdbc安裝好了。html

下面就說下具體怎麼配置。java

1.先在安裝目錄bin下面(通常都是在bin下面)新建兩個文件jdbc.conf和jdbc.sqlmysql

2.配置jdbc.confsql

 1 input {
 2       stdin {
 3        }
 4       jdbc {
 5         # 鏈接的數據庫地址和哪個數據庫,指定編碼格式,禁用SSL協議,設定自動重連
 6         jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/microstorage_backend?characterEncoding=UTF-8&useSSL=false&autoReconnect=true"
 7         jdbc_user => "root"
 8         jdbc_password => "123456"
 9         # 下載鏈接數據庫的驅動包,建議使用絕對地址
10        jdbc_driver_library => "/usr/local/Cellar/logstash/6.5.4/libexec/logstash-core/lib/jars/mysql-connector-java-5.1.42.jar"
11  
12        jdbc_driver_class => "com.mysql.jdbc.Driver"
13        jdbc_paging_enabled => "true"
14        jdbc_page_size => "50000"
15        codec => plain { charset => "UTF-8"}
16  
17         #使用其它字段追蹤,而不是用時間
18       #use_column_value => true   //這裏若是是用時間追蹤好比:數據的更新時間或建立時間等和時間有關的這裏必定不能是true, 切記切記切記,我是用update_time來追蹤的
19         #追蹤的字段
20      tracking_column => update_time
21      record_last_run => true
22      #上一個sql_last_value值的存放文件路徑, 必需要在文件中指定字段的初始值  這裏說是必須指定初始值,我沒指定默認是1970-01-01 08:00:00 
23      last_run_metadata_path => "/usr/local/opt/logstash/lastrun/.logstash_jdbc_last_run"  //這裏的lastrun文件夾和.logstash_jdbc_last_run是本身建立的
24  
25       jdbc_default_timezone => "Asia/Shanghai"   //設置時區
26       #statement => SELECT * FROM goods  WHERE update_time > :last_sql_value  //這裏要說明一下若是直接寫sql語句,前面這種寫法確定不對的
27                                                 ,加上引號也試過也不對,因此我直接寫在jdbc.sql文件中
28       statement_filepath => "/usr/local/Cellar/logstash/6.5.4/bin/jdbc.sql"
29  
30  
31      #是否清除 last_run_metadata_path 的記錄,若是爲真那麼每次都至關於從頭開始查詢全部的數據庫記錄
32      clean_run => false
33  
34        # 這是控制定時的,重複執行導入任務的時間間隔,第一位是分鐘 不設置就是1分鐘執行一次 35        schedule => "* * * * *"
36        type => "std"
37      }
38  }
39 
40  filter {
41 
42     json {
43 
44         source => "message"
45 
46         remove_field => ["message"]
47 
48     }
49 
50 }
51 
52 output {
53 
54     elasticsearch {
55 
56         # 要導入到的Elasticsearch所在的主機
57 
58         hosts => "127.0.0.1:9200"
59 
60         # 要導入到的Elasticsearch的索引的名稱
61 
62         index => "goods"
63 
64         # 類型名稱(相似數據庫表名)
65 
66         document_type => "spu"
67 
68         # 主鍵名稱(相似數據庫主鍵)
69 
70         document_id => "%{id}"
71     }
72 
73     stdout {
74 
75         # JSON格式輸出
76 
77         codec => json_lines
78 
79     }
80 }

3.配置jdbc.sql數據庫

1 select id,goods_name,goods_no,price,account_id,create_time,update_time from goods where update_time > :sql_last_value

4.咱們來看下 .logstash_jdbc_last_run文件中的內容(網上講述該配置的時候都沒講到裏面具體的內容寫法,致使不少人很迷惑,其中我就是)json

前面的---具體什麼意思,我也不太清楚。ruby

 

5.啓動jdbc.conf配置,開始同步數據elasticsearch

 

第一次:由於時間是從1970年開始的因此會所有同步一遍。至關於全量同步了編碼

從第二次開始,會從上次最新的一次時間同步,既新增和修改都會同步spa

 

遇到的問題:

1.ES中8小時時差的問題?

解決方法:從源頭解決問題

在jdbc.conf配置文件中只要是有關時間的字段都手動+8小時

 39 filter {
 40     json {
 41         source => "message"
 42         remove_field => ["message"]
 43     }
 44    // date類型不能省略,否則會報錯,       就是把當前字段+8小時後賦值給新的字段,而後再取新字段的值賦值給老的字段,再把新的字段刪除
 45     date {
 46       match => ["message","UNIX_MS"]
 47       target => "@timestamp"
 48        }
 49          ruby {
 50                 code => "event.set('timestamp', event.get('@timestamp').time.localtime + 8*60*60)"
 51          }      
 52         ruby{   
 53                 code => "event.set('@timestamp',event.get('timestamp'))"
 54         }       
 55         mutate{ 
 56                remove_field => ["timestamp"]
 57         }      
 58         
 59    date {
 60     match => ["message","UNIX_MS"]
 61     target => "create_time"
 62          } 
 63          ruby {
 64                  code => "event.set('@create_time', event.get('create_time').time.localtime + 8*60*60)"
 65          }       
 66         ruby {   
 67                  code => "event.set('create_time',event.get('@create_time'))"
 68          }       
 69         mutate { 
 70          remove_field => ["@create_time"]
 71         }
 72         
 73         date {
 74         match => ["message","UNIX_MS"]
 75         target => "update_time"
 76          }
 77          ruby {
 78                  code => "event.set('@update_time', event.get('update_time').time.localtime + 8*60*60)"
 79          }       
 80         ruby {   
 81                  code => "event.set('update_time',event.get('@update_time'))"
 82          }       
 83         mutate {
 84          remove_field => ["@update_time"]
 85         }
86 }

總結:主要是配置,有什麼問題,先檢查配置文件。 

相關文章
相關標籤/搜索