Redis分布式锁实现

分布式锁的各种问题及优化

并发情况下以下代码可能导致超买

1
2
3
4
5
6
7
8
int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); // jedis.get("stock")
if (stock > 0) {
int realStock = stock - 1;
stringRedisTemplate.opsForValue().set("stock", realStock + ""); // jedis.set(key,value)
System.out.println("扣减成功,剩余库存:" + realStock);
} else {
System.out.println("扣减失败,库存不足");
}

为了解决该问题可以通过redis加上分布式锁,该方式是解决了并发问题,但是引入了新的问题,若业务代码异常可能导致锁永远得不到释放

1
2
3
4
5
6
7
8
9
10
11
12
13
14
String lockKey = "product_101";
Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "product_id");
if (!result) {
return "error_code";
}
int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
if (stock > 0) {
int realStock = stock - 1;
stringRedisTemplate.opsForValue().set("stock", realStock + "");
System.out.println("扣减成功,剩余库存:" + realStock);
} else {
System.out.println("扣减失败,库存不足");
}
stringRedisTemplate.delete(lockKey);

可以通过finally中来释放锁来解决业务代码异常的情况,但若当锁获取成功后机器宕机了,同样锁还是不能得到释放。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
String lockKey = "product_101";
try {
Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "product_id");
if (!result) {
return "error_code";
}
int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
if (stock > 0) {
int realStock = stock - 1;
stringRedisTemplate.opsForValue().set("stock", realStock + "");
System.out.println("扣减成功,剩余库存:" + realStock);
} else {
System.out.println("扣减失败,库存不足");
}
} finally {
stringRedisTemplate.delete(lockKey);
}

可以通过给锁加上一个过期时间的方式来解决获取锁成功后机器宕机,导致锁不能被释放的情况,但是这种写法还是没有完全解决,因为加锁和设置缓存时间不是原子操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
String lockKey = "product_101";
try {
Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "product_id");
stringRedisTemplate.expire(lockKey, 10, TimeUnit.SECONDS);
if (!result) {
return "error_code";
}
int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
if (stock > 0) {
int realStock = stock - 1;
stringRedisTemplate.opsForValue().set("stock", realStock + "");
System.out.println("扣减成功,剩余库存:" + realStock);
} else {
System.out.println("扣减失败,库存不足");
}
} finally {
stringRedisTemplate.delete(lockKey);
}

可通过在加锁的同时设置超时原子操作来解决该问题,但设置了超时时间若当前业务代码没有被执行完其本身没有释放锁,但由于过期锁被清理掉了,新的线程加锁进来后,之前执行业务代码的线程又去把新的线程的锁释放了,将导致锁完全失效。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
String lockKey = "product_101";
try {
Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "product_id", 30, TimeUnit.SECONDS);
if (!result) {
return "error_code";
}
int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
if (stock > 0) {
int realStock = stock - 1;
stringRedisTemplate.opsForValue().set("stock", realStock + "");
System.out.println("扣减成功,剩余库存:" + realStock);
} else {
System.out.println("扣减失败,库存不足");
}
} finally {
stringRedisTemplate.delete(lockKey);
}

可通过给锁设置唯一标识的方式来解决其他线程释放非自身设置的锁,所有线程只能释放本线程设置的锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
String lockKey = "product_101";
String clientId = UUID.randomUUID().toString();
try {
Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "product_id", 30, TimeUnit.SECONDS);
if (!result) {
return "error_code";
}
int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
if (stock > 0) {
int realStock = stock - 1;
stringRedisTemplate.opsForValue().set("stock", realStock + "");
System.out.println("扣减成功,剩余库存:" + realStock);
} else {
System.out.println("扣减失败,库存不足");
}
} finally {
if (clientId.equals(stringRedisTemplate.opsForValue().get(lockKey))) {
stringRedisTemplate.delete(lockKey);
}
}

索然上面的锁已经很完善了,但还是有锁因为超时时间导致的极小概率的并发问题,该问题可以通过给锁续命即判断业务代码是否执行完成,若未完成则重置超时时间的方式来解决该问题。Redisson就是这样做的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
String lockKey = "product_101";
String clientId = UUID.randomUUID().toString();
RLock redissonLock = redisson.getLock(lockKey);
try {
//加锁
redissonLock.lock(); //setIfAbsent(lockKey, clientId, 30, TimeUnit.SECONDS);
int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); // jedis.get("stock")
if (stock > 0) {
int realStock = stock - 1;
stringRedisTemplate.opsForValue().set("stock", realStock + ""); // jedis.set(key,value)
System.out.println("扣减成功,剩余库存:" + realStock);
} else {
System.out.println("扣减失败,库存不足");
}
} finally {
redissonLock.unlock();
}

红锁

RedLock是一种利用多Master对共享资源做互斥访问,基于N个完全独立的Redis节点,运行Redlock算法通过在客户端依次执行下面的步骤来完成获取锁的操作:

  • 获取当前时间,毫秒数
  • 顺序依次向N个Redis节点执行获取锁操作,该获取操作跟前面基于单Redis节点获取锁过程相同,为了保证在某个Redis节点不可用的时候算法能够继续运行,该获取锁操作还有一个超时时间,几十毫秒量级,它要远小于锁的有效时间。客户端在向某个Redis节点获取锁失败后,应该立即尝试下一个Redis节点,这里的失败应该包含任何类型的失败,如该Redis节点不可用、该Redis节点上的锁已经被其它客户端持有
  • 计算整个获取锁的过程总共消耗了多长时间,计算方法是用当前时间减去第1步记录的时间。若客户端从大多数Redis节点即>= N/2+1成功获取到了锁,且获取锁总耗时没有超过锁的有效时间,则此时客户端才认为最终获取锁成功;否则认为最终获取锁失败
  • 最终获取锁成功,则该锁的有效时间应该重新计算,它等于最初的锁的有效时间减去第3步计算出来的获取锁消耗的时间
  • 若最终获取锁失败了,可能由于获取到锁的Redis节点个数少于N/2+1,或整个获取锁的过程耗时超过了锁的最初有效时间,则客户端应该立即所有Redis节点发起释放锁操作

释放锁的过程比较简单:客户端向所有Redis节点发起释放锁的操作,不管这些节点当时在获取锁时成功与否。在最后释放锁时,客户端应该向所有Redis节点发起释放锁的操作,即使当时向某个节点获取锁没有成功,在释放锁时也不应该漏掉该节点。因为若客户端发给某个Redis节点获取锁的请求成功到达了该Redis节点,该节点也成功执行了SET操作,但返回给客户端的响应包却丢失。在客户端看来,获取锁的请求由于超时而失败了,但在Redis这边看来,加锁已经成功了。因此释放锁时,客户端也应该对当时获取锁失败的那些Redis节点同样发起请求。

但由于N个Redis节点中的大多数能正常工作就能保证Redlock正常工作,因此理论上它的可用性更高。单Redis节点的分布式锁在failover的时锁失效的问题,在Redlock中不存在了,但若有节点发生崩溃重启,还是会对锁的安全性有影响,具体的影响程度跟Redis对数据的持久化程度有关

假设一共有5个Redis节点A、B、C、D、E,若客户端1成功锁住了A、B、C,获取锁成功, 但D和E没有锁住,节点C崩溃重启了,但客户端1在C上加的锁没有持久化下来,丢失了,节点C重启后客户端2锁住了C、D、E, 获取锁成功,针对同一资源客户端1和客户端2同时获得了锁

Redis的AOF持久化方式默认每秒写一次磁盘,最坏情况下可能丢失1秒的数据,为了尽可能不丢数据,Redis允许设置成每次修改数据都进行fsync,但这会降低性能。当然,即使执行了fsync也仍然有可能丢失数据,这取决于系统而不是Redis的实现。故上面分析的由于节点重启引发的锁失效问题,总是有可能出现的。为了应对这一问题,可通过延迟重启,即一个节点崩溃后,先不立即重启它,而是等待一段时间再重启,该时间应该大于锁的有效时间,该节点在重启前所参与的锁都会过期,它在重启后就不会对现有的锁造成影响。

Redisson锁原理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
public class Redisson implements RedissonClient {
public RLock getLock(String name) {
return new RedissonLock(connectionManager.getCommandExecutor(), name);
}
}
public class RedissonLock extends RedissonExpirable implements RLock {
public RedissonLock(CommandAsyncExecutor commandExecutor, String name) {
super(commandExecutor, name);
this.commandExecutor = commandExecutor;
this.id = commandExecutor.getConnectionManager().getId();
this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();
}
public void lock(long leaseTime, TimeUnit unit) {
try {
lockInterruptibly(leaseTime, unit);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
long threadId = Thread.currentThread().getId(); // 获取当前线程的ID
Long ttl = tryAcquire(leaseTime, unit, threadId); // 尝试获取锁,并返回锁剩余持有时间
if (ttl == null) { // 若锁剩余持有时间为null,表示获取锁成功
return; // 获取锁成功
}
RFuture<RedissonLockEntry> future = subscribe(threadId);
commandExecutor.syncSubscription(future);
try {
while (true) {
ttl = tryAcquire(leaseTime, unit, threadId);
if (ttl == null) {
break;
}
if (ttl >= 0) {
getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
getEntry(threadId).getLatch().acquire();
}
}
} finally {
unsubscribe(future, threadId);
}
}
private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {
return get(tryAcquireAsync(leaseTime, unit, threadId));
}
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {
if (leaseTime != -1) { // 设置了超时时间的逻辑
return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
}
// 未设置超时时间默认设置超时时间为30s
RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
ttlRemainingFuture.addListener(new FutureListener<Long>() {
@Override
public void operationComplete(Future<Long> future) throws Exception {
if (!future.isSuccess()) {
return;
}
Long ttlRemaining = future.getNow();
if (ttlRemaining == null) { // 若当前锁还没有释放,则给当前锁续超时时间
scheduleExpirationRenewal(threadId);
}
}
});
return ttlRemainingFuture;
}
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
internalLockLeaseTime = unit.toMillis(leaseTime);
// 异步执行lua命令获取锁,若获取锁成功返回null,否则返回剩余持有时间
return commandExecutor
.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
"if (redis.call('exists', KEYS[1]) == 0) then " + // 判断锁是否存在
"redis.call('hset', KEYS[1], ARGV[2], 1); " + // 将锁的的状态设置为1
"redis.call('pexpire', KEYS[1], ARGV[1]); " + // 给锁加上失效时间
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + // 重入锁的处理,锁存在,且加锁对象是当前线程
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " + // 将锁加一
"redis.call('pexpire', KEYS[1], ARGV[1]); " + // 重置失效时间
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);", // 返回锁剩余的失效时间
Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}
private void scheduleExpirationRenewal(final long threadId) {
if (expirationRenewalMap.containsKey(getEntryName())) {
return;
}
// 每10s执行一次
Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
RFuture<Boolean> future = commandExecutor
.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + // 若KEY存在则返回true,否则返回false
"redis.call('pexpire', KEYS[1], ARGV[1]); " + // 重置超时时间
"return 1; " +
"end; " +
"return 0;",
Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
future.addListener(new FutureListener<Boolean>() {
@Override
public void operationComplete(Future<Boolean> future) throws Exception {
expirationRenewalMap.remove(getEntryName());
if (!future.isSuccess()) {
return;
}
if (future.getNow()) { // 若当前锁还没有释放,则给当前锁续超时时间
scheduleExpirationRenewal(threadId);
}
}
});
}
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
if (expirationRenewalMap.putIfAbsent(getEntryName(), task) != null) {
task.cancel();
}
}
}

LUA脚本

Redis在2.6推出了脚本功能,允许开发者使用Lua语言编写脚本传到Redis中执行:

  • 减少网络开销:本来5次网络请求的操作,可用一个请求完成,原先5次请求的逻辑放在redis服务器上完成。使用脚本,减少了网络往返时延。这点跟管道类似
  • 原子操作:Redis会将整个脚本作为一个整体执行,中间不会被其他命令插入。管道不是原子的,不过redis的批量操作命令是原子。
  • 替代redis的事务功能:redis自带的事务功能很鸡肋,而redis的lua脚本几乎实现了常规的事务功能,官方推荐如果要使用redis的事务功能可以用redis lua替代。

可以使用EVAL命令对Lua脚本进行求值。EVAL命令的格式如下:

1
EVAL script numkeys key [key ...] arg [arg ...]

script参数是一段Lua脚本程序,它会被运行在Redis服务器上下文中,numkeys参数用于指定键名参数的个数。键名参数key [key ...]从EVAL的第三个参数开始算起,表示在脚本中所用到的那些Redis键(key),这些键名参数可在Lua中通过全局变量KEYS数组,用1为基址的形式访问KEYS[1],KEYS[2] 以此类推。

在命令的最后不是键名参数的附加参数arg [arg ...],可在Lua中通过全局变量ARGV数组访问,访问的形式和KEYS变量类似(ARGV[1]、ARGV[2],在Lua脚本中,可使用redis.call()函数来执行Redis命令:

1
2
3
4
5
6
7
8
9
10
11
jedis.set("product_stock_10016", "15");  //初始化商品10016的库存
String script = " local count = redis.call('get', KEYS[1]) " +
" local a = tonumber(count) " +
" local b = tonumber(ARGV[1]) " +
" if a >= b then " +
" redis.call('set', KEYS[1], a-b) " +
" return 1 " +
" end " +
" return 0 ";
Object obj = jedis.eval(script, Arrays.asList("product_stock_10016"), Arrays.asList("10"));
System.out.println(obj);

不要在Lua脚本中出现死循环和耗时的运算,否则redis会阻塞,将不接受其他的命令,所以使用时要注意不能出现死循环、耗时的运算。redis是单进程、单线程执行脚本。管道不会阻塞redis