input { redis { batch_count => 1 #返回的事件數量,此屬性僅在list模式下起做用。 data_type => "list" #logstash redis插件工做方式 key => "logstash-test-list" #監聽的鍵值 host => "127.0.0.1" #redis地址 port => 6379 #redis端口號 password => "123qwe" #若是有安全認證,此項爲密碼 db => 0 #redis數據庫的編號 threads => 1 #啓用線程數量 } } output { stdout{} }
圖不夠專業,可是大體就如上圖所示:java
首先是程序的自定義,這裏設置了redis插件須要的參數,默認值,以及校驗等。
redis
而後註冊Redis實例須要的信息,好比key的名字或者url等,能夠看到默認的data_type是list模式。
數據庫
程序運行的主要入口,根據不一樣的data_type,傳遞不一樣的實現方法,而後調用listener_loop執行循環監聽
json
Listner_loop方法傳遞了兩個參數,一個是監聽器實現的方法,一個是處理的數據隊列。循環是每秒鐘執行一次,若是循環標識被設置,則退出。
數組
上面的循環方法能夠看到,是經過一個參數shutdown_requested來判斷是否繼續循環。該參數經過tear_down方法設置爲true,而後根據不一樣的模式,指定不一樣的退出方式。
若是是list模式,則直接退出;若是是channel模式,則發送redis的unsubsribe命令退出;若是是pattern_channel,則發送punsubscribe退出。
安全
在循環內部,判斷是否已經建立了redis實例,若是沒有建立,則調用connect方法建立;不然直接執行。
服務器
這裏前一段是調用Redis的new方法,初始化一個redis實例。緊接着判斷batch_count是否大於1,若是等於1,就什麼也不作,而後返回redis。
若是batch_count大於1,那麼就調用load_batch_script方法,加載Lua腳本,存儲到redis中的lua腳本字典中,供後面使用。代碼以下:
數據結構
上面的代碼應該是這個插件最難理解的部分了。爲了弄清楚這段代碼的工做,須要瞭解下面幾個知識點:less
首先,要想運行上面的腳本,必須是Redis2.6+的版本,才支持EVAL,不然會報錯!EVAL命令與js中的差很少,就是能夠把某一個字符串當作命令解析,其中字符串就包括lua腳本。這樣有什麼好處呢?ide
說白了,就是能一次性進行多個操做。好比咱們能夠在腳本中寫入一連串的操做,這些操做會以原子模式,一次性在服務器執行完,在返回回來。
關於lua腳本,其實沒有詳細研究的必要,可是必定要知道一個local和table的概念。local是建立本地的變量,這樣就不會污染redis的數據。table是lua的一種數據結構,有點相似於json,能夠存儲數據。
另外還要知道EVAL命令的使用方法,看下面這個命令,就好理解了!
EVAL "return KEYS[1] KEYS[2] ARGV[1] ARGV[2];" 2 name:xing age:13
就會返回:
name age xing 13
這段代碼沒有通過真正的操做,可是有助於理解就好!也就是說,EVAL後面跟着一段腳本,腳本後面跟着的就是參數,能夠經過KEYS和ARGV數組得到,可是下標從1開始。
再來講說EVAL命令,它的執行過程以下:
有了這些理論基礎之後,就能夠看看上面的代碼都作了什麼了!
首先是獲取參數,這個參數賦值給i;而後建立了一個對象res;緊接着調用llen命令,得到指定list的長度;若是list的長度大於i,則什麼也不作;若是小於i,那麼i就等於lenth;而後執行命令lpop,取出list中的元素,一共取i次,放入res中,最後返回。
說得通俗點,就是比較一下list元素個數與設置batch_count的值。若是batch_count爲5,列表list中有5條以上的數據,那麼直接取5條,一次性返回;不然取length條返回。
能夠看到這段腳本的做用,就是讓logstash一次請求,最多得到batch_count條事件,減少了服務器處理請求的壓力。
講完這段代碼,能夠看看不一樣的工做模式的實現代碼了:
首先是list的代碼,其實就是執行BLPOP命令,獲取數據。若是在list模式中,還會去判斷batch_count的值,若是是1直接退出;若是大於1,則使用evalsha命令調用以前保存的腳本方法。
至於channel和pattern_channel,就沒啥解釋的了,就是分別調用subscribe和psubsribe命令而已。
其實最難理解的,就是中間那段lua腳本~明白它的用處,redis插件也就不難理解了。
# encoding: utf-8 require "logstash/inputs/base" require "logstash/inputs/threadable" require "logstash/namespace" # This input will read events from a Redis instance; it supports both Redis channels and lists. # The list command (BLPOP) used by Logstash is supported in Redis v1.3.1+, and # the channel commands used by Logstash are found in Redis v1.3.8+. # While you may be able to make these Redis versions work, the best performance # and stability will be found in more recent stable versions. Versions 2.6.0+ # are recommended. # # For more information about Redis, see <http://redis.io/> # # `batch_count` note: If you use the `batch_count` setting, you *must* use a Redis version 2.6.0 or # newer. Anything older does not support the operations used by batching. # class LogStash::Inputs::Redis < LogStash::Inputs::Threadable config_name "redis" default :codec, "json" # The `name` configuration is used for logging in case there are multiple instances. # This feature has no real function and will be removed in future versions. config :name, :validate => :string, :default => "default", :deprecated => true # The hostname of your Redis server. config :host, :validate => :string, :default => "127.0.0.1" # The port to connect on. config :port, :validate => :number, :default => 6379 # The Redis database number. config :db, :validate => :number, :default => 0 # Initial connection timeout in seconds. config :timeout, :validate => :number, :default => 5 # Password to authenticate with. There is no authentication by default. config :password, :validate => :password # The name of the Redis queue (we'll use BLPOP against this). # TODO: remove soon. config :queue, :validate => :string, :deprecated => true # The name of a Redis list or channel. # TODO: change required to true config :key, :validate => :string, :required => false # Specify either list or channel. If `redis\_type` is `list`, then we will BLPOP the # key. If `redis\_type` is `channel`, then we will SUBSCRIBE to the key. # If `redis\_type` is `pattern_channel`, then we will PSUBSCRIBE to the key. # TODO: change required to true config :data_type, :validate => [ "list", "channel", "pattern_channel" ], :required => false # The number of events to return from Redis using EVAL. config :batch_count, :validate => :number, :default => 1 public def register require 'redis' @redis = nil @redis_url = "redis://#{@password}@#{@host}:#{@port}/#{@db}" # TODO remove after setting key and data_type to true if @queue if @key or @data_type raise RuntimeError.new( "Cannot specify queue parameter and key or data_type" ) end @key = @queue @data_type = 'list' end if not @key or not @data_type raise RuntimeError.new( "Must define queue, or key and data_type parameters" ) end # end TODO @logger.info("Registering Redis", :identity => identity) end # def register # A string used to identify a Redis instance in log messages # TODO(sissel): Use instance variables for this once the @name config # option is removed. private def identity @name || "#{@redis_url} #{@data_type}:#{@key}" end private def connect redis = Redis.new( :host => @host, :port => @port, :timeout => @timeout, :db => @db, :password => @password.nil? ? nil : @password.value ) load_batch_script(redis) if @data_type == 'list' && (@batch_count > 1) return redis end # def connect private def load_batch_script(redis) #A Redis Lua EVAL script to fetch a count of keys #in case count is bigger than current items in queue whole queue will be returned without extra nil values redis_script = <<EOF local i = tonumber(ARGV[1]) local res = {} local length = redis.call('llen',KEYS[1]) if length < i then i = length end while (i > 0) do local item = redis.call("lpop", KEYS[1]) if (not item) then break end table.insert(res, item) i = i-1 end return res EOF @redis_script_sha = redis.script(:load, redis_script) end private def queue_event(msg, output_queue) begin @codec.decode(msg) do |event| decorate(event) output_queue << event end rescue LogStash::ShutdownSignal => e # propagate up raise(e) rescue => e # parse or event creation error @logger.error("Failed to create event", :message => msg, :exception => e, :backtrace => e.backtrace); end end private def list_listener(redis, output_queue) item = redis.blpop(@key, 0, :timeout => 1) return unless item # from timeout or other conditions # blpop returns the 'key' read from as well as the item result # we only care about the result (2nd item in the list). queue_event(item[1], output_queue) # If @batch_count is 1, there's no need to continue. return if @batch_count == 1 begin redis.evalsha(@redis_script_sha, [@key], [@batch_count-1]).each do |item| queue_event(item, output_queue) end # Below is a commented-out implementation of 'batch fetch' # using pipelined LPOP calls. This in practice has been observed to # perform exactly the same in terms of event throughput as # the evalsha method. Given that the EVALSHA implementation uses # one call to Redis instead of N (where N == @batch_count) calls, # I decided to go with the 'evalsha' method of fetching N items # from Redis in bulk. #redis.pipelined do #error, item = redis.lpop(@key) #(@batch_count-1).times { redis.lpop(@key) } #end.each do |item| #queue_event(item, output_queue) if item #end # --- End commented out implementation of 'batch fetch' rescue Redis::CommandError => e if e.to_s =~ /NOSCRIPT/ then @logger.warn("Redis may have been restarted, reloading Redis batch EVAL script", :exception => e); load_batch_script(redis) retry else raise e end end end private def channel_listener(redis, output_queue) redis.subscribe @key do |on| on.subscribe do |channel, count| @logger.info("Subscribed", :channel => channel, :count => count) end on.message do |channel, message| queue_event message, output_queue end on.unsubscribe do |channel, count| @logger.info("Unsubscribed", :channel => channel, :count => count) end end end private def pattern_channel_listener(redis, output_queue) redis.psubscribe @key do |on| on.psubscribe do |channel, count| @logger.info("Subscribed", :channel => channel, :count => count) end on.pmessage do |ch, event, message| queue_event message, output_queue end on.punsubscribe do |channel, count| @logger.info("Unsubscribed", :channel => channel, :count => count) end end end # Since both listeners have the same basic loop, we've abstracted the outer # loop. private def listener_loop(listener, output_queue) while !@shutdown_requested begin @redis ||= connect self.send listener, @redis, output_queue rescue Redis::BaseError => e @logger.warn("Redis connection problem", :exception => e) # Reset the redis variable to trigger reconnect @redis = nil sleep 1 end end end # listener_loop public def run(output_queue) if @data_type == 'list' listener_loop :list_listener, output_queue elsif @data_type == 'channel' listener_loop :channel_listener, output_queue else listener_loop :pattern_channel_listener, output_queue end rescue LogStash::ShutdownSignal # ignore and quit end # def run public def teardown @shutdown_requested = true if @redis if @data_type == 'list' @redis.quit rescue nil elsif @data_type == 'channel' @redis.unsubscribe rescue nil @redis.connection.disconnect elsif @data_type == 'pattern_channel' @redis.punsubscribe rescue nil @redis.connection.disconnect end @redis = nil end end end # class LogStash::Inputs::Redis