目录 在分布式系统中,同一时间只允许一个线程/进程对共享资源进行操作。例如:秒杀、积分扣减、抢红包、定时任务执行等等。 setnx和expire两个操作非原子性,expire操作之前程序崩溃,会发生死锁。 这种加锁方式解决的问题是程序崩溃/超时,未能释放导致死锁,使用该方案的前提是:各个服务器时间必须同步,在cache.getSet(lockKey, stringOfLockExpireTime)时会出现时间覆盖问题,只要各个服务器时间同步,时间覆盖也不影响加锁效果,不应该属于错误案例,因为出现时间覆盖了,严格来说就是错误的,主要看怎么定义了。我们原来一直用的是这种方式。 不分是不是自己持有的锁,上来就删除,导致锁误删除。 如代码注释,问题在于如果调用cache.del()方法的时候,这把锁已经不属于当前客户端的时候会解除他人加的锁。那么是否真的有这种场景?答案是肯定的,比如客户端A加锁,一段时间之后客户端A解锁,在执行cache.del()之前,锁突然过期了,此时客户端B尝试加锁成功,然后客户端A再执行del()方法,则将客户端B的锁给解除了,这也是锁误删除的例子。 加锁:使用set扩展命令,key:锁标识,value:持有当前锁线程标识,PX:超时时间(毫秒)。 解锁:只有当前锁的持有者才可以执行删除操作,通过lua脚本保证了get和del命令执行的原子性操作。 A成功获取锁后并设置超时时间5秒,但是A业务执行超过了5秒,A持有锁过期自动释放,B获取到锁,A和B并发执行。 A和B并发执行显然是不允许的,一般两种解决方式: 守护线程自动续期代码实现: 当不同的客户端连接不同的 master 节点时,两个客户端可以同时拥有同一把锁。如下: 参考RedLock算法学习目标
分布式锁概念
分布式锁4种雷区
分布式锁特性
基于redis的分布式锁
错误案例集
加锁-错误案例1
public void lock_error1(String lockKey, String requestId, int expireTime) {
RedisCache cache = redisFactory.getRedisCacheInstance(name);
Long result = cache.setnx(lockKey, requestId);
if (result == 1) {
// 若在这里程序突然崩溃,则无法设置过期时间,将发生死锁
cache.expireSeconds(lockKey, expireTime);
}
}
加锁-错误案例2(严格意义属于错误案例)
public String lock_error2(String lockKey, int expireTime) {
RedisCache cache = redisFactory.getRedisCacheInstance(name);
long lockExpireTime = System.currentTimeMillis() + expireTime * 1000 + 1;//锁超时时间
String stringOfLockExpireTime = String.valueOf(lockExpireTime);
if (cache.setnx(lockKey, stringOfLockExpireTime) == 1) { // 获取到锁
//成功获取到锁, 设置相关标识
return stringOfLockExpireTime;
}
String value = cache.get(lockKey);
if (value != null && isTimeExpired(value)) { // lock is expired
// 假设多个线程(非单jvm)同时走到这里
String oldValue = cache.getSet(lockKey, stringOfLockExpireTime); // getset is atomic
// 但是走到这里时每个线程拿到的oldValue肯定不可能一样(因为getset是原子性的)
// 假如拿到的oldValue依然是expired的,那么就说明拿到锁了
if (oldValue != null && isTimeExpired(oldValue)) {
//成功获取到锁, 设置相关标识
return stringOfLockExpireTime;
}
}
return null;
}
解锁-错误案例1
public void unLock_error1(String lockKey) {
RedisCache cache = redisFactory.getRedisCacheInstance(name);
cache.del(lockKey);
}
解锁-错误案例2
public void unLock_error2(String lockKey, String requestId) {
RedisCache cache = redisFactory.getRedisCacheInstance(name);
// 判断加锁与解锁是不是同一个客户端
if (requestId.equals(cache.get(lockKey))) {
// 若在此时,这把锁突然不是这个客户端的,则会误解锁
cache.del(lockKey);
}
}
正确实现
实现原理
# 加锁命令
set key value NX PX milliseconds
# 解锁命令
if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end
实现代码
// 加锁
public boolean lockByLua(String lockKey, String requestId, int expireTime) {
RedisCache cache = redisFactory.getRedisCacheInstance(name);
String result = cache.set(lockKey,requestId, "NX", "PX", expireTime);
if ("OK".equals(result)) {
return true;
}
return false;
}
// 解锁
public boolean unLockByLua(String lockKey, String requestId) {
Long success = 1L;
RedisCache cache = redisFactory.getRedisCacheInstance(name);
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
Object result = cache.eval(script, Collections.singletonList(lockKey), Collections.singletonList(requestId));
if (success.equals(result)) {
return true;
}
return false;
}
不足之处
锁超时并发执行 解决方案
问题现象
实现方案
@Slf4j
public class ExpireDelayThread implements Runnable {
/**
* 锁
*/
private String lockKey;
/**
* 持锁线程标识id
*/
private String requestId;
/**
* 过期时间(单位:毫秒)
*/
private Integer expireTime;
private RedisClient redisClient;
private volatile boolean isRun = true;
public ExpireDelayThread(String lockKey, String requestId, Integer expireTime, RedisClient redisClient){
this.lockKey = lockKey;
this.requestId = requestId;
this.expireTime = expireTime;
this.redisClient = redisClient;
}
public void stop(){
isRun = false;
}
@Override
public void run() {
int waitTime = Math.max(1, expireTime * 2 /3);
while (isRun) {
try {
Thread.sleep(waitTime);
if (redisClient.exprieDelayByLua(lockKey,requestId)){
log.info("lock key:{}, thread requestId:{}, waitTime:{}, exprie delay time:{}",lockKey,requestId,waitTime,expireTime);
} else {
log.info("lock key:{}, thread requestId:{}, waitTime:{}, exprie delay time failed!",lockKey,requestId,waitTime);
this.stop();
}
} catch (InterruptedException e) {
log.error("lock key:{}, thread requestId:{}, waitTime:{}, InterruptedException!",lockKey,requestId,waitTime);
} catch (Exception ex) {
log.error("lock key:"+ lockKey +", thread requestId:"+ expireTime +", waitTime:"+ waitTime +", error!",ex);
}
}
}
}
集群容错 解决方案
问题现象
实现方案