背景:
在很多互聯網產品應用中,有些場景需要加鎖處理,比如:秒殺,全局遞增ID,樓層生成等等。大部分是解決方案基於DB實現的,Redis為單進程單線程模式,采用隊列模式將並發訪問變成串行訪問,且多客戶端對Redis的連接並不存在競爭關系。
項目實踐
任務隊列用到分布式鎖的情況比較多,在將業務邏輯中可以異步處理的操作放入隊列,在其他線程中處理後出隊,此時隊列中使用了分布式鎖,保證入隊和出隊的一致性。關於redis隊列這塊的邏輯分析,我將在下一次對其進行總結,此處先略過。
接下來對redis實現的分布式鎖的邏輯代碼進行詳細的分析和理解:
1、為避免特殊原因導致鎖無法釋放, 在加鎖成功後, 鎖會被賦予一個生存時間(通過 lock 方法的參數設置或者使用默認值), 超出生存時間鎖將被自動釋放.
2、鎖的生存時間默認比較短(秒級, 具體見 lock 方法), 因此若需要長時間加鎖, 可以通過 expire 方法延長鎖的生存時間為適當的時間. 比如在循環內調用 expire
3、系統級的鎖當進程無論因為任何原因出現crash,操作系統會自己回收鎖,所以不會出現資源丟失。
4、但分布式鎖不同。若一次性設置很長的時間,一旦由於各種原因進程 crash 或其他異常導致 unlock 未被調用,則該鎖在剩下的時間就變成了垃圾鎖,導致其他進程或進程重啟後無法進入加鎖區域。
<?php require_once 'RedisFactory.php'; /** * 在 Redis 上實現的分布式鎖 */ class RedisLock { //單例模式 private static $_instance = null; public static function instance() { if(self::$_instance == null) { self::$_instance = new RedisLock(); } return self::$_instance; } //redis對象變量 private $redis; //存放被鎖的標志名的數組 private $lockedNames = array(); public function __construct() { //獲取一個 RedisString 實例 $this->redis = RedisFactory::instance()->getString(); } /** * 加鎖 * * @param string 鎖的標識名 * @param int 獲取鎖失敗時的等待超時時間(秒), 在此時間之內會一直嘗試獲取鎖直到超時. 為 0 表示失敗後直接返回不等待 * @param int 當前鎖的最大生存時間(秒), 必須大於 0 . 如果超過生存時間後鎖仍未被釋放, 則系統會自動將其強制釋放 * @param int 獲取鎖失敗後掛起再試的時間間隔(微秒) */ public function lock($name, $timeout = 0, $expire = 15, $waitIntervalUs = 100000) { if(empty($name)) return false; $timeout = (int)$timeout; $expire = max((int)$expire, 5); $now = microtime(true); $timeoutAt = $now + $timeout; $expireAt = $now + $expire; $redisKey = "Lock:$name"; while(true) { $result = $this->redis->setnx($redisKey, (string)$expireAt); if($result !== false) { //對$redisKey設置生存時間 $this->redis->expire($redisKey, $expire); //將最大生存時刻記錄在一個數組裡面 $this->lockedNames[$name] = $expireAt; return true; } //以秒為單位,返回$redisKey 的剩余生存時間 $ttl = $this->redis->ttl($redisKey); // TTL 小於 0 表示 key 上沒有設置生存時間(key 不會不存在, 因為前面 setnx 會自動創建) // 如果出現這種情況, 那就是進程在某個實例 setnx 成功後 crash 導致緊跟著的 expire 沒有被調用. 這時可以直接設置 expire 並把鎖納為己用 if($ttl < 0) { $this->redis->set($redisKey, (string)$expireAt, $expire); $this->lockedNames[$name] = $expireAt; return true; } // 設置了不等待或者已超時 if($timeout <= 0 || microtime(true) > $timeoutAt) break; // 掛起一段時間再試 usleep($waitIntervalUs); } return false; } /** * 給當前鎖增加指定的生存時間(秒), 必須大於 0 * * @param string 鎖的標識名 * @param int 生存時間(秒), 必須大於 0 */ public function expire($name, $expire) { if($this->isLocking($name)) { if($this->redis->expire("Lock:$name", max($expire, 1))) { return true; } } return false; } /** * 判斷當前是否擁有指定名稱的鎖 * * @param mixed $name */ public function isLocking($name) { if(isset($this->lockedNames[$name])) { return (string)$this->lockedNames[$name] == (string)$this->redis->get("Lock:$name"); } return false; } /** * 釋放鎖 * * @param string 鎖的標識名 */ public function unlock($name) { if($this->isLocking($name)) { if($this->redis->deleteKey("Lock:$name")) { unset($this->lockedNames[$name]); return true; } } return false; } /** 釋放當前已經獲取到的所有鎖 */ public function unlockAll() { $allSuccess = true; foreach($this->lockedNames as $name => $item) { if(false === $this->unlock($name)) { $allSuccess = false; } } return $allSuccess; } }
此類很多代碼都寫上了注釋,只要認真理解下,就很容易懂得如何在redis實現分布式鎖了。