【Logstash Redis插件】· 源碼剖析

redis插件的完整配置

input {
    redis {
        batch_count => 1 #返回的事件數量,此屬性僅在list模式下起做用。
        data_type => "list" #logstash redis插件工做方式
        key => "logstash-test-list" #監聽的鍵值
        host => "127.0.0.1" #redis地址
        port => 6379 #redis端口號
        password => "123qwe" #若是有安全認證,此項爲密碼
        db => 0 #redis數據庫的編號
        threads => 1 #啓用線程數量
    }
}
output {
 stdout{}
}

工做流程

圖不夠專業,可是大體就如上圖所示:java

  • logstash啓動redis插件
  • redis插件獲取參數,進行校驗工做
  • 判斷監聽模式(list,channel,pattern_channel等),根據不一樣的監聽模式建立監放任務
  • 建立redis實例,綁定EVAL腳本;經過指定的redis模式,發送請求,監聽數據
  • redis返回指定內容的數(多是列表list,也多是某個特定的頻道中的數據)
  • 獲得的數據,進行處理,返回給logstash
  • 若是發送了中止信號,則根據不一樣的模式,發送不一樣的命令退出redis。

源碼剖析

首先是程序的自定義,這裏設置了redis插件須要的參數,默認值,以及校驗等。
redis

而後註冊Redis實例須要的信息,好比key的名字或者url等,能夠看到默認的data_type是list模式。
數據庫

程序運行的主要入口,根據不一樣的data_type,傳遞不一樣的實現方法,而後調用listener_loop執行循環監聽
json

Listner_loop方法傳遞了兩個參數,一個是監聽器實現的方法,一個是處理的數據隊列。循環是每秒鐘執行一次,若是循環標識被設置,則退出。
數組

上面的循環方法能夠看到,是經過一個參數shutdown_requested來判斷是否繼續循環。該參數經過tear_down方法設置爲true,而後根據不一樣的模式,指定不一樣的退出方式。
若是是list模式,則直接退出;若是是channel模式,則發送redis的unsubsribe命令退出;若是是pattern_channel,則發送punsubscribe退出。
安全

在循環內部,判斷是否已經建立了redis實例,若是沒有建立,則調用connect方法建立;不然直接執行。
服務器

這裏前一段是調用Redis的new方法,初始化一個redis實例。緊接着判斷batch_count是否大於1,若是等於1,就什麼也不作,而後返回redis。
若是batch_count大於1,那麼就調用load_batch_script方法,加載Lua腳本,存儲到redis中的lua腳本字典中,供後面使用。代碼以下:
數據結構

上面的代碼應該是這個插件最難理解的部分了。爲了弄清楚這段代碼的工做,須要瞭解下面幾個知識點:less

  • lua腳本基本概念
  • Redis中的EVAL命令如何使用
  • 理解上面腳本的工做

首先,要想運行上面的腳本,必須是Redis2.6+的版本,才支持EVAL,不然會報錯!EVAL命令與js中的差很少,就是能夠把某一個字符串當作命令解析,其中字符串就包括lua腳本。這樣有什麼好處呢?ide

說白了,就是能一次性進行多個操做。好比咱們能夠在腳本中寫入一連串的操做,這些操做會以原子模式,一次性在服務器執行完,在返回回來。

Lua腳本

關於lua腳本,其實沒有詳細研究的必要,可是必定要知道一個local和table的概念。local是建立本地的變量,這樣就不會污染redis的數據。table是lua的一種數據結構,有點相似於json,能夠存儲數據。

EVAL命令

另外還要知道EVAL命令的使用方法,看下面這個命令,就好理解了!
EVAL "return KEYS[1] KEYS[2] ARGV[1] ARGV[2];" 2 name:xing age:13
就會返回:

name
age
xing
13

這段代碼沒有通過真正的操做,可是有助於理解就好!也就是說,EVAL後面跟着一段腳本,腳本後面跟着的就是參數,能夠經過KEYS和ARGV數組得到,可是下標從1開始。

再來講說EVAL命令,它的執行過程以下:

  • 解析字符串腳本,根據校驗和生成lua的方法
  • 把校驗和和函數放入一個lua_script字典裏面,以後就能夠經過EVALSHA命令直接使用校驗和執行函數。

有了這些理論基礎之後,就能夠看看上面的代碼都作了什麼了!
首先是獲取參數,這個參數賦值給i;而後建立了一個對象res;緊接着調用llen命令,得到指定list的長度;若是list的長度大於i,則什麼也不作;若是小於i,那麼i就等於lenth;而後執行命令lpop,取出list中的元素,一共取i次,放入res中,最後返回。

說得通俗點,就是比較一下list元素個數與設置batch_count的值。若是batch_count爲5,列表list中有5條以上的數據,那麼直接取5條,一次性返回;不然取length條返回。

能夠看到這段腳本的做用,就是讓logstash一次請求,最多得到batch_count條事件,減少了服務器處理請求的壓力。

講完這段代碼,能夠看看不一樣的工做模式的實現代碼了:

首先是list的代碼,其實就是執行BLPOP命令,獲取數據。若是在list模式中,還會去判斷batch_count的值,若是是1直接退出;若是大於1,則使用evalsha命令調用以前保存的腳本方法。

至於channel和pattern_channel,就沒啥解釋的了,就是分別調用subscribe和psubsribe命令而已。

其實最難理解的,就是中間那段lua腳本~明白它的用處,redis插件也就不難理解了。

完整的代碼:

# encoding: utf-8
require "logstash/inputs/base"
require "logstash/inputs/threadable"
require "logstash/namespace"

# This input will read events from a Redis instance; it supports both Redis channels and lists.
# The list command (BLPOP) used by Logstash is supported in Redis v1.3.1+, and
# the channel commands used by Logstash are found in Redis v1.3.8+.
# While you may be able to make these Redis versions work, the best performance
# and stability will be found in more recent stable versions.  Versions 2.6.0+
# are recommended.
#
# For more information about Redis, see <http://redis.io/>
#
# `batch_count` note: If you use the `batch_count` setting, you *must* use a Redis version 2.6.0 or
# newer. Anything older does not support the operations used by batching.
#
class LogStash::Inputs::Redis < LogStash::Inputs::Threadable
  config_name "redis"

  default :codec, "json"

  # The `name` configuration is used for logging in case there are multiple instances.
  # This feature has no real function and will be removed in future versions.
  config :name, :validate => :string, :default => "default", :deprecated => true

  # The hostname of your Redis server.
  config :host, :validate => :string, :default => "127.0.0.1"

  # The port to connect on.
  config :port, :validate => :number, :default => 6379

  # The Redis database number.
  config :db, :validate => :number, :default => 0

  # Initial connection timeout in seconds.
  config :timeout, :validate => :number, :default => 5

  # Password to authenticate with. There is no authentication by default.
  config :password, :validate => :password

  # The name of the Redis queue (we'll use BLPOP against this).
  # TODO: remove soon.
  config :queue, :validate => :string, :deprecated => true

  # The name of a Redis list or channel.
  # TODO: change required to true
  config :key, :validate => :string, :required => false

  # Specify either list or channel.  If `redis\_type` is `list`, then we will BLPOP the
  # key.  If `redis\_type` is `channel`, then we will SUBSCRIBE to the key.
  # If `redis\_type` is `pattern_channel`, then we will PSUBSCRIBE to the key.
  # TODO: change required to true
  config :data_type, :validate => [ "list", "channel", "pattern_channel" ], :required => false

  # The number of events to return from Redis using EVAL.
  config :batch_count, :validate => :number, :default => 1

  public
  def register
    require 'redis'
    @redis = nil
    @redis_url = "redis://#{@password}@#{@host}:#{@port}/#{@db}"

    # TODO remove after setting key and data_type to true
    if @queue
      if @key or @data_type
        raise RuntimeError.new(
          "Cannot specify queue parameter and key or data_type"
        )
      end
      @key = @queue
      @data_type = 'list'
    end

    if not @key or not @data_type
      raise RuntimeError.new(
        "Must define queue, or key and data_type parameters"
      )
    end
    # end TODO

    @logger.info("Registering Redis", :identity => identity)
  end # def register

  # A string used to identify a Redis instance in log messages
  # TODO(sissel): Use instance variables for this once the @name config
  # option is removed.
  private
  def identity
    @name || "#{@redis_url} #{@data_type}:#{@key}"
  end

  private
  def connect
    redis = Redis.new(
      :host => @host,
      :port => @port,
      :timeout => @timeout,
      :db => @db,
      :password => @password.nil? ? nil : @password.value
    )
    load_batch_script(redis) if @data_type == 'list' && (@batch_count > 1)
    return redis
  end # def connect

  private
  def load_batch_script(redis)
    #A Redis Lua EVAL script to fetch a count of keys
    #in case count is bigger than current items in queue whole queue will be returned without extra nil values
    redis_script = <<EOF
          local i = tonumber(ARGV[1])
          local res = {}
          local length = redis.call('llen',KEYS[1])
          if length < i then i = length end
          while (i > 0) do
            local item = redis.call("lpop", KEYS[1])
            if (not item) then
              break
            end
            table.insert(res, item)
            i = i-1
          end
          return res
EOF
    @redis_script_sha = redis.script(:load, redis_script)
  end

  private
  def queue_event(msg, output_queue)
    begin
      @codec.decode(msg) do |event|
        decorate(event)                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                               
        output_queue << event
      end
    rescue LogStash::ShutdownSignal => e
      # propagate up
      raise(e)
    rescue => e # parse or event creation error
      @logger.error("Failed to create event", :message => msg, :exception => e, :backtrace => e.backtrace);
    end
  end

  private
  def list_listener(redis, output_queue)

    item = redis.blpop(@key, 0, :timeout => 1)
    return unless item # from timeout or other conditions

    # blpop returns the 'key' read from as well as the item result
    # we only care about the result (2nd item in the list).
    queue_event(item[1], output_queue)

    # If @batch_count is 1, there's no need to continue.
    
    return if @batch_count == 1
    
    begin
      redis.evalsha(@redis_script_sha, [@key], [@batch_count-1]).each do |item|
        queue_event(item, output_queue)
      end

      # Below is a commented-out implementation of 'batch fetch'
      # using pipelined LPOP calls. This in practice has been observed to
      # perform exactly the same in terms of event throughput as
      # the evalsha method. Given that the EVALSHA implementation uses
      # one call to Redis instead of N (where N == @batch_count) calls,
      # I decided to go with the 'evalsha' method of fetching N items
      # from Redis in bulk.
      #redis.pipelined do
        #error, item = redis.lpop(@key)
        #(@batch_count-1).times { redis.lpop(@key) }
      #end.each do |item|
        #queue_event(item, output_queue) if item
      #end
      # --- End commented out implementation of 'batch fetch'
    rescue Redis::CommandError => e
      if e.to_s =~ /NOSCRIPT/ then
        @logger.warn("Redis may have been restarted, reloading Redis batch EVAL script", :exception => e);
        load_batch_script(redis)
        retry
      else
        raise e
      end
    end
  end

  private
  def channel_listener(redis, output_queue)
    redis.subscribe @key do |on|
      on.subscribe do |channel, count|
        @logger.info("Subscribed", :channel => channel, :count => count)
      end

      on.message do |channel, message|
        queue_event message, output_queue
      end

      on.unsubscribe do |channel, count|
        @logger.info("Unsubscribed", :channel => channel, :count => count)
      end
    end
  end

  private
  def pattern_channel_listener(redis, output_queue)
    redis.psubscribe @key do |on|
      on.psubscribe do |channel, count|
        @logger.info("Subscribed", :channel => channel, :count => count)
      end

      on.pmessage do |ch, event, message|
        queue_event message, output_queue
      end

      on.punsubscribe do |channel, count|
        @logger.info("Unsubscribed", :channel => channel, :count => count)
      end
    end
  end

  # Since both listeners have the same basic loop, we've abstracted the outer
  # loop.
  private
  def listener_loop(listener, output_queue)
    while !@shutdown_requested
      begin
        @redis ||= connect
        self.send listener, @redis, output_queue
      rescue Redis::BaseError => e
        @logger.warn("Redis connection problem", :exception => e)
        # Reset the redis variable to trigger reconnect
        @redis = nil
        sleep 1
      end
    end
  end # listener_loop

  public
  def run(output_queue)
    if @data_type == 'list'
      listener_loop :list_listener, output_queue
    elsif @data_type == 'channel'
      listener_loop :channel_listener, output_queue
    else
      listener_loop :pattern_channel_listener, output_queue
    end
  rescue LogStash::ShutdownSignal
    # ignore and quit
  end # def run

  public
  def teardown
    @shutdown_requested = true

    if @redis
      if @data_type == 'list'
        @redis.quit rescue nil
      elsif @data_type == 'channel'
        @redis.unsubscribe rescue nil
        @redis.connection.disconnect
      elsif @data_type == 'pattern_channel'
        @redis.punsubscribe rescue nil
        @redis.connection.disconnect
      end
      @redis = nil
    end
  end
end # class LogStash::Inputs::Redis
相關文章
相關標籤/搜索