一、前置内容
1.1 单机秒杀实现
基于 Redis 的秒杀单机系统迭代实现(附详细流程图)-CSDN博客
1.2 SETNX实现集群模式下
基于 Redis 的秒杀集群系统迭代实现【SETNX】(附详细流程图)-CSDN博客
1.3 前置流程图 
1.4 前置问题
用SETNX实现的锁,有不可重入,不可重试,锁过期,主从不一致的问题
二、Redisson锁
2.1 介绍
Redisson 是一个基于 Redis 的 Java 客户端,它不仅封装了 Redis 的基础操作,还提供了一套完整的分布式锁实现,解决了原生 Redis 实现分布式锁时的诸多痛点(比如锁自动续期、死锁、释放别人的锁等问题)。
2.2 特点
可重入锁:支持线程重入(同一个线程多次获取同一把锁不会死锁),符合 Java 中 ReentrantLock 的特性。
- 自动续期(看门狗机制):这是 Redisson 锁最核心的特性。当持有锁的线程因为业务逻辑执行时间过长,超过了锁的过期时间时,Redisson 会自动为锁续期(默认每 30 秒续期一次,锁默认过期时间 30 秒),避免锁提前释放导致并发问题。
2.3 使用
@Configuration
public class RedissonConfig {
@Bean
public RedissonClient redissonClient(){
// 配置
Config config = new Config();
config.useSingleServer().setAddress("地址")
.setPassword("密码");
// 创建RedissonClient对象
return Redisson.create(config);
}
}
@Override
@Transactional
public Long seckillVoucher(Long voucherId) {
// 1.查询优惠券
SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
// 2.判断秒杀是否开始
if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {
// 尚未开始
throw new BaseException(ExceptionConstants.NOT_START);
}
// 3.判断秒杀是否已经结束
if (voucher.getEndTime().isBefore(LocalDateTime.now())) {
// 尚未开始
throw new BaseException(ExceptionConstants.IS_END);
}
// 4.判断库存是否充足
if (voucher.getStock() < 1) {
// 库存不足
throw new BaseException(ExceptionConstants.NOT_ENOUGH);
}
Long userId = UserHolder.getUser().getId();
String key = RedisConstants.getLockOrderKey(userId);
RLock lock = redissonClient.getLock(key);
boolean isLock = lock.tryLock();
if (!isLock) {
// 获取锁失败,返回错误或重试
throw new BaseException(ExceptionConstants.EXIST_BUY);
}
try {
IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
return proxy.createVoucherOrder(voucherId);
} catch (IllegalStateException e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
}
}
这里的锁从原来自己实现变成了使用现成的Api
三、使用Redisson锁解决不可重入
- 可重入锁:支持线程重入(同一个线程多次获取同一把锁不会死锁),符合 Java 中 ReentrantLock 的特性。
3.1 可重入锁和不可重入锁

3.2 Redisson可重入锁实现原理
原来的锁键是用户,锁是线程
Redisson锁在redis中存储的是hash结构,大键是用户,小键是线程,值是锁的重复次数

3.3 流程图

四、可重入源码
4.1 加锁
进入RedissonLock.tryLock(),此类实现了Rlock接口
public boolean tryLock() {
return (Boolean)this.get(this.tryLockAsync());
}
public RFuture<Boolean> tryLockAsync() {
return this.tryLockAsync(Thread.currentThread().getId());
}
public RFuture<Boolean> tryLockAsync(long threadId) {
return this.tryAcquireOnceAsync(-1L, -1L, (TimeUnit)null, threadId);
}
private RFuture<Boolean> tryAcquireOnceAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
if (leaseTime != -1L) {
return this.<Boolean>tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
} else {
RFuture<Boolean> ttlRemainingFuture = this.<Boolean>tryLockInnerAsync(waitTime, this.commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
if (e == null) {
if (ttlRemaining) {
this.scheduleExpirationRenewal(threadId);
}
}
});
return ttlRemainingFuture;
}
}
- 同步封装异步:tryLock() 是同步入口,本质是阻塞调用异步的 tryLockAsync() 获取结果。
- 参数默认值:默认调用时 waitTime=-1(不等待,直接抢锁)、leaseTime=-1(不设锁等待时间)。
- 调用tryLockInnerAsync
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
this.internalLockLeaseTime = unit.toMillis(leaseTime);
return this.<T>evalWriteAsync(this.getName(), LongCodec.INSTANCE, command,
"if (redis.call('exists', KEYS[1]) == 0) then"+
" redis.call('hincrby', KEYS[1], ARGV[2], 1); "+
"redis.call('pexpire', KEYS[1], ARGV[1]);"+
"return nil;"+
"end;"+
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then "+
"redis.call('hincrby', KEYS[1], ARGV[2], 1); "+
"redis.call('pexpire', KEYS[1], ARGV[1]); "+
"return nil; "+
"end; "+
"return redis.call('pttl', KEYS[1]);",
Collections.singletonList(this.getName()), this.internalLockLeaseTime, this.getLockName(threadId));
}
4.2 释放锁
public void unlock() {
try {
this.get(this.unlockAsync(Thread.currentThread().getId()));
} catch (RedisException e) {
if (e.getCause() instanceof IllegalMonitorStateException) {
throw (IllegalMonitorStateException)e.getCause();
} else {
throw e;
}
}
}
public RFuture<Void> unlockAsync(long threadId) {
RPromise<Void> result = new RedissonPromise();
RFuture<Boolean> future = this.unlockInnerAsync(threadId);
future.onComplete((opStatus, e) -> {
this.cancelExpirationRenewal(threadId);
if (e != null) {
result.tryFailure(e);
} else if (opStatus == null) {
IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: " + this.id + " thread-id: " + threadId);
result.tryFailure(cause);
} else {
result.trySuccess((Object)null);
}
});
return result;
}
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return this.<Boolean>evalWriteAsync(this.getName(), LongCodec.INSTANCE,
RedisCommands.EVAL_BOOLEAN, "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then
return nil;end; local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); if (counter
> 0) then redis.call('pexpire', KEYS[1], ARGV[2]); return 0; else redis.call('del',
KEYS[1]); redis.call('publish', KEYS[2], ARGV[1]); return 1; end; return nil;",
Arrays.asList(this.getName(), this.getChannelName()), LockPubSub.UNLOCK_MESSAGE,
this.internalLockLeaseTime, this.getLockName(threadId));
}

五、使用Redisson锁解决不可重试
5.1 加锁
public boolean tryLock(long waitTime, TimeUnit unit) throws InterruptedException {
return this.tryLock(waitTime, -1L, unit);
}
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
long time = unit.toMillis(waitTime);
long current = System.currentTimeMillis();
long threadId = Thread.currentThread().getId();
Long ttl = this.tryAcquire(waitTime, leaseTime, unit, threadId);
if (ttl == null) {
return true;
} else {
time -= System.currentTimeMillis() – current;
if (time <= 0L) {
this.acquireFailed(waitTime, unit, threadId);
return false;
} else {
current = System.currentTimeMillis();
RFuture<RedissonLockEntry> subscribeFuture = this.subscribe(threadId);
if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) {
if (!subscribeFuture.cancel(false)) {
subscribeFuture.onComplete((res, e) -> {
if (e == null) {
this.unsubscribe(subscribeFuture, threadId);
}
});
}
this.acquireFailed(waitTime, unit, threadId);
return false;
} else {
boolean var14;
try {
time -= System.currentTimeMillis() – current;
if (time > 0L) {
do {
long currentTime = System.currentTimeMillis();
ttl = this.tryAcquire(waitTime, leaseTime, unit, threadId);
if (ttl == null) {
boolean var28 = true;
return var28;
}
time -= System.currentTimeMillis() – currentTime;
if (time <= 0L) {
this.acquireFailed(waitTime, unit, threadId);
boolean var27 = false;
return var27;
}
currentTime = System.currentTimeMillis();
if (ttl >= 0L && ttl < time) {
((RedissonLockEntry)subscribeFuture.getNow()).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
((RedissonLockEntry)subscribeFuture.getNow()).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
}
time -= System.currentTimeMillis() – currentTime;
} while(time > 0L);
this.acquireFailed(waitTime, unit, threadId);
boolean var16 = false;
return var16;
}
this.acquireFailed(waitTime, unit, threadId);
var14 = false;
} finally {
this.unsubscribe(subscribeFuture, threadId);
}
return var14;
}
}
}
}
private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
return (Long)this.get(this.tryAcquireAsync(waitTime, leaseTime, unit, threadId));
}
首次尝试先调用 tryAcquire 尝试获取锁。如果成功(返回 null),直接返回 true。
订阅锁释放事件如果首次尝试失败,就通过 subscribe 订阅该锁的释放通知,避免无效的自旋等待。
循环重试进入 do-while 循环,只要还有剩余等待时间,就会:
- 再次调用 tryAcquire 尝试获取锁
- 若成功则返回 true
- 若失败,就等待锁释放通知或剩余时间,然后继续循环
超时退出如果等待时间耗尽仍未获取到锁,就调用 acquireFailed 并返回 false。
资源清理无论成功还是失败,最终都会在 finally 块中取消订阅,避免资源泄漏。

六、看门狗策略
6.1 加锁
private RFuture<Boolean> tryAcquireOnceAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
if (leaseTime != -1L) {
return this.<Boolean>tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
} else {
RFuture<Boolean> ttlRemainingFuture = this.<Boolean>tryLockInnerAsync(waitTime, this.commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
if (e == null) {
if (ttlRemaining) {
this.scheduleExpirationRenewal(threadId);
}
}
});
return ttlRemainingFuture;
}
}
private void scheduleExpirationRenewal(long threadId) {
ExpirationEntry entry = new ExpirationEntry();
ExpirationEntry oldEntry = (ExpirationEntry)EXPIRATION_RENEWAL_MAP.putIfAbsent(this.getEntryName(), entry);
if (oldEntry != null) {
oldEntry.addThreadId(threadId);
} else {
entry.addThreadId(threadId);
this.renewExpiration();
}
}
private void renewExpiration() {
ExpirationEntry ee = (ExpirationEntry)EXPIRATION_RENEWAL_MAP.get(this.getEntryName());
if (ee != null) {
Timeout task = this.commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
public void run(Timeout timeout) throws Exception {
ExpirationEntry ent = (ExpirationEntry)RedissonLock.EXPIRATION_RENEWAL_MAP.get(RedissonLock.this.getEntryName());
if (ent != null) {
Long threadId = ent.getFirstThreadId();
if (threadId != null) {
RFuture<Boolean> future = RedissonLock.this.renewExpirationAsync(threadId);
future.onComplete((res, e) -> {
if (e != null) {
RedissonLock.log.error("Can't update lock " + RedissonLock.this.getName() + " expiration", e);
} else {
if (res) {
RedissonLock.this.renewExpiration();
}
}
});
}
}
}
}, this.internalLockLeaseTime / 3L, TimeUnit.MILLISECONDS);
ee.setTimeout(task);
}
}
protected RFuture<Boolean> renewExpirationAsync(long threadId) {
return this.<Boolean>evalWriteAsync(this.getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('pexpire', KEYS[1], ARGV[1]); return 1; end; return 0;", Collections.singletonList(this.getName()), this.internalLockLeaseTime, this.getLockName(threadId));
}
开启续命当调用 tryLock 且未指定 leaseTime(即 leaseTime = -1)时,锁获取成功后会触发 scheduleExpirationRenewal,开启看门狗。
定时任务触发renewExpiration 方法会创建一个定时任务,默认在锁过期时间的 1/3 时(比如锁默认 30 秒过期,任务就会在 10 秒后执行),去检查锁是否还被当前线程持有。
续命操作定时任务会调用 renewExpirationAsync,通过 Lua 脚本去 Redis 中更新锁的过期时间,完成 “续命”。
循环续命只要锁还被持有,每次续命成功后,就会再次调度下一个定时任务,如此循环,直到锁被主动释放或线程崩溃。

当我们不指定锁过期时间,就会执行看门狗策略,会一直帮我们续期
思考:为什么不干脆设定不过期,而是要采用定时任务不断地续期?
- 满足长任务需求:有些业务操作(比如批量处理数据、复杂的数据库事务)执行时间不确定,可能超过锁的初始过期时间(默认 30 秒),如果锁提前过期,会导致多个客户端同时持有锁,破坏分布式锁的互斥性。
- 保证锁最终可释放:即使客户端异常崩溃,看门狗的续期线程也会随之终止,锁会在初始过期时间(或最后一次续期后的 30 秒)后自动过期,避免死锁
6.2 释放锁
public RFuture<Void> unlockAsync(long threadId) {
RPromise<Void> result = new RedissonPromise();
RFuture<Boolean> future = this.unlockInnerAsync(threadId);
future.onComplete((opStatus, e) -> {
this.cancelExpirationRenewal(threadId);
if (e != null) {
result.tryFailure(e);
} else if (opStatus == null) {
IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: " + this.id + " thread-id: " + threadId);
result.tryFailure(cause);
} else {
result.trySuccess((Object)null);
}
});
return result;
}
void cancelExpirationRenewal(Long threadId) {
ExpirationEntry task = (ExpirationEntry)EXPIRATION_RENEWAL_MAP.get(this.getEntryName());
if (task != null) {
if (threadId != null) {
task.removeThreadId(threadId);
}
if (threadId == null || task.hasNoThreads()) {
Timeout timeout = task.getTimeout();
if (timeout != null) {
timeout.cancel();
}
EXPIRATION_RENEWAL_MAP.remove(this.getEntryName());
}
}
}
-
解锁时主动关闭在 unlockAsync 方法中,解锁操作完成后会立刻调用 cancelExpirationRenewal,主动关闭看门狗的续命任务。
-
线程安全的清理
- cancelExpirationRenewal 会先从 EXPIRATION_RENEWAL_MAP 中获取当前锁对应的看门狗任务(ExpirationEntry)。
- 然后从任务中移除当前线程 ID,这是为了处理锁可重入的场景,只有当所有持有该锁的线程都解锁后,才会真正停止看门狗。
-
停止定时任务
- 当任务中没有任何线程持有锁时(task.hasNoThreads()),会取消看门狗的定时任务(timeout.cancel())。
- 最后从 EXPIRATION_RENEWAL_MAP 中移除该任务,彻底清理资源。

定时任务会在解锁的时候取消掉,不会一直执行
七、Redisson锁实现主从不一致问题
redission提出来了MutiLock锁,使用这把锁咱们就不使用主从了,每个节点的地位都是一样的, 这把锁加锁的逻辑需要写入到每一个主丛节点上,只有所有的服务器都写入成功,此时才是加锁成功,假设现在某个节点挂了,那么他去获得锁的时候,只要有一个节点拿不到,都不能算是加锁成功,就保证了加锁的可靠性。
7.1 代码实现
@Configuration
public class RedissonConfig {
@Bean
public RedissonClient redissonClient(){
// 配置
Config config = new Config();
config.useSingleServer().setAddress("地址")
.setPassword("密码");
// 创建RedissonClient对象
return Redisson.create(config);
}
@Bean
public RedissonClient redissonClient2(){
// 配置
Config config = new Config();
config.useSingleServer().setAddress("地址")
.setPassword("密码");
// 创建RedissonClient对象
return Redisson.create(config);
}
@Bean
public RedissonClient redissonClient3(){
// 配置
Config config = new Config();
config.useSingleServer().setAddress("地址")
.setPassword("密码");
// 创建RedissonClient对象
return Redisson.create(config);
}
}
@Override
@Transactional
public Long seckillVoucher(Long voucherId) {
// 1.查询优惠券
SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
// 2.判断秒杀是否开始
if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {
// 尚未开始
throw new BaseException(ExceptionConstants.NOT_START);
}
// 3.判断秒杀是否已经结束
if (voucher.getEndTime().isBefore(LocalDateTime.now())) {
// 尚未开始
throw new BaseException(ExceptionConstants.IS_END);
}
// 4.判断库存是否充足
if (voucher.getStock() < 1) {
// 库存不足
throw new BaseException(ExceptionConstants.NOT_ENOUGH);
}
Long userId = UserHolder.getUser().getId();
String key = RedisConstants.getLockOrderKey(userId);
RLock lock1 = redissonClient.getLock(key);
RLock lock2 = redissonClient2.getLock(key);
RLock lock3 = redissonClient3.getLock(key);
RLock lock = redissonClient.getMultiLock(lock1 , lock2 , lock3);
boolean isLock = lock.tryLock();
if (!isLock) {
// 获取锁失败,返回错误或重试
throw new BaseException(ExceptionConstants.EXIST_BUY);
}
try {
IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
return proxy.createVoucherOrder(voucherId);
} catch (IllegalStateException e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
}
}
7.2 源码
public RLock getMultiLock(RLock… locks) {
return new RedissonMultiLock(locks);
}
final List<RLock> locks = new ArrayList();
public RedissonMultiLock(RLock… locks) {
if (locks.length == 0) {
throw new IllegalArgumentException("Lock objects are not defined");
} else {
this.locks.addAll(Arrays.asList(locks));
}
}
7.3 加锁
public boolean tryLock() {
try {
return this.tryLock(-1L, -1L, (TimeUnit)null);
} catch (InterruptedException var2) {
Thread.currentThread().interrupt();
return false;
}
}
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
long newLeaseTime = -1L;
if (leaseTime != -1L) {
if (waitTime == -1L) {
newLeaseTime = unit.toMillis(leaseTime);
} else {
newLeaseTime = unit.toMillis(waitTime) * 2L;
}
}
long time = System.currentTimeMillis();
long remainTime = -1L;
if (waitTime != -1L) {
remainTime = unit.toMillis(waitTime);
}
long lockWaitTime = this.calcLockWaitTime(remainTime);
int failedLocksLimit = this.failedLocksLimit();
List<RLock> acquiredLocks = new ArrayList(this.locks.size());
ListIterator<RLock> iterator = this.locks.listIterator();
while(iterator.hasNext()) {
RLock lock = (RLock)iterator.next();
boolean lockAcquired;
try {
if (waitTime == -1L && leaseTime == -1L) {
lockAcquired = lock.tryLock();
} else {
long awaitTime = Math.min(lockWaitTime, remainTime);
lockAcquired = lock.tryLock(awaitTime, newLeaseTime, TimeUnit.MILLISECONDS);
}
} catch (RedisResponseTimeoutException var21) {
this.unlockInner(Arrays.asList(lock));
lockAcquired = false;
} catch (Exception var22) {
lockAcquired = false;
}
if (lockAcquired) {
acquiredLocks.add(lock);
} else {
if (this.locks.size() – acquiredLocks.size() == this.failedLocksLimit()) {
break;
}
if (failedLocksLimit == 0) {
this.unlockInner(acquiredLocks);
if (waitTime == -1L) {
return false;
}
failedLocksLimit = this.failedLocksLimit();
acquiredLocks.clear();
while(iterator.hasPrevious()) {
iterator.previous();
}
} else {
–failedLocksLimit;
}
}
if (remainTime != -1L) {
remainTime -= System.currentTimeMillis() – time;
time = System.currentTimeMillis();
if (remainTime <= 0L) {
this.unlockInner(acquiredLocks);
return false;
}
}
}
if (leaseTime != -1L) {
List<RFuture<Boolean>> futures = new ArrayList(acquiredLocks.size());
for(RLock rLock : acquiredLocks) {
RFuture<Boolean> future = ((RedissonLock)rLock).expireAsync(unit.toMillis(leaseTime), TimeUnit.MILLISECONDS);
futures.add(future);
}
for(RFuture<Boolean> rFuture : futures) {
rFuture.syncUninterruptibly();
}
}
return true;
}
7.4 流程图

八、后续内容
目前我们的流程还是串行执行的,性能不是很好,我们在下一篇进行异步优化
网硕互联帮助中心



评论前必须登录!
注册