为什么要使用redis来实现一个分布式锁?
一个比较主要的原因是和分布式系统的范围有关。多线程中也有锁,可以使多个线程进行排他性访问,避免资源竞争,最终达到数据一致,但是这个锁仅在线程间可见。而在分布式系统架构中,各个节点通过网络连接,要确保各节点间的排他性访问不论是操作系统级别的锁还是编程语言级别的锁都将失效。
我们需要的是一个全局可见的锁,即分布式系统中各个节点都可以访问到这个锁。 由于redis经常充当分布式系统中缓存的角色,因此在分布式系统中它必然是全局可见的。另外由于它是一个key-value型的内存数据库除了速度快外,也比较易于实现。当然,易于实现只是想对来说的,一个可靠的分布式锁需要考虑到分布式系统可能存在的各种情形,以下是使用redis实现的分布式锁需要达到的几个目标:
- 安全属性: 不管任何时候,只有一个客户端能持有锁,一个客户端获取到的锁不能被另一个客户端释放
- 效率属性A: 不会死锁,就算一个持有锁的客户端宕掉或者发生网络分区
- 效率属性B: 容错,只要大部分节点工作正常,客户端就能获取和释放锁
安全属性这个不用说,本来锁的作用就是为了让各个节点进行排他性访问的,这个在锁的设计上是应该考虑的。对于效率属性A,在分布式系统中的场景是这样的,当一个获取到锁的节点宕机之后,锁没有得到释放,造成其他节点都在等待锁。我们知道造成死锁有4个必要条件(不清楚的童鞋可自查),只要破坏其中任意一点,就能打破死锁。常规的做法是给每个锁设定一个超时时间,超时时间一到锁就自动释放,这样也就打破了请求和保持条件,也就不会造成死锁了。当然,给锁设置超时时间会产生另外的问题,如下:
- 持有锁的节点因为操作时间过长而导致锁被自动释放,但该节点本身不知道这点,甚至有可能在执行完释放锁的时候把其他节点获取到的锁释放了
- 由于网络问题,节点A获取锁的时间比较长,当返回获取成功的同时,锁被自动释放了,而节点B在此时也在获取同一个锁,由于锁已经被自动释放了,所以节点B也获取成功了。最终节点A和节点B都认为自己获取到锁了。
针对第一个问题,我们会为每个获取锁的节点生成一个随机数,这样即使是同一个锁,不同的节点获取到之后也有不同的随机数,以此来保证一个节点不会释放另一个节点获取到的锁。另外,为了避免操作时间过长导致锁被自动释放,我们可以提供一个方法来刷新锁的持有时间。
针对第二个问题,我们可以将超时时间设置的比获取时间长很多,并且在获取到锁时判断我们拿到锁的时间是否已经大于了超时时间,一旦大于超时时间则认为获取锁失败。
以上两个问题的解决方法会在之后介绍redis分布式锁算法(Redlock算法)时再提及。最后,效率属性B也会放到Redlock小节去介绍。实际上,对于redis是单实例的分布式系统来说,不用考虑效率属性B。但是,redis实际上也是分布式的,这也是业界普遍使用的,单实例的情况还是比较少见的。下面我们针对redis是单实例和分布式部署的情况来介绍分布式锁的实现。
单实例redis实现分布式锁
def acquire_locak_with_timeout(conn,
lockname,
acquire_timeout=10,
lock_timeout=10):
# 生成随机标识
identifier = str(uuid.uuid4())
lockname = 'local' + lockname
lock_timeout = int(math.ceil(lock_timeout))
end = time.time() + acquire_timeout
while time.time() < end:
# 成功获取了锁之后才设置锁的超时时间
if conn.setnx(lockname, lock_timeout)
conn.expire(lockname, lock_timeout)
# 返回随机生成的值,在释放锁时使用
return identifier
elif not conn.ttl(lockname):
conn.expire(lockname, lock_timeout)
time.sleep(.001)
return False
def release_lock(conn, lockname, identifier):
pipe = conn.pipeline(True)
lockname = 'lock:' + lockname
while True:
try:
pipe.watch(lockname)
# 检查进程是否仍然持有锁
if pipe.get(lockname) == identifier:
pipe.multi()
pipe.delete(lockname)
pipe.execute()
return True
pipe.unwatch()
break
except redis.exceptions.WatchError:
pass
return False
以上代码是《redis实战》中的实现。在实现中,代码为每一个获取锁的客户端都生成了一个128位的标识identifier,即使是同一个锁,不同的客户端也会生成不同的identifier。而在释放锁时,会首先检查锁的identifier是否与客户端持有的identifier一致。如果一致,表示客户端依然持有锁,可以释放;如果不一致,表示客户端持有的锁已经被释放了。通过identifier可以确保客户端A不会释放其他客户端获取的锁。为了避免客户端在获取到锁还没来得及设置锁的过期时间就出现宕机或者断电的情况,在acquire_locak_with_timeout中会对没有设置过期时间的锁加上过期时间,确保所有的锁都会被自动释放。上述的分布式锁适合在只有一个redis实例的情况下使用,对于多redis实例的集群架构,需要有另外的分布式锁实现。
多实例redis实现分布式锁(Redlock)
根据redis官方文档描述的分布式锁管理器实现算法Redlock,我们来描述一下获取锁的过程,假设在我们的redis集群中存在N个节点:
- 获取当前时间(单位是毫秒)。
- 轮流用相同的key和随机值在N个节点上请求锁,在这一步里,客户端在每个master上请求锁时,会有一个和总的锁释放时间相比小的多的超时时间。比如如果锁自动释放时间是10秒钟,那每个节点锁请求的超时时间可能是5-50毫秒的范围,这个可以防止一个客户端在某个宕掉的master节点上阻塞过长时间,如果一个master节点不可用了,我们应该尽快尝试下一个master节点。
- 客户端计算第二步中获取锁所花的时间,只有当客户端在大多数master节点(N/2+1个节点)上成功获取了锁,而且总共消耗的时间不超过锁释放时间,这个锁就认为是获取成功了。
- 如果锁获取成功了,那现在锁自动释放时间就是最初的锁释放时间减去之前获取锁所消耗的时间。
- 如果锁获取失败了,不管是因为获取成功的锁不超过一半(N/2+1)还是因为总消耗时间超过了锁释放时间,客户端都会到每个master节点上释放锁,即便是那些它认为没有获取成功的锁。
简单的来说,在多实例redis集群架构中实现分布式锁需要在集群中的每个master节点都去获取锁,只要在N/2+1个节点上成功获取,并且花费的时间符合要求即可。为什么不是获取N个锁呢?因为同一个锁,只要客户端A获取了集群中超过半数的锁,在A还没释放和锁没超时的情况下,其他客户端不可能在获取超过半数的锁。 也就是说,RedLock算法只要在集群中超过半数的节点正常工作的情况下,都能保证客户端获取到锁。正如我们之前说过的,获取锁的时间如果超过了锁超时的时间,这个锁其实是无效的。在多实例redis架构中也是如此,如果我们设置锁的过期时间是1s,但是我们获取(N/2+1)个锁的时间超过了1s,那么我们获取到的这(N/2+1)个锁应该认为是无效的。根据官方文档描述,我们假设锁的超时时间是TTL,获取第一个锁的时间是T1,获取最后一个锁的时间是T2,那么当我们获取到分布式锁时(也就是我们获取到集群中(N/2+1)个锁时),该锁的存活时间可以认为是MIN_VALIDITY=TTL-(T2-T1)-CLOCK_DRIFT。CLOCK_DRIFT是不同进程间的时钟差异,比如redis超时的时间精度等等。
最后我们来看看Redlock的python版实现源码,相关注释也写在代码里了,如下
import logging
import string
import random
import time
from collections import namedtuple
import redis
from redis.exceptions import RedisError
# Python 3 compatibility
string_type = getattr(__builtins__, 'basestring', str)
try:
basestring
except NameError:
basestring = str
Lock = namedtuple("Lock", ("validity", "resource", "key"))
class CannotObtainLock(Exception):
pass
class MultipleRedlockException(Exception):
def __init__(self, errors, *args, **kwargs):
super(MultipleRedlockException, self).__init__(*args, **kwargs)
self.errors = errors
def __str__(self):
return ' :: '.join([str(e) for e in self.errors])
def __repr__(self):
return self.__str__()
class Redlock(object):
default_retry_count = 3
default_retry_delay = 0.2
clock_drift_factor = 0.01
# 用lua实现释放单实例redis锁
unlock_script = """
if redis.call("get",KEYS[1]) == ARGV[1] then
return redis.call("del",KEYS[1])
else
return 0
end"""
def __init__(self, connection_list, retry_count=None, retry_delay=None):
self.servers = []
for connection_info in connection_list:
try:
if isinstance(connection_info, string_type):
server = redis.StrictRedis.from_url(connection_info)
elif type(connection_info) == dict:
server = redis.StrictRedis(**connection_info)
else:
server = connection_info
self.servers.append(server)
except Exception as e:
raise Warning(str(e))
self.quorum = (len(connection_list) // 2) + 1
# 如果集群中没有超过半数的有效节点,则认为异常
if len(self.servers) < self.quorum:
raise CannotObtainLock(
"Failed to connect to the majority of redis servers")
self.retry_count = retry_count or self.default_retry_count
self.retry_delay = retry_delay or self.default_retry_delay
def lock_instance(self, server, resource, val, ttl):
'''
获取单实例redis锁,为锁设置一个随机值val。每次获取这个锁都会重新生成val。
'''
try:
assert isinstance(ttl, int), 'ttl {} is not an integer'.format(ttl)
except AssertionError as e:
raise ValueError(str(e))
return server.set(resource, val, nx=True, px=ttl)
def unlock_instance(self, server, resource, val):
'''
释放单实例redis锁,释放时会校验val值,避免释放成其他客户端获取的锁
'''
try:
server.eval(self.unlock_script, 1, resource, val)
except Exception as e:
logging.exception("Error unlocking resource %s in server %s", resource, str(server))
def get_unique_id(self):
'''
随机生成长度为22的值。获取和释放锁都用这个值,确保不会重复获取和释放
'''
CHARACTERS = string.ascii_letters + string.digits
return ''.join(random.choice(CHARACTERS) for _ in range(22)).encode()
def lock(self, resource, ttl):
retry = 0
val = self.get_unique_id()
# Add 2 milliseconds to the drift to account for Redis expires
# precision, which is 1 millisecond, plus 1 millisecond min
# drift for small TTLs.
drift = int(ttl * self.clock_drift_factor) + 2
redis_errors = list()
while retry < self.retry_count:
n = 0
start_time = int(time.time() * 1000)
del redis_errors[:]
# 去获取每个节点的redis锁,并对获取成功的锁计数加1
for server in self.servers:
try:
if self.lock_instance(server, resource, val, ttl):
n += 1
except RedisError as e:
redis_errors.append(e)
elapsed_time = int(time.time() * 1000) - start_time
validity = int(ttl - elapsed_time - drift)
# 成功获取到锁需要满足两个条件:
# 1.锁的过期时间大于所有锁的获取时间和补偿时间之和
# 2.获取到超过半数节点的redis锁
if validity > 0 and n >= self.quorum:
if redis_errors:
raise MultipleRedlockException(redis_errors)
return Lock(validity, resource, val)
else:
for server in self.servers:
try:
self.unlock_instance(server, resource, val)
except:
pass
retry += 1
time.sleep(self.retry_delay)
return False
def unlock(self, lock):
redis_errors = []
# 释放每个节点的锁
for server in self.servers:
try:
self.unlock_instance(server, lock.resource, lock.key)
except RedisError as e:
redis_errors.append(e)
if redis_errors:
raise MultipleRedlockException(redis_errors)
作者:阿潘特秃毛
链接:https://juejin.cn/post/7044026675478151205
来源:稀土掘金