云计算百科
云计算领域专业知识百科平台

基于 Redis 的秒杀集群系统迭代实现【Redisson源码分析】(附详细流程图)

一、前置内容

1.1 单机秒杀实现

基于 Redis 的秒杀单机系统迭代实现(附详细流程图)-CSDN博客

1.2 SETNX实现集群模式下

基于 Redis 的秒杀集群系统迭代实现【SETNX】(附详细流程图)-CSDN博客

1.3 前置流程图 

1.4 前置问题

用SETNX实现的锁,有不可重入,不可重试,锁过期,主从不一致的问题

二、Redisson锁

2.1 介绍

Redisson 是一个基于 Redis 的 Java 客户端,它不仅封装了 Redis 的基础操作,还提供了一套完整的分布式锁实现,解决了原生 Redis 实现分布式锁时的诸多痛点(比如锁自动续期、死锁、释放别人的锁等问题)。

2.2 特点

可重入锁:支持线程重入(同一个线程多次获取同一把锁不会死锁),符合 Java 中 ReentrantLock 的特性。

  • 自动续期(看门狗机制):这是 Redisson 锁最核心的特性。当持有锁的线程因为业务逻辑执行时间过长,超过了锁的过期时间时,Redisson 会自动为锁续期(默认每 30 秒续期一次,锁默认过期时间 30 秒),避免锁提前释放导致并发问题。

 2.3 使用

@Configuration
public class RedissonConfig {

@Bean
public RedissonClient redissonClient(){
// 配置
Config config = new Config();
config.useSingleServer().setAddress("地址")
.setPassword("密码");
// 创建RedissonClient对象
return Redisson.create(config);
}
}
@Override
@Transactional
public Long seckillVoucher(Long voucherId) {
// 1.查询优惠券
SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
// 2.判断秒杀是否开始
if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {
// 尚未开始
throw new BaseException(ExceptionConstants.NOT_START);
}
// 3.判断秒杀是否已经结束
if (voucher.getEndTime().isBefore(LocalDateTime.now())) {
// 尚未开始
throw new BaseException(ExceptionConstants.IS_END);
}
// 4.判断库存是否充足
if (voucher.getStock() < 1) {
// 库存不足
throw new BaseException(ExceptionConstants.NOT_ENOUGH);
}
Long userId = UserHolder.getUser().getId();
String key = RedisConstants.getLockOrderKey(userId);
RLock lock = redissonClient.getLock(key);
boolean isLock = lock.tryLock();
if (!isLock) {
// 获取锁失败,返回错误或重试
throw new BaseException(ExceptionConstants.EXIST_BUY);
}
try {
IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
return proxy.createVoucherOrder(voucherId);
} catch (IllegalStateException e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
}
}

这里的锁从原来自己实现变成了使用现成的Api

三、使用Redisson锁解决不可重入

  • 可重入锁:支持线程重入(同一个线程多次获取同一把锁不会死锁),符合 Java 中 ReentrantLock 的特性。

3.1 可重入锁和不可重入锁

3.2 Redisson可重入锁实现原理

原来的锁键是用户,锁是线程

Redisson锁在redis中存储的是hash结构,大键是用户,小键是线程,值是锁的重复次数

3.3 流程图

四、可重入源码

4.1 加锁

进入RedissonLock.tryLock(),此类实现了Rlock接口
public boolean tryLock() {
return (Boolean)this.get(this.tryLockAsync());
}

public RFuture<Boolean> tryLockAsync() {
return this.tryLockAsync(Thread.currentThread().getId());
}

public RFuture<Boolean> tryLockAsync(long threadId) {
return this.tryAcquireOnceAsync(-1L, -1L, (TimeUnit)null, threadId);
}

private RFuture<Boolean> tryAcquireOnceAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
if (leaseTime != -1L) {
return this.<Boolean>tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
} else {
RFuture<Boolean> ttlRemainingFuture = this.<Boolean>tryLockInnerAsync(waitTime, this.commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
if (e == null) {
if (ttlRemaining) {
this.scheduleExpirationRenewal(threadId);
}

}
});
return ttlRemainingFuture;
}
}

  •  同步封装异步:tryLock() 是同步入口,本质是阻塞调用异步的 tryLockAsync() 获取结果。
  • 参数默认值:默认调用时 waitTime=-1(不等待,直接抢锁)、leaseTime=-1(不设锁等待时间)。
  •  调用tryLockInnerAsync  

<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
this.internalLockLeaseTime = unit.toMillis(leaseTime);
return this.<T>evalWriteAsync(this.getName(), LongCodec.INSTANCE, command,
"if (redis.call('exists', KEYS[1]) == 0) then"+
" redis.call('hincrby', KEYS[1], ARGV[2], 1); "+
"redis.call('pexpire', KEYS[1], ARGV[1]);"+
"return nil;"+
"end;"+
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then "+
"redis.call('hincrby', KEYS[1], ARGV[2], 1); "+
"redis.call('pexpire', KEYS[1], ARGV[1]); "+
"return nil; "+
"end; "+
"return redis.call('pttl', KEYS[1]);",
Collections.singletonList(this.getName()), this.internalLockLeaseTime, this.getLockName(threadId));
}

  

4.2 释放锁

public void unlock() {
try {
this.get(this.unlockAsync(Thread.currentThread().getId()));
} catch (RedisException e) {
if (e.getCause() instanceof IllegalMonitorStateException) {
throw (IllegalMonitorStateException)e.getCause();
} else {
throw e;
}
}
}

public RFuture<Void> unlockAsync(long threadId) {
RPromise<Void> result = new RedissonPromise();
RFuture<Boolean> future = this.unlockInnerAsync(threadId);
future.onComplete((opStatus, e) -> {
this.cancelExpirationRenewal(threadId);
if (e != null) {
result.tryFailure(e);
} else if (opStatus == null) {
IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: " + this.id + " thread-id: " + threadId);
result.tryFailure(cause);
} else {
result.trySuccess((Object)null);
}
});
return result;
}

protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return this.<Boolean>evalWriteAsync(this.getName(), LongCodec.INSTANCE,
RedisCommands.EVAL_BOOLEAN, "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then
return nil;end; local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); if (counter
> 0) then redis.call('pexpire', KEYS[1], ARGV[2]); return 0; else redis.call('del',
KEYS[1]); redis.call('publish', KEYS[2], ARGV[1]); return 1; end; return nil;",
Arrays.asList(this.getName(), this.getChannelName()), LockPubSub.UNLOCK_MESSAGE,
this.internalLockLeaseTime, this.getLockName(threadId));
}

五、使用Redisson锁解决不可重试

5.1 加锁

public boolean tryLock(long waitTime, TimeUnit unit) throws InterruptedException {
return this.tryLock(waitTime, -1L, unit);
}

public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
long time = unit.toMillis(waitTime);
long current = System.currentTimeMillis();
long threadId = Thread.currentThread().getId();
Long ttl = this.tryAcquire(waitTime, leaseTime, unit, threadId);
if (ttl == null) {
return true;
} else {
time -= System.currentTimeMillis() – current;
if (time <= 0L) {
this.acquireFailed(waitTime, unit, threadId);
return false;
} else {
current = System.currentTimeMillis();
RFuture<RedissonLockEntry> subscribeFuture = this.subscribe(threadId);
if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) {
if (!subscribeFuture.cancel(false)) {
subscribeFuture.onComplete((res, e) -> {
if (e == null) {
this.unsubscribe(subscribeFuture, threadId);
}

});
}

this.acquireFailed(waitTime, unit, threadId);
return false;
} else {
boolean var14;
try {
time -= System.currentTimeMillis() – current;
if (time > 0L) {
do {
long currentTime = System.currentTimeMillis();
ttl = this.tryAcquire(waitTime, leaseTime, unit, threadId);
if (ttl == null) {
boolean var28 = true;
return var28;
}

time -= System.currentTimeMillis() – currentTime;
if (time <= 0L) {
this.acquireFailed(waitTime, unit, threadId);
boolean var27 = false;
return var27;
}

currentTime = System.currentTimeMillis();
if (ttl >= 0L && ttl < time) {
((RedissonLockEntry)subscribeFuture.getNow()).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
((RedissonLockEntry)subscribeFuture.getNow()).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
}

time -= System.currentTimeMillis() – currentTime;
} while(time > 0L);

this.acquireFailed(waitTime, unit, threadId);
boolean var16 = false;
return var16;
}

this.acquireFailed(waitTime, unit, threadId);
var14 = false;
} finally {
this.unsubscribe(subscribeFuture, threadId);
}

return var14;
}
}
}
}

private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
return (Long)this.get(this.tryAcquireAsync(waitTime, leaseTime, unit, threadId));
}

  • 首次尝试先调用 tryAcquire 尝试获取锁。如果成功(返回 null),直接返回 true。

  • 订阅锁释放事件如果首次尝试失败,就通过 subscribe 订阅该锁的释放通知,避免无效的自旋等待。

  • 循环重试进入 do-while 循环,只要还有剩余等待时间,就会:

    • 再次调用 tryAcquire 尝试获取锁
    • 若成功则返回 true
    • 若失败,就等待锁释放通知或剩余时间,然后继续循环
  • 超时退出如果等待时间耗尽仍未获取到锁,就调用 acquireFailed 并返回 false。

  • 资源清理无论成功还是失败,最终都会在 finally 块中取消订阅,避免资源泄漏。

  • 六、看门狗策略

    6.1 加锁

    private RFuture<Boolean> tryAcquireOnceAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
    if (leaseTime != -1L) {
    return this.<Boolean>tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
    } else {
    RFuture<Boolean> ttlRemainingFuture = this.<Boolean>tryLockInnerAsync(waitTime, this.commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
    ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
    if (e == null) {
    if (ttlRemaining) {
    this.scheduleExpirationRenewal(threadId);
    }

    }
    });
    return ttlRemainingFuture;
    }
    }

    private void scheduleExpirationRenewal(long threadId) {
    ExpirationEntry entry = new ExpirationEntry();
    ExpirationEntry oldEntry = (ExpirationEntry)EXPIRATION_RENEWAL_MAP.putIfAbsent(this.getEntryName(), entry);
    if (oldEntry != null) {
    oldEntry.addThreadId(threadId);
    } else {
    entry.addThreadId(threadId);
    this.renewExpiration();
    }

    }

    private void renewExpiration() {
    ExpirationEntry ee = (ExpirationEntry)EXPIRATION_RENEWAL_MAP.get(this.getEntryName());
    if (ee != null) {
    Timeout task = this.commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
    public void run(Timeout timeout) throws Exception {
    ExpirationEntry ent = (ExpirationEntry)RedissonLock.EXPIRATION_RENEWAL_MAP.get(RedissonLock.this.getEntryName());
    if (ent != null) {
    Long threadId = ent.getFirstThreadId();
    if (threadId != null) {
    RFuture<Boolean> future = RedissonLock.this.renewExpirationAsync(threadId);
    future.onComplete((res, e) -> {
    if (e != null) {
    RedissonLock.log.error("Can't update lock " + RedissonLock.this.getName() + " expiration", e);
    } else {
    if (res) {
    RedissonLock.this.renewExpiration();
    }

    }
    });
    }
    }
    }
    }, this.internalLockLeaseTime / 3L, TimeUnit.MILLISECONDS);
    ee.setTimeout(task);
    }
    }

    protected RFuture<Boolean> renewExpirationAsync(long threadId) {
    return this.<Boolean>evalWriteAsync(this.getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('pexpire', KEYS[1], ARGV[1]); return 1; end; return 0;", Collections.singletonList(this.getName()), this.internalLockLeaseTime, this.getLockName(threadId));
    }

  • 开启续命当调用 tryLock 且未指定 leaseTime(即 leaseTime = -1)时,锁获取成功后会触发 scheduleExpirationRenewal,开启看门狗。

  • 定时任务触发renewExpiration 方法会创建一个定时任务,默认在锁过期时间的 1/3 时(比如锁默认 30 秒过期,任务就会在 10 秒后执行),去检查锁是否还被当前线程持有。

  • 续命操作定时任务会调用 renewExpirationAsync,通过 Lua 脚本去 Redis 中更新锁的过期时间,完成 “续命”。

  • 循环续命只要锁还被持有,每次续命成功后,就会再次调度下一个定时任务,如此循环,直到锁被主动释放或线程崩溃。

  • 当我们不指定锁过期时间,就会执行看门狗策略,会一直帮我们续期

    思考:为什么不干脆设定不过期,而是要采用定时任务不断地续期?

    • 满足长任务需求:有些业务操作(比如批量处理数据、复杂的数据库事务)执行时间不确定,可能超过锁的初始过期时间(默认 30 秒),如果锁提前过期,会导致多个客户端同时持有锁,破坏分布式锁的互斥性。
    • 保证锁最终可释放:即使客户端异常崩溃,看门狗的续期线程也会随之终止,锁会在初始过期时间(或最后一次续期后的 30 秒)后自动过期,避免死锁

    6.2 释放锁

    public RFuture<Void> unlockAsync(long threadId) {
    RPromise<Void> result = new RedissonPromise();
    RFuture<Boolean> future = this.unlockInnerAsync(threadId);
    future.onComplete((opStatus, e) -> {
    this.cancelExpirationRenewal(threadId);
    if (e != null) {
    result.tryFailure(e);
    } else if (opStatus == null) {
    IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: " + this.id + " thread-id: " + threadId);
    result.tryFailure(cause);
    } else {
    result.trySuccess((Object)null);
    }
    });
    return result;
    }

    void cancelExpirationRenewal(Long threadId) {
    ExpirationEntry task = (ExpirationEntry)EXPIRATION_RENEWAL_MAP.get(this.getEntryName());
    if (task != null) {
    if (threadId != null) {
    task.removeThreadId(threadId);
    }

    if (threadId == null || task.hasNoThreads()) {
    Timeout timeout = task.getTimeout();
    if (timeout != null) {
    timeout.cancel();
    }

    EXPIRATION_RENEWAL_MAP.remove(this.getEntryName());
    }

    }
    }

    • 解锁时主动关闭在 unlockAsync 方法中,解锁操作完成后会立刻调用 cancelExpirationRenewal,主动关闭看门狗的续命任务。

    • 线程安全的清理

      • cancelExpirationRenewal 会先从 EXPIRATION_RENEWAL_MAP 中获取当前锁对应的看门狗任务(ExpirationEntry)。
      • 然后从任务中移除当前线程 ID,这是为了处理锁可重入的场景,只有当所有持有该锁的线程都解锁后,才会真正停止看门狗。
    • 停止定时任务

      • 当任务中没有任何线程持有锁时(task.hasNoThreads()),会取消看门狗的定时任务(timeout.cancel())。
      • 最后从 EXPIRATION_RENEWAL_MAP 中移除该任务,彻底清理资源。

    定时任务会在解锁的时候取消掉,不会一直执行

    七、Redisson锁实现主从不一致问题

    redission提出来了MutiLock锁,使用这把锁咱们就不使用主从了,每个节点的地位都是一样的, 这把锁加锁的逻辑需要写入到每一个主丛节点上,只有所有的服务器都写入成功,此时才是加锁成功,假设现在某个节点挂了,那么他去获得锁的时候,只要有一个节点拿不到,都不能算是加锁成功,就保证了加锁的可靠性。

    7.1 代码实现

    @Configuration
    public class RedissonConfig {

    @Bean
    public RedissonClient redissonClient(){
    // 配置
    Config config = new Config();
    config.useSingleServer().setAddress("地址")
    .setPassword("密码");
    // 创建RedissonClient对象
    return Redisson.create(config);
    }

    @Bean
    public RedissonClient redissonClient2(){
    // 配置
    Config config = new Config();
    config.useSingleServer().setAddress("地址")
    .setPassword("密码");
    // 创建RedissonClient对象
    return Redisson.create(config);
    }

    @Bean
    public RedissonClient redissonClient3(){
    // 配置
    Config config = new Config();
    config.useSingleServer().setAddress("地址")
    .setPassword("密码");
    // 创建RedissonClient对象
    return Redisson.create(config);
    }
    }
    @Override
    @Transactional
    public Long seckillVoucher(Long voucherId) {
    // 1.查询优惠券
    SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
    // 2.判断秒杀是否开始
    if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {
    // 尚未开始
    throw new BaseException(ExceptionConstants.NOT_START);
    }
    // 3.判断秒杀是否已经结束
    if (voucher.getEndTime().isBefore(LocalDateTime.now())) {
    // 尚未开始
    throw new BaseException(ExceptionConstants.IS_END);
    }
    // 4.判断库存是否充足
    if (voucher.getStock() < 1) {
    // 库存不足
    throw new BaseException(ExceptionConstants.NOT_ENOUGH);
    }
    Long userId = UserHolder.getUser().getId();
    String key = RedisConstants.getLockOrderKey(userId);
    RLock lock1 = redissonClient.getLock(key);
    RLock lock2 = redissonClient2.getLock(key);
    RLock lock3 = redissonClient3.getLock(key);
    RLock lock = redissonClient.getMultiLock(lock1 , lock2 , lock3);
    boolean isLock = lock.tryLock();
    if (!isLock) {
    // 获取锁失败,返回错误或重试
    throw new BaseException(ExceptionConstants.EXIST_BUY);
    }
    try {
    IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
    return proxy.createVoucherOrder(voucherId);
    } catch (IllegalStateException e) {
    throw new RuntimeException(e);
    } finally {
    lock.unlock();
    }
    }

    7.2 源码

    public RLock getMultiLock(RLock… locks) {
    return new RedissonMultiLock(locks);
    }
    final List<RLock> locks = new ArrayList();

    public RedissonMultiLock(RLock… locks) {
    if (locks.length == 0) {
    throw new IllegalArgumentException("Lock objects are not defined");
    } else {
    this.locks.addAll(Arrays.asList(locks));
    }
    }

    7.3 加锁

    public boolean tryLock() {
    try {
    return this.tryLock(-1L, -1L, (TimeUnit)null);
    } catch (InterruptedException var2) {
    Thread.currentThread().interrupt();
    return false;
    }
    }

    public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
    long newLeaseTime = -1L;
    if (leaseTime != -1L) {
    if (waitTime == -1L) {
    newLeaseTime = unit.toMillis(leaseTime);
    } else {
    newLeaseTime = unit.toMillis(waitTime) * 2L;
    }
    }

    long time = System.currentTimeMillis();
    long remainTime = -1L;
    if (waitTime != -1L) {
    remainTime = unit.toMillis(waitTime);
    }

    long lockWaitTime = this.calcLockWaitTime(remainTime);
    int failedLocksLimit = this.failedLocksLimit();
    List<RLock> acquiredLocks = new ArrayList(this.locks.size());
    ListIterator<RLock> iterator = this.locks.listIterator();

    while(iterator.hasNext()) {
    RLock lock = (RLock)iterator.next();

    boolean lockAcquired;
    try {
    if (waitTime == -1L && leaseTime == -1L) {
    lockAcquired = lock.tryLock();
    } else {
    long awaitTime = Math.min(lockWaitTime, remainTime);
    lockAcquired = lock.tryLock(awaitTime, newLeaseTime, TimeUnit.MILLISECONDS);
    }
    } catch (RedisResponseTimeoutException var21) {
    this.unlockInner(Arrays.asList(lock));
    lockAcquired = false;
    } catch (Exception var22) {
    lockAcquired = false;
    }

    if (lockAcquired) {
    acquiredLocks.add(lock);
    } else {
    if (this.locks.size() – acquiredLocks.size() == this.failedLocksLimit()) {
    break;
    }

    if (failedLocksLimit == 0) {
    this.unlockInner(acquiredLocks);
    if (waitTime == -1L) {
    return false;
    }

    failedLocksLimit = this.failedLocksLimit();
    acquiredLocks.clear();

    while(iterator.hasPrevious()) {
    iterator.previous();
    }
    } else {
    –failedLocksLimit;
    }
    }

    if (remainTime != -1L) {
    remainTime -= System.currentTimeMillis() – time;
    time = System.currentTimeMillis();
    if (remainTime <= 0L) {
    this.unlockInner(acquiredLocks);
    return false;
    }
    }
    }

    if (leaseTime != -1L) {
    List<RFuture<Boolean>> futures = new ArrayList(acquiredLocks.size());

    for(RLock rLock : acquiredLocks) {
    RFuture<Boolean> future = ((RedissonLock)rLock).expireAsync(unit.toMillis(leaseTime), TimeUnit.MILLISECONDS);
    futures.add(future);
    }

    for(RFuture<Boolean> rFuture : futures) {
    rFuture.syncUninterruptibly();
    }
    }

    return true;
    }

  • 多节点独立加锁向多个独立的 Redis 节点发起加锁请求,这些节点之间没有主从关系,都是独立的实例。
  • 失败时回滚如果加锁失败,会立刻释放所有已经成功加锁的节点,避免出现锁残留。
  • 7.4 流程图

    八、后续内容

    目前我们的流程还是串行执行的,性能不是很好,我们在下一篇进行异步优化

    赞(0)
    未经允许不得转载:网硕互联帮助中心 » 基于 Redis 的秒杀集群系统迭代实现【Redisson源码分析】(附详细流程图)
    分享到: 更多 (0)

    评论 抢沙发

    评论前必须登录!