使用logstash從MySQL增量提取數據,傳入elasticsearch中,並經過kibana作簡單的圖表html
logstash安裝java
#下載,logstash5及以上版本須要jdk8 wget https://artifacts.elastic.co/downloads/logstash/logstash-6.1.1.zip #解壓 unzip logstash-6.1.1.zip #To test your Logstash installation, run the most basic Logstash pipeline #測試logstash環境,運行以下demo(input {stdin{}}:接收終端輸入;output {stdout{}}:輸出到終端),出現Pipeline main started爲正常 cd logstash-6.1.1 ./bin/logstash -e 'input {stdin{}} output {stdout{}}' #-----------------------------------start----------------------------------- Settings: Default pipeline workers: 24 Pipeline main started #------------------------------------end------------------------------------ #The -e flag enables you to specify a configuration directly from the command line. Specifying configurations at the command line lets you quickly test configurations without having to edit a file between iterations. The pipeline in the example takes input from the standard input, stdin, and moves that input to the standard output, stdout, in a structured format. #測試,輸入hello world,而後回車 #出現以下信息即爲安裝成功 #-----------------------------------start----------------------------------- 2018-01-04T02:44:41.024Z hostname hello world #------------------------------------end------------------------------------
logstash-input-jdbc插件node
#logstash-6.1.1已支持logstash-input-jdbc,不須要單獨安裝 #老版本安裝logstash-input-jdbc ./bin/plugin install logstash-input-jdbc
配置logstash增量提取MySQL數據mysql
cat mysql_pipelines.yml #-----------------------------------start----------------------------------- #輸入部分 input { jdbc { #鏈接MySQL驅動,須要本身下載 jdbc_driver_library => "/es/mysql-connector-java-5.1.31.jar" jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_connection_string => "jdbc:mysql://10.112.29.30:3306/mstore" #鏈接數據庫帳號信息 jdbc_user => "MySQL_admin" jdbc_password => "password" #分頁 jdbc_paging_enabled => true #分頁大小 jdbc_page_size => 100000 #流式獲取數據,每次取10000. jdbc_fetch_size => 10000 #Maximum number of times to try connecting to database connection_retry_attempts => 3 #Number of seconds to sleep between connection attempts connection_retry_attempts_wait_time => 1 #Connection pool configuration. The amount of seconds to wait to acquire a connection before raising a PoolTimeoutError (default 5) jdbc_pool_timeout => 5 #Whether to force the lowercasing of identifier fields lowercase_column_names => true #Whether to save state or not in last_run_metadata_path #保存上次運行記錄,增量提取數據時使用 record_last_run = > true #"* * * * *"爲每分鐘執行一次 schedule => "* * * * *" #Use an incremental column value rather than a timestamp use_column_value => true #sql_last_value #The value used to calculate which rows to query. Before any query is run, this is set to Thursday, 1 January 1970, or 0 if use_column_value is true and tracking_column is set. It is updated accordingly after subsequent queries are run. tracking_column => "id" #查詢語句 statement => "SELECT id,package_name,name,sub_name,editor_comment,high_quality,sub_category,tag,update_time FROM tbl_app WHERE id > :sql_last_value" } } #過濾部分 filter { json { source => "message" remove_field => ["message"] } date{ match => ["update_time","yyy-MM-dd HH:mm:ss"] } } #輸出到elastsicearch output { elasticsearch { #elasticsearch集羣地址,不用列出全部節點,默認端口號也可省略 hosts => ["10.127.92.181:9200", "10.127.92.212:9200", "10.127.92.111:9200"] #索引值,查詢的時候會用到;須要先在elasticsearch中建立對應的mapping,也能夠採用默認的mapping index => "store" #指定插入elasticsearch文檔ID,對應input中sql字段id document_id => "%{id}" } } #------------------------------------end------------------------------------ #注:使用時請去掉此文件中的註釋,否則會報錯 #logstash會把執行記錄默認存在帳戶根目錄下: /root/.logstash_jdbc_last_run #若是須要從新加載數據到elasticsearch,須要刪除這個文件
啓動logstashlinux
請在啓動elasticsearch以後再啓動logstash,否則鏈接elasticsearch會報錯git
#後臺啓動logstash ./bin/logstash -f config/mysql_pipelines.yml & #若是報下面錯誤,說明jdk版本不支持 #-----------------------------------start----------------------------------- NameError: cannot link Java class org.logstash.RubyUtil org/logstash/RubyUtil : Unsupported major.minor version 52.0 method_missing at org/jruby/javasupport/JavaPackage.java:259 <main> at /disk2/es/logstash-6.1.1/logstash-core/lib/logstash-core/logstash-core.rb:37 require at org/jruby/RubyKernel.java:955 <main> at /disk2/es/logstash-6.1.1/logstash-core/lib/logstash/runner.rb:1 require at org/jruby/RubyKernel.java:955 <main> at /disk2/es/logstash-6.1.1/lib/bootstrap/environment.rb:66 #------------------------------------end------------------------------------ #logstash-6.1.1須要jdk1.8 #請自行下載jdk1.8版本放到/opt/jdk1.8.0_151 #編寫啓動腳本以下 cat exec_logstash.sh #-----------------------------------start----------------------------------- #!/bin/sh #配置jdk1.8執行環境 export JAVA_HOME=/opt/jdk1.8.0_151 export PATH=$JAVA_HOME/bin:$PATH export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar ./bin/logstash -f config/mysql_pipelines.yml & #------------------------------------end------------------------------------
elasticsearch安裝github
#下載 wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-6.1.1.zip #解壓 unzip elasticsearch-6.1.1.zip cd elasticsearch-6.1.1 #配置jvm內存 vim config/jvm.options #-----------------------------------start----------------------------------- -Xms8g -Xmx8g #------------------------------------end------------------------------------ #注:把內存(少於)一半給Lucene,內存對於 Elasticsearch 來講絕對是重要的,它能夠被許多內存數據結構使用來提供更快的操做。可是說到這裏, 還有另一個內存消耗大戶 非堆內存 (off-heap):Lucene。 #Lucene 被設計爲能夠利用操做系統底層機制來緩存內存數據結構。 Lucene 的段是分別存儲到單個文件中的。由於段是不可變的,這些文件也都不會變化,這是對緩存友好的,同時操做系統也會把這些段文件緩存起來,以便更快的訪問。 #Lucene 的性能取決於和操做系統的相互做用。若是你把全部的內存都分配給 Elasticsearch 的堆內存,那將不會有剩餘的內存交給 Lucene。 這將嚴重地影響全文檢索的性能。 #標準的建議是把 50% 的可用內存做爲 Elasticsearch 的堆內存,保留剩下的 50%。固然它也不會被浪費,Lucene 會很樂意利用起餘下的內存。 #若是你不須要對分詞字符串作聚合計算(例如,不須要 fielddata )能夠考慮下降堆內存。堆內存越小,Elasticsearch(更快的 GC)和 Lucene(更多的內存用於緩存)的性能越好。 #分配給Elasticsearch的內存不能超過32G。JVM 在內存小於 32 GB 的時候會採用一個內存對象指針壓縮技術。 #在 Java 中,全部的對象都分配在堆上,並經過一個指針進行引用。 普通對象指針(OOP)指向這些對象,一般爲 CPU 字長 的大小:32 位或 64 位,取決於你的處理器。指針引用的就是這個 OOP 值的字節位置。 #對於 32 位的系統,意味着堆內存大小最大爲 4 GB。對於 64 位的系統, 可使用更大的內存,可是 64 位的指針意味着更大的浪費,由於你的指針自己大了。更糟糕的是, 更大的指針在主內存和各級緩存(例如 LLC,L1 等)之間移動數據的時候,會佔用更多的帶寬。 #Java 使用一個叫做 內存指針壓縮(compressed oops)的技術來解決這個問題。 它的指針再也不表示對象在內存中的精確位置,而是表示 偏移量 。這意味着 32 位的指針能夠引用 40 億個 對象 , 而不是 40 億個字節。最終, 也就是說堆內存增加到 32 GB 的物理內存,也能夠用 32 位的指針表示。 #一旦你越過那個神奇的 ~32 GB 的邊界,指針就會切回普通對象的指針。 每一個對象的指針都變長了,就會使用更多的 CPU 內存帶寬,也就是說你實際上失去了更多的內存。事實上,當內存到達 40–50 GB 的時候,有效內存才至關於使用內存對象指針壓縮技術時候的 32 GB 內存。 #即使你有足夠的內存,也儘可能不要 超過 32 GB。由於它浪費了內存,下降了 CPU 的性能,還要讓 GC 應對大內存。 #設置堆內存爲 31 GB 是一個安全的選擇。 另外,你能夠在你的 JVM 設置裏添加 -XX:+PrintFlagsFinal 用來驗證 JVM 的臨界值, 而且檢查 UseCompressedOops 的值是否爲 true。對於你本身使用的 JVM 和操做系統,這將找到最合適的堆內存臨界值。 #具體請參考:https://www.elastic.co/guide/cn/elasticsearch/guide/current/heap-sizing.html #elasticsearch集羣信息配置 vim config/elasticsearch.yml #-----------------------------------start----------------------------------- #配置集羣名稱,每一個節點集羣名稱請保持一致 cluster.name: my-app #配置節點名稱,每一個節點須要起不一樣的名稱 node.name: node-1 #配置數據存儲目錄 path.data: /disk3/to/data,/disk4/to/data #log存儲位置 path.logs: /disk3/to/logs #Set the bind address to a specific IP (IPv4 or IPv6) #0.0.0.0爲不限制訪問 network.host: 0.0.0.0 #端口 http.port: 9200 #配置集羣 discovery.zen.ping.unicast.hosts: ["10.127.92.212", "10.127.92.181"] #Lock the memory on startup #true表示不容許內存交換(內存交換影響速度) #若是報錯,使用執行ulimit -l unlimited,取消限制最大加鎖內存 bootstrap.memory_lock: true #系統調用過濾器,建議禁用該項檢查,由於不少檢查項須要Linux 3.5以上的內核 bootstrap.system_call_filter: false #------------------------------------end------------------------------------
elasticsearch配置sql
#在其餘資源可用的前提下,單個JVM能開啓的最大線程數是/proc/sys/vm/max_map_count的設置數的一半 #永久生效,把vm.max_map_count=262144寫入/etc/sysctl.conf中,而後執行sysctl -p #默認vm.max_map_count=65530,會報錯,致使elasticsearch沒法啓動 sysctl -w vm.max_map_count=262144 sysctl -p #默認elasticsearch沒法以root帳戶啓動,須要建立單獨帳戶 #建立用戶組 groupadd elastic #建立用戶es,指定附屬組root useradd -g elastic -G root es #配置jdk1.8環境 vim bin/elasticsearch #-----------------------------------start----------------------------------- #最上面加入下面內容 export JAVA_HOME=/disk4/es/jdk1.8.0_151 export PATH=$JAVA_HOME/bin:$PATH export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar #------------------------------------end------------------------------------
中文分詞插件安裝數據庫
#不須要中文分詞搜索的請忽略這一步 #下載中文分詞插件ik wget https://github.com/medcl/elasticsearch-analysis-ik/releases/download/v6.1.1/elasticsearch-analysis-ik-6.1.1.zip #解壓 unzip elasticsearch-analysis-ik-6.1.1.zip #移動解壓文件到elasticsearch內plugins下便可 mv elasticsearch-analysis-ik-6.1.1 elasticsearch-6.1.1/plugins/ik #ik在github地址:https://github.com/medcl/elasticsearch-analysis-ik #Analyzer: ik_smart , ik_max_word ; Tokenizer: ik_smart , ik_max_word #ik_max_word: 會將文本作最細粒度的拆分,好比會將「中華人民共和國國歌」拆分爲「中華人民共和國,中華人民,中華,華人,人民共和國,人民,人,民,共和國,共和,和,國國,國歌」,會窮盡各類可能的組合; #ik_smart: 會作最粗粒度的拆分,好比會將「中華人民共和國國歌」拆分爲「中華人民共和國,國歌」。 #在elasticsearch啓動以後能夠測試 curl -H 'content-type: application/json' 'http://localhost:9200/store/_analyze?pretty=true' -d '{"text":"中華人民共和國國歌","analyzer":"ik_max_word"}' curl -H 'content-type: application/json' 'http://localhost:9200/store/_analyze?pretty=true' -d '{"text":"中華人民共和國國歌","analyzer":"ik_smart"}'
啓動elasticsearchjson
#以用戶es啓動elasticsearch #-d指定後臺啓動 sudo -u es ./bin/elasticsearch -d #逐個啓動集羣節點 #檢查集羣 curl -XGET http://localhost:9200?pretty #-----------------------------------start----------------------------------- { "name" : "node-1", "cluster_name" : "my-app", "cluster_uuid" : "ncrtFPuhRJuv9D7R4cOp4w", "version" : { "number" : "6.1.1", "build_hash" : "bd92e7f", "build_date" : "2017-12-17T20:23:25.338Z", "build_snapshot" : false, "lucene_version" : "7.1.0", "minimum_wire_compatibility_version" : "5.6.0", "minimum_index_compatibility_version" : "5.0.0" }, "tagline" : "You Know, for Search" } #------------------------------------end------------------------------------ #查看集羣 curl -XGET 'localhost:9200/_cat/nodes?v&pretty' #-----------------------------------start----------------------------------- ip heap.percent ram.percent cpu load_1m load_5m load_15m node.role master name 10.127.92.212 25 92 0 0.00 0.00 0.00 mdi - node-2 10.127.92.181 13 99 1 0.18 0.21 0.10 mdi - node-1 10.127.92.111 12 70 3 0.14 0.04 0.01 mdi * node-3 #------------------------------------end------------------------------------ #*號表示爲master節點 #查看elasticsearch安裝插件 curl -XGET localhost:9200/_cat/plugins?v #-----------------------------------start----------------------------------- name component version node-2 analysis-ik 6.1.1 node-1 analysis-ik 6.1.1 node-3 analysis-ik 6.1.1 #------------------------------------end------------------------------------
建立mapping
mapping相似於關係型數據庫中的表結構定義
#建立mapping文件 vim store_mapping.json #-----------------------------------start----------------------------------- { "settings": { "number_of_shards": 5,#主分片數,默認5 "number_of_replicas": 1#副本數,寫1爲每一個主分片有一個副本 }, "mappings": { #type類型,新版貌似不能修改了,默認就是doc。也就是說這個位置如今固定爲doc了 "doc": { "properties": { #sql中對應字段信息 "id": { #支持的數據類型:https://www.elastic.co/guide/en/elasticsearch/reference/current/mapping-types.html "type": "long", #The index option controls whether field values are indexed. It accepts true or false and defaults to true. Fields that are not indexed are not queryable. "index": false, #是否存在於_source,和source filtering使用相關,默認true "store": true }, "package_name": { "index": false, #keyword表示是不會拆解,表示準確值 #They are typically used for filtering (Find me all blog posts where status is published), for sorting, and for aggregations. Keyword fields are only searchable by their exact value. #If you need to index structured content such as email addresses, hostnames, status codes, or tags, it is likely that you should rather use a keyword field. #If you need to index full text content such as email bodies or product descriptions, it is likely that you should rather use a text field. "type": "keyword" }, "name": { #type爲text會被拆解成詞元 #an analyzer to convert the string into a list of individual terms before being indexed #The analysis process allows Elasticsearch to search for individual words within each full text field. Text fields are not used for sorting and seldom used for aggregations "type": "text", #指定解析者,默認standard "analyzer": "ik_max_word", #The analyzer that should be used at search time on analyzed fields. Defaults to the analyzer setting. "search_analyzer": "ik_max_word" }, "sub_name": { "type": "text", "analyzer": "ik_max_word", "search_analyzer": "ik_max_word" }, "editor_comment": { "type": "text", "analyzer": "ik_max_word", "search_analyzer": "ik_max_word" }, "high_quality": { "type": "integer", "store": true }, "sub_category": { "type": "text", "analyzer": "ik_max_word", "search_analyzer": "ik_max_word" }, "tag": { "type": "text", "analyzer": "ik_max_word", "search_analyzer": "ik_max_word" }, "update_time": { "type": "date" } } } } } #------------------------------------end------------------------------------ #新建index,這裏爲store(和logstash配置文件中保持一致) curl -XPUT 'localhost:9200/store' #上傳mapping到建立的index中 curl -XPUT -H 'content-type: application/json' 'http://localhost:9200/store' -d @store_mapping.json #查看建立的mapping curl -XGET http://localhost:9200/store/doc/_mapping?pretty #這個時候就能夠啓動logstash了 #查看elasticsearch是否已經有數據 curl -XGET http://localhost:9200/store/doc/_search?pretty=true #v:顯示詳細信息;pretty:格式化顯示信息 #來個複雜點的查詢。查詢name或者editor_comment包含「自由」,並以update_time降序,_score降序排序搜索結果 curl -XPOST -H 'content-type: application/json' 'http://localhost:9200/store/doc/_search?pretty=true' -d ' { "query": { "multi_match" : { "query": "自由", "fields": [ "name", "editor_comment" ] } }, "sort": [ { "update_time": { "order": "desc" }}, { "_score": { "order": "desc" }} ] }'
kibana安裝配置
#下載kibana wget https://artifacts.elastic.co/downloads/kibana/kibana-6.1.1-linux-x86_64.tar.gz #解壓 tar -zxvf kibana-6.1.1-linux-x86_64.tar.gz cd kibana-6.1.1-linux-x86_64 #配置 vim config/kibana.yml #-----------------------------------start----------------------------------- #配置kibana訪問端口 server.port: 5601 #配置容許訪問kibana的ip,0.0.0.0表示不作限制 server.host: "0.0.0.0" #elasticsearch集羣鏈接 elasticsearch.url: "http://localhost:9200" #kibana運行ID位置 pid.file: /var/run/kibana.pid #------------------------------------end------------------------------------ #啓動kibana ./bin/kibana &
訪問kibana:http://ip:port
界面以下:
點擊Management,而後點擊Index Patterns,建立一個index過濾器
點擊Next step,選擇是否經過時間過濾數據
點擊Create index pattern即建立
點擊左側Discover,建立一個查詢
點擊查詢
左側Avaliable Fields能夠設置右側顯示字段信息,點擊上方Save能夠保存查詢條件
點擊左側Visualize,點擊Create a visualization
選擇一個你要建立的圖表,我選擇了餅圖
store_test爲Discover中保存的搜索條件名稱,點擊store_test
點擊頁面靠左側部分的Split Slices,而後選擇Aggregation類型,配置結果以下
而後點擊執行按鈕,結果以下,From爲>=,to爲<
點擊上方Save,並定義個Visualization名稱
注:若是須要配置雙層圓環,能夠點擊Add sub-buckets,操做同Split Slices
點擊左側Dashboard,點擊Create a dashboard
點擊Add按鈕
點擊visualize名稱,下方出現圖表以下
點擊上方Save,並輸入Dashbioard名稱
注:建立的圖表均可以以這種方式放到Dashboard中