美团面试:Redis锁如何续期?Redis锁超时,任务没完怎么办?

news/2024/9/26 5:22:50

文章很长,且持续更新,建议收藏起来,慢慢读!疯狂创客圈总目录 博客园版 为您奉上珍贵的学习资源 :

免费赠送 :《尼恩Java面试宝典》 持续更新+ 史上最全 + 面试必备 2000页+ 面试必备 + 大厂必备 +涨薪必备
免费赠送 :《尼恩技术圣经+高并发系列PDF》 ,帮你 实现技术自由,完成职业升级, 薪酬猛涨!加尼恩免费领
免费赠送 经典图书:《Java高并发核心编程(卷1)加强版》 面试必备 + 大厂必备 +涨薪必备 加尼恩免费领
免费赠送 经典图书:《Java高并发核心编程(卷2)加强版》 面试必备 + 大厂必备 +涨薪必备 加尼恩免费领
免费赠送 经典图书:《Java高并发核心编程(卷3)加强版》 面试必备 + 大厂必备 +涨薪必备 加尼恩免费领

免费赠送 资源宝库: Java 必备 百度网盘资源大合集 价值>10000元 加尼恩领取


美团面试:Redis锁如何续期?Redis锁超时,任务没完怎么办?

尼恩特别说明: 尼恩的文章,都会在 《技术自由圈》 公号 发布, 并且维护最新版本。 如果发现图片 不可见, 请去 《技术自由圈》 公号 查找

尼恩说在前面

在40岁老架构师 尼恩的读者交流群(50+)中,最近有小伙伴拿到了一线互联网企业如得物、阿里、滴滴、极兔、有赞、希音、百度、网易、美团的面试资格,遇到很多很重要的面试题:

Redis分布式锁,过期怎么办? 如何自动续期?

Redis分布式锁过期,任务没完成怎么办?

最近有小伙伴在面试 美团,又遇到了相关的面试题。小伙伴懵了,因为没有遇到过,所以支支吾吾的说了几句,面试官不满意,面试挂了。

所以,尼恩给大家做一下系统化、体系化的梳理,使得大家内力猛增,可以充分展示一下大家雄厚的 “技术肌肉”,让面试官爱到 “不能自已、口水直流”,然后实现”offer直提”。

当然,这道面试题,以及参考答案,也会收入咱们的 《尼恩Java面试宝典PDF》V171版本,供后面的小伙伴参考,提升大家的 3高 架构、设计、开发水平。

最新《尼恩 架构笔记》《尼恩高并发三部曲》《尼恩Java面试宝典》的PDF,请关注本公众号【技术自由圈】获取,回复:领电子书

Redis分布式锁过期怎么办?

尼恩先简单的总结一下,两大核心方案,大家收藏起来,毒打面试官。

在这里插入图片描述

两大Redis 分布式

本文重点介绍Redis分布式锁,分为两个维度进行介绍:

(1)基于Jedis手工造轮子分布式锁

(2)介绍Redission 分布式锁的使用和原理。

分布式锁一般有如下的特点:

  • 互斥性: 同一时刻只能有一个线程持有锁
  • 可重入性: 同一节点上的同一个线程如果获取了锁之后能够再次获取锁
  • 锁超时:和J.U.C中的锁一样支持锁超时,防止死锁
  • 高性能和高可用: 加锁和解锁需要高效,同时也需要保证高可用,防止分布式锁失效
  • 具备阻塞和非阻塞性:能够及时从阻塞状态中被唤醒

基于Jedis 的API实现分布式锁

我们首先讲解 Jedis 普通分布式锁实现,并且是纯手工的模式,从最为基础的Redis命令开始。

只有充分了解与分布式锁相关的普通Redis命令,才能更好的了解高级的Redis分布式锁的实现,因为高级的分布式锁的实现完全基于普通Redis命令。

Redis几种架构

Redis发展到现在,几种常见的部署架构有:

  • 单机模式;
  • 主从模式;
  • 哨兵模式;
  • 集群模式;

从分布式锁的角度来说, 无论是单机模式、主从模式、哨兵模式、集群模式,其原理都是类同的。

只是主从模式、哨兵模式、集群模式的更加的高可用、或者更加高并发。

所以,接下来先基于单机模式,基于Jedis手工造轮子实现自己的分布式锁。

首先看两个命令:

Redis分布式锁机制,主要借助setnx和expire两个命令完成。

setnx命令:

SETNX 是SET if Not eXists的简写。

  • 将 key 的值设为 value,当且仅当 key 不存在;

  • 若给定的 key 已经存在,则 SETNX 不做任何动作。

下面为客户端使用示例:

127.0.0.1:6379> set lock "unlock"
OK
127.0.0.1:6379> setnx lock "unlock"
(integer) 0
127.0.0.1:6379> setnx lock "lock"
(integer) 0
127.0.0.1:6379> 

expire命令:

expire命令为 key 设置生存时间,当 key 过期时(生存时间为 0 ),它会被自动删除.

expire 格式为:

EXPIRE key seconds

下面为客户端使用示例:

127.0.0.1:6379> expire lock 10
(integer) 1
127.0.0.1:6379> ttl lock
8

基于Jedis API的分布式锁的总体流程:

通过Redis的setnx、expire命令可以实现简单的锁机制:

  • key不存在时创建,并设置value和过期时间,返回值为1;成功获取到锁;
  • 如key存在时直接返回0,抢锁失败;
  • 持有锁的线程释放锁时,手动删除key; 或者过期时间到,key自动删除,锁释放。

线程调用setnx方法成功返回1认为加锁成功,其他线程要等到当前线程业务操作完成释放锁后,才能再次调用setnx加锁成功。

在这里插入图片描述

以上简单redis分布式锁的问题:

如果出现了这么一个问题:如果setnx是成功的,但是expire设置失败,一旦出现了释放锁失败,或者没有手工释放,那么这个锁永远被占用,其他线程永远也抢不到锁。

所以,需要保障setnx和expire两个操作的原子性。

简单来说,原子性就是下面的三点:

  • 要么 setnx和expire 全部执行,
  • 要么 setnx和expire 全部不执行,
  • setnx和expire 二者不能分开。

解决的办法有两种:

  • 使用set的命令时,同时设置过期时间,不再单独使用 expire命令
  • 使用lua脚本,将加锁的命令放在lua脚本中原子性的执行

简单加锁:使用set的命令时,同时设置过期时间

使用set的命令时,同时设置过期时间的示例如下:

127.0.0.1:6379> set unlock "234" EX 100 NX
(nil)
127.0.0.1:6379> 
127.0.0.1:6379> set test "111" EX 100 NX
OK

这样就完美的解决了分布式锁的原子性; set 命令的完整格式:

set key value [EX seconds] [PX milliseconds] [NX|XX]


EX seconds:设置失效时长,单位秒
PX milliseconds:设置失效时长,单位毫秒
NX:key不存在时设置value,成功返回OK,失败返回(nil)
XX:key存在时设置value,成功返回OK,失败返回(nil)

使用set命令实现加锁操作,先展示加锁的简单代码实习,再带大家慢慢解释为什么这样实现。

加锁的简单代码实现

package com.crazymaker.springcloud.standard.lock;@Slf4j
@Data
@AllArgsConstructor
public class JedisCommandLock {private  RedisTemplate redisTemplate;private static final String LOCK_SUCCESS = "OK";private static final String SET_IF_NOT_EXIST = "NX";private static final String SET_WITH_EXPIRE_TIME = "PX";/*** 尝试获取分布式锁* @param jedis Redis客户端* @param lockKey 锁* @param requestId 请求标识* @param expireTime 超期时间* @return 是否获取成功*/public static   boolean tryGetDistributedLock(Jedis jedis, String lockKey, String requestId, int expireTime) {String result = jedis.set(lockKey, requestId, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, expireTime);if (LOCK_SUCCESS.equals(result)) {return true;}return false;}}

可以看到,我们加锁用到了Jedis的set Api

jedis.set(String key, String value, String nxxx, String expx, int time)

这个set()方法一共有五个形参:

  • 第一个为key,我们使用key来当锁,因为key是唯一的。

  • 第二个为value,我们传的是requestId,很多童鞋可能不明白,有key作为锁不就够了吗,为什么还要用到value?原因就是我们在上面讲到可靠性时,分布式锁要满足第四个条件解铃还须系铃人,通过给value赋值为requestId,我们就知道这把锁是哪个请求加的了,在解锁的时候就可以有依据。

    requestId可以使用UUID.randomUUID().toString()方法生成。

  • 第三个为nxxx,这个参数我们填的是NX,意思是SET IF NOT EXIST,即当key不存在时,我们进行set操作;若key已经存在,则不做任何操作;

  • 第四个为expx,这个参数我们传的是PX,意思是我们要给这个key加一个过期的设置,具体时间由第五个参数决定。

  • 第五个为time,与第四个参数相呼应,代表key的过期时间。

总的来说,执行上面的set()方法就只会导致两种结果:

  1. 当前没有锁(key不存在),那么就进行加锁操作,并对锁设置个有效期,同时value表示加锁的客户端。
  2. 已有锁存在,不做任何操作。

心细的童鞋就会发现了,我们的加锁代码满足前面描述的四个条件中的三个。

  • 首先,set()加入了NX参数,可以保证如果已有key存在,则函数不会调用成功,也就是只有一个客户端能持有锁,满足互斥性。

  • 其次,由于我们对锁设置了过期时间,即使锁的持有者后续发生崩溃而没有解锁,锁也会因为到了过期时间而自动解锁(即key被删除),不会被永远占用(而发生死锁)。

  • 最后,因为我们将value赋值为requestId,代表加锁的客户端请求标识,那么在客户端在解锁的时候就可以进行校验是否是同一个客户端。

  • 由于我们只考虑Redis单机部署的场景,所以容错性我们暂不考虑。

基于Jedis 的API实现简单解锁代码

还是先展示代码,再带大家慢慢解释为什么这样实现。

解锁的简单代码实现

package com.crazymaker.springcloud.standard.lock;@Slf4j
@Data
@AllArgsConstructor
public class JedisCommandLock {private static final Long RELEASE_SUCCESS = 1L;/*** 释放分布式锁* @param jedis Redis客户端* @param lockKey 锁* @param requestId 请求标识* @return 是否释放成功*/public static boolean releaseDistributedLock(Jedis jedis, String lockKey, String requestId) {String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";Object result = jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(requestId));if (RELEASE_SUCCESS.equals(result)) {return true;}return false;}}

那么这段Lua代码的功能是什么呢?

其实很简单,首先获取锁对应的value值,检查是否与requestId相等,如果相等则删除锁(解锁)。

第一行代码,我们写了一个简单的Lua脚本代码。

第二行代码,我们将Lua代码传到jedis.eval()方法里,并使参数KEYS[1]赋值为lockKey,ARGV[1]赋值为requestId。eval()方法是将Lua代码交给Redis服务端执行。

那么为什么要使用Lua语言来实现呢?

因为要确保上述操作是原子性的。那么为什么执行eval()方法可以确保原子性,源于Redis的特性.

简单来说,就是在eval命令执行Lua代码的时候,Lua代码将被当成一个命令去执行,并且直到eval命令执行完成,Redis才会执行其他命

错误示例1

最常见的解锁代码就是直接使用 jedis.del() 方法删除锁,这种不先判断锁的拥有者而直接解锁的方式,会导致任何客户端都可以随时进行解锁,即使这把锁不是它的。

public static void wrongReleaseLock1(Jedis jedis, String lockKey) {jedis.del(lockKey);
}

错误示例2

这种解锁代码乍一看也是没问题,甚至我之前也差点这样实现,与正确姿势差不多,唯一区别的是分成两条命令去执行,代码如下:

public static void wrongReleaseLock2(Jedis jedis, String lockKey, String requestId) {// 判断加锁与解锁是不是同一个客户端if (requestId.equals(jedis.get(lockKey))) {// 若在此时,这把锁突然不是这个客户端的,则会误解锁jedis.del(lockKey);}}

基于Lua脚本实现分布式锁

lua脚本的好处

前面提到,在redis中执行lua脚本,有如下的好处:

那么为什么要使用Lua语言来实现呢?

因为要确保上述操作是原子性的。那么为什么执行eval()方法可以确保原子性,源于Redis的特性.

简单来说,就是在eval命令执行Lua代码的时候,Lua代码将被当成一个命令去执行,并且直到eval命令执行完成,Redis才会执行其他命

所以:

大部分的开源框架(如 redission)中的分布式锁组件,都是用纯lua脚本实现的。

题外话: lua脚本是高并发、高性能的必备脚本语言

有关lua的详细介绍,请参见以下书籍:

清华大学出版社 出版的,尼恩的《Java高并发核心编程 卷3 加强版》

基于纯Lua脚本的分布式锁的执行流程

加锁和删除锁的操作,使用纯lua进行封装,保障其执行时候的原子性。

基于纯Lua脚本实现分布式锁的执行流程,大致如下:

在这里插入图片描述

加锁的Lua脚本: lock.lua

--- -1 failed
--- 1 success
---
local key = KEYS[1]
local requestId = KEYS[2]
local ttl = tonumber(KEYS[3])
local result = redis.call('setnx', key, requestId)
if result == 1 then--PEXPIRE:以毫秒的形式指定过期时间redis.call('pexpire', key, ttl)
elseresult = -1;-- 如果value相同,则认为是同一个线程的请求,则认为重入锁local value = redis.call('get', key)if (value == requestId) thenresult = 1;redis.call('pexpire', key, ttl)end
end
--  如果获取锁成功,则返回 1
return result

解锁的Lua脚本: unlock.lua:

--- -1 failed
--- 1 success-- unlock key
local key = KEYS[1]
local requestId = KEYS[2]
local value = redis.call('get', key)
if value == requestId thenredis.call('del', key);return 1;
end
return -1

两个文件,放在资源文件夹下备用:

在这里插入图片描述

在Java中调用lua脚本,完成加锁操作

package com.crazymaker.springcloud.standard.lock;import com.crazymaker.springcloud.common.exception.BusinessException;
import com.crazymaker.springcloud.common.util.IOUtil;
import com.crazymaker.springcloud.standard.context.SpringContextUtil;
import com.crazymaker.springcloud.standard.lua.ScriptHolder;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.data.redis.core.script.RedisScript;import java.util.ArrayList;
import java.util.List;@Slf4j
public class InnerLock {private RedisTemplate redisTemplate;public static final Long LOCKED = Long.valueOf(1);public static final Long UNLOCKED = Long.valueOf(1);public static final int EXPIRE = 2000;String key;String requestId;  // lockValue 锁的value ,代表线程的uuid/*** 默认为2000ms*/long expire = 2000L;private volatile boolean isLocked = false;private RedisScript lockScript;private RedisScript unLockScript;public InnerLock(String lockKey, String requestId) {this.key = lockKey;this.requestId = requestId;lockScript = ScriptHolder.getLockScript();unLockScript = ScriptHolder.getUnlockScript();}/*** 抢夺锁*/public void lock() {if (null == key) {return;}try {List<String> redisKeys = new ArrayList<>();redisKeys.add(key);redisKeys.add(requestId);redisKeys.add(String.valueOf(expire));Long res = (Long) getRedisTemplate().execute(lockScript, redisKeys);isLocked = false;} catch (Exception e) {e.printStackTrace();throw BusinessException.builder().errMsg("抢锁失败").build();}}/*** 有返回值的抢夺锁** @param millisToWait*/public boolean lock(Long millisToWait) {if (null == key) {return false;}try {List<String> redisKeys = new ArrayList<>();redisKeys.add(key);redisKeys.add(requestId);redisKeys.add(String.valueOf(millisToWait));Long res = (Long) getRedisTemplate().execute(lockScript, redisKeys);return res != null && res.equals(LOCKED);} catch (Exception e) {e.printStackTrace();throw BusinessException.builder().errMsg("抢锁失败").build();}}//释放锁public void unlock() {if (key == null || requestId == null) {return;}try {List<String> redisKeys = new ArrayList<>();redisKeys.add(key);redisKeys.add(requestId);Long res = (Long) getRedisTemplate().execute(unLockScript, redisKeys);//            boolean unlocked = res != null && res.equals(UNLOCKED);} catch (Exception e) {e.printStackTrace();throw BusinessException.builder().errMsg("释放锁失败").build();}}private RedisTemplate getRedisTemplate() {if(null==redisTemplate){redisTemplate= (RedisTemplate) SpringContextUtil.getBean("stringRedisTemplate");}return redisTemplate;}
}

在Java中调用lua脚本,完成加锁操作

下一步,实现Lock接口, 完成JedisLock的分布式锁。

其加锁操作,通过调用 lock.lua脚本完成,代码如下:

package com.crazymaker.springcloud.standard.lock;import com.crazymaker.springcloud.common.exception.BusinessException;
import com.crazymaker.springcloud.common.util.ThreadUtil;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.RedisScript;import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;@Slf4j
@Data
@AllArgsConstructor
public class JedisLock implements Lock {private RedisTemplate redisTemplate;RedisScript<Long> lockScript = null;RedisScript<Long> unLockScript = null;public static final int DEFAULT_TIMEOUT = 2000;public static final Long LOCKED = Long.valueOf(1);public static final Long UNLOCKED = Long.valueOf(1);public static final Long WAIT_GAT = Long.valueOf(200);public static final int EXPIRE = 2000;String key;String lockValue;  // lockValue 锁的value ,代表线程的uuid/*** 默认为2000ms*/long expire = 2000L;public JedisLock(String lockKey, String lockValue) {this.key = lockKey;this.lockValue = lockValue;}private volatile boolean isLocked = false;private Thread thread;/*** 获取一个分布式锁 , 超时则返回失败** @return 获锁成功 - true | 获锁失败 - false*/@Overridepublic boolean tryLock(long time, TimeUnit unit) throws InterruptedException {//本地可重入if (isLocked && thread == Thread.currentThread()) {return true;}expire = unit != null ? unit.toMillis(time) : DEFAULT_TIMEOUT;long startMillis = System.currentTimeMillis();Long millisToWait = expire;boolean localLocked = false;int turn = 1;while (!localLocked) {localLocked = this.lockInner(expire);if (!localLocked) {millisToWait = millisToWait - (System.currentTimeMillis() - startMillis);startMillis = System.currentTimeMillis();if (millisToWait > 0L) {/*** 还没有超时*/ThreadUtil.sleepMilliSeconds(WAIT_GAT);log.info("睡眠一下,重新开始,turn:{},剩余时间:{}", turn++, millisToWait);} else {log.info("抢锁超时");return false;}} else {isLocked = true;localLocked = true;}}return isLocked;}/*** 有返回值的抢夺锁** @param millisToWait*/public boolean lockInner(Long millisToWait) {if (null == key) {return false;}try {List<String> redisKeys = new ArrayList<>();redisKeys.add(key);redisKeys.add(lockValue);redisKeys.add(String.valueOf(millisToWait));Long res = (Long) redisTemplate.execute(lockScript, redisKeys);return res != null && res.equals(LOCKED);} catch (Exception e) {e.printStackTrace();throw BusinessException.builder().errMsg("抢锁失败").build();}}}

通过实现JUC的显示锁Lock接口,实现一个简单的分布式锁

其解锁操作,通过调用unlock.lua脚本完成,代码如下:

package com.crazymaker.springcloud.standard.lock;import com.crazymaker.springcloud.common.exception.BusinessException;
import com.crazymaker.springcloud.common.util.ThreadUtil;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.RedisScript;import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;@Slf4j
@Data
@AllArgsConstructor
public class JedisLock implements Lock {private RedisTemplate redisTemplate;RedisScript<Long> lockScript = null;RedisScript<Long> unLockScript = null;//释放锁@Overridepublic void unlock() {if (key == null || requestId == null) {return;}try {List<String> redisKeys = new ArrayList<>();redisKeys.add(key);redisKeys.add(requestId);Long res = (Long) redisTemplate.execute(unLockScript, redisKeys);} catch (Exception e) {e.printStackTrace();throw BusinessException.builder().errMsg("释放锁失败").build();}}}

编写RedisLockService用于管理JedisLock

编写个分布式锁服务,用于加载lua脚本,创建 分布式锁,代码如下:

package com.crazymaker.springcloud.standard.lock;import com.crazymaker.springcloud.common.util.IOUtil;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.data.redis.core.script.RedisScript;import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;@Slf4j
@Data
public class RedisLockService
{private RedisTemplate redisTemplate;static String lockLua = "script/lock.lua";static String unLockLua = "script/unlock.lua";static RedisScript<Long> lockScript = null;static RedisScript<Long> unLockScript = null;{String script = IOUtil.loadJarFile(RedisLockService.class.getClassLoader(),lockLua);
//        String script = FileUtil.readString(lockLua, Charset.forName("UTF-8" ));if(StringUtils.isEmpty(script)){log.error("lua load failed:"+lockLua);}lockScript = new DefaultRedisScript<>(script, Long.class);//        script = FileUtil.readString(unLockLua, Charset.forName("UTF-8" ));script =  IOUtil.loadJarFile(RedisLockService.class.getClassLoader(),unLockLua);if(StringUtils.isEmpty(script)){log.error("lua load failed:"+unLockLua);}unLockScript = new DefaultRedisScript<>(script, Long.class);}public RedisLockService(RedisTemplate redisTemplate){this.redisTemplate = redisTemplate;}public Lock getLock(String lockKey, String lockValue) {JedisLock lock=new JedisLock(lockKey,lockValue);lock.setRedisTemplate(redisTemplate);lock.setLockScript(lockScript);lock.setUnLockScript(unLockScript);return lock;}
}

测试用例

接下来,终于可以上测试用例了

package com.crazymaker.springcloud.lock;@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest(classes = {DemoCloudApplication.class})
// 指定启动类
public class RedisLockTest {@ResourceRedisLockService redisLockService;private ExecutorService pool = Executors.newFixedThreadPool(10);@Testpublic void testLock() {int threads = 10;final int[] count = {0};CountDownLatch countDownLatch = new CountDownLatch(threads);long start = System.currentTimeMillis();for (int i = 0; i < threads; i++) {pool.submit(() ->{String lockValue = UUID.randomUUID().toString();try {Lock lock = redisLockService.getLock("test:lock:1", lockValue);boolean locked = lock.tryLock(10, TimeUnit.SECONDS);if (locked) {for (int j = 0; j < 1000; j++) {count[0]++;}log.info("count = " + count[0]);lock.unlock();} else {System.out.println("抢锁失败");}} catch (Exception e) {e.printStackTrace();}countDownLatch.countDown();});}try {countDownLatch.await();} catch (InterruptedException e) {e.printStackTrace();}System.out.println("10个线程每个累加1000为: = " + count[0]);//输出统计结果float time = System.currentTimeMillis() - start;System.out.println("运行的时长为(ms):" + time);System.out.println("每一次执行的时长为(ms):" + time / count[0]);}}

执行用例,结果如下:

2021-05-04 23:02:11.900  INFO 22120 --- [pool-1-thread-7] c.c.springcloud.lock.RedisLockTest       LN:50 count = 6000
2021-05-04 23:02:11.901  INFO 22120 --- [pool-1-thread-1] c.c.springcloud.standard.lock.JedisLock  LN:81 睡眠一下,重新开始,turn:3,剩余时间:9585
2021-05-04 23:02:11.902  INFO 22120 --- [pool-1-thread-1] c.c.springcloud.lock.RedisLockTest       LN:50 count = 7000
2021-05-04 23:02:12.100  INFO 22120 --- [pool-1-thread-4] c.c.springcloud.standard.lock.JedisLock  LN:81 睡眠一下,重新开始,turn:3,剩余时间:9586
2021-05-04 23:02:12.101  INFO 22120 --- [pool-1-thread-5] c.c.springcloud.standard.lock.JedisLock  LN:81 睡眠一下,重新开始,turn:3,剩余时间:9585
2021-05-04 23:02:12.101  INFO 22120 --- [pool-1-thread-8] c.c.springcloud.standard.lock.JedisLock  LN:81 睡眠一下,重新开始,turn:3,剩余时间:9585
2021-05-04 23:02:12.101  INFO 22120 --- [pool-1-thread-4] c.c.springcloud.lock.RedisLockTest       LN:50 count = 8000
2021-05-04 23:02:12.102  INFO 22120 --- [pool-1-thread-8] c.c.springcloud.lock.RedisLockTest       LN:50 count = 9000
2021-05-04 23:02:12.304  INFO 22120 --- [pool-1-thread-5] c.c.springcloud.standard.lock.JedisLock  LN:81 睡眠一下,重新开始,turn:4,剩余时间:9383
2021-05-04 23:02:12.307  INFO 22120 --- [pool-1-thread-5] c.c.springcloud.lock.RedisLockTest       LN:50 count = 10000
10个线程每个累加1000为: = 10000
运行的时长为(ms):827.0
每一次执行的时长为(ms):0.0827

STW导致的锁过期问题

下面有一个简单的使用锁的例子,在10秒内占着锁:

  //写数据到文件
function writeData(filename, data) {boolean locked = lock.tryLock(10, TimeUnit.SECONDS);if (!locked) {throw 'Failed to acquire lock';}try {//将数据写到文件var file = storage.readFile(filename);var updated = updateContents(file, data);storage.writeFile(filename, updated);} finally {lock.unlock();}
}

问题是:

  • 如果在写文件过程中,发生了 fullGC,并且其时间跨度较长, 超过了10秒,

  • 那么,由于锁的有效期就是 10s,这时候任务没有执行完成,分布式锁就自动过期了。

在此过程中,client2 抢到锁,写了文件。

回到 client1: client1 的fullGC完成后,也继续写文件,注意,此时client1 的并没有占用锁,此时写入会导致文件数据错乱,发生线程安全问题。

这就是STW导致的锁过期问题。

STW导致的锁过期问题,具体如下图所示:

在这里插入图片描述

锁过期问题 的解决方案

锁过期问题,大概的解决方案 有2种:

1: 模拟CAS乐观锁的方式,增加版本号

2:watch dog自动延期机制

方式一:模拟CAS乐观锁的方式,增加版本号

1: 模拟CAS乐观锁的方式,增加版本号(如下图中的token)

CAS乐观锁 的方法也很简单:

在每次写操作时加入一个 token。 token 可以是一个递增的数字(lock service 可以做到),每次有 client 申请锁就递增一次。比如:

  • client1 的token 是33
  • client2 的token 是34

紧接着 client1 活过来之后尝试写入数据,自身 token 33 比 34 小,因此client1 的写入操作被拒绝了。

在这里插入图片描述

此方案如果要实现,需要调整业务逻辑与之配合,所以会入侵代码。

当然,如果能把这个方案说清楚了,也能像他一样,拿到年薪65W的offer:

上岸奇迹:中厂大龄34岁,被裁8月收一大厂offer, 年薪65W,逆天改命!

方式二:watch dog自动延期机制

客户端1加锁的锁key默认生存时间才30秒,如果超过了30秒,客户端1还想一直持有这把锁,怎么办呢?

简单!

只要客户端1一旦加锁成功,就会启动一个watch dog看门狗,他是一个后台线程,会每隔10秒检查一下,如果客户端1还持有锁key,那么就会不断的延长锁key的生存时间。

redission,采用的就是这种方案, 此方案不会入侵业务代码。

watch dog看门狗 的作用是在锁没有过期之前,不断的延长锁的有效期。

在这里插入图片描述

默认情况下,锁的过期时间是 30 秒,看门狗的续期时间是 10 秒,

也可以通过修改 Config.lockWatchdogTimeout 来指定。

上面的两个方法,后面尼恩会在《尼恩Java面试宝典》配套视频里边做详细介绍。

为啥推荐使用Redission

Redission 就是 使用 看门狗的机制。

作为 Java 开发人员,我们若想在程序中集成 Redis,必须使用 Redis 的第三方库。目前大家使用的最多的第三方库是jedis。

和SpringCloud gateway一样,Redisson也是基于Netty实现的,是更高性能的第三方库。 所以,这里推荐大家使用Redission替代 jedis。

Redisson简介

Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式的Java常用对象,还实现了可重入锁(Reentrant Lock)、公平锁(Fair Lock、联锁(MultiLock)、 红锁(RedLock)、 读写锁(ReadWriteLock)等,还提供了许多分布式服务。

img

Redisson提供了使用Redis的最简单和最便捷的方法。Redisson的宗旨是促进使用者对Redis的关注分离(Separation of Concern),从而让使用者能够将精力更集中地放在处理业务逻辑上。

img

Redisson与Jedis的对比

1.概况对比

Jedis是Redis的java实现的客户端,其API提供了比较全面的的Redis命令的支持,Redisson实现了分布式和可扩展的的java数据结构,和Jedis相比,功能较为简单,不支持字符串操作,不支持排序,事物,管道,分区等Redis特性。Redisson的宗旨是促进使用者对Redis的关注分离,从而让使用者能够将精力更集中的放在处理业务逻辑上。

2.可伸缩性

Jedis使用阻塞的I/O,且其方法调用都是同步的,程序流程要等到sockets处理完I/O才能执行,不支持异步,Jedis客户端实例不是线程安全的,所以需要通过连接池来使用Jedis。

Redisson使用非阻塞的I/O和基于Netty框架的事件驱动的通信层,其方法调用时异步的。Redisson的API是线程安全的,所以操作单个Redisson连接来完成各种操作。

3.第三方框架整合

Redisson在Redis的基础上实现了java缓存标准规范;Redisson还提供了Spring Session回话管理器的实现。

Redission 的源码地址:

官网: https://redisson.org/

github: https://github.com/redisson/redisson#quick-start

特性 & 功能:

  • 支持 Redis 单节点(single)模式、哨兵(sentinel)模式、主从(Master/Slave)模式以及集群(Redis Cluster)模式

  • 程序接口调用方式采用异步执行和异步流执行两种方式

  • 数据序列化,Redisson 的对象编码类是用于将对象进行序列化和反序列化,以实现对该对象在 Redis 里的读取和存储

  • 单个集合数据分片,在集群模式下,Redisson 为单个 Redis 集合类型提供了自动分片的功能

  • 提供多种分布式对象,如:Object Bucket,Bitset,AtomicLong,Bloom Filter 和 HyperLogLog 等

  • 提供丰富的分布式集合,如:Map,Multimap,Set,SortedSet,List,Deque,Queue 等

  • 分布式锁和同步器的实现,可重入锁(Reentrant Lock),公平锁(Fair Lock),联锁(MultiLock),红锁(Red Lock),信号量(Semaphonre),可过期性信号锁(PermitExpirableSemaphore)等

  • 提供先进的分布式服务,如分布式远程服务(Remote Service),分布式实时对象(Live Object)服务,分布式执行服务(Executor Service),分布式调度任务服务(Schedule Service)和分布式映射归纳服务(MapReduce)

Redisson的使用

如何安装 Redisson

安装 Redisson 最便捷的方法是使用 Maven 或者 Gradle:

•Maven

<dependency>	<groupId>org.redisson</groupId>	<artifactId>redisson</artifactId>	<version>3.11.4</version>	
</dependency>

•Gradle

compile group: 'org.redisson', name: 'redisson', version: '3.11.4'

目前 Redisson 最新版是 3.11.4,当然你也可以通过搜索 Maven 中央仓库 mvnrepository[1] 来找到 Redisson 的各种版本。

获取RedissonClient对象

RedissonClient有多种模式,主要的模式有:

  • 单节点模式

  • 哨兵模式

  • 主从模式

  • 集群模式

首先介绍单节点模式。

单节点模式的程序化配置方法,大致如下:

Config config = new Config();
config.useSingleServer().setAddress("redis://myredisserver:6379");
RedissonClient redisson = Redisson.create(config);
Config config = new Config();
config.useSingleServer().setAddress("redis://myredisserver:6379");
RedissonClient redisson = Redisson.create(config);// connects to 127.0.0.1:6379 by defaultRedisson
Client redisson = Redisson.create();
SingleServerConfig singleConfig = config.useSingleServer();

SpringBoot整合Redisson

Redisson有多种模式,首先介绍单机模式的整合。

一、导入Maven依赖

<!-- redisson-springboot --><dependency><groupId>org.redisson</groupId><artifactId>redisson-spring-boot-starter</artifactId><version>3.11.4</version></dependency>

二、核心配置文件

spring:redis:host: 127.0.0.1port: 6379database: 0timeout: 5000

三、添加配置类

RedissonConfig.java

import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.data.redis.RedisProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RedissonConfig {@Autowiredprivate RedisProperties redisProperties;@Beanpublic RedissonClient redissonClient() {Config config = new Config();String redisUrl = String.format("redis://%s:%s", redisProperties.getHost() + "", redisProperties.getPort() + "");config.useSingleServer().setAddress(redisUrl).setPassword(redisProperties.getPassword());config.useSingleServer().setDatabase(3);return Redisson.create(config);}}

自定义starter

由于redission可以有多种模式,处于学习的目的,将多种模式封装成一个start,可以学习一下starter的制作。

在这里插入图片描述

封装一个RedissonManager,通过策略模式,根据不同的配置类型,创建 RedissionConfig实例,然后创建RedissionClient对象。

在这里插入图片描述

使用 RLock 实现 Redis 分布式锁

RLock 是 Java 中可重入锁的分布式实现,下面的代码演示了 RLock 的用法:

public class RedissionTest {@ResourceRedissonManager redissonManager;@Testpublic void testLockExamples() {// 默认连接上 127.0.0.1:6379RedissonClient redisson = redissonManager.getRedisson();// RLock 继承了 java.util.concurrent.locks.Lock 接口RLock lock = redisson.getLock("redission:test:lock:1");final int[] count = {0};int threads = 10;ExecutorService pool = Executors.newFixedThreadPool(10);CountDownLatch countDownLatch = new CountDownLatch(threads);long start = System.currentTimeMillis();for (int i = 0; i < threads; i++) {pool.submit(() ->{for (int j = 0; j < 1000; j++) {lock.lock();count[0]++;lock.unlock();}countDownLatch.countDown();});}try {countDownLatch.await();} catch (InterruptedException e) {e.printStackTrace();}System.out.println("10个线程每个累加1000为: = " + count[0]);//输出统计结果float time = System.currentTimeMillis() - start;System.out.println("运行的时长为:" + time);System.out.println("每一次执行的时长为:" + time/count[0]);}}

此代码将产生以下输出:

10个线程每个累加1000为: = 10000
运行的时长为:14172.0
每一次执行的时长为:1.4172

Redision锁 核心源码分析

单机模式下,简单Redision锁的使用如下:

// 构造redisson实现分布式锁必要的Config
Config config = new Config();
config.useSingleServer().setAddress("redis://172.29.1.180:5379").setPassword("a123456").setDatabase(0);
// 构造RedissonClient
RedissonClient redissonClient = Redisson.create(config);
// 设置锁定资源名称
RLock disLock = redissonClient.getLock("DISLOCK");
//尝试获取分布式锁
boolean isLock= disLock.tryLock(500, 15000, TimeUnit.MILLISECONDS);
if (isLock) {try {//TODO if get lock success, do something;Thread.sleep(15000);} catch (Exception e) {} finally {// 无论如何, 最后都要解锁disLock.unlock();}
}

通过代码可知,经过Redisson的封装,实现Redis分布式锁非常方便,和显式锁的使用方法是一样的。RLock接口继承了 Lock接口。

我们再看一下Redis中的value是啥,和前文分析一样,hash结构, redis 的key就是资源名称。

hash结构的key就是UUID+threadId,hash结构的value就是重入值,在分布式锁时,这个值为1(Redisson还可以实现重入锁,那么这个值就取决于重入次数了):

172.29.1.180:5379> hgetall DISLOCK
1) "01a6d806-d282-4715-9bec-f51b9aa98110:1"
2) "1"

使用客户端工具看到的效果如下:

在这里插入图片描述

getLock()方法

//name:锁的名称
public RLock getLock(String name) {
//默认创建的同步执行器, (存在异步执行器, 因为锁的获取和释放是有强一致性要求, 默认同步)return new RedissonLock(this.connectionManager.getCommandExecutor(), name);
}

可以看到,调用getLock()方法后实际返回一个RedissonLock对象

点击 RedissonLock 进去,发现这是一个 RedissonLock 构造方法,主要初始化一些属性。

public RedissonLock(CommandAsyncExecutor commandExecutor, String name) {super(commandExecutor, name);this.commandExecutor = commandExecutor;//唯一IDthis.id = commandExecutor.getConnectionManager().getId();//等待获取锁时间this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();//ID + 锁名称this.entryName = this.id + ":" + name;//发布订阅	this.pubSub = commandExecutor.getConnectionManager().getSubscribeService().getLockPubSub();
}

点击 getLockWatchdogTimeout() 进去看一下:

`public` `class` `Config {``    ` `    ``private` `long` `lockWatchdogTimeout = ``30` `* ``1000``;``        ` `    ``public` `long` `getLockWatchdogTimeout() {``        ``return` `lockWatchdogTimeout;``    ``}``    ` `    ``//省略``}`

internalLockLeaseTime 分布式锁的超时时间,默认是 30 秒,

现在我们知道默认是 30 秒,那么这个看门狗多久时间来延长一次有效期呢?

我们接着往下看。

tryLock方法

下面来看下tryLock方法,源码如下:

    @Overridepublic boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {long time = unit.toMillis(waitTime);long current = System.currentTimeMillis();long threadId = Thread.currentThread().getId();Long ttl = tryAcquire(leaseTime, unit, threadId);// lock acquiredif (ttl == null) {return true;}time -= System.currentTimeMillis() - current;if (time <= 0) {acquireFailed(threadId);return false;}current = System.currentTimeMillis();RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) {if (!subscribeFuture.cancel(false)) {subscribeFuture.onComplete((res, e) -> {if (e == null) {unsubscribe(subscribeFuture, threadId);}});}acquireFailed(threadId);return false;}try {time -= System.currentTimeMillis() - current;if (time <= 0) {acquireFailed(threadId);return false;}while (true) {long currentTime = System.currentTimeMillis();ttl = tryAcquire(leaseTime, unit, threadId);// lock acquiredif (ttl == null) {return true;}time -= System.currentTimeMillis() - currentTime;if (time <= 0) {acquireFailed(threadId);return false;}// waiting for messagecurrentTime = System.currentTimeMillis();if (ttl >= 0 && ttl < time) {getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);} else {getEntry(threadId).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);}time -= System.currentTimeMillis() - currentTime;if (time <= 0) {acquireFailed(threadId);return false;}}} finally {unsubscribe(subscribeFuture, threadId);}
//        return get(tryLockAsync(waitTime, leaseTime, unit));}

以上代码使用了异步回调模式,RFuture 继承了 java.util.concurrent.Future, CompletionStage两大接口,异步回调模式的基础知识,请参见 《Java高并发核心编程 卷2 》

tryLock方法 调用了tryAcquire()方法,核心逻辑在tryAcquire()方法

tryAcquire()方法

在RedissonLock对象的lock()方法主要调用tryAcquire()方法

img

tryLockInnerAsync

private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, long threadId) {if (leaseTime != -1L) {//进行异步获取锁return this.tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);} else {//尝试异步获取锁, 获取锁成功返回空, 否则返回锁剩余过期时间RFuture<Long> ttlRemainingFuture = this.tryLockInnerAsync(this.commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);//ttlRemainingFuture 执行完成后触发此操作ttlRemainingFuture.onComplete((ttlRemaining, e) -> {if (e == null) {//ttlRemaining == null 代表获取了锁//获取到锁后执行续时操作if (ttlRemaining == null) {this.scheduleExpirationRenewal(threadId);}}});return ttlRemainingFuture;}
}

由于leaseTime == -1,于是走tryLockInnerAsync()方法,这个方法才是关键

img

首先,看一下evalWriteAsync方法的定义

<T, R> RFuture<R> evalWriteAsync(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params);

这和前面的jedis调用lua脚本类似,最后两个参数分别是keys和params。

单独将调用的那一段摘出来看,实际调用是这样的:

commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,"if (redis.call('exists', KEYS[1]) == 0) then " +"redis.call('hset', KEYS[1], ARGV[2], 1); " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return nil; " +"end; " +"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return nil; " +"end; " +"return redis.call('pttl', KEYS[1]);",Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));

结合上面的参数声明,我们可以知道,这里KEYS[1]就是getName(),ARGV[2]是getLockName(threadId)

假设:

  • 前面获取锁时传的name是“DISLOCK”,
  • 假设调用的线程ID是1,
  • 假设成员变量UUID类型的id是01a6d806-d282-4715-9bec-f51b9aa98110

那么KEYS[1]=DISLOCK,ARGV[2]=01a6d806-d282-4715-9bec-f51b9aa98110:1

因此,这段脚本的意思是

  1、判断有没有一个叫“DISLOCK”的key

  2、如果没有,则在其下设置一个字段为“01a6d806-d282-4715-9bec-f51b9aa98110:1”,值为“1”的键值对 ,并设置它的过期时间

  3、如果存在,则进一步判断“01a6d806-d282-4715-9bec-f51b9aa98110:1”是否存在,若存在,则其值加1,并重新设置过期时间

  4、返回“DISLOCK”的生存时间(毫秒)

原理:加锁机制

这里用的数据结构是hash,hash的结构是: key 字段1 值1 字段2 值2 。。。

用在锁这个场景下,key就表示锁的名称,也可以理解为临界资源,字段就表示当前获得锁的线程

所有竞争这把锁的线程都要判断在这个key下有没有自己线程的字段,如果没有则不能获得锁,如果有,则相当于重入,字段值加1(次数)

在这里插入图片描述

Lua脚本的详解

为何要使用lua语言?

因为一大堆复杂的业务逻辑,可以通过封装在lua脚本中发送给redis,保证这段复杂业务逻辑执行的原子性

在这里插入图片描述

回顾一下evalWriteAsync方法的定义

<T, R> RFuture<R> evalWriteAsync(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params);

注意,其最后两个参数分别是keys和params。

关于 lua脚本的参数解释:

KEYS[1] 代表的是你加锁的那个key,比如说:

RLock lock = redisson.getLock("DISLOCK");

这里你自己设置了加锁的那个锁key就是“DISLOCK”。

ARGV[1] 代表的就是锁key的默认生存时间

调用的时候,传递的参数为 internalLockLeaseTime ,该值默认30秒。

ARGV[2] 代表的是加锁的客户端的ID,类似于下面这样:

01a6d806-d282-4715-9bec-f51b9aa98110:1

lua脚本的第一段if判断语句,就是用“exists DISLOCK”命令判断一下,如果你要加锁的那个锁key不存在的话,你就进行加锁。

如何加锁呢?很简单,用下面的redis命令:

hset DISLOCK 01a6d806-d282-4715-9bec-f51b9aa98110:1 1

通过这个命令设置一个hash数据结构,这行命令执行后,会出现一个类似下面的数据结构:

DISLOCK:{8743c9c0-0795-4907-87fd-6c719a6b4586:1 1}

接着会执行“pexpire DISLOCK 30000”命令,设置DISLOCK这个锁key的生存时间是30秒(默认)

锁互斥机制

那么在这个时候,如果客户端2来尝试加锁,执行了同样的一段lua脚本,会咋样呢?

很简单,第一个if判断会执行“exists DISLOCK”,发现DISLOCK 这个锁key已经存在了。

接着第二个if判断,判断一下,DISLOCK锁key的hash数据结构中,是否包含客户端2的ID,但是明显不是的,因为那里包含的是客户端1的ID。

所以,客户端2会获取到pttl DISLOCK返回的一个数字,这个数字代表了DISLOCK 这个锁key的剩余生存时间。 比如还剩15000毫秒的生存时间。

此时客户端2会进入一个while循环,不停的尝试加锁。

可重入加锁机制

如果客户端1都已经持有了这把锁了,结果可重入的加锁会怎么样呢?

RLock lock = redisson.getLock("DISLOCK")
lock.lock();
//业务代码
lock.lock();
//业务代码
lock.unlock();
lock.unlock();

分析上面那段lua脚本。

第一个if判断肯定不成立,“exists DISLOCK”会显示锁key已经存在了。

第二个if判断会成立,因为DISLOCK的hash数据结构中包含的那个ID,就是客户端1的那个ID,也就是“8743c9c0-0795-4907-87fd-6c719a6b4586:1”

此时就会执行可重入加锁的逻辑,他会用:

incrby DISLOCK

8743c9c0-0795-4907-87fd-6c719a6b4586:1 1

通过这个命令,对客户端1的加锁次数,累加1。

此时DISLOCK数据结构变为下面这样:

DISLOCK:{8743c9c0-0795-4907-87fd-6c719a6b4586:1 2}

释放锁机制

如果执行lock.unlock(),就可以释放分布式锁,此时的业务逻辑也是非常简单的。

其实说白了,就是每次都对DISLOCK数据结构中的那个加锁次数减1。

如果发现加锁次数是0了,说明这个客户端已经不再持有锁了,此时就会用:

“del DISLOCK”命令,从redis里删除这个key。

然后呢,另外的客户端2就可以尝试完成加锁了。

unlock 源码

  @Overridepublic void unlock() {try {get(unlockAsync(Thread.currentThread().getId()));} catch (RedisException e) {if (e.getCause() instanceof IllegalMonitorStateException) {throw (IllegalMonitorStateException) e.getCause();} else {throw e;}}//        Future<Void> future = unlockAsync();
//        future.awaitUninterruptibly();
//        if (future.isSuccess()) {
//            return;
//        }
//        if (future.cause() instanceof IllegalMonitorStateException) {
//            throw (IllegalMonitorStateException)future.cause();
//        }
//        throw commandExecutor.convertException(future);}

再深入一下,实际调用的是unlockInnerAsync方法

unlockInnerAsync方法

在这里插入图片描述

原理:Redision 解锁机制

上图没有截取完整,完整的源码如下:

    protected RFuture<Boolean> unlockInnerAsync(long threadId) {return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +"return nil;" +"end; " +"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +"if (counter > 0) then " +"redis.call('pexpire', KEYS[1], ARGV[2]); " +"return 0; " +"else " +"redis.call('del', KEYS[1]); " +"redis.call('publish', KEYS[2], ARGV[1]); " +"return 1; "+"end; " +"return nil;",Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));}

我们还是假设name=DISLOCK,假设线程ID是1

同理,我们可以知道

KEYS[1]是getName(),即KEYS[1]=DISLOCK

KEYS[2]是getChannelName(),即KEYS[2]=redisson_lock__channel:{DISLOCK}

ARGV[1]是LockPubSub.unlockMessage,即ARGV[1]=0

ARGV[2]是生存时间

ARGV[3]是getLockName(threadId),即ARGV[3]=8743c9c0-0795-4907-87fd-6c719a6b4586:1

因此,上面脚本的意思是:

  1、判断是否存在一个叫“DISLOCK”的key

  2、如果不存在,返回nil

  3、如果存在,使用Redis Hincrby 命令用于为哈希表中的字段值加上指定增量值 -1 ,代表减去1

  4、若counter >,返回空,若字段存在,则字段值减1

  5、若减完以后,counter > 0 值仍大于0,则返回0

  6、减完后,若字段值小于或等于0,则用 publish 命令广播一条消息,广播内容是0,并返回1;

可以猜测,广播0表示资源可用,即通知那些等待获取锁的线程现在可以获得锁了

在这里插入图片描述

通过redis Channel 解锁订阅

以上是正常情况下获取到锁的情况,那么当无法立即获取到锁的时候怎么办呢?

再回到前面获取锁的位置

@Override
public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {long threadId = Thread.currentThread().getId();Long ttl = tryAcquire(leaseTime, unit, threadId);// lock acquiredif (ttl == null) {return;}//    订阅RFuture<RedissonLockEntry> future = subscribe(threadId);commandExecutor.syncSubscription(future);try {while (true) {ttl = tryAcquire(leaseTime, unit, threadId);// lock acquiredif (ttl == null) {break;}// waiting for messageif (ttl >= 0) {getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);} else {getEntry(threadId).getLatch().acquire();}}} finally {unsubscribe(future, threadId);}
//        get(lockAsync(leaseTime, unit));
}protected static final LockPubSub PUBSUB = new LockPubSub();protected RFuture<RedissonLockEntry> subscribe(long threadId) {return PUBSUB.subscribe(getEntryName(), getChannelName(), commandExecutor.getConnectionManager().getSubscribeService());
}protected void unsubscribe(RFuture<RedissonLockEntry> future, long threadId) {PUBSUB.unsubscribe(future.getNow(), getEntryName(), getChannelName(), commandExecutor.getConnectionManager().getSubscribeService());
}

这里会订阅Channel,当资源可用时可以及时知道,并抢占,防止无效的轮询而浪费资源

这里的channel为:

redisson_lock__channel:

在这里插入图片描述

在这里插入图片描述

当资源可用用的时候,循环去尝试获取锁,由于多个线程同时去竞争资源,所以这里用了信号量,对于同一个资源只允许一个线程获得锁,其它的线程阻塞

这点,有点儿类似 Zookeeper分布式锁:

有关zookeeper分布式锁的原理和实现,具体请参见下面的博客:
Zookeeper 分布式锁 (图解+秒懂+史上最全)

watch dog自动延期机制

客户端1加锁的锁key默认生存时间才30秒,如果超过了30秒,客户端1还想一直持有这把锁,怎么办呢?

简单!只要客户端1一旦加锁成功,就会启动一个watch dog看门狗,他是一个后台线程,会每隔10秒检查一下,如果客户端1还持有锁key,那么就会不断的延长锁key的生存时间。

使用watchDog机制实现锁的续期

但是聪明的同学肯定会问:

有效时间设置多长,假如我的业务操作比有效时间长,我的业务代码还没执行完,就自动给我解锁了,不就完蛋了吗。

这个问题就有点棘手了,在网上也有很多讨论:

第一种解决方法就是靠程序员自己去把握,预估一下业务代码需要执行的时间,然后设置有效期时间比执行时间长一些,保证不会因为自动解锁影响到客户端业务代码的执行。

但是这并不是万全之策,比如网络抖动这种情况是无法预测的,也有可能导致业务代码执行的时间变长,所以并不安全。

第二种方法,使用监事狗watchDog机制实现锁的续期。

第二种方法比较靠谱一点,而且无业务入侵。

在Redisson框架实现分布式锁的思路,就使用watchDog机制实现锁的续期。

当加锁成功后,同时开启守护线程,默认有效期是30秒,每隔10秒就会给锁续期到30秒,只要持有锁的客户端没有宕机,就能保证一直持有锁,直到业务代码执行完毕由客户端自己解锁,如果宕机了自然就在有效期失效后自动解锁。

这里,和前面解决 JVM STW的锁过期问题有点类似,只不过,watchDog自动续期,也没有完全解决JVM STW的锁过期问题。

如何彻底解决 JVM STW的锁过期问题,可以来疯狂创客圈的社群讨论。

redisson watchdog 使用和原理

实际上,redisson加锁的基本流程图如下:

在这里插入图片描述

这里专注于介绍watchdog。

首先watchdog的具体思路是 加锁时,默认加锁 30秒,每10秒钟检查一次,如果存在就重新设置 过期时间为30秒。

然后设置默认加锁时间的参数是 lockWatchdogTimeout(监控锁的看门狗超时,单位:毫秒)

官方文档描述如下

lockWatchdogTimeout(监控锁的看门狗超时,单位:毫秒)

默认值:30000

监控锁的看门狗超时时间单位为毫秒。该参数只适用于分布式锁的加锁请求中未明确使用leaseTimeout参数的情况。如果该看门狗未使用lockWatchdogTimeout去重新调整一个分布式锁的lockWatchdogTimeout超时,那么这个锁将变为失效状态。这个参数可以用来避免由Redisson客户端节点宕机或其他原因造成死锁的情况。

需要注意的是

1.watchDog 只有在未显示指定加锁时间时才会生效。(这点很重要)

2.lockWatchdogTimeout设定的时间不要太小 ,比如我之前设置的是 100毫秒,由于网络直接导致加锁完后,watchdog去延期时,这个key在redis中已经被删除了。

tryAcquireAsync原理

在调用lock方法时,会最终调用到tryAcquireAsync。详细解释如下:

private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {//如果指定了加锁时间,会直接去加锁if (leaseTime != -1) {return tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);}//没有指定加锁时间 会先进行加锁,并且默认时间就是 LockWatchdogTimeout的时间//这个是异步操作 返回RFuture 类似netty中的futureRFuture<Long> ttlRemainingFuture = tryLockInnerAsync(waitTime,commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(),TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);//这里也是类似netty Future 的addListener,在future内容执行完成后执行ttlRemainingFuture.onComplete((ttlRemaining, e) -> {if (e != null) {return;}// lock acquiredif (ttlRemaining == null) {//这里是定时执行 当前锁自动延期的动作scheduleExpirationRenewal(threadId);}});return ttlRemainingFuture;}

scheduleExpirationRenewal 中会调用renewExpiration。

scheduleExpirationRenewal() 跳进去看:

private void scheduleExpirationRenewal(long threadId) {RedissonLock.ExpirationEntry entry = new RedissonLock.ExpirationEntry();RedissonLock.ExpirationEntry oldEntry = (RedissonLock.ExpirationEntry)EXPIRATION_RENEWAL_MAP.putIfAbsent(this.getEntryName(), entry);if (oldEntry != null) {oldEntry.addThreadId(threadId);} else {entry.addThreadId(threadId);this.renewExpiration();}}

接着进去 renewExpiration() 方法看:

renewExpiration执行延期动作

这里我们可以看到是 启用了一个timeout定时,去执行延期动作

 private void renewExpiration() {//从容器中去获取要被续期的锁RedissonLock.ExpirationEntry ee = (RedissonLock.ExpirationEntry)EXPIRATION_RENEWAL_MAP.get(this.getEntryName());//容器中没有要续期的锁,直接返回nullif (ee != null) {//创建定时任务//并且执行的时间为 30000/3 毫秒,也就是 10 秒后Timeout task = this.commandExecutor.getConnectionManager().newTimeout(new TimerTask() {public void run(Timeout timeout) throws Exception {//从容器中取出线程RedissonLock.ExpirationEntry ent = (RedissonLock.ExpirationEntry)RedissonLock.EXPIRATION_RENEWAL_MAP.get(RedissonLock.this.getEntryName());if (ent != null) {Long threadId = ent.getFirstThreadId();if (threadId != null) {//Redis进行锁续期//这个方法的作用其实底层也是去执行LUA脚本RFuture<Boolean> future = RedissonLock.this.renewExpirationAsync(threadId);//同理去处理Redis续命结果future.onComplete((res, e) -> {if (e != null) {RedissonLock.log.error("Can't update lock " + RedissonLock.this.getName() + " expiration", e);} else {//如果成功续期,递归继续创建下一个 10S 后的任务if (res) {//递归继续创建下一个10S后的任务RedissonLock.this.renewExpiration();}}});}}}}, this.internalLockLeaseTime / 3L, TimeUnit.MILLISECONDS);ee.setTimeout(task);}
}

从这里我们就知道,获取锁成功就会开启一个定时任务,也就是 watchdog 看门狗,定时任务会定期检查去续期renewExpirationAsync(threadId)。

从这里我们明白,该定时调度每次调用的时间差是 internalLockLeaseTime / 3,也就是 10 秒。

最终 scheduleExpirationRenewal会调用到 renewExpirationAsync,

renewExpirationAsync

执行下面这段 lua脚本。他主要判断就是 这个锁是否在redis中存在,如果存在就进行 pexpire 延期。

   protected RFuture<Boolean> renewExpirationAsync(long threadId) {return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return 1; " +"end; " +"return 0;",Collections.singletonList(getName()),internalLockLeaseTime, getLockName(threadId));}

watchLog总结

1.要使 watchLog机制生效 ,lock时 不要设置 过期时间

2.watchlog的延时时间 可以由 lockWatchdogTimeout指定默认延时时间,但是不要设置太小。如100

3.watchdog 会每 lockWatchdogTimeout/3时间,去延时。

4.watchdog 通过 类似netty的 Future功能来实现异步延时

5.watchdog 最终还是通过 lua脚本来进行延时

总结:Redis分布式锁过期怎么办?

尼恩先简单的总结一下,两大核心方案,大家收藏起来,毒打面试官。

在这里插入图片描述

第一种方案是入侵性比较强,在代码里边需要进行版本的检查。

第一种方案是入侵性比较弱,建议使用第二种。

如果使用第二种方案,就是设计一个watch dog 看门狗后台线程, 最好是能够定时调度的线程。

只要客户端一旦加锁成功,watch dog 看门狗后台线程添加一个定时任务,会每隔 10 秒检查一下,如果客户端还持有锁 key,那么就会不断的延长锁 key 的过期时间。

并且再一次创建一个续期的定时任务,为下一次续期做准备。

默认情况下,加锁的时间是 30 秒,.如果加锁的业务没有执行完,就会进行一次续期,把锁重置成 30 秒,万一业务的机器宕机了,那就续期不了,30 秒之后锁就解开了。

说在最后:有问题找老架构取经‍

通过对Redis 锁过期的深度回答,可以充分展示一下大家雄厚的 “技术肌肉”,让面试官爱到 “不能自已、口水直流”,然后实现”offer直提”。

在面试之前,建议大家系统化的刷一波 5000页《尼恩Java面试宝典PDF》,里边有大量的大厂真题、面试难题、架构难题。

很多小伙伴刷完后, 吊打面试官, 大厂横着走。

在刷题过程中,如果有啥问题,大家可以来 找 40岁老架构师尼恩交流。

另外,如果没有面试机会,可以找尼恩来改简历、做帮扶。

遇到职业难题,找老架构取经, 可以省去太多的折腾,省去太多的弯路。

尼恩指导了大量的小伙伴上岸,前段时间,刚指导一个40岁+被裁小伙伴,拿到了一个年薪100W的offer。

狠狠卷,实现 “offer自由” 很容易的, 前段时间一个武汉的跟着尼恩卷了2年的小伙伴, 在极度严寒/痛苦被裁的环境下, offer拿到手软, 实现真正的 “offer自由” 。

技术自由的实现路径:

实现你的 架构自由:

《吃透8图1模板,人人可以做架构》

《10Wqps评论中台,如何架构?B站是这么做的!!!》

《阿里二面:千万级、亿级数据,如何性能优化? 教科书级 答案来了》

《峰值21WQps、亿级DAU,小游戏《羊了个羊》是怎么架构的?》

《100亿级订单怎么调度,来一个大厂的极品方案》

《2个大厂 100亿级 超大流量 红包 架构方案》

… 更多架构文章,正在添加中

实现你的 响应式 自由:

《响应式圣经:10W字,实现Spring响应式编程自由》

这是老版本 《Flux、Mono、Reactor 实战(史上最全)》

实现你的 spring cloud 自由:

《Spring cloud Alibaba 学习圣经》 PDF

《分库分表 Sharding-JDBC 底层原理、核心实战(史上最全)》

《一文搞定:SpringBoot、SLF4j、Log4j、Logback、Netty之间混乱关系(史上最全)》

实现你的 linux 自由:

《Linux命令大全:2W多字,一次实现Linux自由》

实现你的 网络 自由:

《TCP协议详解 (史上最全)》

《网络三张表:ARP表, MAC表, 路由表,实现你的网络自由!!》

实现你的 分布式锁 自由:

《Redis分布式锁(图解 - 秒懂 - 史上最全)》

《Zookeeper 分布式锁 - 图解 - 秒懂》

实现你的 王者组件 自由:

《队列之王: Disruptor 原理、架构、源码 一文穿透》

《缓存之王:Caffeine 源码、架构、原理(史上最全,10W字 超级长文)》

《缓存之王:Caffeine 的使用(史上最全)》

《Java Agent 探针、字节码增强 ByteBuddy(史上最全)》

实现你的 面试题 自由:

4800页《尼恩Java面试宝典 》 40个专题

免费获取11个技术圣经PDF:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.ryyt.cn/news/59852.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈,一经查实,立即删除!

相关文章

数论 莫比乌斯反演

数论 莫比乌斯反演讲解前置需求 数论分块 概念 对于一个形如 \(\sum_{x=1}^n \lfloor{\frac{n}{x}}\rfloor\) 的式子,我们发现对于一部分的 \(x\),它们的 \(\lfloor{\frac{n}{x}}\rfloor\) 值相同,因此我们没必要 \(\mathcal{O(n)}\) 计算,可以采用数论分块的办法将这一步的…

Prometheus修改默认数据存储时间

Prometheus修改默认数据存储时间 Prometheus 的数据存储时间是通过命令行参数 --storage.tsdb.retention.time 来设置的。这个参数指定了 Prometheus 在本地存储中保留数据的时间长度,默认为15天。如果需要延长/缩短数据保留时间,可以将该参数的值设置为更大/更小的时间值。 …

解决rabbitmq队列超时timeout问题【win环境】

解决rabbitmq队列超时timeout问题【win环境】 1.安装RabbitMQ-Plugins cd C:\Program Files\RabbitMQ Server\rabbitmq_server-3.11.3\sbin rabbitmq-plugins enable rabbitmq_management浏览器打开http://localhost:15672来访问web端的管理界面,用户名:guest,密码:guest进…

低代码 + BI 数字化转型如何助力制造业供应链协同?

引言 在当今快速变化的商业环境中,制造业面临着前所未有的挑战和机遇。全球化竞争、消费者需求的快速变化、技术创新的加速以及不断增加的成本压力,都要求制造企业不断提高其供应链的效率和灵活性。供应链协同作为一种先进的管理理念和实践,正在成为制造业实现数字化转型、提…

10 Windows批处理之调用例程和bat文件

在前文中,我介绍了标签和非顺序执行,这两者在本文中也起着重要作用。我将很快介绍一个已经讨论过的命令的新变化,允许您创建和调用由标签定义的例程。不是简单地在标签之后将控制权交给代码,而是在例程执行后将控制权返回到调用它的位置。在编写更复杂、更有趣的bat文件时,…

pbootcms后台的百度普通收录token怎么填写?怎么获得?

要在PbootCMS后台填写百度普通收录token,你需要先获得这个token,然后按照以下步骤填写到后台: 如何获得百度普通收录token访问百度搜索资源平台打开百度搜索资源平台的网址:https://ziyuan.baidu.com/ 如果你还没有账号,请先注册一个百度账号。添加站点登录后,进入用户中…

PbootCMS实现后台编辑器微信公众号图片本地化

为了实现PbootCMS后台编辑器将微信公众号上的图片本地化,可以按照你提供的方法进行修改。以下是详细的步骤和修改内容: 1. 修改 coreextendueditor/phpction_crawler.php 文件 原始代码/* 抓取远程图片 */ $list = array(); if (isset($_POST[$fieldName])) {$source = $_POS…

市场主体登记

市场主体登记服务 4.0tomcat部署与安装[root@VM-4-9-centos ~]# mkdir /data/soft/ [root@VM-4-9-centos ~]# tar -zxvf apache-tomcat-9.0.68.tar.gz[root@top164 ftpdir]# tar -zxvf apache-tomcat-9.0.68.tar.gz [root@top164 soft]# cp -a cp -a apache-tomcat-9.0.68 apac…