Logstash使用介紹

Logstash介紹

Logstash是一個數據收集處理轉發系統,是 Java開源項目。 它只作三件事:php

  • 數據輸入
  • 數據加工(不是必須的):如過濾,改寫等
  • 數據輸出

 

下載安裝

logstash是基於Java的服務,各操做系統安裝Java環境都可使用。 

Java 
https://www.java.com/zh_CN/ 
安裝後配置好java環境變量。
logstash 
最新版 https://www.elastic.co/downloads/logstash 
2.3.4版 https://www.elastic.co/downloads/past-releases/logstash-2-3-4html

 

配置結構

#輸入
Input{
   Jdbc{}
}
#加工過濾
Filter{
   Json{}
}
#輸出
Output{
   elasticsearch{}
}
 

 

支持的插件

Input: elasticsearch,file,http_poller,jdbc,log4j,rss,rabbitmq,redis,syslog,tcp,udp…等java

Filter: grok,json,mutate,split …等redis

Output: email,elasticsearch,file,http,mongodb,rabbitmq,redis,stdout,tcp,udp …等sql

配置說明地址: https://www.elastic.co/guide/en/logstash/current/input-plugins.htmlmongodb

 

應用示例

#jdbc_demo
input {
  jdbc {
        #數據庫連接設置
        jdbc_driver_library => "k:\k\k\sqljdbc_6.0\chs\sqljdbc42.jar"
        jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
        jdbc_connection_string => "jdbc:sqlserver://192.168.0.1;user=sa;password=sa;DatabaseName=EStatistic;"
        jdbc_user => "sa"
        jdbc_password => "sa"
    
	statement_filepath => "../config-demo/sqlscript/jdbc_demo.sql"         #sql腳本
	schedule => "* * * * *"                                                #執行計劃

	record_last_run => true                                                #記錄最後運行值
	use_column_value => true                                               #
	tracking_column => id                                                  #要記錄的字段
	last_run_metadata_path => "../config-demo/log/jdbc_demo"               #記錄的位置

	lowercase_column_names => true                                         #將字段名所有改成小寫
	clean_run => false
	
        jdbc_fetch_size => 1000                                                #分頁設置
	jdbc_page_size => 1000
	jdbc_paging_enabled => true
  }
}
filter {
	mutate {
        #重命名,能夠將字段名改掉
        rename => {
                "id" => "Id"
                "bid" => "BId"
                "saleconsultantid" => "SaleConsultantId"
                "avgreplyduration" => "AvgReplyDuration"
                "avgreplyratio" => "AvgReplyRatio"
                "avgonlineduration" => "AvgOnlineDuration"
                "stattime" => "StatTime"
			}
    }
}
output { 
	elasticsearch { 
		hosts => ["192.168.0.1:9200"]                            #es 服務地址
		index => "jdbc_demo"                                       #索引的名字
		document_type => "demoinfo"                                #類型的名字
		workers => 1                                               #輸出時進程數
		document_id=>"%{Id}"                                       #文檔的惟一ID
		template => "../config-demo/templates/jdbc_demo.json"      #模板的路徑
		template_name => "jdbc_demo"                               #模板的名字
		template_overwrite => false                                ##是否覆蓋已存在的模板
	} 
    #  stdout{
    #     codec => rubydebug 
    # }
}

 

索引模板數據庫

{
    "order": 1,
    "template": "jdbc_demo",                  //模板匹配規則,已經索引名匹配(eg:jdbc_demo-*,能夠匹配到,jdbc_demo-1,jdbc_demo-14...)
    "settings": {
        "index.number_of_shards": 4,          //分片數
        "number_of_replicas": 1               //每一個分配備份數
    },
    "mappings": {
        "_default_": {
            "_source": {
                "enabled": true
            }
        },
        "demoinfo": {           //動態模板 若是映射後,有新的字段添加進來,而且在字段區域沒有映射會按該動態模板匹配映射
            "dynamic_templates": [
                {
                    "string_field": {
                        "match": "*",
                        "match_mapping_type": "string", //匹配到全部字符串 設置爲不分詞
                        "mapping": {
                            "index": "not_analyzed",
                            "type": "string"
                        }
                    }
                }
            ],                      //類型名
            "_source": {
                "enabled": true
            },
            "_all": {
                "enabled": false
            },
            "properties": {                 //字段區域
                "Id": {
                    "type": "long"
                },
                "BId": {
                    "type": "integer"
                },
                "SaleConsultantId": {
                    "type": "integer"
                },
                "AvgReplyDuration": {
                    "type": "integer"
                },
                "AvgReplyRatio": {
                    "type": "double"
                },
                "AvgOnlineDuration": {
                    "type": "integer"
                },
                "StatTime": {
                    "format": "strict_date_optional_time||epoch_millis",
                    "type": "date"
                }
            }
        }
    },
    "aliases": {
        "logstashdemo": {}       //別名,匹配到該模板的會設置一個別名
    }
}

 

Sqlserver 查詢語句json

SELECT  [Id]
      ,[BId]
      ,[SaleConsultantId]
      ,[AvgReplyDuration]
      ,[AvgReplyRatio]
      ,[AvgOnlineDuration]
      ,[StatTime]
  FROM [EStatistic].[dbo].[StatSessionBy7Day] 
  WHERE Id>:sql_last_value
  --:sql_last_value是記錄的最後的一個值

 

5.x模板String字段ruby

 {"properties": {                 //字段區域
                 "NewField": {
                    "type": "keyword",       // keyword 不分詞
                   "index": false            //不創建索引
                   },
                 "NewFieldText": {
                  "type": "text",           // text分詞 
                   "index": true            // 創建索引
                 }
            }
            }

 

 

注意事項

  • Jdbc(input)拉取數據使用分頁功能時沒法查詢text、ntext 和 image字段。
  • jdbc(input)使用分頁時會將字段所有轉換爲大寫。
  • elasticsearch(output)的模板中匹配符,必定要可以匹配到索引名稱才能生效,而且要避免讓其餘索引匹配到,以避免影響其它新索引。
  • elasticsearch(output)定義索引名稱必須全小寫(es限制)。
  • 以時間字段進行跟蹤時sql查詢語句不能使用Top x限制每次查詢條數,這會致使最後的記錄值並不是是最大的值,所拉取的數據可能會出現數據重複拉取(可是在es中不會體現爲多條重複數據,只是version字段會>1)或數據丟失。
  • 跟蹤主鍵ID時需顯示設置 order by id asc
  • 無檢索文本內容需設置「index」: 「not_analyzed」
 

資料

相關文章
相關標籤/搜索