玖叶教程网

前端编程开发入门

一文搞懂分布式可重入锁(分布式重试)

前言

Java 中为线程资源访问的问题,提供非丰富的锁,来满足的不同的场景。本文主要讲解可重锁的实现,以及具体应用场景。先放一张图对 Java 中的锁有一个初步的了解,后面如果有机会我们分篇深入讲解各种锁的使用。

什么是可重入锁

可重入锁,又叫递归锁,从名字大概就可以理解其含义,意思是同一个线程,外层的方法获取到了锁,再进入到这个线程内层的方法会自动获取到锁,其中有一个前提锁对象是同一个对象或者 class。

Java 中 ReentrantLock 和 synchronized 都是可重入锁,可重入的主要好处是可以一定程度的避免死锁的问题。

先看一段代码实例:

public class LockTest {
   //  可重入
    static ReentrantLock lock = new ReentrantLock();
   // 不可重入
   // static Lock lock = new Lock();

    public void m1(){
        lock.lock();
       System.out.println("方法 1");
        try {
            m2(1);
        }finally {
            lock.unlock();
        }
    }
    public void m2(int i) {
        lock.lock();
        try {
           i = i +1;
           System.out.println("方法 2"+i);
        }finally {
            lock.unlock();
        }
    }
}

这段代码,同时给出了可重入和不可重入的两种实例。

我们具体分析几个场景:

  • 假如 m1 加锁,m2 不加锁:由于在调用 m1 的时候,由于 m2 也是 public,也可以单独被调用,就会出现“线程安全”的问题,假如我们并发调用 m1、m2 方法,是不是就会出现 i 值超出我们预期值的问题。线程安全问题由两个原因引发,一个是线程之间存在共享数据(临界资源),一个是线程之间会同时操作共享数据。
  • 假如使用的是可重入锁(ReentrantLock),我们在调用 m1 的时候,由于 m2 也是有锁的,就会避免死锁,因为同一个线程可以重复获取到自己加锁的锁。

小结:通过这一节想必大家对可重入锁和不可重入锁,以及线程安全这几个概念有了初步的了解。

深入解析 Synchronized 和 ReentrantLock

本文核心是讲解分布式可重入锁的使用,在此处对 Synchronized 和 ReentrantLock 不做过度展开,主要讲一下使用方法和基本实现原理。

Synchronized

Synchronized 是 Java 中经常用于解决方法问题的一个方法,也是比较简单的一个方法,它可以锁住任何非 null 的对象。

Synchronized 有哪些特性:

  • 原子性:确实只有一个线程可以访问同步代码,线程之间是互斥的。
  • 有序性:可解决重排序问题,就是一个 unlock 操作后,紧接着就会一个 lock 的操作。
  • 可见性:说到可见性,大家首先会想到谁?Volatitle!!对就是它,他也具有可见性,但它不具有原子性。可见性的含义的是什么?就是有一个线程修改了共享变量,其他线程都能看得到,拿到最新的值。官方定义如下:

软件工程中,是指对象间的可见性,含义是一个对象能够看到或者能够引用另一个对象的能力。

很难理解可见性的含义,如果深入到内存模型,如何理解可见性,就是一个变量,在 unlock 之前,必须写入到内存中,让其他线程也能拿到这个变量的最新值。

Synchronized 的用法:

  • 修饰实例方法,当前方法加锁;
  • 修饰静态方法,当前方法所在的类对象加锁;
  • 修饰代码块,可以指定加锁的对象,给对象加锁,作用于当前的代码块。

用法比较简单,就不提供具体的实例了,能够理解上面三点就行。

Synchronized 的实现原理

Synchronized 加在代码块、方法上实现的方式是不一样的,加在代码块上主要用 monitorenter 和 monitorexit 来实现,加载方法上用 ACC_SYNCHRONIZED 来实现。核心原理就是进入方法和代码块时,要进行加锁,在退出时或者有异常时释放锁。

我们以加在代码块上来分析一下 Synchronized 的具体实现:

public class SynchronizedTest {
    public void method() {
        synchronized (this) {
            System.out.println("进入方法");
        }
    }
}

反编译的结果:

结果解析

monitorenter

每一个对象都是一个监视器锁(monitor),线程执行 monitorenter 来获取监视器锁的所有权,大概分为如下三个步骤:

  1. 初始化时 monitor 的进入数时 0,如果有一个线程进入,则为 monitor 的所有者,把进入数设置 1。
  2. 后面的线程在进入时,由于 monitor 是被占用状态,该线程就处于阻塞状态,并把 monitor 的进入数加 1。
  3. 当 monitor 的进入数为 0 时,从新尝试获取所有权。

monitorexit

和 monitorenter 的作用相反,执行 monitorenter 的线程必须时 objectref 的的拥有者。执行 monitorexit 就会把 monitor 的进入数减 1,如果减 1 之后,进入数是 0,线程就失去 monitor 的所有权,其他被阻塞的线程就可以尝试获取 monitor 的所有权。

注意事项:monitorexit 执行了两次,第一次是正常执行完成释放锁,第二次为异常释放锁。

小结

从上面的讲述我们大概知道了 Synchronized 的实现原理,主要通过 monitor 的对象的进入数来控制获得锁和释放锁,扩展一下我们想想 wait/notify 等方法是不是要求必须在同步代码块中执行,否则会报 IllegalMonitorStateException,原因就是 wait/notify 等也是依赖 monitor 的状态控制的。

ReentrantLock

要想深入了解 ReentrantLock 绕不开 AbstractQueuedSynchronizer(AQS)和 Compare and Swap(CAS),它是在 JDK1.5 之后添加的类,实现 Lock 接口,功能和 Synchronized 差不多。

ReentrantLock 是一个可重入锁,支持公平锁和非公平锁。

本节会介绍 CAS、AQS、以及 ReentrantLock 的实现原理,在第一节已经以 ReentrantLock 为例介绍了可重入的具体含义。

CAS

Compare and Swap 的缩写,中文翻译过来是比较并交换。CAS 是 java.util.concurrent 的核心基础,可以说没有 CAS 就不会有 java.util.concurrent 并发包。CAS 有三个操作数:内存值 V、旧的期望值 A 以及要修改的值 B,有且仅当 A=V,并将内存值修改为 B 时才返回 true,否则什么都做做,返回 false。这个操作是具有原子性的,主要由 Unsafe 这个类 JNI 调用 CPU 底层指令实现的。

AQS

翻译过来名字叫:抽象的队列同步器,它定义了一套多线程访问共享数据的同步器框架,需要同步类都是基于它实现的,比如:ReentrantLock/Semaphore/CountDownLatch/FutureTask 等等

从上图可知,AQS 维护了一个 FIFO 队列和一个 volatile 的 state,资源的获取和释放主要使用如下方式;

  • tryAcquire(int):独占方式。尝试获取资源,成功 true,失败 false。
  • tryRelease(int):独占方式。尝试释放资源,成功 true,失败 false。
  • tryAcquireShared(int):共享方式。尝试获取资源。负数为失败;0 为成功,但是没有剩余资源;正数表示成功,而且有资源。
  • tryReleaseShared(int):共享方式。尝试释放资源,释放后允许唤醒等待的线程返回 true,否则返回 false。

其获取和释放锁的过程和 Synchronized 类似,以 ReentrantLock 为例:

  • state 在没有任何线程访问时是 0;
  • 当一个线程 tryAcquire 尝试获取独占锁的时候,state 会加 1;
  • 当占用该锁的线程,再次获取锁的时候,state 也会加 1,代表可以重入;
  • 其他线程如果 tryAcquire,发现 state != 0,只有等占用的锁的 state 重入几次释放几次才能获取到锁。

注意事项:state 必须采用 CAS 的方式进行添加和减少,才能保证整个操作的原子性。

公平锁和非公平锁

ReentrantLock 默认是非公平锁,也可以设置为公平锁

  • 公平锁:先等待的先拿到锁,后等待的后拿到锁,用内部类 FairSync 实现
  • 非公平锁:获取到锁就行,谁得到就是谁的,不管队列里的等待顺序,用内部类 NonfairSync 实现

查看源码里面的一个主要区别是否有 !hasQueuedPredecessors() 这个判断,就是用来判断是不是在队列头。

小结:本节主要介绍 ReentrantLock 的基本是基于 CAS 和 AQS 实现,以及公平锁和非公平锁的异同。

什么是分布式锁?为什么要可重入?

什么是分布式锁

在单体应用的时代,是不需要分布式锁的,分布式环境中,各台机器中间的共享资源问题,使用 java 自带的 Synchronized、ReentrantLock 变无法解决多台机器之间的资源访问问题,因为各个机器中间的内存是无法相互访问。为此分布式锁诞生了。

实现方式:

  • 基于数据库
  • 基于缓存 Redis、Memcached 等
  • 基于 ZooKeeper

本文主要讲解基于 Redis 的形式。

为什么要可重入?

假如不可重入,会大大增加死锁的概率,我们无法避免一个线程会同时操作多次共享数据,如果避免永远不死锁,分布式锁一般都会设置一个超时锁,保证不管发生什么情况,锁都能被释放。

Redis 实现分布式锁各种错误方式

由于摘要里把方式和利弊写重复了,所以这一节汇总在一起讲解各种实现形式以及利弊。

错误方式 1

直接用 setnx() 加锁和 expire() 释放锁:

  • setnx():作用是是否可以往 redis 中设置值,如果可以返回 1,不可以返回 0。
  • expire():给要加锁的 key,设置超时间,防止死锁。

代码实例如下:

public static void lock(Jedis jedis, String lockKey, String id, int expireTime) {
    Long result = jedis.setnx(lockKey, id);
    if (result == 1) {
        // 这里会有问题,如果程序突然在这个之前崩溃,或者有异常,还没设置超时时间,就会发生永久死锁
        jedis.expire(lockKey, expireTime);
    }
}

这个方式的核心问题是 setnx()和 expire()不具有原子性,一旦两步在中间过程出现问题,就会出现死锁的情况。

错误方式 2

直接上代码实例在进行分析:

public static boolean lock(Jedis jedis, String lockKey, int expireTime) {
    long expires = System.currentTimeMillis() + expireTime;
    String expiresStr = String.valueOf(expires);

    // 注意 value 是过期时间
    if (jedis.setnx(lockKey, expiresStr) == 1) {
        return true;
    }
    // 如果锁存在,获取锁的过期时间
    String currentExprieTime = jedis.get(lockKey);
    if (currentExprieTime != null && Long.parseLong(currentExprieTime) < System.currentTimeMillis()) {
        // 如果锁过期,获取上一个锁的过期时间,并重新设置现在锁的过期时间
        String oldExprieTime = jedis.getSet(lockKey, expiresStr);
        if (oldExprieTime != null && oldExprieTime.equals(currentExprieTime)) {
            // 只有一个线程的设置值和当前值相同,才有权利加锁
            return true;
        }
    }      
    return false;

}

这种方式会有多个问题的产生:

  1. 要求客户端时间的在分布式环境下要完全一致,这个就很难保证。
  2. 多个客户端同时执行 getSet(),虽然最终只有一个客户端可以加锁,但是这个客户端的过期时间可能被其他覆盖。
  3. 任何客户端都可以解锁。

小结:本节主要讲述了分布式锁常见的两种错误实现方式,下一节深入源码讲解分布式锁的正确使用方式。

深入源码解析 Redis 实现的可重入分布式锁

正确的使用方式

我们自己用 Redis 既然有各种问题,幸好 Java 的世界中,从来良好的封装,本节主要介绍怎么用 Redisson 实现可重入的分布式。

添加依赖:

<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson</artifactId>
    <version>3.10.1</version>
</dependency>

实例:

public static void main(String[] args) {
    Config config = new Config();
    config.useSingleServer().setAddress("redis://127.0.0.1:6379");
    config.useSingleServer().setPassword("123456");
    final RedissonClient client = Redisson.create(config);  
    RLock lock = client.getLock("lockKey");
    try{
        lock.lock();
    }finally{
        lock.unlock();
    }
}

加锁

阅读 Redisson 的源码,可以看到加锁是用 lockInterruptibly(long leaseTime, TimeUnit unit) 来实现的。

public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
    // 当前线程 id
    long threadId = Thread.currentThread().getId();
    // 尝试获取锁,下一节会讲解这一块的源码
    Long ttl = tryAcquire(leaseTime, unit, threadId);
    //  如果 ttl 为空,则证明获取锁成功
    if (ttl == null) {
        return;
    }
    // 如果获取锁失败,则订阅到对应这个锁
    RFuture<RedissonLockEntry> future = subscribe(threadId);
    commandExecutor.syncSubscription(future);
    try {
        while (true) {
            //  死循环不断尝试获取锁
            ttl = tryAcquire(leaseTime, unit, threadId);
            //  如果 ttl 为空,则证明获取锁成功
            if (ttl == null) {
                break;
            }
            //  ttl 大于 0 则等待 ttl 时间后继续尝试获取
            if (ttl >= 0) {
                getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
            } else {
                getEntry(threadId).getLatch().acquire();
            }
        }
    } finally {
        // 取消订阅
        unsubscribe(future, threadId);
    }
    //get(lockAsync(leaseTime, unit));
}

上面代码的大致过程是这样的:

先尝试获取锁,如果返回了 ttl 为 null,加锁成功,否则加锁失败。然后订阅这个锁的 Chanael,等待释放的消息发出后,再从新获取锁。

获取锁

获取锁是用 tryAcquire 实现的,具体源码如下:

private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {
    // 带有过期时间,获取锁
    if (leaseTime != -1) {
        return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
    }

    //执行获取锁的方法,默认过期时间 30 秒
    RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(
        commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(),
        TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);

    // 如果还持有这个锁,则开启监听任务任务不断刷新该锁的过期时间
    ttlRemainingFuture.addListener(new FutureListener<Long>() {
        @Override
        public void operationComplete(Future<Long> future) throws Exception {
            if (!future.isSuccess()) {
                return;
            }
            Long ttlRemaining = future.getNow();
            // 释放锁
            if (ttlRemaining == null) {
                scheduleExpirationRenewal(threadId);
            }
        }
    });
    return ttlRemainingFuture;
}

从上面的源码可以看出,获取是通过 tryLockInnerAsync 进行的, tryLockInnerAsync 获取的锁的逻辑,在 LUA 脚本代码中,使该部分操作绝对具有原子性。

<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit,     
                            long threadId, RedisStrictCommand<T> command) {
        //  获取过期时间
        internalLockLeaseTime = unit.toMillis(leaseTime);
        return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
                  // 锁不存在:通过 hset 设置它的值,并设置过期时间
                  "if (redis.call('exists', KEYS[1]) == 0) then " +
                      "redis.call('hset', KEYS[1], ARGV[2], 1); " +
                      "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                      "return nil; " +
                  "end; " +
                  // 锁已存在:且锁的是当前线程,hincrby 数值递增 1
                  "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                      "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                      "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                      "return nil; " +
                  "end; " +
                  // 锁已存在,且非本线程,则返回过期时间 ttl
                  "return redis.call('pttl', KEYS[1]);",
        Collections.<Object>singletonList(getName()), 
                internalLockLeaseTime, getLockName(threadId));
    }

此段核心的核心是上面的三个判断:

  • exists 判断锁是否存在,不存在设置值以及过期时间加锁成功。
  • hexists 判断锁是否存在,如果锁存在且锁的是当前线程,并发值加 1,解锁时要用,返回加锁成功,可重入
  • 如果锁存在,且锁的不是当前线程,说明其他线程获取了锁,返回过期时间,加锁失败。

释放锁

查看源码,释放锁主要使用 unlock 实现,具体源码如下:

public RFuture<Void> unlockAsync(final long threadId) {
    final RPromise<Void> result = new RedissonPromise<Void>();
    // 释放锁
    RFuture<Boolean> future = unlockInnerAsync(threadId);
    future.addListener(new FutureListener<Boolean>() {
        @Override
        public void operationComplete(Future<Boolean> future) throws Exception {
            if (!future.isSuccess()) {
                cancelExpirationRenewal(threadId);
                result.tryFailure(future.cause());
                return;
            }
            // 解锁线程是否是持有锁的线程,不是则返回 null
            Boolean opStatus = future.getNow();
            // 如果返回空,代表解锁线程和持有该锁的线程不是同一个,抛出异常
            if (opStatus == null) {
                IllegalMonitorStateException cause = 
                    new IllegalMonitorStateException("
                        attempt to unlock lock, not locked by current thread by node id: "
                        + id + " thread-id: " + threadId);
                result.tryFailure(cause);
                return;
            }
            // 释放成功,取消刷新过期时间的那个定时任务
            if (opStatus) {
                cancelExpirationRenewal(null);
            }
            result.trySuccess(null);
        }
    });

    return result;
}

同样释放锁也是一段 LUA 代码,释放锁的方法 unlockInnerAsync,源码如下:

protected RFuture<Boolean> unlockInnerAsync(long threadId) {
    return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, EVAL,
            // 锁已经不存在,发布锁已经释放的消息
            "if (redis.call('exists', KEYS[1]) == 0) then " +
                "redis.call('publish', KEYS[2], ARGV[1]); " +
                "return 1; " +
            "end;" +
            // 释放锁的线程和持有锁的线程不是同一个,返回 null
            "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                "return nil;" +
            "end; " +
            // hincrby 递减 1 的方式,释放可重入的那些锁
            // 剩余次数大于 0 ,则刷新过期时间
            "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
            "if (counter > 0) then " +
                "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                "return 0; " +
            //  不大于 0,锁已经释放,删除 key 并发布锁释放的消息
            "else " +
                "redis.call('del', KEYS[1]); " +
                "redis.call('publish', KEYS[2], ARGV[1]); " +
                "return 1; "+
            "end; " +
            "return nil;",
    Arrays.<Object>asList(getName(), getChannelName()), 
        LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId));

}

从上面的源码,可以看出释放锁,同样有三个判断:

  • 如果锁不存在,发布释放锁的消息,解锁成功;
  • 如果解锁的线程和持有的不是同一个,解锁失败,并抛出异常;
  • hincrby 的值减 1,代表释放一次锁,如果剩余的次数大于 0,说明是可重入锁,刷新过期时间。如果小于 0,说明锁完全释放,发布释放锁的消息。

小结:本节主要通过 Redisson 的加锁、获取锁、释放锁的源码,分析它如何实现出一种可以规避各种问题的分布式锁。从此处也可以得到,很多原理是可以借鉴的,如果有现成的比较的好的轮子,尽量避免自己再重复创造,因为你创造的可能是错的。

可重入分布式锁我使用的场景

分布式的经典应用场景是解决超卖的问题,比如我们的直播平台,某主播很豪气,一分钱秒杀 10 台 iphone12+给观众,当前有 1 万个人都虎视眈眈地在看着,10、9、8……3、2、1 我们库存可只设置了 10 个,假如同时有 5000 并发,我们没有使用分布式锁,大家抢的时候,发现都是有库存的,结果在扣库存的时候,完蛋了,10 台超卖了 1000 台。这还干直播,直接跑路了。

分布式锁怎么解决超卖的问题呢?我们把这个产品的 id 设置 lockKey,每次都没次减库存的时候,只要一个线程在执行就会很好避免该问题。代码如下:

    RLock lock = redisson.getLock("iphone12_stock");
    lock.lock;
    int oldstockCount = stockDao.getByGoodsId("iphone12GoodsId");
    if(oldStockCount > currentCount){
        // 减存储
    }
    lock.unclock();

总结

本文本人于周末花了八九个小时,写了一万多字,有错误之处希望多多指正,希望您有所收获。

发表评论:

控制面板
您好,欢迎到访网站!
  查看权限
网站分类
最新留言