logstash經過kafka傳輸nginx日誌

logstash經過kafka傳輸nginx日誌(三)

  單個進程 logstash 能夠實現對數據的讀取、解析和輸出處理。可是在生產環境中,從每臺應用服務器運行 logstash 進程並將數據直接發送到 Elasticsearch 裏,顯然不是第一選擇:第一,過多的客戶端鏈接對 Elasticsearch 是一種額外的壓力;第二,網絡抖動會影響到 logstash 進程,進而影響生產應用;第三,運維人員未必願意在生產服務器上部署 Java,或者讓 logstash 跟業務代碼爭奪 Java 資源。html

  因此,在實際運用中,logstash 進程會被分爲兩個不一樣的角色。運行在應用服務器上的,儘可能減輕運行壓力,只作讀取和轉發,這個角色叫作 shipper;運行在獨立服務器上,完成數據解析處理,負責寫入 Elasticsearch 的角色,叫 indexer。node

 

 

  Kafka 是一個高吞吐量的分佈式發佈訂閱日誌服務,具備高可用、高性能、分佈式、高擴展、持久性等特性。和Redis作輕量級消息隊列不一樣,Kafka利用磁盤作消息隊列,因此也就無所謂消息緩衝時的磁盤問題。生產環境中仍是推薦使用Kafka作消息隊列。此外,若是公司內部已經有 Kafka 服務在運行,logstash也能夠快速接入,免去重複建設的麻煩。nginx

1、Logstash搭建數據庫

  詳細搭建能夠參考Logstash安裝搭建(一)apache

2、配置Shipperjson

  Shipper 即爲Nginx服務器上運行的 logstash 進程,logstash 經過 logstash-input-file 寫入,而後經過 logstash-output-kafka 插件將日誌寫入到 kafka 集羣中。bootstrap

  Logstash使用一個名叫 FileWatch 的 Ruby Gem 庫來監聽文件變化。這個庫支持 glob 展開文件路徑,並且會記錄在 .sincedb 的數據庫文件來跟蹤被監聽的日誌文件的當前讀取位置。服務器

.sincedb 文件中記錄了每一個被監聽的文件的 inode, major number, minor number 和 pos。 

  Input配置實例網絡

複製代碼

input {
  file {
    path => "/var/log/nginx/log_access.log"
    type => "nginx-access"
    discover_interval => 15    #logstash 每隔多久去檢查一次被監聽的 path 下是否有新文件。默認值是 15 秒。
    sincedb_path => "/etc/logstash/.sincedb"    #定義sincedb文件的位置
    start_position => "beginning"    #定義文件讀取的位置
  }
}

複製代碼

   其餘配置詳解:app

複製代碼

exclude    不想被監聽的文件能夠排除出去。
close_older    已經監聽的文件,若超過這個時間內沒有更新,就關閉監聽該文件的句柄。默認爲:3600s,即一小時。
ignore_older    在每次檢查文件列表時,若文件的最後修改時間超過該值,則忽略該文件。默認爲:86400s,即一天。
sincedb_path    定義 .sincedb 文件路徑,默認爲 $HOME/.sincedb 。
sincedb_write_interval    間隔多久寫一次sincedb文件,默認15s。
stat_interval    每隔多久檢查被監聽文件狀態(是否有更新),默認爲1s。
start_position    logstash從什麼位置開始讀取文件數據。默認爲結束位置,相似 tail -f 的形式。設置爲「beginning」,則從頭讀取,相似 cat ,到最後一行之後變成爲 tail -f 。

複製代碼

  Output配置實例

  如下配置能夠實現對 kafka producer 的基本使用。生產者更多詳細配置請查看 Kafka 官方文檔中生產者部分配置文檔

複製代碼

output {
  kafka {
    bootstrap_servers => "localhost:9092"    #生產者
    topic_id => "nginx-access-log"    #設置寫入kafka的topic
    compression_type => "snappy"    #消息壓縮模式,默認是none,可選gzip、snappy。
  }
}

複製代碼

  logstash-out-kafka 其餘配置詳解:

compression_type    消息壓縮模式,默認是none,有效值爲:none、gzip、snappy。
asks    消息確認模式,默認爲1,有效值爲:0、一、all。設置爲0,生產者不等待 broker 迴應;設置爲1,生產者會收到 leader 寫入以後的迴應;設置爲all, leader 將要等待 in-sync 中全部的 replication 同步確認。
send_buffer_bytes    TCP發送數據時的緩衝區的大小。

  logstash-kafka 插件輸入和輸出默認 codec 爲 json 格式。在輸入和輸出的時候注意下編碼格式。消息傳遞過程當中 logstash 默認會爲消息編碼內加入相應的時間戳和 hostname 等信息。若是不想要以上信息(通常作消息轉發的狀況下),能夠使用如下配置,例如:

複製代碼

output {
    kafka {
        codec => plain {
            format => "%{message}"
        }
    }
}

複製代碼

3、搭建配置Kafka

   搭建配置Kafka能夠參考 Kafka集羣搭建

4、配置Indexer

  是用logstash-input-kafka插件,從kafka集羣中讀取數據。

  Input配置示例:

複製代碼

input {
  kafka {
    zk_connect => "localhost:2181"    #zookeeper地址
    topic_id => "nginx-access-log"    #kafka中topic名稱,記得建立該topic
    group_id => "nginx-access-log"     #默認爲「logstash」
    codec => "plain"    #與Shipper端output配置項一致
    consumer_threads => 1    #消費的線程數
    decorate_events => true    #在輸出消息的時候回輸出自身的信息,包括:消費消息的大小、topic來源以及consumer的group信息。
    type => "nginx-access-log"      
  }
}

複製代碼

  更多 logstash-input-kafka 配置能夠從 logstash 官方文檔 查看。

  Logstash 是一個 input | decode | filter | encode | output 的數據流。上述配置中有 codec => "plain" ,即logstash 採用轉發的形式,不會對原有信息進行編碼轉換。豐富的過濾器插件(Filter)的存在是 logstash 威力強大的重要因素,提供的不僅僅是過濾的功能,能夠進行復雜的邏輯處理,甚至無中生有添加新的logstash事件到後續的流程中去。這裏只列舉 logstash-output-elasticsearch 配置。

  logstash-output-elasticsearch 配置實例:

複製代碼

output {
    elasticsearch {
        hosts => ["localhost:9200"]    //Elasticsearch 地址,多個地址以逗號分隔。
        index => "logstash-%{type}-%{+YYYY.MM.dd}"    //索引命名方式,不支持大寫字母(Logstash除外)
        document_type => "%{type}"    //文檔類型
        workers => 1
        flush_size => 20000    //向Elasticsearch批量發送數據的條數
        idle_flush_time => 10    //向Elasticsearch批量發送數據的時間間隔,即便不知足 flush_size 也會發送
        template_overwrite => true    //設置爲true,將會把自定義的模板覆蓋logstash自帶模板
    }
}

複製代碼

 

到此就已經把Nginx上的日誌轉發到Elasticsearch中。

相關文章
相關標籤/搜索