云计算百科
云计算领域专业知识百科平台

使用Redisson实现分布式锁发现的【订阅超时】Subscribe timeout: (7500ms)

背景

使用 redisson 实现分布式锁,出现的异常:

org.redisson.client.RedisTimeoutException: Subscribe timeout: (7500ms). Increase ‘subscriptionsPerConnection’ and/or ‘subscriptionConnectionPoolSize’ parameters

在这里插入图片描述

从异常信息读的出来一些东西

  • 订阅超时?
  • 和 [subscriptionsPerConnection]、[subscriptionConnectionPoolSize] 似乎要调整配置,名称是这两个?
  • 这是我使用的 redisson 客户端的版本:

    <dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson-spring-boot-starter</artifactId>
    <version>3.15.6</version>
    </dependency>

    正文

    查阅一些资料后得出结论:

  • redisson核心参数:subscriptionsPerConnection和subscriptionConnectionPoolSize较小,当线上出现大量锁竞争时,发布订阅连接池不满足拿去需求
  • 服务使用的redisson3.15.6版本存在代码漏洞,当出现获取订阅连接失败时缺乏重试,且存在订阅连接无法释放的隐患,有内存泄漏风险。
  • 优化建议:

    • 增大subscriptionsPerConnection和subscriptionConnectionPoolSize配置大小

    该配置为redisson客户端连接池配置,对redis本身性能影响较小,且线上redis资源利用率不高,可酌情调整

    • 升级redisson客户端版本至少到3.17.3

    截至到该版本优化了分布式锁的订阅逻辑,并解决了因网络故障、redis集群故障等问题导致的取消订阅失败造成的连接池异常占用问题。

    问题复现

    服务当前Redisson版本为3.15.6,使用如下用例可稳定复现线上报错:

    在这里插入图片描述

    代码示例:

    package com.example.demo;

    import lombok.extern.slf4j.Slf4j;
    import org.redisson.Redisson;
    import org.redisson.api.RLock;
    import org.redisson.api.RedissonClient;
    import org.redisson.config.Config;
    import org.springframework.boot.test.context.SpringBootTest;

    import java.util.Random;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.TimeUnit;

    @Slf4j
    @SpringBootTest
    public class RedisTest {

    public static void main(String[] args) throws InterruptedException {
    Config config = new Config();
    config.useSingleServer()
    .setSubscriptionConnectionPoolSize(2)
    .setSubscriptionConnectionMinimumIdleSize(2)
    .setSubscriptionsPerConnection(2)
    // .setTimeout(5000)
    .setAddress("redis://127.0.0.1:6379");

    RedissonClient redisson = Redisson.create(config);
    ExecutorService e = Executors.newFixedThreadPool(32);
    Random random = new Random();
    for (int i = 0; i < 20000; i++) {
    e.submit(() -> {
    try {
    String lockKey = "lock-" + random.nextInt(5);
    RLock lock = redisson.getLock(lockKey);
    log.info("before lock {}", lockKey);
    lock.lock();
    log.info("after lock {}", lockKey);
    Thread.sleep(random.nextInt(20));
    lock.unlock();
    log.info("after -unlock {}", lockKey);
    } catch (Exception exception){
    log.error("e", exception);
    }
    });
    }

    e.shutdown();
    e.awaitTermination(10, TimeUnit.MINUTES);
    }
    }

    正确使用:

    redisson 客户端版本更换:

    <dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson-spring-boot-starter</artifactId>
    <version>3.17.3</version>
    </dependency>

    代码示例:

    package com.example.demo;

    import lombok.extern.slf4j.Slf4j;
    import org.redisson.Redisson;
    import org.redisson.api.RLock;
    import org.redisson.api.RedissonClient;
    import org.redisson.config.Config;
    import org.springframework.boot.test.context.SpringBootTest;

    import java.util.Random;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.TimeUnit;

    @Slf4j
    @SpringBootTest
    public class RedisTest {

    public static void main(String[] args) throws InterruptedException {
    Config config = new Config();
    config.useSingleServer()
    .setSubscriptionConnectionPoolSize(2)
    .setSubscriptionConnectionMinimumIdleSize(2)
    .setSubscriptionsPerConnection(2)
    // .setTimeout(5000)
    .setAddress("redis://127.0.0.1:6379");

    RedissonClient redisson = Redisson.create(config);
    ExecutorService e = Executors.newFixedThreadPool(32);
    Random random = new Random();
    for (int i = 0; i < 20000; i++) {
    e.submit(() -> {
    try {
    String lockKey = "lock-" + random.nextInt(5);
    RLock lock = redisson.getLock(lockKey);
    log.info("before lock {}", lockKey);
    lock.lock();
    log.info("after lock {}", lockKey);
    Thread.sleep(random.nextInt(20));
    lock.unlock();
    log.info("after -unlock {}", lockKey);
    } catch (Exception exception){
    log.error("e", exception);
    }
    });
    }

    e.shutdown();
    e.awaitTermination(10, TimeUnit.MINUTES);
    }
    }

    原因分析

    在这里插入图片描述

    关键点:

    • 成功场景: 直接获取锁事不触发订阅流程,仅启动看门狗线程
    • 失败场景: 订阅特定格式的频道(redisson_lock_channel:{key})并阻塞线程
    • 取消订阅‌: 在锁获取成功、线程中断、等待超时三种情况下触发
    • 线程安全‌: 通过Semaphore和线程ID绑定确保订阅/取消订阅的原子性

    之所以使用发布订阅模式处理锁竞争时候,为了避免时间轮询获取状态的方式带来的性能吮毫,提高执行效率

    发生报错的场景:

    在这里插入图片描述 服务使用Redisson3.15.6版本,当分布式锁出现大量竞争触发订阅发布流程,而subscriptionsPerConnection和subscriptionConnectionPoolSize设置较小时,redisson发布订阅的连接池打满出现等待,等待超过设置的连接超时事件就会报错Subscribe timeout

    在Redisson3.16.8把版本中对该问题进行了改进,bug号:4064。针对订阅失败增加了重试逻辑,根据默认的重试次数进行重试,大幅度减少该报错的发生几率 https://github.com/redisson/redisson/issues/4064

    在这里插入图片描述

    CompletableFuture<RedisPubSubConnection> connectFuture = connect(codec, channelName, msEntry, promise, type, lock, listeners);
    if (attempts.get() == config.getRetryAttempts()) {
    return;
    }

    connectionManager.newTimeout(t -> {
    if (connectFuture.cancel(true)) {
    subscribe(codec, channelName, entry, promise, type, lock, attempts, listeners);
    attempts.incrementAndGet();
    }
    }, config.getRetryInterval(), TimeUnit.MILLISECONDS);

    版本更新说明:

    版本:3.16.8

    如何切换到 redisson 3.16.8 这个版本 看源码?

    git clone https://github.com/redisson/redisson.git

    cd redisson
    git checkout tags/版本号 b 版本名

    比如现状切换到 3.16.8版本

    git checkout tags/redisson3.16.8 b myredisson3.16.8branch

    该版本解决了因高并发下锁竞争超时导致的订阅异常,加入了超时的重试机制,重试多次失败后才进行报错

    在这里插入图片描述 并且解决了当出现异常时,订阅不释放的问题 在这里插入图片描述 在这里插入图片描述

    赞(0)
    未经允许不得转载:网硕互联帮助中心 » 使用Redisson实现分布式锁发现的【订阅超时】Subscribe timeout: (7500ms)
    分享到: 更多 (0)

    评论 抢沙发

    评论前必须登录!