logstash筆記

把以前的logstash筆記分享,輕拍。html

Logstash
    doc
        https://www.elastic.co/guide/en/logstash/current/plugins-filters-mutate.html#plugins-filters-mutate-update
        
    wget https://artifacts.elastic.co/downloads/logstash/logstash-5.4.0.tar.gz
    2.x須要java7的支持
    文件
        bin/logstash  主程序文件
        配置文件須要本身編寫
    安裝使用
        tar *.tar.gz
        cd logstash
        vi logstash.conf
            input  output是兩個必要定義的元素
            filter 是可選的,做用是按規定修改數據
            格式
                1,命令形式 bin/logstash -e 'input { stdin { } } output { stdout {} }'    
                    stdin 是標準輸入plugin
                    stdout 是標準輸出,屏幕
                2,文件形式
                    input {
                       stdin {}
                    }
                    output {
                       stdout {}
                    }
                例:文件輸入,輸出esasticsearch
                    input {
                       file => {
                          path => "/var/log/syslog"
                          start_position => "beginning"
                          ignore_older => 0
                       }
                    }
                    output {
                       esasticsearch {
                          hosts => [ "localhost:9200" ]
                       }
                    }
                    查看導入es的數據
                        curl -XGET 'localhost:9200/logstash-*/_search?pretty&q=response=200'
    命令
        logstash-plugin
            list                          List all installed Logstash plugins
                Usage:
                    bin/logstash-plugin list [OPTIONS] [PLUGIN]
                Parameters:
                    [PLUGIN]                      Part of plugin name to search for, leave empty for all plugins
                Options:
                    --installed                   List only explicitly installed plugins using bin/logstash-plugin install ... (default: false)
                    --verbose                     Also show plugin version number (default: false)
                    --group NAME                  Filter plugins per group: input, output, filter or codec
                    -h, --help                    print help
            install                       Install a Logstash plugin
                Usage:
                    bin/logstash-plugin install [OPTIONS] [PLUGIN] ...
                Parameters:
                    [PLUGIN] ...                  plugin name(s) or file
                Options:
                    --version VERSION             version of the plugin to install
                    --[no-]verify                 verify plugin validity before installation (default: true)
                    --preserve                    preserve current gem options (default: false)
                    --development                 install all development dependencies of currently installed plugins (default: false)
                    --local                       force local-only plugin installation. see bin/logstash-plugin package|unpack (default: false)
                    -h, --help                    print help
            remove                        Remove a Logstash plugin
            update                        Update a plugin
            pack                          Package currently installed plugins, Deprecated: Please use prepare-offline-pack instead
            unpack                        Unpack packaged plugins, Deprecated: Please use prepare-offline-pack instead
            generate                      Create the foundation for a new plugin
            prepare-offline-pack          Create an archive of specified plugins to use for offline installation
    啓動
        5.x以上默認開啓了xpack, 若是連不上es會報錯,在logstash.yml裏增長xpack.monitoring.enabled: false
        命令:
            bin/logstash -e 'input { stdin { } } output { stdout {} }'    
        配置文件:
            bin/logstash -f logstash.conf
            logstash 還提供一個方便咱們規劃和書寫配置的小功能。你能夠直接用 bin/logstash -f /etc/logstash.d/ 來運行。logstash 會自動讀取 /etc/logstash.d/ 目錄下全部的文本文件,而後在本身內存裏拼接成一個完整的大配置文件,再去執行。
                【logstash.d/*.conf 這樣只會讀取排第一後綴是.conf的文件】
            注意:
            logstash 列出目錄下全部文件時,是字母排序的。而 logstash 配置段的 filter 和 output 都是順序執行,因此順序很是重要。採用多文件管理的用戶,推薦採用數字編號方式命名配置文件,同時在配置中,嚴謹採用 if 判斷限定不一樣日誌的動做
        參數:
            --allow-unsafe-shutdown  迫使logstash退出時關閉,默認狀況下,logstash將拒絕離開,直到全部收到事件輸出完
            -w, --pipeline-workers COUNT  工做的pipeline數量,默認是 CPU 核數
            -b, --pipeline-batch-size     每一個 Logstash pipeline 線程,在執行具體的 filter 和 output 函數以前,最多能累積的日誌條數。默認是 125 條。越大性能越好,一樣也會消耗越多的 JVM 內存
            -u, --pipeline-batch-delay    每一個 Logstash pipeline 線程,在打包批量日誌的時候,最多等待幾毫秒。默認是 5 ms
            -l, --log FILE  自身的日誌
            --quiet  只打印出error級別的日誌
            --verbose  日誌級別
            --debug  日誌級別
            -v  日誌級別
            -p, --pluginpath PATH   加載本身的插件
            -t, --configtest  檢查配置文件的語法錯誤
            -r, --[no-]auto-reload   監視配置文件的更新,並自動加載
            --reload-interval RELOAD_INTERVAL   配置文件監控的間隔,默認是3秒
            --allow-env  支持shell的變量
    語法
        Logstash從 1.3.0 版開始支持條件判斷和表達式。
            字段引用、sprintf格式、條件判斷只能用於filter和output,不能用於input。
            if 
                支持: grok
                不支持: mutate
                if "ab" in [fieldname] {
                  ...
                } else if "a" in "ab" { 
                  ...
                } else {
                  ...
                }
                ==(等於), !=(不等於), <(小於), >(大於), <=(小於等於), >=(大於等於)
                =~(匹配正則), !~(不匹配正則)
                in(包含), not in(不包含)
                and(與), or(或), nand(非與), xor(非或)
                ()(複合表達式), !()(對複合表達式結果取反)
                filter {
                  if [action] == "login" {
                    mutate { remove => "secret" }
                  }
                }
                output {
                  if "_grokparsefailure" not in [tags] {
                    elasticsearch { ... }
                  }
                }
                if [loglevel] == "ERROR" and [deployment] == "production"
                if [foo] in [foobar]
                if [foo] in "foo"
                if "hello" in [greeting]
                if [foo] in ["hello", "world", "foo"]
                if [missing] in [alsomissing]
                if !("foo" in ["hello", "world"])
                if [foo] #判斷filed是否存在,不能加雙引號
        數據類型    
            bool
                debug => true
            string
                host => "hostname"
            number
                port => 514
            array
                match => ["datetime", "UNIX", "ISO8601"]
            hash
                options => {
                    key1 => "value1",
                    key2 => "value2"
                }
            注意:若是你用的版本低於 1.2.0,哈希的語法跟數組是同樣的,像下面這樣寫:
                match => [ "field1", "pattern1", "field2", "pattern2" ]
        字段
            #字段引用使用[]號,好比使用status作判斷,if [status] = 200 {}
            #如果要取得字段的值,使用 %{ip}
            #取os的值,須要這樣:[ua][os],能夠把ua看做數組名,os是下標。
            字段是 Logstash::Event 對象的屬性
            字段引用
                [tags]
                多維哈希表,或者叫哈希的哈希值獲取
                    options => {
                        a => {
                            b => "b"
                        }
                    }
                    [options][a][0]
                logstash 的數組也支持倒序下標,即 [geoip][location][-1] 能夠獲取數組最後一個元素的值。
                Logstash 還支持變量內插,在字符串裏使用字段引用的方法是這樣:
                    "the longitude is %{[geoip][location][0]}"
    配置
        host字段自定義
            host會使用hosts文件的第一行做爲內容
            127.0.0.1  localhost
        通用
            每一個 logstash 過濾插件,都會有四個方法叫 add_tag, remove_tag, add_field 和 remove_field
            type 和 tags 是 logstash 事件中兩個特殊的字段。一般來講咱們會在輸入區段中經過 type 來標記事件類型。而 tags 則是在數據處理過程當中,由具體的插件來添加或者刪除的
        input
            file
                file {
                    path => ["/var/log/*.log", "/var/log/message"]
                    type => "system"
                    start_position => "beginning"
                }
                Logstash 使用一個名叫 FileWatch 的 Ruby Gem 庫來監聽文件變化。這個庫支持 glob 展開文件路徑,並且會記錄一個叫 .sincedb 的數據庫文件來跟蹤被監聽的日誌文件的當前讀取位置。
                sincedb 文件中記錄了每一個被監聽的文件的 inode, major number, minor number 和 pos
                一些比較有用的配置項:
                    tag
                        此設置沒有默認值。向您的活動添加任意數量的任意標籤
                    discover_interval
                        logstash 每隔多久去檢查一次被監聽的 path 下是否有新文件。默認值是 15 秒。
                    exclude
                        不想被監聽的文件能夠排除出去,這裏跟 path 同樣支持 glob 展開。
                    close_older
                        一個已經監聽中的文件,若是超過這個值的時間內沒有更新內容,就關閉監聽它的文件句柄。默認是 3600 秒,即一小時。
                    ignore_older
                        在每次檢查文件列表的時候,若是一個文件的最後修改時間超過這個值,就忽略這個文件。默認是 86400 秒,即一天。
                    sincedb_path
                        記錄文件讀取到的位置(若是文件被切割,個人想法是:logstash只會寫新的值進來,新進來的日誌仍然正常讀取)
                        若是你不想用默認的 $HOME/.sincedb(Windows 平臺上在 C:\Windows\System32\config\systemprofile\.sincedb),能夠經過這個配置定義 sincedb 文件到其餘位置。
                    sincedb_write_interval
                        logstash 每隔多久寫一次 sincedb 文件,默認是 15 秒。
                    stat_interval
                        logstash 每隔多久檢查一次被監聽文件狀態(是否有更新),默認是 1 秒。
                    start_position
                        只有在一開始的時候纔有效,若是有了sincedb的信息,此選項無效
                        logstash 從什麼位置開始讀取文件數據,默認是結束位置,也就是說 logstash 進程會以相似 tail -F 的形式運行。若是你是要導入原有數據,把這個設定改爲 "beginning",logstash 進程就從頭開始讀取,有點相似 cat,可是讀到最後一行不會終止,而是繼續變成 tail -F。
                注意
                    FileWatch 只支持文件的絕對路徑,並且會不自動遞歸目錄。
                    能寫成 path => "/path/to/*/*/*/*.log"。FileWatch 模塊提供了一個稍微簡單一點的寫法:/path/to/**/*.log,用 ** 來縮寫表示遞歸所有子目錄。
                    start_position 僅在該文件從未被監聽過的時候起做用。若是 sincedb 文件中已經有這個文件的 inode 記錄了,那麼 logstash 依然會從記錄過的 pos 開始讀取數據。因此重複測試的時候每回須要刪除 sincedb 文件(官方博客上提供了另外一個巧妙的思路:將 sincedb_path 定義爲 /dev/null,則每次重啓自動從頭開始讀)。
                    由於 windows 平臺上沒有 inode 的概念,Logstash 某些版本在 windows 平臺上監聽文件不是很靠譜。windows 平臺上,推薦考慮使用 nxlog。
            stdin
                標準輸入
            syslog 
                本機的 syslog 就會默認發送到 logstash 裏
            redis
                list => BLPOP
                channel => SUBSCRIBE
                pattern_channel => PSUBSCRIBE
                配置:
                input {
                    redis {
                        batch_count => 1
                        data_type => "list"
                        key => "logstash-*"
                        host => "192.168.0.2"
                        port => 6379
                        threads => 5
                    }
                }
            kafka {
                #https://www.elastic.co/guide/en/logstash/2.3/plugins-inputs-kafka.html#plugins-inputs-kafka-zk_connect
                #----------- 5.0如下的配置
                zk_connect => "10.26.93.65:2181,127.0.0.1:2181"  #Zookeeper的地址
                group_id => "nginx_log"
                #topic訂閱有三個可選topic_id or white_list or black_list
                #white_list => "sudiyi_.*"  #Whitelist of topics to include for consumption.
                #black_list => "xx.*|aabb"  #Blacklist of topics to exclude from consumption.
                topic_id => "app_server" #單個topic訂閱
                consumer_threads => 5  #應該與kafka的Partition同樣
                # ----------5.0以上配置
                bootstrap_servers => "10.26.93.65:9092,127.0.0.1:9092" #kafka集羣的地址
                group_id => "nginx_log"
                #topic
                codec => "json" #約定數據傳輸格式,就是編碼和解碼
                topics => ["sudiyi_express_admin","user_center_service"]
                #topics_pattern => "user_center.*" #若是這個存在,topics會被忽略
                #線程完美的配置應該和kafkapartitions同樣,過多的線程是浪費的
                consumer_threads => 3
            }
        編碼插件
            input | decode | filter | encode | output
        filter
            grok 
                調試:http://grokdebug.herokuapp.com/
                自帶:
                    %{IPORHOST:clientip} 
                    %{USER:ident} 
                    %{USER:auth} 
                    %{HTTPDATE:timestamp}
                    %{WORD:verb} 
                    %{NOTSPACE:request}
                    %{NUMBER:httpversion}
                    %{DATA:rawrequest}
                    %{NUMBER:response}
                    %{NUMBER:bytes}
                    %{COMMONAPACHELOG} 
                    %{QS:referrer} 
                    %{QS:agent}
                pattern => 「\[(?<datetime>\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2},\d{3})\]\s(?<level>\w*)\s\」Crawl\surl:(?<url>(.*))」
                Grok 支持把預約義的 grok 表達式 寫入到文件中
                格式
                    定義:USERNAME [a-zA-Z0-9._-]+    
                    引用:USER %{USERNAME}
                            %{PATTERN_NAME:capture_name:data_type}
                                實例:%{NUMBER:request_time:float}
                patterns_dir 選項來指明grok 表達式文件
                    patterns_dir => "/path/to/your/own/patterns"
                多行匹配
                    在和 codec/multiline 搭配使用的時候,須要注意一個問題,grok 正則和普通正則同樣,默認是不支持匹配回車換行的。就像你須要 =~ //m 同樣也須要單獨指定,具體寫法是在表達式開始位置加 (?m) 標記
                多項選擇
                    match => [
                        "message", "(?<request_time>\d+(?:\.\d+)?)",
                        "message", "%{SYSLOGBASE} %{DATA:message}",
                        "message", "(?m)%{WORD}"f
                    ]
                    logstash 會按照這個定義次序依次嘗試匹配,到匹配成功爲止
                remove_field 參數來刪除掉 message 字段,或者用 overwrite 參數來重寫默認的 message 字段,只保留最重要的部分
                    remove_fileld => ["message"]
            dissect    
                logstash不自帶, 須要手動安裝bin/logstash-plugin install logstash-filter-dissect
                這是一個跟grok比較像的過濾器
                一個dissect過濾器最好只mapping一個字段,嘗試過mapping兩個字段會報key的錯誤,由於第二個字段是從第一個字段產生的,多是由於第二個字段不能及時產生形成的報錯
                    例子數據:
                        2017-05-04 19:06:48,079 [ForkJoinPool-1-worker-11] INFO  c.s.t.s.s.i.ReceiveEventServiceImpl:148: Heartbeat deviceId= 1000269
                    mapping => {
                        "message" => "%{datetime} [%{thread_name}] %{level} %{action} %{info_name} %{info_body}"
                    }
                    # "要匹配的字段" => "匹配語法"
                    #結果: datetime: 2017-05-04 19:06:48,079
                            thread_name: ForkJoinPool-1-worker-11
                            level: INFO
                            ...
                    #Heartbeat後面歸到一個fileld,而後能夠用kv過濾器處理java

            date
                #匹配上了會自動替換@timestamp字段
                #SSS是毫秒
                date {
                    #        [ "field_name", "時間格式1", "時間格式2" ]
                    match => ["date_field", "ISO8601", "Y-MM-dd HH:mm:ss,SSS"]
                    remove_field => [ "date_field", "my_extraneous_field" ]    刪除
                }
                UNIX_MS  毫秒時間戳
                UNIX  秒時間戳node

                不少中國用戶常常提一個問題:爲何 @timestamp 比咱們晚了 8 個小時?怎麼修改爲北京時間?
                其實,Elasticsearch 內部,對時間類型字段,是統一採用 UTC 時間,存成 long 長整形數據的!對日誌統一採用 UTC 時間存儲,是國際安全/運維界的一個通識——歐美公司的服務器廣泛普遍分佈在多個時區裏——不像中國,地域橫跨五個時區卻只用北京時間。
                對於頁面查看,ELK 的解決方案是在 Kibana 上,讀取瀏覽器的當前時區,而後在頁面上轉換時間內容的顯示。
                因此,建議你們接受這種設定。不然,即使你用 .getLocalTime 修改,也還要面臨在 Kibana 上反過去修改,以及 Elasticsearch 原有的 ["now-1h" TO "now"] 這種方便的搜索語句沒法正常使用的尷尬。
            geoip 
                geoip {
                    source => "message"
                }
                過濾
                geoip {
                    fields => ["city_name", "continent_code", "country_code2", "country_code3", "country_name", "dma_code", "ip", "latitude", "longitude", "postal_code", "region_name", "timezone"]
                }
                須要注意的是:geoip.location 是 logstash 經過 latitude 和 longitude 額外生成的數據。因此,若是你是想要經緯度又不想重複數據的話,應該像下面這樣作:
                    filter { 
                        geoip { 
                            fields => ["city_name", "country_code2", "country_name", "latitude", "longitude", "region_name"] 
                            remove_field => ["[geoip][latitude]", "[geoip][longitude]"] 
                        } 
                    }
            mutate
                數據類型修改
                     mutate {
                        convert => ["request_time", "float"]
                        #5.2語法
                        #convert => { "info_content[deviceId]" => "integer" }
                        #convert => { "msgId" => "integer" }
                    }
                字符串處理
                    gsub
                        經過應用正則表達式和替換來轉換字符串字段。 若是字段不是字符串,則不會執行任何操做。
                        add和gsub最好不要放到一個mutate裏面,有時候剛add,gsub執行沒有效果
                            gsub => [
                                "urlparams", "[\\?#]", "_",              #替換反斜槓,問號,#號
                                "fieldname", "/", "_"                 # _替換爲/
                            ]  
                       split
                        filter {
                            mutate {
                                split => ["message", "|"]
                            }
                        }            
                    join
                        僅對數組類型字段有效
                        咱們在以前已經用 split 割切的基礎再 join 回去。配置改爲:
                        filter {
                            mutate {
                                split => ["message", "|"]
                            }
                            mutate {
                                join => ["message", ","]
                            }
                        }
                    替換一個field的內容, %{}是引用字段的內容
                    replace => {
                        "type" => "app_nginx_%{type}"nginx

                    }
                增長字段
                    add_field => { "visit_device" => "%{[http_user_agent][0]}" }
                    merge
                        合併兩個數組或者哈希字段。依然在以前 split 的基礎上繼續:
                        filter {
                            mutate {
                                split => ["message", "|"]
                            }
                            mutate {
                                merge => ["message", "message"]
                            }
                        }
                字段處理
                    rename
                        重命名某個字段,若是目的字段已經存在,會被覆蓋掉:
                            filter {
                                mutate {
                                    rename => ["syslog_host", "host"]
                                }
                            }
                    update
                        更新某個字段的內容。若是字段不存在,不會新建。
                            mutate {
                                update => { "sample" => "My new message" }
                            }
                    replace
                        做用和 update 相似,可是當字段不存在的時候,它會起到 add_field 參數同樣的效果,自動添加新的字段。
                執行次序
                    須要注意的是,filter/mutate 內部是有執行次序的。其次序以下:
                        rename(event) if @rename
                        update(event) if @update
                        replace(event) if @replace
                        convert(event) if @convert
                        gsub(event) if @gsub
                        uppercase(event) if @uppercase
                        lowercase(event) if @lowercase
                        strip(event) if @strip
                        remove(event) if @remove
                        split(event) if @split
                        join(event) if @join
                        merge(event) if @merge
                        filter_matched(event)
            split 拆分事件
                split 插件中使用的是 yield 功能,其結果是 split 出來的新事件,會直接結束其在 filter 階段的歷程,也就是說寫在 split 後面的其餘 filter 插件都不起做用,進入到 output 階段。因此,必定要保證 split 配置寫在所有 filter 配置的最後
                terminator    指定切割的間隔符, Default value is "\n" 
                field 指定切割的域, default value is "message"
            kv
                處理key=value這種數據結構
                source  數據源,Default value is "message"
                target  把取出來的數據放到一個容器裏面,至關於list
                    target => "kv"
                transform_key   轉變key的值爲大寫或小寫
                transform_value  轉變value的值爲大寫或小寫
                trimkey  修剪key包含的字符like [ or ] using \.
                trim  修剪value包含的字符like [ or ] using \.(有時候value後面會包括一些無用的分隔符,須要用這個設置處理掉)
                value_split  按什麼符號切分,Default value is "="git

        output
            file {
                path => "/path/xx"
                codec => line { format => "custom format: %{message}"}
            }
            codec => dots #把每一個 event 都變成一個點(.)
            codec=>rubydebug
            null {} 拋棄全部 event
            輸出插件統一具備一個參數是 workers。Logstash 爲輸出作了多線程的準備
            kafka
                kafka版本是0.9要對應使用logstash版本2.3.x如下,不然導不了數據到kafka
                fluntd也有kafka插件
                kafka 
                    kafka{
                        bootstrap_servers => "xx.sudiyi.cn:9092" #因爲kafka集羣配置,可能這裏須要使用域名:端口(在host文件裏配)
                        topic_id => "test01"
                        codec => "json" #約定數據傳輸格式,就是編碼和解碼
                            # https://www.elastic.co/guide/en/logstash/current/configuration-file-structure.html#codec
                            # json 
                            # plain 無格式
                    }
            elasticsearch 
                HTTP , node 和 transport 方式
                elasticsearch {
                    hosts => ["es-node02:9200"] #最好在hosts裏映射ip地址
                    index => "logstash-%{type}-%{+YYYY.MM.dd}"
                    document_type => "%{type}"
                    workers => 1  #5.x這個是string類型
                    flush_size => 20000
                    idle_flush_time => 10
                    template_overwrite => true
                    user => ""
                    password => ""
                }正則表達式

                配置
                    批量發送
                        flush_size 和 idle_flush_time 共同控制 Logstash 向 Elasticsearch 發送批量數據的行爲。以上面示例來講:Logstash 會努力攢到 20000 條數據一次性發送出去,可是若是 10 秒鐘內也沒攢夠 20000 條,Logstash 仍是會以當前攢到的數據量發一次。
                    索引名
                        寫入的 ES 索引的名稱,這裏可使用變量。爲了更貼合日誌場景,Logstash 提供了 %{+YYYY.MM.dd} 這種寫法。在語法解析的時候,看到以 + 號開頭的,就會自動認爲後面是時間格式,嘗試用時間格式來解析後續字符串。因此,以前處理過程當中不要給自定義字段取個加號開頭的名字……
                        此外,注意索引名中不能有大寫字母,不然 ES 在日誌中會報 InvalidIndexNameException,可是 Logstash 不會報錯,這個錯誤比較隱晦,也容易掉進這個坑中。redis

                Logstash 1.4.2 在 transport 和 http 協議的狀況下是固定鏈接指定 host 發送數據。從 1.5.0 開始,host 能夠設置數組,它會從節點列表中選取不一樣的節點發送數據,達到 Round-Robin 負載均衡的效果。
                Kibana4 強制要求 ES 全集羣全部 node 版本在 1.4 以上,Kibana4.2 要求 ES 2.0 以上。因此採用 node 方式發送數據的 logstash-1.4(攜帶的 Elasticsearch.jar 庫是 1.1.1 版本) 會致使 Kibana4 沒法運行,採用 Kibana4 的讀者務必改用 http 方式。
                開發者在 IRC freenode#logstash 頻道里表示:"高於 1.0 版本的 Elasticsearch 應該都能跟最新版 logstash 的 node 一塊兒正常工做"。此信息僅供參考,請認真測試後再上線。
                常常有同窗問,爲何 Logstash 在有多個 conf 文件的狀況下,進入 ES 的數據會重複,幾個 conf 數據就會重複幾回。其實問題緣由在以前啓動參數章節有提過,output 段順序執行,沒有對日誌 type 進行判斷的各插件配置都會所有執行一次。在 output 段對 type 進行判斷的語法以下所示:
                    output {
                        if [type] == "nginxaccess" {
                          elasticsearch { }
                        }
                    }
                老版本的性能問題
                    Logstash 1.4.2 在 http 協議下默認使用做者本身的 ftw 庫,隨同分發的是 0.0.39 版。該版本有內存泄露問題,長期運行下輸出性能愈來愈差!
                    解決辦法:
                        對性能要求不高的,能夠在啓動 logstash 進程時,配置環境變量 ENV["BULK"],強制採用 elasticsearch 官方 Ruby 庫。命令以下:
                        export BULK="esruby"
                        對性能要求高的,能夠嘗試採用 logstash-1.5.0RC2 。新版的 outputs/elasticsearch 放棄了 ftw 庫,改用了一個 JRuby 平臺專有的 Manticore 庫。根據測試,性能跟 ftw 比至關接近。
                        對性能要求極高的,能夠手動更新 ftw 庫版本,目前最新版是 0.0.42 版,據稱內存問題在 0.0.40 版即解決。
                模板
                    Elasticsearch 支持給索引預約義設置和 mapping(前提是你用的 elasticsearch 版本支持這個 API)。Logstash 自帶有一個優化好的模板
                    默認使用./vendor/bundle/jruby/1.9/gems/logstash-output-elasticsearch-2.7.1-java/lib/logstash/outputs/elasticsearch/elasticsearch-template.json 這個模板去插入elasticsearch數據,在比較老的版本模板不會自動建立.raw,這會形成elasticsearch在對字符串作聚合查詢的時間報Field data loading is forbidden on [xxxx]錯誤shell

 

 

附,以前的一個匹配:數據庫

patterns:express

STATUS1 [A-Z]+
TIME \d+\.\d+
DATE \d{4}-\d{1,2}-\d{1,2}\ \d{1,2}:\d{1,2}:\d{1,2}
RESPONSE_AGREEMENT [A-Z]+
METHOD (POST)|(GET)
RESPONSE_METHOD /\S+
DEVICE_ID \d{1,7}
mail_nu \d{1,50}
mobiles \d+
tag1 [a-zA-Z0-9._-]+


MAIN_SC \[%{DATE:logtime}\] device_id: %{DEVICE_ID:device_id}, mail_no: %{mail_nu:mail_no}, mobile: %{mobiles:mobile}, status: %{STATUS1:status}, \[%{tag1:tag}\]

 

日誌:

[2017-05-22 17:06:57] device_id: 1022456, mail_no: 3965330139219, mobile: 13603840208, status: OK, [NEW_API]

相關文章
相關標籤/搜索