數據管道 Logstash 入門

Logstash 入門


Logstash 是什麼

Logstash 就是一個開源的數據流工具,它會作三件事:html

  1. 從數據源拉取數據
  2. 對數據進行過濾、轉換等處理
  3. 將處理後的數據寫入目標地

例如:java

  • 監聽某個目錄下的日誌文件,讀取文件內容,處理數據,寫入 influxdb 。
  • 從 kafka 中消費消息,處理數據,寫入 elasticsearch 。

爲何要用 Logstash ?

方便省事。mysql

假設你須要從 kafka 中消費數據,而後寫入 elasticsearch ,若是本身編碼,你得去對接 kafka 和 elasticsearch 的 API 吧,若是你用 Logstash ,這部分就不用本身去實現了,由於 Logstash 已經爲你封裝了對應的 plugin 插件,你只須要寫一個配置文件形如:git

input {
    kafka {
        # kafka consumer 配置
    }
}

filter {
    # 數據處理配置
}

output {
    elasticsearch {
        # elasticsearch 輸出配置
    }
}

而後運行 logstash 就能夠了。github

Logstash 提供了兩百多個封裝好的 plugin 插件,這些插件被分爲三類:web

  • input plugin : 從哪裏拉取數據
  • filter plugin : 數據如何處理
  • output plugin : 數據寫入何處

使用 logstash 你只要編寫一個配置文件,在配置文件中挑選組合這些 plugin 插件,就能夠輕鬆實現數據從輸入源到輸出源的實時流動。正則表達式


安裝 logstash

請參數:官方文檔redis


第一個示例

假設你已經安裝好了 logstash ,而且可執行文件的路徑已經加入到了 PATH 環境變量中。sql

下面開始咱們的第一個示例,編寫 pipeline.conf 文件,內容爲:mongodb

input {
    stdin {

    }
}

filter {

}

output {
    stdout {

    }
}

這個配置文件的含義是:

  • input 輸入爲 stdin(標準輸入)
  • filter 爲空(也就是不進行數據的處理)
  • output 輸出爲 stdout(標準輸出)

執行命令:

logstash -f pipeline.conf

等待 logstash 啓動完畢,輸入 hello world 而後回車, 你就會看到如下輸出內容:

{
       "message" => "hello world",
      "@version" => "1",
    "@timestamp" => 2020-11-01T08:25:10.987Z,
          "host" => "local"
}

咱們輸入的內容已經存在於 message 字段中了。

當你輸入其餘內容後也會看到相似的輸出。

至此,咱們的第一個示例已經完成,正如配置文件中所定義的,Logstash 從 stdin 標準輸入讀取數據,不對源數據作任何處理,而後輸出到 stdout 標準輸出。

特定名詞和字段

  • event : 數據在 logstash 中被包裝成 event 事件的形式從 input 到 filter 再到 output 流轉。
  • @timestamp : 特殊字段,標記 event 發生的時間。
  • @version : 特殊字段,標記 event 的版本號。
  • message : 源數據內容。
  • @metadata : 元數據,key/value 的形式,是否有數據得看具體插件,例如 kafka 的 input 插件會在 @metadata 裏記錄 topic、consumer_group、partition、offset 等一些元數據。
  • tags : 記錄 tag 的字符串數組。

字段引用

在配置文件中,能夠經過 [field] 的形式引用字段內容,若是在字符串中,則能夠經過 %{[field]} 的方式進行引用。

示例:

input {
    kafka {
        # kafka 配置
    }
}

filter {
    # 引用 log_level 字段的內容進行判斷
    if [log_level] == "debug" {

    }
}

output {
  elasticsearch {
    # %{+yyyy.MM.dd} 來源於 @timestamp
    index => "log-%{+yyyy.MM.dd}"
    document_type => "_doc"
    document_id => "%{[@metadata][kafka][key]}"
    hosts => ["127.0.0.1:9200"]
  }
}

Plugin 插件一覽

用好 Logstash 的第一步就是熟悉 plugin 插件,只有熟悉了這些插件你才能快速高效的創建數據管道。

Input plugin

Input 插件定義了數據源,即 logstash 從哪裏拉取數據。

示例:

input {
  beats {
    port => 5044
  }
}
  • dead_letter_queue : 從 Logstash 本身的 dead letter queue 中拉取數據,目前 dead letter queue 只支持記錄 output 爲 elasticsearch 時寫入 400 或 404 的數據。

示例:

input {
  dead_letter_queue {
    path => "/var/logstash/data/dead_letter_queue"
    start_timestamp => "2017-04-04T23:40:37"
  }
}
  • elasticsearch : 從 elasticsearch 中讀取 search query 的結果。

示例:

input {
  elasticsearch {
    hosts => "localhost"
    query => '{ "query": { "match": { "statuscode": 200 } } }'
  }
}
  • exec : 按期執行一個 shell 命令,而後捕獲其輸出。

示例:

input {
  exec {
    command => "ls"
    interval => 30
  }
}
  • file : 從文件中流式讀取內容。

示例:

input {
  file {
    path => ["/var/log/*.log", "/var/log/message"]
    start_position => "beginning"
  }
}
  • generator : 生成隨機數據。

示例:

input {
  generator {
    count => 3
    lines => [
      "line 1",
      "line 2",
      "line 3"
    ]
  }
}
  • github : 從 github webhooks 中讀取數據。
  • graphite : 接受 graphite 的 metrics 指標數據。
  • heartbeat : 生成心跳信息。這樣作的通常目的是測試 Logstash 的性能和可用性。
  • http : Logstash 接受 http 請求做爲數據。
  • http_poller : Logstash 發起 http 請求,讀取響應數據。

示例:

input {
  http_poller {
    urls => {
      test1 => "http://localhost:9200"
      test2 => {
        method => get
        user => "AzureDiamond"
        password => "hunter2"
        url => "http://localhost:9200/_cluster/health"
        headers => {
          Accept => "application/json"
        }
     }
    }
    request_timeout => 60
    schedule => { cron => "* * * * * UTC"}
    codec => "json"
    metadata_target => "http_poller_metadata"
  }
}
  • imap : 從 IMAP 服務器讀取郵件。
  • jdbc : 經過 JDBC 接口導入數據庫中的數據。

示例:

input {
  jdbc {
    jdbc_driver_library => "mysql-connector-java-5.1.36-bin.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://localhost:3306/mydb"
    jdbc_user => "mysql"
    parameters => { "favorite_artist" => "Beethoven" }
    schedule => "* * * * *"
    statement => "SELECT * from songs where artist = :favorite_artist"
  }
}
  • kafka : 消費 kafka 中的消息。

示例:

input {
  kafka {
    bootstrap_servers => "127.0.0.1:9092"
    group_id => "consumer_group"
    topics => ["kafka_topic"]
    enable_auto_commit => true
    auto_commit_interval_ms => 5000
    auto_offset_reset => "latest"
    decorate_events => true
    isolation_level => "read_uncommitted"
    max_poll_records => 1000
  }
}
  • rabbitmq : 從 RabbitMQ 隊列中拉取數據。
  • redis : 從 redis 中讀取數據。
  • stdin : 從標準輸入讀取數據。
  • syslog : 讀取 syslog 數據。
  • tcp : 經過 TCP socket 讀取數據。
  • udp : 經過 udp 讀取數據。
  • unix : 經過 UNIX socket 讀取數據。
  • websocket : 經過 websocket 協議 讀取數據。

Output plugin

Output 插件定義了數據的輸出地,即 logstash 將數據寫入何處。

  • csv : 將數據寫入 csv 文件。
  • elasticsearch : 寫入 Elasticsearch 。
  • email : 發送 email 郵件。
  • exec : 執行命令。
  • file : 寫入磁盤文件。
  • graphite : 寫入 Graphite 。
  • http : 發送 http 請求。
  • influxdb : 寫入 InfluxDB 。
  • kafka : 寫入 Kafka 。
  • mongodb : 寫入 MongoDB 。
  • opentsdb : 寫入 OpenTSDB 。
  • rabbitmq : 寫入 RabbitMQ 。
  • redis : 使用 RPUSH 的方式寫入到 Redis 隊列。
  • sink : 將數據丟棄,不寫入任何地方。
  • syslog : 將數據發送到 syslog 服務端。
  • tcp : 發送 TCP socket。
  • udp : 發送 UDP 。
  • webhdfs : 經過 webhdfs REST API 寫入 HDFS 。
  • websocket : 推送 websocket 消息 。

Filter plugin

Filter 插件定義對數據進行如何處理。

  • aggregate : 聚合數據。
  • alter : 修改數據。
  • bytes : 將存儲大小如 "123 MB" 或 "5.6gb" 的字符串表示形式解析爲以字節爲單位的數值。
  • cidr : 檢查 IP 地址是否在指定範圍內。

示例:

filter {
  cidr {
    add_tag => [ "testnet" ]
    address => [ "%{src_ip}", "%{dst_ip}" ]
    network => [ "192.0.2.0/24" ]
  }
}
  • cipher : 對數據進行加密或解密。
  • clone : 複製 event 事件。
  • csv : 解析 CSV 格式的數據。
  • date : 解析字段中的日期數據。

示例,匹配輸入的 timestamp 字段,而後替換 @timestamp :

filter {
  date {
    match => ["timestamp", "dd/MMM/yyyy:HH:mm:ss ZZ"]
    target => "@timestamp"
  }
}
  • dissect : 使用 %{} 的形式拆分字符串並提取出特定內容,比較經常使用,具體語法見 dissect 文檔
  • drop : 丟棄這個 event 。

示例:

filter {
  if [loglevel] == "debug" {
    drop { }
  }
}
  • elapsed : 經過記錄開始和結束時間跟蹤 event 的耗時。
  • elasticsearch : 在 elasticsearch 中進行搜索,並將數據複製到當前 event 中。
  • environment : 將環境變量中的數據存儲到 @metadata 字段中。
  • extractnumbers : 提取字符串中找到的全部數字。
  • fingerprint : 根據一個或多個字段的內容建立哈希值,並存儲到新的字段中。
  • geoip : 使用綁定的 GeoLite2 數據庫添加有關 IP 地址的地理位置的信息,這個插件很是有用,你能夠根據 IP 地址獲得對應的國家、省份、城市、經緯度等地理位置數據。

示例,經過 clent_ip 字段獲取對應的地理位置信息:

filter {
  geoip {
    cache_size => 1000
    default_database_type => "City"
    source => "clent_ip"
    target => "geo"
    tag_on_failure => ["_geoip_city_fail"]
    add_field => {
      "geo_country_name" => "%{[geo][country_name]}"
      "geo_region_name" => "%{[geo][region_name]}"
      "geo_city_name" => "%{[geo][city_name]}"
      "geo_location" => "%{[geo][latitude]},%{[geo][longitude]}"
    }
    remove_field => ["geo"]
  }
}
  • grok : 經過正則表達式去處理字符串,比較經常使用,具體語法見 grok 文檔
  • http : 與外部 web services/REST APIs 集成。
  • i18n : 從字段中刪除特殊字符。
  • java_uuid : 生成 UUID 。
  • jdbc_static : 從遠程數據庫中讀取數據,而後豐富 event 。
  • jdbc_streaming : 執行 SQL 查詢而後將結果存儲到指定字段。
  • json : 解析 json 字符串,生成 field 和 value。

示例:

filter {
  json {
    skip_on_invalid_json => true
    source => "message"
  }
}

若是輸入的 message 字段是 json 字符串如 "{"a": 1, "b": 2}", 那麼解析後就會增長兩個字段,字段名分別是 a 和 b 。

  • kv : 解析 key=value 形式的數據。
  • memcached : 與外部 memcached 集成。
  • metrics : logstash 在內存中去聚合指標數據。
  • mutate : 對字段進行一些常規更改。

示例:

filter {
  mutate {
    split => ["hostname", "."]
    add_field => { "shortHostname" => "%{hostname[0]}" }
  }

  mutate {
    rename => ["shortHostname", "hostname"]
  }
}
  • prune : 經過黑白名單的方式刪除多餘的字段。

示例:

filter {
  prune {
    blacklist_names => [ "method", "(referrer|status)", "${some}_field" ]
  }
}
  • ruby : 執行 ruby 代碼。

示例,解析 http://example.com/abc?q=haha 形式字符串中的 query 參數 q 的值 :

filter {
  ruby {
    code => "
      require 'cgi'

      req = event.get('request_uri').split('?')
      query = ''
      if req.length > 1
        query = req[1]

        qh = CGI::parse(query)
        event.set('search_q', qh['q'][0])
      end
    "
  }
}

在 ruby 代碼中,字段的獲取和設置經過 event.get()event.set() 方法進行操做。

  • sleep : 休眠指定時間。
  • split : 拆分字段。
  • throttle : 限流,限制 event 數量。
  • translate : 根據指定的字典文件將數據進行對應轉換。

示例:

filter {
  translate {
    field => "[http_status]"
    destination => "[http_status_description]"
    dictionary => {
      "100" => "Continue"
      "101" => "Switching Protocols"
      "200" => "OK"
      "500" => "Server Error"
    }
    fallback => "I'm a teapot"
  }
}
  • truncate : 將字段內容超出長度的部分裁剪掉。
  • urldecode : 對 urlencoded 的內容進行解碼。
  • useragent : 解析 user-agent 的內容獲得諸如設備、操做系統、版本等信息。

示例:

filter {
  # ua_device : 設備
  # ua_name : 瀏覽器
  # ua_os : 操做系統
  useragent {
    lru_cache_size => 1000
    source => "user_agent"
    target => "ua"
    add_field => {
      "ua_device" => "%{[ua][device]}"
      "ua_name" => "%{[ua][name]}"
      "ua_os" => "%{[ua][os_name]}"
    }
    remove_field => ["ua"]
  }
}
  • uuid : 生成 UUID 。
  • xml : 解析 XML 格式的數據。

結語

Logstash 的插件除了本文提到的這些以外還有不少,想要詳細的瞭解每一個插件如何使用仍是要去查閱官方文檔。

得益於 Logstash 的插件體系,你只須要編寫一個配置文件,聲明使用哪些插件,就能夠很輕鬆的構建數據管道。

image

相關文章
相關標籤/搜索