分布式锁-03-基于 mysql 实现分布式锁
MySQL--DB实现分布式锁思路
无论是单机锁还是分布式锁,原理都是基于共享的数据,判断当前操作的行为。
对于单机则是共享RAM内存,对于集群则可以借助Redis,ZK,DB等第三方组件来实现。
Redis,ZK对分布式锁提供了很好的支持,基本上开箱即用,然而这些组件本身要高可用,系统也需要强依赖这些组件,额外增加了不少成本。
DB对于系统来说本身就默认为高可用组件,针对一些低频的业务使用DB实现分布式锁也是一个不错的解决方案,比如控制多机器下定时任务的起调,针对审批回调处理等,本文将给出DB实现分布式锁的一些场景以及解决方案,希望对你启发。
表设计
首先要明确DB在系统中仍然需要认为是最脆弱的一环,因此在设计时需要考虑压力问题,即能应用实现的逻辑就不要放到DB上实现,也就是尽量少使用DB提供的锁能力,如果是高并发业务则要避免使用DB锁,换成Redis等缓存锁更加有效。
如清单1所示,该表中唯一的约束为lock_name,timestamp,version三者组合主键,下文会利用这三者实现悲观锁,乐观锁等业务场景。
- 清单1: 分布式锁表结构
CREATE TABLE `lock` (
`lock_name` varchar(32) NOT NULL DEFAULT '' COMMENT '锁名称',
`resource` bigint(20) NOT NULL COMMENT '业务主键',
`version` int(5) NOT NULL COMMENT '版本',
`gmt_create` datetime NOT NULL COMMENT '生成时间',
PRIMARY KEY (`lock_name`,`resource`,`version`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
悲观锁实现
对于悲观锁业务中常见的操作有以下两种:
针对A:
A场景当一台机器获取到锁后,其他机器处于排队状态,锁释放后其他机器才能够继续下去,这种应用层面解决是相当麻烦,因此一般使用DB提供的行锁能力,即select xxx from xxx for update。
A场景一般都和业务强关联,比如库存增减,使用业务对象作为行锁即可。
需要注意的是,该方案本质上锁压力还是在数据库上,当阻塞住的线程过多,且操作耗时,最后会出现大量锁超时现象。
针对B:
针对B场景(tryLock)举个具体业务,在集群下每台机器都有定时任务,但是业务上要求同一时刻只能有一台能正常调度。
解决思路是利用唯一主键约束,插入一条针对TaskA的记录,版本则默认为1,插入成功的算获取到锁,继续执行业务操作。这种方案当机器挂掉就会出现死锁,因此还需要有一个定时任务,定时清理已经过期的锁,清理维度可以根据lock_name设置不同时间清理策略。
定时任务清理策略会额外带来复杂度,假设机器A获取到了锁,但由于CPU资源紧张,导致处理变慢,此时锁被定时任务释放,因此机器B也会获取到锁,那么此时就出现同一时刻两台机器同时持有锁的现象,解决思路:把超时时间设置为远大于业务处理时间,或者增加版本机制改成乐观锁。
insert into lock set lock_name='TaskA' , resource='锁住的业务',version=1,gmt_create=now()
success: 获取到锁
failed:放弃操作
释放锁
悲观锁思想
除了可以通过增删操作数据库表中的记录以外,我们还可以借助数据库中自带的锁来实现分布式锁。
在查询语句后面增加FOR UPDATE,数据库会在查询过程中给数据库表增加悲观锁,也称排他锁。当某条记录被加上悲观锁之后,其它线程也就无法再该行上增加悲观锁。
在使用悲观锁的同时,我们需要注意一下锁的级别。
MySQL InnoDB引起在加锁的时候,只有明确地指定主键(或索引)的才会执行行锁 (只锁住被选取的数据),否则MySQL 将会执行表锁(将整个数据表单给锁住)。
在使用悲观锁时,我们必须关闭MySQL数据库的自动提交属性(参考下面的示例),因为MySQL默认使用autocommit模式,也就是说,当你执行一个更新操作后,MySQL会立刻将结果进行提交。
mysql> SET AUTOCOMMIT = 0;
Query OK, 0 rows affected (0.00 sec)
这样在使用FOR UPDATE获得锁之后可以执行相应的业务逻辑,执行完之后再使用COMMIT来释放锁。
我们不妨沿用前面的database_lock表来具体表述一下用法。假设有一线程A需要获得锁并执行相应的操作,那么它的具体步骤如下:
STEP1 - 获取锁:SELECT * FROM database_lock WHERE id = 1 FOR UPDATE;
STEP2 - 执行业务逻辑;
STEP3 - 释放锁:COMMIT。
如果另一个线程B在线程A释放锁之前执行STEP1,那么它会被阻塞,直至线程A释放锁之后才能继续。注意,如果线程A长时间未释放锁,那么线程B会报错,参考如下(lock wait time可以通过innodb_lock_wait_timeout来进行配置):
ERROR 1205 (HY000): Lock wait timeout exceeded; try restarting transaction
上面的示例中演示了指定主键并且能查询到数据的过程(触发行锁),如果查不到数据那么也就无从“锁”起了。
如果未指定主键(或者索引)且能查询到数据,那么就会触发表锁,比如STEP1改为执行以下两种情况,都会触发表锁:
SELECT * FROM database_lock WHERE description='lock' FOR UPDATE;
或者
SELECT * FROM database_lock WHERE id>0 FOR UPDATE;
在悲观锁中,每一次行数据的访问都是独占的,只有当正在访问该行数据的请求事务提交以后,其他请求才能依次访问该数据,否则将阻塞等待锁的获取。
悲观锁可以严格保证数据访问的安全。但是缺点也明显,即每次请求都会额外产生加锁的开销且未获取到锁的请求将会阻塞等待锁的获取,在高并发环境下,容易造成大量请求阻塞,影响系统可用性。
另外,悲观锁使用不当还可能产生死锁的情况。
乐观锁实现
针对乐观锁场景,举个具体业务,在后台系统中经常使用大json扩展字段存储业务属性,在涉及部分更新时,需要先查询出来,合并数据,写入到DB,这个过程中如果存在并发,则很容易造成数据丢失,因此需要使用锁来保证数据一致性,相应操作如下所示,针对乐观锁,不存在死锁,因此这里直接存放业务id字段,保证每一个业务id有一条对应的记录,并且不需要对应的定时器清除。
select * from lock where lock_name='业务名称', resource='业务id';
不存在: insert into lock set lock_name='业务名称', resource='业务id' , version=1;
获取版本: version
业务操作: 取数据,合并数据,写回数据
写回到DB: update lock set version=version+1 where lock_name='业务名称' and resource='业务id' and version= #{version};
写回成功: 操作成功
写回失败: 回滚事务,从头操作
乐观锁写入失败会回滚整个事务,因此如果写入冲突很频繁的场景不适合使用乐观锁,大量的事务回滚会给DB巨大压力,最终影响到具体业务系统。
乐观锁思想
顾名思义,系统认为数据的更新在大多数情况下是不会产生冲突的,只在数据库更新操作提交的时候才对数据作冲突检测。如果检测的结果出现了与预期数据不一致的情况,则返回失败信息。
乐观锁大多数是基于数据版本(version)的记录机制实现的。何谓数据版本号?即为数据增加一个版本标识,在基于数据库表的版本解决方案中,一般是通过为数据库表添加一个 “version”字段来实现读取出数据时,将此版本号一同读出,之后更新时,对此版本号加1。
在更新过程中,会对版本号进行比较,如果是一致的,没有发生改变,则会成功执行本次操作;如果版本号不一致,则会更新失败。
为了更好的理解数据库乐观锁在实际项目中的使用,这里就列举一个典型的电商库存的例子。
一个电商平台都会存在商品的库存,当用户进行购买的时候就会对库存进行操作(库存减1代表已经卖出了一件)。
我们将这个库存模型用下面的一张表optimistic_lock来表述,参考如下:
CREATE TABLE `optimistic_lock` (
`id` BIGINT NOT NULL AUTO_INCREMENT,
`resource` int NOT NULL COMMENT '锁定的资源',
`version` int NOT NULL COMMENT '版本信息',
`created_at` datetime COMMENT '创建时间',
`updated_at` datetime COMMENT '更新时间',
`deleted_at` datetime COMMENT '删除时间',
PRIMARY KEY (`id`),
UNIQUE KEY `uiq_idx_resource` (`resource`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='数据库分布式锁表';
在使用乐观锁之前要确保表中有相应的数据(如果没有此资源则可变相说明是无锁状态,可以进行插入操作,插入成功则代表获取锁成功),比如:
INSERT INTO optimistic_lock(resource, version, created_at, updated_at) VALUES(20, 1, CURTIME(), CURTIME());
如果只是一个线程进行操作,数据库本身就能保证操作的正确性。主要步骤如下:
STEP1 - 获取资源:SELECT resource FROM optimistic_lock WHERE id = 1;
STEP2 - 执行业务逻辑;
STEP3 - 更新资源:UPDATE optimistic_lock SET resource = resource -1 WHERE id = 1。
然而在并发的情况下就会产生一些意想不到的问题:比如两个线程同时购买一件商品,在数据库层面实际操作应该是库存(resource)减2,但是由于是高并发的情况,第一个线程执行之后(执行了STEP1、STEP2但是还没有完成STEP3),第二个线程在购买相同的商品(执行STEP1),此时查询出的库存并没有完成减1的动作,那么最终会导致2个线程购买的商品却出现库存只减1的情况。
在引入了version字段之后,那么具体的操作就会演变成下面的内容:
STEP1 - 获取资源: SELECT resource, version FROM optimistic_lock WHERE id = 1;
STEP2 - 执行业务逻辑;
STEP3 - 更新资源:UPDATE optimistic_lock SET resource = resource -1, version = version + 1 WHERE id = 1 AND version = oldVersion。
乐观锁的优点比较明显,由于在检测数据冲突时并不依赖数据库本身的锁机制,不会影响请求的性能,当产生并发且并发量较小的时候只有少部分请求会失败。
缺点是需要对表的设计增加额外的字段,增加了数据库的冗余,另外,当应用并发量高的时候,version值在频繁变化,则会导致大量请求失败,影响系统的可用性。
我们通过上述sql语句还可以看到,数据库锁都是作用于同一行数据记录上,这就导致一个明显的缺点,在一些特殊场景,如大促、秒杀等活动开展的时候,大量的请求同时请求同一条记录的行锁,会对数据库产生很大的写压力。
所以综合数据库乐观锁的优缺点,乐观锁比较适合并发量不高,并且写操作不频繁的场景。
总结
分布式锁的原理实际上很容易理解,难的是如何在具体业务场景上选择最合适的方案。
无论是哪一种锁方案都是与业务密切关联,总之没有完美的分布式锁方案,只有最适合当前业务的锁方案。
为什么我们需要分布式锁?
我们中的许多人可能遇到或听说过这些问题:
重试导致的数据损坏:从程序UI提交数据,该程序用户界面在你看不见的地方将数据插入后端MySQL数据库。偶尔用户界面没有响应,因此用户单击了几次提交按钮,这损坏了数据库中的数据。
缓存过期:使用Redis作为读透缓存,缓存中的密钥都同时过期。然后,所有流量都击中底层MySQL,该MySQL无法处理如此高的负载,并返回超时错误。
幽灵库存:假设我们是电子商务卖家,我们的库存中总共有4部iPhone。爱丽丝提交了购买3部iPhone的订单,鲍勃也订购了2部iPhone。理论上,只有第一个订单应该成功,第二个订单应该因为库存不足而失败。然而,实际上,两个用户都可能看到一个用户界面,显示大约同时还剩下4部iPhone并提交了订单,但实际上,第二个订单成功了,而第一个订单失败,或者更糟糕的是,两个订单都成功了,订购了5部
iPhone!
所有这些问题都有一个共同的特点:多个用户试图在同一时间读取/更新同一资源。如果我们所有的服务和数据库都在一台大型整体超级计算机上运行,我们可以使用通常与编程语言一起附带的锁或互斥来序列化任何并发读/写操作。例如,Java中的synchronized关键字和ReentrantLock,或在Go中sync包。
然而,分布式系统通常在数千台机器上运行数十个微服务,我们需要协调许多机器的读/写。这就是分布式锁的用途。

分布式锁的设计目标
锁定实体的存储
单台机器上的静音或锁定通常带有编程语言本身或广泛使用的库。在自然界中,它们在内存中作为整数实现。
例如,Go中的Mutex和RWMutex使用不同的整数来指示不同的锁状态。由于分布式系统不共享内存,我们需要将此表示锁定状态的整数存储在存储中间件中。
分布式锁常用的存储中间件是:
MySQL
Redis
ZooKeeper
ETCD
你可以看到分布式锁没有魔力,因为它们也使用相同的熟悉数据存储系统,如MySQL、Redis等。
现在,让我们深入了解设计细节,特别是如何在这些不同的存储系统之上构建分布式锁,以及权衡和区别是什么。
功能要求
- 相互排斥。
必要时,不同机器上许多进程/线程中只有一个可以访问特定资源,其他进程/线程应该等到锁被释放并可用。
- TTL或租赁机制。
提供分布式锁和其他请求分布式锁定的服务位于许多不同的机器上。这些服务通过网络进行通信。从CAP理论来看,我们知道网络总是不可靠,任何服务器都可能停机一段时间。因此,当我们设计分布式锁定服务时,我们需要考虑持有锁的客户端可能关闭并且无法释放锁的可能性,从而阻止了等待获得相同锁的所有其他客户端。因此,我们需要一个“上帝”机制,在这种情况下可以自动释放锁,以解除对其他客户端的封锁。
- 锁定服务的API:
锁定:获取锁定
解锁:松开锁
TryLock(可选)。例如,更高级的API:客户端可以指定获取锁的最大等待时间。如果无法在窗口内获得锁定,请返回时出现错误,而不是继续等待。
- 高性能。
低延迟:在正常情况下,锁定和解锁应该会很快。例如,假设实际的业务逻辑只需要1ms来处理,但只需在处理每个请求时简单地获取和释放锁,只需再获得100ms,那么最大QPS只能达到10,这对于当今标准中的许多服务来说都非常低。在这种情况下,服务器可以处理的最大QPS受到锁定性能的限制。
通知机制:分布式锁最好提供通知机制。如果服务器进程A由于被另一个服务器进程B持有而无法获得锁,那么A不应该继续等待并占用CPU。相反,A应该闲置,以避免浪费CPU周期。然后,当锁可用时,锁服务会通知A,A将加载到CPU并恢复运行。
避免雷鸣般的牛群。假设有100个进程想要获得相同的锁。当锁可用时,理想情况下,只应通知排队的“下一个”进程,而不是突然调用所有100个进程来竞争锁,结果发现100个进程中有1个可以获得锁并继续,而其他99个需要回到他们之前正在做的事情。
- 公平。
先到先得。无论谁等了最久,接下来都应该拿到锁。如果是这样,锁被视为公平的锁。
否则,这是一把不公平的锁。这两种类型的锁实际上都在现实中使用(嗯,生活不公平🤔)
- 重新进入锁。
想象一下,一个节点或服务器进程获得了锁,开始处理业务逻辑,然后遇到了一个代码片段,要求再次获得相同的锁!
在这种情况下,节点或进程不应该死锁,相反,它应该能够再次获得相同的锁,因为它已经持有锁。
使用MySQL的分布式锁
在实际生产环境中,MySQL通常在RC(已读提交)隔离级别配置。因此,我们随后的讨论将侧重于RC,而不是RR(可重复阅读)。
实施 1. 使用唯一的密钥约束
MySQL允许创建对键或索引具有唯一密钥约束的表。我们可以使用此内置的唯一性约束来实现分布式锁。
假设我们在MySQL中创建了一个名为lock的表,那么分布式锁的代码路径应该是:
客户端A正试图获得锁。目前没有其他客户端持有锁,因此客户端A成功获得了锁,并将一行插入MySQLlock表中。
现在客户端B想要获得相同的锁。它首先查询数据库,并发现客户端A插入的行已经存在。在这种情况下,客户端B无法获取锁,并将返回错误。
然后,客户端B将等待一段时间并重试。通常,我们将在这里使用带有TTL的重试循环。
客户端B将在指定的TTL窗口内继续重试几次,最终要么在客户端A释放锁后成功获取锁,要么因TTL而失败。
一旦客户端A完成任务,它只需删除DBlock表中的行即可释放锁。现在,其他客户可以获得锁。
以下是SQL片段示例,用于在lock_key列上创建一个名为lock的具有唯一密钥约束的表:
CREATE TABLE `lock` (
`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT,
`lock_key` varchar(256) NOT NULL,
`holder` varchar(256) NOT NULL,
`creation_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (`id`),
UNIQUE KEY `uniq_lock_key` (`lock_key`)
);
在上面的SQL中,
lock_key是锁的唯一名称。我们可以在这里使用任何字符串。
例如,我们可以使用串联字符串project_name + resource_id作为锁的名称。命名应该一致,并给出足够的粒度。
持有人:当前持有锁的客户端的ID。例如,我们可以使用串联字符串service_name + IP地址+ thread_id来识别分布式环境中的客户端。
Once the table with unique key constraint on lock_key is defined, acquire & release lock are merely two additional SQL snippets:
Aquire锁:
INSERT INTO `lock`(`lock_key`, `holder`) VALUES ('project1_uid1', 'server1_ip1_tid1');
释放锁:
DELETE FROM `lock` WHERE `lock_key` = 'project1_uid1';
这种简单的实现满足了基本的功能要求。
现在,让我们考虑几种故障模式,看看它是否对分布式系统中的常见故障具有鲁棒性。
如果客户端A获取锁,将一行插入数据库,但后来客户端A崩溃,或者网络分区和客户端A无法到达数据库,该怎么办?
在这种情况下,该行将保留在数据库中,并且不会被删除。
换句话说,对其他客户端来说,就好像客户端A仍然持有锁(即使A已经崩溃了!)。其他客户端将无法获得锁,并将返回时出错。
常用的方法是为每个锁分配一个TTL。这个想法很简单:如果客户端A崩溃并且无法释放锁,那么其他人应该做这项工作,删除DB中的行,从而释放锁。假设客户端A通常需要3分钟才能完成任务。我们可以将TTL设置为5分钟。
然后,我们需要构建另一个服务来不断扫描lock表,并删除超过5分钟前创建的任何行。然而,还有其他问题:
如果A没有崩溃,它只需要比平时更长的时间来完成任务呢?
如果我们为扫描lock桌本身而构建的新服务崩溃了怎么办?
第一个问题将很难用MySQL完全解决。
然而,实际上,对于大多数业务案例,我们总是可以设置足够大的TTL,因此这种情况很少发生,以至于对公司业务的任何影响几乎不明显。
或者我们可以使用ZooKeeper,它附带了另一组权衡,稍后将讨论。
现在,让我们解决实施2中的第二个问题。
实现2.使用时间戳+唯一密钥约束
我们可以在lock表中添加一列,以存储上次获取锁的时间戳。
通过这种方式,我们可以继续使用MySQL的内置功能,而无需构建其他服务。
CREATE TABLE `lock` (
`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT,
`lock_key` varchar(128) NOT NULL,
`holder` varchar(128) NOT NULL DEFAULT '',
`creation_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
`last_lock_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (`id`),
UNIQUE KEY `uniq_lock_key` (`lock_key`)
);
首先,我们需要确定一个合适的TTL ${timeout},如前所述。TTL应该足够大。
Aquire锁:
当客户端B尝试获取锁时,我们可以添加last_lock_time
requestIdTL = new ThreadLocal<>();
public List selectAll() {
return testMapper.selectAll();
}
/**
** 获取当前线程requestid
** @return
**/
public static String getRequestId() {
String requestId = requestIdTL.get();
if (requestId == null || "".equals(requestId)) {
requestId = UUID.randomUUID().toString();
requestIdTL.set(requestId);
}
System.out.println("requestId: "+requestId);
return requestId;
}
/**
* 初始化记录,如果有记录update,如果没有记录insert
*/
private TTagLock initTTagLock(String key){
// 查询记录是否存在
TTagLock tTagLock = testMapper.queryRecord(key);
if (null == tTagLock) {
// 记录不存在,创建
tTagLock = new TTagLock();
tTagLock.setFlockName(key);
tTagLock.setFcount(0);
tTagLock.setFdesc("");
tTagLock.setFdeadline(new Date(0));
tTagLock.setFstatus(1);
tTagLock.setFRequestId(getRequestId());
testMapper.insertRecord(tTagLock);
}
return tTagLock;
}
/**
* 获取中央锁Key
*/
private boolean getCenterLock(String key){
String prefix = "center_lock_";
CRC32 crc32 = new CRC32();
crc32.update(key.getBytes());
Long hash = crc32.getValue();
if (null == hash){
return false;
}
Integer len = hash.toString().length();
String slot = hash.toString().substring(len-2);
String centerLockKey = prefix + slot;
testMapper.queryRecord(centerLockKey);
return true;
}
/**
* 获取锁,代码片段
*/
@Transactional
public boolean getLock(String lockName,String desc,Long expireTime) throws InterruptedException {
// 检测参数
if(StringUtils.isEmpty(lockName)) {
System.out.println("参数为空");
return false;
}
// 获取中央锁,初始化记录
Long nowTime = new Date().getTime();
getCenterLock(lockName);
TTagLock tTagLock = initTTagLock(lockName);
// 未释放锁或未过期,获取失败
if (tTagLock.getFstatus() == 1
&& tTagLock.getFdeadline().getTime() > nowTime && !getRequestId().equals(tTagLock.getFRequestId())){
Thread.sleep(50);
return false;
}
if(getRequestId().equals(tTagLock.getFRequestId())){//重入锁
int num = testMapper.updateRecord(lockName, tTagLock.getFdeadline(), tTagLock.getFcount()+1,
tTagLock.getFdesc(), 1,getRequestId());
return true;
}
// 获取锁
Date deadline = new Date(nowTime + expireTime);
int num = testMapper.updateRecord(lockName, deadline, 1, desc, 1,getRequestId());
return true;
}
public void unLock(String lockName) {
//获取当前线程requestId
String requestId = getRequestId();
TTagLock tTagLock = testMapper.queryRecord(lockName);
//当前线程requestId和库中request_id一致 && lock_count>0,表示可以释放锁
if (Objects.nonNull(tTagLock) && requestId.equals(tTagLock.getFRequestId()) && tTagLock.getFcount() > 0) {
if (tTagLock.getFcount() == 1) {
//重置锁
resetLock(tTagLock);
} else {
testMapper.updateRecord(lockName,tTagLock.getFdeadline(),tTagLock.getFcount()-1,tTagLock.getFdesc(),1,getRequestId());
}
}
}
public int resetLock(TTagLock tTagLock) {
tTagLock.setFRequestId("");
tTagLock.setFcount(0);
tTagLock.setFdeadline(new Date());
//todo 修改update
return testMapper.updateRecord(tTagLock.getFlockName(),new Date(),0,"",0,"");
}
# Mysql分布式锁设计
最近开发电商库存相关项目,其中最为重要的一个功能之一是分布式锁的实现。本文就项目组中用到的基于MySQL实现的分布式锁,做一些思考和总结。
## 1、常见的分布式锁实现方案
基于数据库实现的分布式锁
基于Redis实现的分布式锁
基于Zookeeper实现的分布式锁
在讨论使用分布式锁的时候往往首先排除掉基于数据库的方案,本能的会觉得这个方案不够“高级”。
从性能的角度考虑,基于数据库的方案性能确实不够优异,但就目前笔者所在项目组来说,几乎所有项目的项目都是基于MySQL实现的分布式锁,所以采用哪种方案是要基于使用场景来看的,选择哪种方案,合适最重要,本文也仅就MySQL实现分布式锁展开讨论。
## 2、一把极为简单的MySQL分布式锁
最容易想到的基于MySQL的分布式锁就是通过数据库的唯一键约束,来达到抢占锁资源的目的,本文也从这把最为简单的分布式锁讲起。
在MySQL中创建一张表如下,为资源ID设置唯一键约束。
```sql
CREATE TABLE `distribute_lock` (
`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',
`resource_id` varchar(64) NOT NULL DEFAULT '' COMMENT '资源id',
`create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
PRIMARY KEY (`id`),
UNIQUE KEY `uniq_resourceId` (`resource_id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='分布式锁';
当需要获取锁时,往数据库中一条记录,因为有唯一键约束,插入成功则代表获取到了锁。
INSERT INTO distribute_lock(`resource_id`) VALUES ( 'resource_01');
释放锁时删除这条记录即可。
DELETE FROM distribute_lock WHERE resource_id = 'resource_01';
显然,这把锁因为太过简单,所以存在很多问题。
没有锁失效时间,一旦解锁操作失败,就会导致锁记录一直在数据库中,其他线程无法再获得到锁。
不可重入,同一个线程在没有释放锁之前无法再次获得该锁。因为数据中数据已经存在了。
非阻塞,一旦插入失败就会直接报错。没有获得锁的线程并不会进入排队队列,要想再次获得锁就要再次触发获得锁操作。
当然还有比如锁可靠性完全依赖于数据库,这是基于MySQL分布式锁的必然缺陷,这里不做讨论。下面将一步步优化这把最为简单的锁。
3、锻造一把好锁
3.1、锁超时失效
通常想到的方法是做一个定时清理过期资源的程序,每隔一定时间把数据库中的超时数据清理一遍。
这种做法最为简单直接,但也有一些相应的弊端,比如增加了程序复杂性(需要专门实现并配置定时任务),锁的超时时间也不方便灵活配置。本文试图说明一种笔者认为更为合理的方式。
前文所述的拿锁方式非常简单粗暴,数据插入不成功则拿锁失败,但此时数据库中的锁记录可能已经超时,所以需要在插数失败之后做进一步动作,以判断之前拿锁的线程是否已经超时。
只需取出当前锁记录,比较锁记录时间与当前时间差值是否已超出锁等待时间,如未超出,获取锁失败,如超出,修改锁记录时间为当前时间,拿锁成功。
代码如下:
boolean tryLock(String resourceId) throws SQLException {
//插入数据,插入成功则成功获取锁
if (ensureLockExist(null, resourceId)) {
return true;
}
//数据插入失败,判断锁是否超时,超时则修改锁记录时间
return tryUpdateLockTime(resourceId);
}
protected boolean ensureLockExist(Connection connection, String resourceId) throws SQLException {
try {
createLock(connection, resourceId); // 插入锁记录
} catch (MySQLIntegrityConstraintViolationException ex) {
return false;
}
return true;
}
private boolean tryUpdateLockTime(String resourceId) throws SQLException {
LockModel entity = getLock(null, resourceId);//查询已有记录
if (entity != null && isLeaseExpired(entity)) {//记录存在并且已过期
//基于乐观锁CAS修改锁记录时间,修改成功就拿到锁
return trySetLockTime(null, resourceId, entity.createTime) > 0;
}
return false;
}
这里需要强调,修改锁记录时间必须通过CAS操作,因为可能存在多个线程同时争抢一把已经过期的锁,如果不通过CAS操作,可能多个线程同时获取到锁。
上述只是代码片段,程序中可以提供方法给调用者灵活修改锁的超时等待时间,同时也不再需要专门配置定时清理过期记录的任务。
3.2、锁可重入
有了前面解决超时失效问题的思路,很容易想到的方案是在表中加个字段记录当前获得锁的机器和线程信息,当线程再次获取锁的时候先查询数据库,如果当前机器和线程信息在数据库可以查到的话,直接把锁分配给该线程即可。
这种方式多了一步查询操作,对锁性能有一定影响,是否可以把成功获取锁的线程和其获取到的锁放到一个容器里呢?
当某个线程需要拿锁时,先在容器中找下自己是否已经拿到过锁,拿到了那就不必和数据库打交道了。
那就这么干,代码如下:
class State {
private Thread exclusiveOwner;//拿到锁的线程
private volatile int state; //重入次数
}
//为保证线程安全,采用ConcurrentHashMap盛放成功获取锁的线程
ConcurrentHashMap states = new ConcurrentHashMap<>();
public boolean tryLock(String resourceId) {
Thread current = Thread.currentThread();
State state = states.get(resourceId);//从容器中拿出锁记录
if (Objects.nonNull(state)) {
//锁记录不为空,且为当前线程,则重入获取锁成功
if (state.exclusiveOwner == current) {
state.state++;//重入次数+1
return true;
}
return false;
} else {
try {
State newState = new State(current, 1);
if (states.putIfAbsent(resourceId, newState) == null) {//第一个往容器中放入锁记录成功
if (node.tryLock(resourceId)) {
return true;
} else {
states.remove(resourceId);
}
}
return false;
} catch (Exception ex) {
throw new DLockException(true, resourceId, ex);
}
}
}
3.3、阻塞式获取锁
阻塞?搞一个while循环,直到tryLock成功?其实这也不失为一种解决方式,emm……总觉得不够优雅。而且通过轮训的方式,会占用较多的CPU资源。
能否借助MySQL的悲观锁实现呢?借助 for update 关键字来给被查询的记录添加行锁中悲观锁,这样别的线程就没有办法对这条记录进行任何操作,从而达到保护共享资源的目的。
select * from distribute_lock where resource_id = ? for update
采用这种方式需要注意:
MySQL默认是会自动提交事务的,应该手动禁止一下: SET AUTOCOMMIT = 0;
行锁是建立在索引的基础上的,如果查询时候不是走的索引的话,行锁会升级为表锁进行全表扫描;
申请锁操作:先往数据库中插入一条锁记录,然后select * from distribute_lock where resource_id = ? for update; 只要可以查的出来就是申请成功的,没有获取到的会被阻塞,阻塞的超时时可
以- 通过设置 MySQL 的 innodb_lock_wait_timeout 来进行设置。释放锁操作:COMMIT; 事务提交之后别的线程就可以查询到这条记录。
boolean lock(String resourceId) throws SQLException {
ensureLockExist(null, resourceId);//插入一条锁记录
connection = dataSource.getConnection();
autoCommit = connection.getAutoCommit();
lockResource(connection, resourceId);
return true;
}
boolean unlock(String resourceId) throws SQLException {
if (connection != null) {
connection.commit();//提交事务达到释放锁的目的
connection.setAutoCommit(autoCommit);
connection.close();
connection = null;
return true;
}
return false;
}
protected void lockResource(Connection connection, String resourceId) throws SQLException {
PreparedStatementCreator creator = (conn) -> {
conn.setAutoCommit(false);//关闭事务自动提交
PreparedStatement ps = conn.prepareStatement("select * from distribute_lock where resource_id = ? for update");
ps.setString(1, resourceId);
return ps;
};
PreparedStatementCallback action = ps -> {
ResultSet rs = ps.executeQuery();
if (rs != null) {
rs.close();
}
return null;
};
execute(connection, creator, action);
}
虽然在此提出了这种阻塞式获取锁的方式,但笔者并不推荐,这种方式实际应用中造成过生产事故,因为MySQL数据库会做查询优化,即便使用了索引,优化时发现全表扫效率更高,则可能会将行锁升级为表锁,此时会造成其他资源锁也无法获取。
难道真的只能通过while循环的方式以达到阻塞式获取锁吗?笔者在库存系统开发过程中,接触到了一种更为巧妙的MySQL阻塞式获取锁的方式。
4、基于MySQL阻塞式分布式锁
通过在事务中插入或更新一条带唯一索引的记录,在事务未提交之前,其他线程事务会处于阻塞等待的状态,以达到阻塞式获取锁的目的。
@Transactional(rollbackFor = Exception.class)
public void executeWithLock(String resourceId) throws OspException {
//获取锁
tryLock(resourceId);
// do something
}
INSERT INTO distribute_lock
(resource_id)
VALUES
(#{resourceId})
ON DUPLICATE key UPDATE update_time = NOW()
其原理笔者查阅了下MySQL(RR事务级别)插入一条记录的执行流程如下:
首先对插入的间隙加插入意向锁(Insert Intension Locks)如果该间隙已被加上了间隙锁或 Next-Key 锁,则加锁失败进入等待;如果没有,则加锁成功,表示可以插入;
然后判断插入记录是否有唯一键,如果有,则进行唯一性约束检查如果不存在相同键值,则完成插入。如果存在相同键值,则判断该键值是否加锁,如果没有锁, 判断该记录是否被标记为删除,如果标记为删除,说明事务已经提交,还没来得及 purge,这时加 S 锁等待;如果没有标记删除,则报 duplicate key 错误;如果有锁,说明该记录正在处理(新增、删除或更新),且事务还未提交,加 S 锁等待;
这种方式是否会造成死锁呢?欢迎留言,避免笔者在项目中踩坑。
基于 MySQL 唯一索引的分布式锁(一):设计
本文提出的分布式锁方案源于我的实际生产经验。在生产环境验证了两年,自认为比网络上那些基于 MySQL 悲观锁、乐观锁实现的分布式锁好很多,他们的方案繁杂,性能又低。
本文提出的方案简单又好用,性能也高一点。
老实说,用 MySQL 实现分布式锁并不是最好的选择,用 Redis、ZooKeeper 实现可能会更好。
但我当初没得选,不得不用 MySQL。如果你确定要用 MySQL 实现分布式锁,可以参考一下本文。
本文只讲设计,但你看完就可以自己照着实现一个基于 MySQL 唯一索引的分布式锁了。
核心逻辑
如果为某个表创建了唯一索引(即 UNIQUE 约束),MySQL 可以保证表中每条记录的唯一索引列都是唯一的。
如果表中已经存在一条唯一索引列为特定值的记录,如果再次尝试插入一条索引值为特定值的新的记录会插入失败,并触发 duplicate entry 错误。如果多个并发事务尝试向表中插入索引字段值相同的记录,则只有一个事务会插入成功,其他事务都会失败并触发 duplicate entry 错误。
所以我们可以利用 MySQL 的唯一索引实现分布式锁。
将插入一条唯一索引列为特定值的记录当成加锁,如果插入成功,则视为成功获取到了锁,如果遇到 duplicate entry 错误,则说明加锁失败;将删除唯一索引列为特定值的记录视为释放操作,删除记录后,其他请求就可以插入记录(获得锁)了。
具体方案
虽然核心逻辑很简单,但接着往下看,你会发现一些续约考虑和衡量的问题。
设计表
现在考虑一下,我们需要哪些字段。基于 MySQL 唯一索引实现分布式锁,最重要的字段当然是唯一索引所在字段。我们可以用一个表示锁名称的字段 name 来充当唯一索引,每个使用分布式锁的业务可以根据用途来给锁命名,不同用途的分布式
锁一般不会有同名冲突,如果同名了,可以加上业务、团队、级别信息作为前缀。
按照惯例,还需要一个自增的主键 id 字段。后面我们会见到这个 id 字段大有用途。
再加一个 created_at 字段,表示记录的创建时间。后面会讲解它的用途。
CREATE TABLE lock (
id INT UNSIGNED AUTO_INCREMENT PRIMARY KEY COMMENT '主键',
name VARCHAR(128) NOT NULL UNIQUE COMMENT '锁名',
created_at INT UNSIGNED NOT NULL COMMENT '秒级时间戳,表示记录被插入的时间',
)ENGINE = InnoDB DEFAULT CHARSET = utf8mb4;
获取锁
加锁操作即插入一条记录。每个想要获取锁的客户端应该执行如下 SQL 语句:
SET SESSION innodb_lock_wait_timeout=1;
INSERT INTO lock (name, created_at) VALUES('hello-dis-lock', UNIX_TIMESTAMP())
SELECT LAST_INSERT_ID();
SET SESSION innodb_lock_wait_timeout=1 将获取锁的超时时间设为设为 1 秒,是为了给加锁操作设置一个兜底的超时时间,避免长时间等待。
通过执行 INSERT INTO lock (name, created_at) VALUES('hello-dis-lock', UNIX_TIMESTAMP())
插入记录后应该马上提交事务,如果某个事务执行了完该语句,但是不提交,其他事务执行该语句时不会报错,也不会成功,而是一直阻塞等待。为了避免阻塞其他尝试获得锁的客户端,执行完插入语句后应该马上提交。
现在很多 生产环境的 MySQL 服务都打开了自动提交( set autocommit=1),每条语句默认是一个事务,在执行完后会自动提交,不需要客户端主动 COMMIT。
如果你使用的 MySQL 服务没有打开自动提交,你可以创建数据库连接后执行 set autocommit=1,或者执行完插入语句前主动用 START TRANSACTION 开启事务,执行完插入语句后马上执行 COMMIT。
SELECT LAST_INSERT_ID() 是获取插入记录的主键 id,客户端需要记住这个返回的 id,后面释放锁时用得上。
如果上面的语句执行成功,也就是获取锁成功,客户端就可以执行自己需要分布式锁保护的业务逻辑了,执行完记得释放锁。
如果插入语句返回 duplicate entry错误,则说明其他客户端已经持有了该锁。获取锁失败,则需要关闭事务,随机休眠一段时间后再重试。如果你的业务特点决定了客户端持有锁的时间比较长(达到了秒级),则需要随机休眠的时间也是秒级,可以断开数据库连接后再休眠。
如果插入语句返回 Lock wait timeout exceeded 说明尝试加锁超时了,可能是当前竞争锁的客户端比较多,关闭事务,随机休眠一段时间后再重试。
如果多次遇到 duplicate entry 错误,则可能是持有锁的客户端出了故障,不能顺利释放锁。则需要进行帮其强行释放。
随机休眠一段时间,而不是休眠一段固定的时间,是为了避免所有客户同时竞争锁,加入随机因素将客户端的获取锁的时间打散。
超时检测和强制释放
客户端在持有锁后可能会因为故障而长时间甚至永远都不释放锁。
是程序就有故障的可能,客户端程序可能会意外退出、断网、进入死循环等,从而导致持有的锁无法释放,其他客户端就只能饿死。
我们可以给分布式锁加入超时检测和强制释放功能,避免因为某个客户端故障而导致分布式锁不可用。
我们可以给锁的持有时间设置一个过期时间,规定客户端最多只能持有锁多长时间。
在客户端获取锁时连续收到 duplicate entry 错误的次数达到次数上限后,可以认为可能是某个客户端持有锁的时间超过了规定的过期时间。
执行一条删除语句帮其强制释放锁。
DELETE * FROM lock WHERE name='hello-dis-lock' and UNIX_TIMESTAMP()-created_at > [超时限制];
如果该删除语句返回 Query OK, 1 row affected,说确实发现锁超时了,而且已经删除成功。
如果删除语句返回 Query OK, 0 row affected,说明删除失败,可能时锁没超时,也可能是锁刚被持有它的客户释放了,也可能是其他客户端将其强制释放了。
不管是哪种返回结果,客户端可以回到获取锁的阶段了。
随机休眠时长、次数上限、过期时间怎么确定?
建议根据客户端业务规律来设置这三个参数。
客户端一般是依赖分布式锁保护某些关键操作。如果在大多数情况下,关键操作都能能在 3~5 完成,则可以将随机休眠时长的下限设为 3 秒,上限设为 5 秒。
如果客户端最多也只需要 10 秒就能完成关键操作,则可以将过期时间设为 20 秒或 30 秒(即将过期时间设为预估的最长持有时间的两倍或三倍)。
执行强制删除前的重试次数上限设为 [超时限制 / 随机休眠时长的下限]。
本文假设竞争同一把锁的所有客户端都使用相同的过期时间。如果你需要每个客户端使用单独的过期时间,可以给表加一列 timeout,表示本条记录对应锁的过期时间。
对应的强制释放语句也要相应改为:
DELETE * FROM lock WHERE name='hello-dis-lock' and UNIX_TIMESTAMP()-created_at > timeout;
超时检测和强制释可以封装成一个单独的定时任务
如果不想客户端逻辑太复杂,可以单独创建一个任务,定时查询表中的所有记录,将超时的记录删除。
表中每把锁需要的过期时间可能不一样,所以需要加一列 timeout 表示每条锁记录的过期时间。
定时任务的运行间隔必须小于等于表中 timeout 列的最小值。
而且这个定时任务不能挂了,如果挂了,所有的锁都将失去超时检测和强制释放功能。
释放锁
释放锁即删除当前客户端之前插入的记录。
之前插入记录时,客户端特意用 SELECT LAST_INSERT_ID() 语句查询并保存了插入记录的主键 id,假设主键 id 值为 lock_id,则需要执行的删除语句如下:
DELETE FROM lock WHERE id=lock_id;
如果删除语句返回 Query OK, 1 row affected,说明删除成功。
如果删除语句返回 Query OK, 0 row affected,说明主键为 lock_id 的记录因为超时被强制删除了。
为什么要根据 id 而不是根据 name 释放锁?
因为存在极小的可能,在客户端主动删除自己的锁记录时,其他客户都已经将其强制删除了,并创建了新的锁记录(即获取了锁)。
如果客户端在释放锁时根据 name 删除,可能会错误的删除其他客户端创建的锁记录。
而 id 是自增的,每次创建的锁记录的 id 都不一样,所以根据 id 不会错误地释放非当前客户端持有的锁。
可选功能:租期续约
分布式锁中一个常见的功能就是租期续约,即在客户端持有的锁快到期时增加其过期时间。
只要客户端还活着就给其不停的续约。
判断客户端是否活着者有两种常见方案:
在有的实现方案中,客户本身就和锁服务端一直连着,可以根据连接是否断开来判断客户都是否或者。
客户端实现健康检查接口,服务定期查询客户端的监控状态。
这两种服务端给客户都主动续约的方案都有一个问题,即客户端可能陷入了某种故障,导致其连接还在、健康检查接口也能正常调用,但是释放锁的逻辑却无法再执行。
服务端却一直认为客户端还活着,给其无限续约,相当于没有过期时间。
本文秉着能在客户端搞定的事就在客户端搞定的原则,建议将租期续约逻辑放入到客户端代码中。客户端可以根据自己的逻辑判断关键操作是否会在超时前完成,如果不能则主动给自己持有的锁记录增大过期时间。即在过期时间前执行以下修改语句:
UPDATE lock SET created_at = UNIX_TIMESTAMP() WHERE id=[lock_id];
lock_id 是客户都之前获取锁时报错的锁记录 id。将锁记录的 created_at 简单的设为当前时间,相重新获取了一次锁。如果想做的更好一点,你可以根据具体想要续约的时长来设置 created_at 值。
更好的方案是给表增加一个单独 timeout 字段表示过期时间,则续约就是修改 timeout 字段。
UPDATE lock SET timeout = timeout+[续约时长] WHERE id=[lock_id];
这样就不用破坏 created_at 的语义。
客户端主动续约的好处是,客户都最知道自己的状态,如果释放锁的逻辑无法正常运行了,续约逻辑大概率也无法运行,不会造成无限续约的问题。
优缺点分析
- 优点:
依赖小,只需要有能正常访问的 MySQL 服务就行。
实现也简单,所有逻辑都在客户端代码中,可以用一个单独的库提供。
相比其他基于 MySQL 的方案,出现无限续约的概率更小。
- 缺点:
和所有基于 MySQL 的方案一样,存在单点故障,如果依赖的 MySQL 服务出现问题,则加锁、释放锁都会失败。
小结
分布式锁的思想是一样的。他山之石。
我们可以开源一个锁实现。知行合一。
参考资料
https://www.jianshu.com/p/f01f0fca6cd0
https://www.51cto.com/article/684974.html
https://juejin.cn/post/7217810344696725560
MySQL--DB实现分布式锁思路: https://cloud.tencent.com/developer/article/1580632