logstash-input-redis源碼解析

首先是程序的自定義,這里設置了redis插件需要的參數,默認值,以及校驗等。

然后注冊Redis實例需要的信息,比如key的名字或者url等,可以看到默認的data_type是list模式。

程序運行的主要入口,根據不同的data_type,傳遞不同的實現方法,然后調用listener_loop執行循環監聽

Listner_loop方法傳遞了兩個參數,一個是監聽器實現的方法,一個是處理的數據隊列。循環是每秒鐘執行一次,如果循環標識被設置,則退出。

上面的循環方法可以看到,是通過一個參數shutdown_requested來判斷是否繼續循環。該參數通過tear_down方法設置為true,然后根據不同的模式,指定不同的退出方式。
如果是list模式,則直接退出;如果是channel模式,則發送redis的unsubsribe命令退出;如果是pattern_channel,則發送punsubscribe退出。

在循環內部,判斷是否已經創建了redis實例,如果沒有創建,則調用connect方法創建;否則直接執行。

這里前一段是調用Redis的new方法,初始化一個redis實例。緊接著判斷batch_count是否大于1,如果等于1,就什么也不做,然后返回redis。
如果batch_count大于1,那么就調用load_batch_script方法,加載Lua腳本,存儲到redis中的lua腳本字典中,供后面使用。代碼如下:

上面的代碼應該是這個插件最難理解的部分了。為了弄清楚這段代碼的工作,需要了解下面幾個知識點:

  • lua腳本基本概念
  • Redis中的EVAL命令如何使用
  • 理解上面腳本的工作

首先,要想運行上面的腳本,必須是Redis2.6+的版本,才支持EVAL,否則會報錯!EVAL命令與js中的差不多,就是可以把某一個字符串當做命令解析,其中字符串就包括lua腳本。這樣有什么好處呢?

說白了,就是能一次性進行多個操作。比如我們可以在腳本中寫入一連串的操作,這些操作會以原子模式,一次性在服務器執行完,在返回回來。

Lua腳本

關于lua腳本,其實沒有詳細研究的必要,但是一定要知道一個local和table的概念。local是創建本地的變量,這樣就不會污染redis的數據。table是lua的一種數據結構,有點類似于json,可以存儲數據。

EVAL命令

另外還要知道EVAL命令的使用方法,看下面這個命令,就好理解了!
EVAL "return KEYS[1] KEYS[2] ARGV[1] ARGV[2];" 2 name:xing age:13
就會返回:

name
age
xing
13

這段代碼沒有經過真正的操作,但是有助于理解就好!也就是說,EVAL后面跟著一段腳本,腳本后面跟著的就是參數,可以通過KEYS和ARGV數組獲得,但是下標從1開始。

再來說說EVAL命令,它的執行過程如下:

  • 解析字符串腳本,根據校驗和生成lua的方法
  • 把校驗和和函數放入一個lua_script字典里面,之后就可以通過EVALSHA命令直接使用校驗和執行函數。

有了這些理論基礎以后,就可以看看上面的代碼都做了什么了!
首先是獲取參數,這個參數賦值給i;然后創建了一個對象res;緊接著調用llen命令,獲得指定list的長度;如果list的長度大于i,則什么也不做;如果小于i,那么i就等于lenth;然后執行命令lpop,取出list中的元素,一共取i次,放入res中,最后返回。

說得通俗點,就是比較一下list元素個數與設置batch_count的值。如果batch_count為5,列表list中有5條以上的數據,那么直接取5條,一次性返回;否則取length條返回。

可以看到這段腳本的作用,就是讓logstash一次請求,最多獲得batch_count條事件,減小了服務器處理請求的壓力。

講完這段代碼,可以看看不同的工作模式的實現代碼了:

首先是list的代碼,其實就是執行BLPOP命令,獲取數據。如果在list模式中,還會去判斷batch_count的值,如果是1直接退出;如果大于1,則使用evalsha命令調用之前保存的腳本方法。

至于channel和pattern_channel,就沒啥解釋的了,就是分別調用subscribe和psubsribe命令而已。

其實最難理解的,就是中間那段lua腳本~明白它的用處,redis插件也就不難理解了。

代碼

# encoding: utf-8
require "logstash/inputs/base"
require "logstash/inputs/threadable"
require "logstash/namespace"# This input will read events from a Redis instance; it supports both Redis channels and lists.
# The list command (BLPOP) used by Logstash is supported in Redis v1.3.1+, and
# the channel commands used by Logstash are found in Redis v1.3.8+.
# While you may be able to make these Redis versions work, the best performance
# and stability will be found in more recent stable versions.  Versions 2.6.0+
# are recommended.
#
# For more information about Redis, see <http://redis.io/>
#
# `batch_count` note: If you use the `batch_count` setting, you *must* use a Redis version 2.6.0 or
# newer. Anything older does not support the operations used by batching.
#
class LogStash::Inputs::Redis < LogStash::Inputs::Threadableconfig_name "redis"default :codec, "json"# The `name` configuration is used for logging in case there are multiple instances.# This feature has no real function and will be removed in future versions.config :name, :validate => :string, :default => "default", :deprecated => true# The hostname of your Redis server.config :host, :validate => :string, :default => "127.0.0.1"# The port to connect on.config :port, :validate => :number, :default => 6379# The Redis database number.config :db, :validate => :number, :default => 0# Initial connection timeout in seconds.config :timeout, :validate => :number, :default => 5# Password to authenticate with. There is no authentication by default.config :password, :validate => :password# The name of the Redis queue (we'll use BLPOP against this).# TODO: remove soon.config :queue, :validate => :string, :deprecated => true# The name of a Redis list or channel.# TODO: change required to trueconfig :key, :validate => :string, :required => false# Specify either list or channel.  If `redis\_type` is `list`, then we will BLPOP the# key.  If `redis\_type` is `channel`, then we will SUBSCRIBE to the key.# If `redis\_type` is `pattern_channel`, then we will PSUBSCRIBE to the key.# TODO: change required to trueconfig :data_type, :validate => [ "list", "channel", "pattern_channel" ], :required => false# The number of events to return from Redis using EVAL.config :batch_count, :validate => :number, :default => 1publicdef registerrequire 'redis'@redis = nil@redis_url = "redis://#{@password}@#{@host}:#{@port}/#{@db}"# TODO remove after setting key and data_type to trueif @queueif @key or @data_typeraise RuntimeError.new("Cannot specify queue parameter and key or data_type")end@key = @queue@data_type = 'list'endif not @key or not @data_typeraise RuntimeError.new("Must define queue, or key and data_type parameters")end# end TODO@logger.info("Registering Redis", :identity => identity)end # def register# A string used to identify a Redis instance in log messages# TODO(sissel): Use instance variables for this once the @name config# option is removed.privatedef identity@name || "#{@redis_url} #{@data_type}:#{@key}"endprivatedef connectredis = Redis.new(:host => @host,:port => @port,:timeout => @timeout,:db => @db,:password => @password.nil? ? nil : @password.value)load_batch_script(redis) if @data_type == 'list' && (@batch_count > 1)return redisend # def connectprivatedef load_batch_script(redis)#A Redis Lua EVAL script to fetch a count of keys#in case count is bigger than current items in queue whole queue will be returned without extra nil valuesredis_script = <<EOFlocal i = tonumber(ARGV[1])local res = {}local length = redis.call('llen',KEYS[1])if length < i then i = length endwhile (i > 0) dolocal item = redis.call("lpop", KEYS[1])if (not item) thenbreakendtable.insert(res, item)i = i-1endreturn res
EOF@redis_script_sha = redis.script(:load, redis_script)endprivatedef queue_event(msg, output_queue)begin@codec.decode(msg) do |event|decorate(event)                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                               output_queue << eventendrescue LogStash::ShutdownSignal => e# propagate upraise(e)rescue => e # parse or event creation error@logger.error("Failed to create event", :message => msg, :exception => e, :backtrace => e.backtrace);endendprivatedef list_listener(redis, output_queue)item = redis.blpop(@key, 0, :timeout => 1)return unless item # from timeout or other conditions# blpop returns the 'key' read from as well as the item result# we only care about the result (2nd item in the list).queue_event(item[1], output_queue)# If @batch_count is 1, there's no need to continue.return if @batch_count == 1beginredis.evalsha(@redis_script_sha, [@key], [@batch_count-1]).each do |item|queue_event(item, output_queue)end# Below is a commented-out implementation of 'batch fetch'# using pipelined LPOP calls. This in practice has been observed to# perform exactly the same in terms of event throughput as# the evalsha method. Given that the EVALSHA implementation uses# one call to Redis instead of N (where N == @batch_count) calls,# I decided to go with the 'evalsha' method of fetching N items# from Redis in bulk.#redis.pipelined do#error, item = redis.lpop(@key)#(@batch_count-1).times { redis.lpop(@key) }#end.each do |item|#queue_event(item, output_queue) if item#end# --- End commented out implementation of 'batch fetch'rescue Redis::CommandError => eif e.to_s =~ /NOSCRIPT/ then@logger.warn("Redis may have been restarted, reloading Redis batch EVAL script", :exception => e);load_batch_script(redis)retryelseraise eendendendprivatedef channel_listener(redis, output_queue)redis.subscribe @key do |on|on.subscribe do |channel, count|@logger.info("Subscribed", :channel => channel, :count => count)endon.message do |channel, message|queue_event message, output_queueendon.unsubscribe do |channel, count|@logger.info("Unsubscribed", :channel => channel, :count => count)endendendprivatedef pattern_channel_listener(redis, output_queue)redis.psubscribe @key do |on|on.psubscribe do |channel, count|@logger.info("Subscribed", :channel => channel, :count => count)endon.pmessage do |ch, event, message|queue_event message, output_queueendon.punsubscribe do |channel, count|@logger.info("Unsubscribed", :channel => channel, :count => count)endendend# Since both listeners have the same basic loop, we've abstracted the outer# loop.privatedef listener_loop(listener, output_queue)while !@shutdown_requestedbegin@redis ||= connectself.send listener, @redis, output_queuerescue Redis::BaseError => e@logger.warn("Redis connection problem", :exception => e)# Reset the redis variable to trigger reconnect@redis = nilsleep 1endendend # listener_looppublicdef run(output_queue)if @data_type == 'list'listener_loop :list_listener, output_queueelsif @data_type == 'channel'listener_loop :channel_listener, output_queueelselistener_loop :pattern_channel_listener, output_queueendrescue LogStash::ShutdownSignal# ignore and quitend # def runpublicdef teardown@shutdown_requested = trueif @redisif @data_type == 'list'@redis.quit rescue nilelsif @data_type == 'channel'@redis.unsubscribe rescue nil@redis.connection.disconnectelsif @data_type == 'pattern_channel'@redis.punsubscribe rescue nil@redis.connection.disconnectend@redis = nilendend
end # class LogStash::Inputs::Redis

?

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/535376.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/535376.shtml
英文地址,請注明出處:http://en.pswp.cn/news/535376.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

logstash-filter模塊

Fillters 在Logstash處理鏈中擔任中間處理組件。他們經常被組合起來實現一些特定的行為來&#xff0c;處理匹配特定規則的事件流。常見的filters如下&#xff1a; grok&#xff1a;解析無規則的文字并轉化為有結構的格式。Grok 是目前最好的方式來將無結構的數據轉換為有結構可…

weblogic啟動慢

1.最差的解決辦法 執行命令 mv /dev/random /dev/random.ORIG ln /dev/urandom /dev/random   將/dev/random 指向/dev/urandom 2. 較好的解決辦法&#xff1a; 在weblogic啟動腳本里setDomainEnv.sh: 加入以下內容 JAVA_OPTIONS"${JAVA_OPTIONS}" -Dja…

SSL雙向認證和SSL單向認證的區別

雙向認證 SSL 協議要求服務器和用戶雙方都有證書。單向認證 SSL 協議不需要客戶擁有CA證書&#xff0c;具體的過程相對于上面的步驟&#xff0c;只需將服務器端驗證客戶證書的過程去掉&#xff0c;以及在協商對稱密碼方案&#xff0c;對稱通話密鑰時&#xff0c;服務器發送給客…

雙向認證SSL原理

文中首先解釋了加密解密的一些基礎知識和概念&#xff0c;然后通過一個加密通信過程的例子說明了加密算法的作用&#xff0c;以及數字證書的出現所起的作用。接著對數字證書做一個詳細的解釋&#xff0c;并討論一下windows中數字證書的管理&#xff0c;最后演示使用makecert生成…

Xtrabackup備份與恢復

一、Xtrabackup介紹 Percona-xtrabackup是 Percona公司開發的一個用于MySQL數據庫物理熱備的備份工具&#xff0c;支持MySQL、Percona server和MariaDB&#xff0c;開源免費&#xff0c;是目前較為受歡迎的主流備份工具。xtrabackup只能備份innoDB和xtraDB兩種數據引擎的表&…

實時備份工具之inotify+rsync

1.inotify簡介 inotify 是一個從 2.6.13 內核開始&#xff0c;對 Linux 文件系統進行高效率、細粒度、異步地監控機制&#xff0c; 用于通知用戶空間程序的文件系統變化。可利用它對用戶空間進行安全、性能、以及其他方面的監控。Inotify 反應靈敏&#xff0c;用法非常簡單&…

nginx proxy_cache緩存詳解

目錄 1. 關于緩沖區指令 1.1 proxy_buffer_size1.2 proxy_buffering1.3 proxy_buffers1.4 proxy_busy_buffers_size1.5 proxy_max_temp_file_size1.6 proxy_temp_file_write_size1.7 緩沖區配置實例2. 常用配置項 2.1 proxy_cache_path2.2 proxy_temp_path2.3 proxy_cache2.4 …

mysql主從延遲

在實際的生產環境中&#xff0c;由單臺MySQL作為獨立的數據庫是完全不能滿足實際需求的&#xff0c;無論是在安全性&#xff0c;高可用性以及高并發等各個方面 因此&#xff0c;一般來說都是通過集群主從復制&#xff08;Master-Slave&#xff09;的方式來同步數據&#xff0c…

16張圖帶你吃透高性能 Redis 集群

現如今 Redis 變得越來越流行&#xff0c;幾乎在很多項目中都要被用到&#xff0c;不知道你在使用 Redis 時&#xff0c;有沒有思考過&#xff0c;Redis 到底是如何穩定、高性能地提供服務的&#xff1f; 你也可以嘗試回答一下以下這些問題&#xff1a; 我使用 Redis 的場景很…

Redis與MySQL雙寫一致性如何保證

談談一致性 一致性就是數據保持一致&#xff0c;在分布式系統中&#xff0c;可以理解為多個節點中數據的值是一致的。 強一致性&#xff1a;這種一致性級別是最符合用戶直覺的&#xff0c;它要求系統寫入什么&#xff0c;讀出來的也會是什么&#xff0c;用戶體驗好&#xff0c;…

weblogic忘記console密碼

進入 cd /sotware/oracle_ldap/Middleware/user_projects/domains/base_domain/security/ 目錄 執行 java -classpath /sotware/oracle_ldap/Middleware/wlserver_10.3/server/lib/weblogic.jar weblogic.security.utils.AdminAccount weblogic(賬號) weblogic123(密碼) . …

Mysql高性能優化技能總結

數據庫命令規范 所有數據庫對象名稱必須使用小寫字母并用下劃線分割 所有數據庫對象名稱禁止使用mysql保留關鍵字&#xff08;如果表名中包含關鍵字查詢時&#xff0c;需要將其用單引號括起來&#xff09; 數據庫對象的命名要能做到見名識意&#xff0c;并且最后不要超過32個…

Redis的AOF日志

如果 Redis 每執行一條寫操作命令&#xff0c;就把該命令以追加的方式寫入到一個文件里&#xff0c;然后重啟 Redis 的時候&#xff0c;先去讀取這個文件里的命令&#xff0c;并且執行它&#xff0c;這不就相當于恢復了緩存數據了嗎&#xff1f; 這種保存寫操作命令到日志的持久…

Redis 核心技術與實戰

目錄 開篇詞 | 這樣學 Redis&#xff0c;才能技高一籌 01 | 基本架構&#xff1a;一個鍵值數據庫包含什么&#xff1f; 02 | 數據結構&#xff1a;快速的Redis有哪些慢操作&#xff1f; 鍵和值用什么結構組織&#xff1f; 為什么哈希表操作變慢了&#xff1f; 有哪些底層數…

redis核心技術與實戰(二)緩存應用篇

1.《旁路緩存&#xff1a;redis 在緩存中工作原理》 1.緩存的兩個特征 1.什么是緩存&#xff0c;有什么特征&#xff1f; 磁盤->內存->cpu 之間讀寫速度差異巨大&#xff0c;為了平衡他們之間的差異&#xff0c;操作系統默認使用了兩種緩存&#xff1b; CPU 里面的末級…

redis核心技術與實戰(三) 性能篇

影響redis性能主要有以下部分&#xff1a; Redis 內部的阻塞式操作&#xff1b; CPU核和NUMA架構 Redis關鍵系統配置 Redis內存碎片 Redis緩沖區 下面一個個來介紹這些地方 1.《redis 有哪些阻塞點&#xff1f;》 redis實例主要交互的對象有以下幾點&#xff0c;我們依據下面這…

redis核心與實戰(一)數據結構篇

1.《redis數據結構概覽》 1.數據結構概覽 數據模型&#xff1a;一共5種&#xff0c;String&#xff08;字符串&#xff09;、List&#xff08;列表&#xff09;、Hash&#xff08;哈希&#xff09;、Set&#xff08;集合&#xff09;和 Sorted Set&#xff08;有序集合&#xf…

redis核心技術與實戰(四)高可用高擴展篇

1.《redis架構組成》 1.redis學習維度 2.一個基本的鍵值型數據庫包括什么&#xff1f; 1.訪問框架 redis通過網絡框架進行訪問&#xff0c;使得 Redis 可以作為一個基礎性的網絡服務進行訪問&#xff0c;擴大了redis應用范圍&#xff1b; 過程&#xff1a;如果客戶端發送“pu…

tomcat監控腳本

#!/bin/sh# func:自動監控tomcat腳本并且執行重啟操作# 獲取tomcat進程ID&#xff08;其中[grep -w .....]中的.....需要替換為實際部署的tomcat文件夾名&#xff0c;如下&#xff09; TomcatID$(ps -ef |grep tomcat |grep -w /usr/local/tomcat/apache-tomcat-8.5.31|grep -v…

weblogic命令行操作

啟動和停止子節點&#xff1a; [rootoud bin]# cd /sotware/oracle_ldap/Middleware/user_projects/domains/base_domain/bin/ [rootoud bin]# ./startManagedWebLogic.sh Server-0 http://192.168.63.129:7001 -Dweblogic.management.usernameweblogic -Dweblogic.management…