Redis上实现分布式锁以提高性能的方案研究
背景: 在很多互联网产品应用中,有些场景需要加锁处理,比如:秒杀,全局递增ID,楼层生成等等。大部分是解决方案基于DB实现的,Redis为单进程单线程模式,采用队列模式将并发访问变成串行访问,且多客户端对Redis的连接并不存在竞争关系。 项目实践 任务队列用到分布式锁的情况比较多,在将业务逻辑中可以异步处理的操作放入队列,在其他线程中处理后出队,此时队列中使用了分布式锁,保证入队和出队的一致性。关于redis队列这块的逻辑分析,我将在下一次对其进行总结,此处先略过。
1、为避免特殊原因导致锁无法释放,在加锁成功后,锁会被赋予一个生存时间(通过 lock 方法的参数设置或者使用默认值),超出生存时间锁将被自动释放. 2、锁的生存时间默认比较短(秒级,具体见 lock 方法),因此若需要长时间加锁,可以通过 expire 方法延长锁的生存时间为适当的时间. 比如在循环内调用 expire <?php require_once 'RedisFactory.php'; /** * 在 Redis 上实现的分布式锁 */ class RedisLock { //单例模式 private static $_instance = null; public static function instance() { if(self::$_instance == null) { self::$_instance = new RedisLock(); } return self::$_instance; } //redis对象变量 private $redis; //存放被锁的标志名的数组 private $lockedNames = array(); public function __construct() { //获取一个 RedisString 实例 $this->redis = RedisFactory::instance()->getString(); } /** * 加锁 * * @param string 锁的标识名 * @param int 获取锁失败时的等待超时时间(秒),在此时间之内会一直尝试获取锁直到超时. 为 0 表示失败后直接返回不等待 * @param int 当前锁的最大生存时间(秒),必须大于 0 . 如果超过生存时间后锁仍未被释放,则系统会自动将其强制释放 * @param int 获取锁失败后挂起再试的时间间隔(微秒) */ public function lock($name,$timeout = 0,$expire = 15,$waitIntervalUs = 100000) { if(empty($name)) return false; $timeout = (int)$timeout; $expire = max((int)$expire,5); $now = microtime(true); $timeoutAt = $now + $timeout; $expireAt = $now + $expire; $redisKey = "Lock:$name"; while(true) { $result = $this->redis->setnx($redisKey,(string)$expireAt); if($result !== false) { //对$redisKey设置生存时间 $this->redis->expire($redisKey,$expire); //将最大生存时刻记录在一个数组里面 $this->lockedNames[$name] = $expireAt; return true; } //以秒为单位,返回$redisKey 的剩余生存时间 $ttl = $this->redis->ttl($redisKey); // TTL 小于 0 表示 key 上没有设置生存时间(key 不会不存在,因为前面 setnx 会自动创建) // 如果出现这种情况,那就是进程在某个实例 setnx 成功后 crash 导致紧跟着的 expire 没有被调用. 这时可以直接设置 expire 并把锁纳为己用 if($ttl < 0) { $this->redis->set($redisKey,(string)$expireAt,$expire); $this->lockedNames[$name] = $expireAt; return true; } // 设置了不等待或者已超时 if($timeout <= 0 || microtime(true) > $timeoutAt) break; // 挂起一段时间再试 usleep($waitIntervalUs); } return false; } /** * 给当前锁增加指定的生存时间(秒),必须大于 0 * * @param string 锁的标识名 * @param int 生存时间(秒),必须大于 0 */ public function expire($name,$expire) { if($this->isLocking($name)) { if($this->redis->expire("Lock:$name",max($expire,1))) { return true; } } return false; } /** * 判断当前是否拥有指定名称的锁 * * @param mixed $name */ public function isLocking($name) { if(isset($this->lockedNames[$name])) { return (string)$this->lockedNames[$name] == (string)$this->redis->get("Lock:$name"); } return false; } /** * 释放锁 * * @param string 锁的标识名 */ public function unlock($name) { if($this->isLocking($name)) { if($this->redis->deleteKey("Lock:$name")) { unset($this->lockedNames[$name]); return true; } } return false; } /** 释放当前已经获取到的所有锁 */ public function unlockAll() { $allSuccess = true; foreach($this->lockedNames as $name => $item) { if(false === $this->unlock($name)) { $allSuccess = false; } } return $allSuccess; } } 此类很多代码都写上了注释,只要认真理解下,就很容易懂得如何在redis实现分布式锁了。 (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |