python-redis-lock簡介
python-redis-lock是一個python的第三方庫,基于Redis,封裝了分布式鎖的邏輯,提供了更高級的API來簡化鎖的獲取、保持和釋放過程。包括自動續期、鎖超時、重入鎖等功能。
相比于直接使用redis的setnx,避免了寫額外代碼來實現鎖的復雜邏輯。
鎖的續期
在使用分布式鎖的情況下,如果某個服務器A的業務還未執行完(可能因為網絡擁塞、數據量突然變大等各種原因),但是鎖過期了,可能引發額外的問題。例如另一臺服務器B發現鎖釋放了,于是加鎖并開始執行業務,從而導致出現問題。
更有可能導致服務器A業務在執行完后去釋放鎖時,意外地釋放了服務器B加的鎖,導致服務器C進入,從而引發問題(在未使用唯一性ID表示加鎖者的情況下)。
所以對鎖的過期時間來續期是很有必要的,它確保了鎖在業務執行完后才釋放。
python-redis-lock實現自動續期的源碼分析
首先在配置python-redis-lock實例時,有兩個參數:expire int 和 auto_renewal boolean。
expire:鎖過期時間,單位為秒。
auto_renewal:是否開啟自動續期鎖。
python-redis-lock實例在初始化時,針對這兩個參數的源碼:
class Lock(object):def __init__(self, redis_client, name, expire=None, auto_renewal=False, ...):...if expire:expire = int(expire)...self._expire = expireself._lock_renewal_interval = float(expire) * 2/3 if auto_renewal else None...
假設我們設置了一個過期時間30秒,開啟自動續期的Lock實例,則self._expire=30秒,self._lock_renewal_interval=20秒。
當在代碼層調用Lock.acquire方法時,判斷self._lock_renewal_interval是否為空,如果不為空,則開啟鎖的自動續期(這里省略了一些內容,因為只關注續期的代碼實現邏輯):
class Lock(object):def acquire(self, blocking=True, timeout=None):...if self._lock_renewal_interval is not None:self._start_lock_renewer()
在self._start_lock_renewer方法中,基于python的threading模塊,開啟了一個鎖自動更新線程self._lock_renewal_thread。這個線程執行self._lock_renewer方法
def _start_lock_renewer(self):...self._lock_renewal_stop = threading.Event() # threading.Event()用于線程間的協調self._lock_renewal_thread = threading.Thread(group=None,target=self._lock_renewer,kwargs={'name': self._name,'lockref': weakref.ref(self), # 對Lock實例的弱引用'interval': self._lock_renewal_interval,'stop': self._lock_renewal_stop,},)self._lock_renewal_thread.demon = Trueself._lock_renewal_thread.start()
weakref.ref(self)創建當前類實例(self)的弱引用,并將其存儲在變量 lockref 中。使用 weakref.ref 創建的弱引用不會阻止對象被垃圾回收。
在self._lock_renewer方法中,首先用了while循環來每次等待20秒(interval=20)再執行循環體:如果弱引用對象(即Lock實例本身)沒有被垃圾回來,則執行續期的方法(lock.extend):
def _lock_renewer(name, lockref, interval, stop):while not stop.wait(timeout=interval):...lock: "Lock" = lockref()if lock is None:breaklock.extend(expire=lock._expire)
在self.extend方法中,執行了一個Lua腳本來更新鎖的過期時間。
在滿足self._name和self._signal的緩存鍵值存在且鎖未過期的情況下,將當前鎖的過期時間重置為expire(30秒)。
def extend(self. expire=None):if expire:expire = int(expire)error = self.extend_script(client=self._client, # 對redis的連接keys=(self._name, self._signal),args=(self._id, expire))...
# self.extend_script的執行邏輯:
EXTEND_SCRIPT = b"""if redis.call("get", KEYS[1]) ~= ARGV[1] thenreturn 1elseif redis.call("ttl", KEYS[1]) < 0 thenreturn 2elseredis.call("expire", KEYS[1], ARGV[2])return 0end
"""
cls.extend_script = redis_client.register_script(EXTEND_SCRIPT)
以上就是python-redis-lock實現鎖自動續期的源碼邏輯。
其中用到了多線程threading、弱引用weakref和Lua腳本等相關知識。