基于redis的分布式锁实现
1.分布式锁介绍在计算机系统中,锁作为一种控制并发的机制无处不在。 单机环境下,操作系统能够在进程或线程之间通过本地的锁来控制并发程序的行为。而在如今的大型复杂系统中,通常采用的是分布式架构提供服务。 分布式环境下,基于本地单机的锁无法控制分布式系统中分开部署客户端的并发行为,此时分布式锁就应运而生了。 一个可靠的分布式锁应该具备以下特性: 1.互斥性:作为锁,需要保证任何时刻只能有一个客户端(用户)持有锁 2.可重入:?同一个客户端在获得锁后,可以再次进行加锁 3.高可用:获取锁和释放锁的效率较高,不会出现单点故障 4.自动重试机制:当客户端加锁失败时,能够提供一种机制让客户端自动重试 2.分布式锁api接口/** * 分布式锁 api接口 */ public interface DistributeLock { * 尝试加锁 * @param lockKey 锁的key * @return 加锁成功 返回uuid * 加锁失败 返回null * */ String lock(String lockKey); * 尝试加锁 (requestID相等 可重入) * expireTime 过期时间 单位:秒 * String lock(String lockKey,int expireTime); requestID 用户ID * * 尝试加锁,失败自动重试 会阻塞当前线程 * String lockAndRetry(String lockKey); * 尝试加锁,失败自动重试 会阻塞当前线程 (requestID相等 可重入) * String lockAndRetry(String lockKey,1)"> retryCount 重试次数 * int expireTime, retryCount); * 释放锁 * true 释放自己所持有的锁 成功 * false 释放自己所持有的锁 失败 * */ boolean unLock(String lockKey,String requestID); } 3.基于redis的分布式锁的简单实现3.1?基础代码?当前实现版本的分布式锁基于redis实现,使用的是jedis连接池来和redis进行交互,并将其封装为redisClient工具类(仅封装了demo所需的少数接口) redisClient工具类:class RedisClient { private static final Logger LOGGER = LoggerFactory.getLogger(RedisClient.); private JedisPool pool; static RedisClient instance = new RedisClient(); RedisClient() { init(); } static RedisClient getInstance(){ return instance; } public Object eval(String script,List<String> keys,List<String> args) { Jedis jedis = getJedis(); Object result = jedis.eval(script,keys,args); jedis.close(); result; } public String get(final String key){ Jedis jedis = getJedis(); String result = jedis.get(key); jedis.close(); public String set(final String key,1)">final String value,1)">final String nxxx,1)">final String expx,1)">final time) { Jedis jedis = jedis.set(key,value,nxxx,expx,time); jedis.close(); void init(){ Properties redisConfig = PropsUtil.loadProps("redis.properties"); int maxTotal = PropsUtil.getInt(redisConfig,"maxTotal",10); String ip = PropsUtil.getString(redisConfig,"ip","127.0.0.1"int port = PropsUtil.getInt(redisConfig,"port",6379); JedisPoolConfig jedisPoolConfig = JedisPoolConfig(); jedisPoolConfig.setMaxTotal(maxTotal); pool = JedisPool(jedisPoolConfig,ip,port); LOGGER.info("连接池初始化成功 ip={},port={},maxTotal={}",port,maxTotal); } Jedis getJedis(){ pool.getResource(); } } 所依赖的工具类:package util; import org.slf4j.Logger; org.slf4j.LoggerFactory; java.io.FileNotFoundException; java.io.IOException; java.io.InputStream; java.util.Properties; * @Author xiongyx * @Create 2018/4/11. PropsUtil { final Logger LOGGER = LoggerFactory.getLogger(PropsUtil. * 读取配置文件 * Properties loadProps(String fileName){ Properties props = null; InputStream is = ; try{ //:::绝对路径获得输入流 is = Thread.currentThread().getContextClassLoader().getResourceAsStream(fileName); if(is == ){ :::没找到文件,抛出异常 throw new FileNotFoundException(fileName + " is not found"); } props = Properties(); props.load(is); }catch(IOException e){ LOGGER.error("load propertis file fail"finally { if(is != { :::关闭io流 is.close(); } (IOException e) { LOGGER.error("close input Stream fail" props; } * 获取字符串属性(默认为空字符串) * String getString(Properties properties,String key){ :::调用重载函数 默认值为:空字符串 return getString(properties,key,""); } * 获取字符串属性 * :::key对应的value数据是否存在 if(properties.containsKey(key)){ properties.getProperty(key); }else defaultValue; } } * 获取int属性 默认值为0 * getInt(Properties properties,1)">:::调用重载函数,默认为:0 return getInt(properties,0 * 获取int属性 * int getInt(Properties properties,1)"> defaultValue){ CastUtil.castToInt(properties.getProperty(key)); } * 获取boolean属性,默认值为false getBoolean(Properties properties,1)">return getBoolean(properties,1)">false * 获取boolean属性 boolean getBoolean(Properties properties,1)"> CastUtil.castToBoolean(properties.getProperty(key)); } defaultValue; } } } CastUtil { * 转为 string * String castToString(Object obj){ return castToString(obj,1)"> * 转为 string 提供默认值 * String castToString(Object obj,1)">if(obj == ){ defaultValue; } obj.toString(); } } * 转为 int * castToInt(Object obj){ return castToInt(obj,1)"> * 转为 int 提供默认值 * int castToInt(Object obj,1)"> Integer.parseInt(obj.toString()); } } * 转为 double * double castToDouble(Object obj){ return castToDouble(obj,1)"> * 转为 double 提供默认值 * double castToDouble(Object obj,1)"> Double.parseDouble(obj.toString()); } } * 转为 long * long castToLong(Object obj){ return castToLong(obj,1)"> * 转为 long 提供默认值 * long castToLong(Object obj,1)"> Long.parseLong(obj.toString()); } } * 转为 boolean * castToBoolean(Object obj){ return castToBoolean(obj,1)"> * 转为 boolean 提供默认值 * boolean castToBoolean(Object obj,1)"> Boolean.parseBoolean(obj.toString()); } } } 初始化lua脚本 LuaScript.java:在分布式锁初始化时,使用init方法读取lua脚本 LuaScript { * 加锁脚本 lock.lua * static String LOCK_SCRIPT = ""; * 解锁脚本 unlock.lua * static String UN_LOCK_SCRIPT = "" init(){ { initLockScript(); initUnLockScript(); } (IOException e) { RuntimeException(e); } } void initLockScript() throws IOException { String filePath = Objects.requireNonNull(LuaScript.class.getClassLoader().getResource("lock.lua")).getPath(); LOCK_SCRIPT = readFile(filePath); } void initUnLockScript() class.getClassLoader().getResource("unlock.lua")).getPath(); UN_LOCK_SCRIPT =static String readFile(String filePath) IOException { ( FileReader reader = FileReader(filePath); BufferedReader br = BufferedReader(reader) ) { String line; StringBuilder stringBuilder = StringBuilder(); while ((line = br.readLine()) != ) { stringBuilder.append(line).append(System.lineSeparator()); } stringBuilder.toString(); } } } 单例的RedisDistributeLock基础属性class RedisDistributeLock implements * 无限重试 * int UN_LIMIT_RETRY = -1 RedisDistributeLock() { LuaScript.init(); } static DistributeLock instance = RedisDistributeLock(); * 持有锁 成功标识 * final Long ADD_LOCK_SUCCESS = 1L; * 释放锁 失败标识 * final Integer RELEASE_LOCK_SUCCESS = 1 * 默认过期时间 单位:秒 * int DEFAULT_EXPIRE_TIME_SECOND = 300 * 默认加锁重试时间 单位:毫秒 * int DEFAULT_RETRY_FIXED_TIME = 3000 * 默认的加锁浮动时间区间 单位:毫秒 * int DEFAULT_RETRY_TIME_RANGE = 1000 * 默认的加锁重试次数 * int DEFAULT_RETRY_COUNT = 30 * lockCount Key前缀 * final String LOCK_COUNT_KEY_PREFIX = "lock_count:" DistributeLock getInstance(){ instance; } } 3.2?加锁实现使用redis实现分布式锁时,加锁操作必须是原子操作,否则多客户端并发操作时会导致各种各样的问题。详情请见:Redis分布式锁的正确实现方式。 由于我们实现的是可重入锁,加锁过程中需要判断客户端ID的正确与否。而redis原生的简单接口没法保证一系列逻辑的原子性执行,因此采用了lua脚本来实现加锁操作。lua脚本可以让redis在执行时将一连串的操作以原子化的方式执行。 加锁lua脚本?lock.lua-- 获取参数 local requestIDKey = KEYS[1] local currentRequestID = ARGV[] local expireTimeTTL = ARGV[2 setnx 尝试加锁 local lockSet = redis.call('hsetnx',KEYS[1],lockKey'if lockSet == 1 then 加锁成功 设置过期时间和重入次数=1 redis.call(expire],expireTimeTTL) redis.call(hsetlockCount) return else 判断是否是重入加锁 local oldRequestID = redis.call(hgetif currentRequestID == oldRequestID then 是重入加锁 redis.call(hincrby) 重置过期时间 redis.call(1 else requestID不一致,加锁失败 0 end end 加锁方法实现:加锁时,通过判断eval的返回值来判断加锁是否成功。 @Override public String lock(String lockKey) { String uuid = UUID.randomUUID().toString(); lock(lockKey,uuid); } @Override public String lock(String lockKey,1)"> expireTime) { String uuid = String lock(String lockKey,String requestID) { expireTime) { RedisClient redisClient = RedisClient.getInstance(); List<String> keyList = Arrays.asList( lockKey ); List<String> argsList = Arrays.asList( requestID,expireTime + "" ); Long result = (Long)redisClient.eval(LuaScript.LOCK_SCRIPT,keyList,argsList); (result.equals(ADD_LOCK_SUCCESS)){ requestID; }return ; } } 3.3?解锁实现解锁操作同样需要一连串的操作,由于原子化操作的需求,因此同样使用lua脚本实现解锁功能。 解锁lua脚本?unlock.lua判断requestID一致性 if redis.call(') == currentRequestID requestID相同,重入次数自减 local currentCount = redis.call(if currentCount == 重入次数为0,删除锁 redis.call(del]) 0 else end ?解锁方法实现:Collections.singletonList(requestID); Object result = RedisClient.getInstance().eval(LuaScript.UN_LOCK_SCRIPT,1)"> 释放锁成功 RELEASE_LOCK_SUCCESS.equals(result); } 3.4?自动重试机制实现调用lockAndRetry方法进行加锁时,如果加锁失败,则当前客户端线程会短暂的休眠一段时间,并进行重试。在重试了一定的次数后,会终止重试加锁操作,从而加锁失败。 需要注意的是,加锁失败之后的线程休眠时长是"固定值 + 随机值",引入随机值的主要目的是防止高并发时大量的客户端在几乎同一时间被唤醒并进行加锁重试,给redis服务器带来周期性的、不必要的瞬时压力。 @Override String lockAndRetry(String lockKey) { String uuid = lockAndRetry(lockKey,1)"> String lockAndRetry(String lockKey,1)">public String lockAndRetry(String lockKey,1)"> retryCount) { String uuid = expireTime) { retryCount) { if(retryCount <= 0 retryCount小于等于0 无限循环,一直尝试加锁 while(true){ String result =if(result != ){ result; } 休眠一会 sleepSomeTime(); } } retryCount大于0 尝试指定次数后,退出 for(int i=0; i<retryCount; i++ sleepSomeTime(); } ; } } 4.使用注解切面简化redis分布式锁的使用通过在方法上引入RedisLock注解切面,让对应方法被redis分布式锁管理起来,可以简化redis分布式锁的使用。 切面注解?RedisLock?@Target(ElementType.METHOD) @Retention(RetentionPolicy.RUNTIME) @Documented public @ RedisLock { int UN_LIMIT_RETRY = RedisDistributeLock.UN_LIMIT_RETRY; String lockKey(); expireTime(); retryCount(); } RedisLock 切面实现@Component @Aspect RedisLockAspect { final Logger LOGGER = LoggerFactory.getLogger(RedisLockAspect.final ThreadLocal<String> REQUEST_ID_MAP = new ThreadLocal<>(); @Pointcut("@annotation(annotation.RedisLock)" annotationPointcut() { } @Around("annotationPointcut()"public Object around(ProceedingJoinPoint joinPoint) Throwable { MethodSignature methodSignature = (MethodSignature)joinPoint.getSignature(); Method method = methodSignature.getMethod(); RedisLock annotation = method.getAnnotation(RedisLock.); boolean lockSuccess = lock(annotation); (lockSuccess){ Object result = joinPoint.proceed(); unlock(annotation); result; } ; } * 加锁 * lock(RedisLock annotation){ DistributeLock distributeLock = RedisDistributeLock.getInstance(); int retryCount = annotation.retryCount(); String requestID = REQUEST_ID_MAP.get(); if(requestID != 当前线程 已经存在requestID distributeLock.lockAndRetry(annotation.lockKey(),annotation.expireTime(),retryCount); LOGGER.info("重入加锁成功 requestID=" + requestID); ; } 当前线程 不存在requestID String newRequestID = distributeLock.lockAndRetry(annotation.lockKey(),retryCount); if(newRequestID != 加锁成功,设置新的requestID REQUEST_ID_MAP.set(newRequestID); LOGGER.info("加锁成功 newRequestID=" + newRequestID); ; }{ LOGGER.info("加锁失败,超过重试次数,直接返回 retryCount={}"; } } } * 解锁 * unlock(RedisLock annotation){ DistributeLock distributeLock = RedisDistributeLock.getInstance(); String requestID = 解锁成功 boolean unLockSuccess = distributeLock.unLock(annotation.lockKey(),requestID); (unLockSuccess){ 移除 ThreadLocal中的数据 REQUEST_ID_MAP.remove(); LOGGER.info("解锁成功 requestID=" + requestID); } } } } 使用例子@Service("testService") class TestServiceImpl TestService { @Override @RedisLock(lockKey = "lockKey",expireTime = 100,retryCount = RedisLock.UN_LIMIT_RETRY) String method1() { return "method1"; } @Override @RedisLock(lockKey = "lockKey",retryCount = 3 String method2() { return "method2"; } } 5.总结5.1 当前版本缺陷主从同步可能导致锁的互斥性失效 在redis主从结构下,出于性能的考虑,redis采用的是主从异步复制的策略,这会导致短时间内主库和从库数据短暂的不一致。 试想,当某一客户端刚刚加锁完毕,redis主库还没有来得及和从库同步就挂了,之后从库中新选拔出的主库是没有对应锁记录的,这就可能导致多个客户端加锁成功,破坏了锁的互斥性。 休眠并反复尝试加锁效率较低 lockAndRetry方法在客户端线程加锁失败后,会休眠一段时间之后再进行重试。当锁的持有者持有锁的时间很长时,其它客户端会有大量无效的重试操作,造成系统资源的浪费。 进一步优化时,可以使用发布订阅的方式。这时加锁失败的客户端会监听锁被释放的信号,在锁真正被释放时才会进行新的加锁操作,从而避免不必要的轮询操作,以提高效率。 不是一个公平的锁 当前实现版本中,多个客户端同时对锁进行抢占时,是完全随机的,既不遵循先来后到的顺序,客户端之间也没有加锁的优先级区别。 后续优化时可以提供一个创建公平锁的接口,能指定加锁的优先级,内部使用一个优先级队列维护加锁客户端的顺序。公平锁虽然效率稍低,但在一些场景能更好的控制并发行为。 5.2 经验总结前段时间看了一篇关于redis分布式锁的技术文章,发现自己对于分布式锁的了解还很有限。纸上得来终觉浅,为了更好的掌握相关知识,决定尝试着自己实现一个demo级别的redis分布式锁,通过这次实践,更进一步的学习了lua语言和redis相关内容。 这篇博客的完整代码在我的github上:https://github.com/1399852153/RedisDistributedLock,存在许多不足之处,请多多指教。 (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |
- angular – 共享混合移动应用程序和桌面Web应用程序的代码
- twitter-bootstrap – Bootstrap 3.3 Internet Explorer 10
- angularjs – 量角器 – 在转发器中计数元素并打印
- vim – 更改折叠中的文本
- angularjs – $q.defer()真的有用吗?
- 如何在Vim中取消设置变量?
- 为什么Iterator有一个包含方法,但是在Scala 2.8中,Iterable
- scala – 对无形HList内容的类型推断
- Shell简介:1分钟理解什么是Shell 脚本语言 解释器 以及编译
- Angular 5 Material 5 MatTableDataSource …“’mat-card-