云计算百科
云计算领域专业知识百科平台

基于 Redis 的秒杀集群系统迭代实现【异步优化】(附详细流程图)

一、前置内容

1.1 单机秒杀实现

基于 Redis 的秒杀单机系统迭代实现(附详细流程图)-CSDN博客

1.2 SETNX实现集群模式下

基于 Redis 的秒杀集群系统迭代实现【SETNX】(附详细流程图)-CSDN博客

1.3 Redisson锁实现

基于 Redis 的秒杀集群系统迭代实现【Redisson源码分析】(附详细流程图)-CSDN博客

1.4 前置流程图

二、基于JVM的阻塞队列实现异步秒杀

2.1 问题分析

我们原来用的乐观锁和Mysql底层的互斥锁查询数据库来判断库存有无剩余并更新,但是更新数据库是比较耗时的,所以用redis来替代,并且是要lua脚本保证一致性。

第一次查询依旧是数据库查询,因为我们要进行一些详细的判断,而redis是键值对类型,确定库存有剩余,再加入到订单set集合中,保证一人一单,所以给用户下单判断是否买过的加上Redisson锁也可以省去了,set是保证不重复的,但是将用户订单保存到数据库中还是要执行的,采用异步来优化。

2.2 lua脚本

— 1.参数列表
— 1.1.优惠券id
local voucherId = ARGV[1]
— 1.2.用户id
local userId = ARGV[2]

— 2.数据key
— 2.1.库存key
local stockKey = 'seckill:stock:' .. voucherId
— 2.2.订单key
local orderKey = 'seckill:order:' .. voucherId

— 3.脚本业务
— 3.1.判断库存是否充足 get stockKey
if(tonumber(redis.call('get', stockKey)) <= 0) then
— 3.2.库存不足,返回1
return 1
end
— 3.2.判断用户是否下单 SISMEMBER orderKey userId
if(redis.call('sismember', orderKey, userId) == 1) then
— 3.3.存在,说明是重复下单,返回2
return 2
end
— 3.4.扣库存 incrby stockKey -1
redis.call('incrby', stockKey, -1)
— 3.5.下单(保存用户)sadd orderKey userId
redis.call('sadd', orderKey, userId)
return 0

2.3 流程图

2.4 代码实现

添加秒杀的时候把优惠卷放到Redis中

@Override
@Transactional
public void addSeckillVoucher(Voucher voucher) {
// 保存优惠券
save(voucher);
// 保存秒杀信息
SeckillVoucher seckillVoucher = new SeckillVoucher();
seckillVoucher.setVoucherId(voucher.getId());
seckillVoucher.setStock(voucher.getStock());
seckillVoucher.setBeginTime(voucher.getBeginTime());
seckillVoucher.setEndTime(voucher.getEndTime());
seckillVoucherService.save(seckillVoucher);
stringRedisTemplate.opsForValue().set(RedisConstants.getSeckillStockKey(voucher.getId()), voucher.getStock().toString());
}

再修改秒杀接口

//lua脚本
private static final DefaultRedisScript<Long> SECKILL_SCRIPT ;
static {
SECKILL_SCRIPT = new DefaultRedisScript<>();
SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));
SECKILL_SCRIPT.setResultType(Long.class);
}

//当前代理对象
private IVoucherOrderService proxy;

@Override
@Transactional
public Long seckillVoucher(Long voucherId) {
// 1.查询优惠券
SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
// 2.判断秒杀是否开始
if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {
// 尚未开始
throw new BaseException(ExceptionConstants.NOT_START);
}
// 3.判断秒杀是否已经结束
if (voucher.getEndTime().isBefore(LocalDateTime.now())) {
// 尚未开始
throw new BaseException(ExceptionConstants.IS_END);
}
// 4.判断库存是否充足
if (voucher.getStock() < 1) {
// 库存不足
throw new BaseException(ExceptionConstants.NOT_ENOUGH);
}
// 5.执行lua脚本
Long userId = UserHolder.getUser().getId();
long orderId = redisIdWorker.nextId("order");
Long result = stringRedisTemplate.execute(
SECKILL_SCRIPT,
Collections.emptyList(),
voucherId.toString(), userId.toString()
);
int r = result.intValue();
if (r == NumberConstants.NOT_ENOUGH){
// 6.判断库存是否充足
throw new BaseException(ExceptionConstants.NOT_ENOUGH);
} else if (r == NumberConstants.EXIST_BUY) {
// 7.用户已经购买过了
throw new BaseException(ExceptionConstants.EXIST_BUY);
}
VoucherOrder voucherOrder = new VoucherOrder();
voucherOrder.setId(orderId);
voucherOrder.setUserId(userId);
voucherOrder.setVoucherId(voucherId);
// 8.获取代理对象
proxy = (IVoucherOrderService)AopContext.currentProxy();
// 9.放入阻塞队列
orderTasks.add(voucherOrder);
//10.返回订单id
return orderId;
}

@Transactional
public void createVoucherOrder(VoucherOrder voucherOrder) {
Long userId = voucherOrder.getUserId();
// 1.查询订单
int count = query().eq("user_id", userId).eq("voucher_id", voucherOrder.getVoucherId()).count();
// 2.判断是否存在
if (count > 0) {
// 用户已经购买过了
throw new BaseException(ExceptionConstants.EXIST_BUY);
}
// 2.扣减库存
boolean success = seckillVoucherService.update()
.setSql("stock = stock – 1") // set stock = stock – 1
.eq("voucher_id", voucherOrder.getVoucherId()).gt("stock", 0) // where id = ? and stock > 0
.update();
if (!success) {
throw new BaseException(ExceptionConstants.NOT_ENOUGH);
}
save(voucherOrder);

}

//阻塞队列
private BlockingQueue<VoucherOrder> orderTasks =new ArrayBlockingQueue<>(1024 * 1024);

//异步处理线程池
private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();

//在类初始化之后执行,因为当这个类初始化好了之后,随时都是有可能要执行的
@PostConstruct
private void init() {
SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
}

// 用于线程池处理的任务
private class VoucherOrderHandler implements Runnable {

@Override
public void run() {
while (true) {
try {
// 1.获取队列中的订单信息
VoucherOrder voucherOrder = orderTasks.take();
// 2.创建订单
handleVoucherOrder(voucherOrder);
} catch (Exception e) {
log.error("处理订单异常", e);
}
}
}

private void handleVoucherOrder(VoucherOrder voucherOrder) {
//1.获取用户
Long userId = voucherOrder.getUserId();
// 2.创建锁对象
String key = RedisConstants.getLockOrderKey(userId);
// 3.尝试获取锁
RLock lock = redissonClient.getLock(key);
try {
boolean isLock = lock.tryLock();
// 4.判断是否获得锁成功
if (!isLock) {
// 获取锁失败,直接返回失败或者重试
throw new BaseException(ExceptionConstants.EXIST_BUY);
}
//注意:由于是spring的事务是放在threadLocal中
proxy.createVoucherOrder(voucherOrder);
} finally {
// 释放锁
lock.unlock();
}
}
}
// 用于线程池处理的任务
@Slf4j
@Component
public class VoucherOrderHandler implements Runnable {

@Autowired
private IVoucherOrderService voucherOrderService;
@Autowired
private RedissonClient redissonClient;
@Autowired
private StringRedisTemplate stringRedisTemplate;

@Override
public void run() {
while (true) {
try {
// 1.获取消息队列中的订单信息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS s1 >
List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
Consumer.from("g1", "c1"),
StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),
StreamOffset.create("stream.orders", ReadOffset.lastConsumed())
);
// 2.判断订单信息是否为空
if (list == null || list.isEmpty()) {
// 如果为null,说明没有消息,继续下一次循环
continue;
}
// 解析数据
MapRecord<String, Object, Object> record = list.get(0);
Map<Object, Object> value = record.getValue();
VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);
//3.获取用户
Long userId = voucherOrder.getUserId();
// 4.创建锁对象
String key = RedisConstants.getLockOrderKey(userId);
// 5.尝试获取锁
RLock lock = redissonClient.getLock(key);
try {
boolean isLock = lock.tryLock();
// 6.判断是否获得锁成功
if (!isLock) {
// 获取锁失败,直接返回失败或者重试
throw new BaseException(ExceptionConstants.EXIST_BUY);
}
// 7.确认消息 XACK
stringRedisTemplate.opsForStream().acknowledge("s1", "g1", record.getId());
} finally {
// 释放锁
lock.unlock();
}
} catch (Exception e) {
log.error("处理订单异常", e);
//处理异常消息
handlePendingList();
}
}
}

private void handlePendingList() {
while (true) {
try {
// 1.获取pending-list中的订单信息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS s1 0
List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
Consumer.from("g1", "c1"),
StreamReadOptions.empty().count(1),
StreamOffset.create("stream.orders", ReadOffset.from("0"))
);
// 2.判断订单信息是否为空
if (list == null || list.isEmpty()) {
// 如果为null,说明没有异常消息,结束循环
break;
}
// 解析数据
MapRecord<String, Object, Object> record = list.get(0);
Map<Object, Object> value = record.getValue();
VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);
//3.获取用户
Long userId = voucherOrder.getUserId();
// 4.创建锁对象
String key = RedisConstants.getLockOrderKey(userId);
// 5.尝试获取锁
RLock lock = redissonClient.getLock(key);
try {
boolean isLock = lock.tryLock();
// 6.判断是否获得锁成功
if (!isLock) {
// 获取锁失败,直接返回失败或者重试
throw new BaseException(ExceptionConstants.EXIST_BUY);
}
//注意:由于是spring的事务是放在threadLocal中
voucherOrderService.createVoucherOrder(voucherOrder);
// 7.确认消息 XACK
stringRedisTemplate.opsForStream().acknowledge("s1", "g1", record.getId());
} finally {
// 释放锁
lock.unlock();
}
} catch (Exception e) {
log.error("处理pendding订单异常", e);
try {
Thread.sleep(20);
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
}
}

2.5 问题分析

JVM的阻塞队列在高并发的情况下可能会有内存溢出的风险,如果服务器宕机了,内存里面的数据就会丢失,出现了数据不一致的情况。

三、介绍Redis消息队列

3.1 基于list实现

Redis的list数据结构是一个双向链表 ,使用BRPOP或者BLPOP来实现阻塞效果。

优点:

  • 利用Redis存储,不受限于JVM内存上限

  • 基于Redis的持久化机制,数据安全性有保证

  • 可以满足消息有序性

缺点:

  • 出队只能保证消息被移出,无法保证有没有对这个消息进行处理,无法避免消息丢失

  • 只支持单消费者,一个信息只能被一个消费者取出

3.2 基于PubSub的消息队列

消费者可以订阅一个或多个channel,生产者向对应channel发送消息后,所有订阅者都能收到相关消息。

优点:

  • 采用发布订阅模型,支持多生产、多消费

缺点:

  • 如果一个channel无人订阅,那么这个消息就丢失了,无法做到数据的持久化

  • 消息堆积有上限,发送的消息如果有消费者监听,会存在消费者的缓存区域,如果处理得慢消息有多,就会超出上线,数据丢失

3.3 基于Stream的消息队列(XREAD

优点:

  • 消息可回溯

  • 一个消息可以被多个消费者读取

  • 可以阻塞读取

缺点:

  • 当我们处理完一条消息时可能一下子多了很多条消息,但只读取了最新的消息,导致消息漏读

3.4 基于Stream的消息队列(XREADGROUP )

XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key …] ID [ID …]

  • group:消费组名称

  • consumer:消费者名称,如果消费者不存在,会自动创建一个消费者

  • count:本次查询的最大数量

  • BLOCK milliseconds:当没有消息时最长等待时间

  • NOACK:无需手动ACK,获取到消息后自动确认

  • STREAMS key:指定队列名称

  • ID:获取消息的起始ID

    • ">":从下一个未消费的消息开始

    • 其它:根据指定id从pending-list中获取已消费但未确认的消息,例如0,是从pending-list中的第一个消息开始

将多个消费者划分到一个组中,监听同一个队列。具备下列特点:

  • 消息分流
    • 队列中的消息会分流给组内的不同消费者,而不是重复消费,从而加快消息处理的速度
  • 消息标示
    • 消费者组会维护一个标示,记录最后一个被处理的消息,哪怕消费者宕机重启,还会从标示之后读取消息。确保每一个消息都会被消费
  •  消息确认
    • 消费者获取消息后,消息处于 pending 状态,并存入一个 pending-list。当处理完成后需要通过 XACK 来确认消息,标记消息为已处理,才会从 pending-list 移除。

优点:

  • 消息可回溯

  • 可以多消费者争抢消息,加快消费速度

  • 可以阻塞读取

  • 没有消息漏读的风险

  • 有消息确认机制,保证消息至少被消费一次

3.5 对比

特性基于 List 实现基于 PubSub 实现基于 Stream (XREAD)基于 Stream (XREADGROUP)
核心原理 使用 LPUSH/RPOP 等命令,基于链表结构实现简单的先进先出队列 发布 – 订阅模式,消息会被广播给所有订阅者 基于 Stream 数据结构,通过游标 XREAD 拉取消息,支持持久化 基于 Stream 的消费者组模式,支持多消费者协同消费,通过 XACK 确认消息
持久化 支持(Redis 持久化机制保障) 不支持(消息无持久化,订阅者离线后消息丢失) 支持(Stream 数据持久化到磁盘) 支持(Stream 数据持久化,消费者组状态也持久化)
消息可靠性 一般(消费者宕机可能导致未处理消息丢失,需额外机制保障) 差(消息不持久化,订阅者离线即丢失) 高(消息持久化,可通过游标回溯) 极高(消费者组维护状态,消息确认机制保障不丢失、不重复)
消费模式 单消费者竞争消费(一个消息只能被一个消费者处理) 多消费者广播消费(一个消息会被所有订阅者接收) 单消费者或多消费者重复消费(多个消费者用相同游标会收到重复消息) 多消费者分流消费(一个消息仅被组内一个消费者处理,支持负载均衡)
消息回溯 仅支持按队列顺序从头 / 尾读取,无游标回溯能力 无回溯能力(订阅者只能收到订阅后的新消息) 支持(通过游标可以从任意位置开始读取历史消息) 支持(消费者组维护最后处理位置,宕机重启后可从断点继续)
负载均衡 依赖客户端自行实现竞争消费,天然具备一定负载均衡能力 不支持(消息广播给所有订阅者,无法分流) 不支持(多消费者会收到重复消息) 支持(消息自动分流给组内不同消费者)
消息确认机制 无原生确认机制,需自行实现 ACK 无原生确认机制,需自行维护消费进度 原生支持 XACK 确认,消息处理完成后才会从 pending-list 移除
适用场景 简单的任务队列、低并发场景 实时通知、日志广播等不要求可靠性的场景 需要持久化、支持回溯的单消费者或重复消费场景 高可靠、高并发的分布式消费场景,如订单处理、日志分流等

四、基于Redis消息队列实现异步秒杀

4.1 修改lua脚本

直接在成功下单后发送消息给消息队列,减少java和redis的交互

— 1.参数列表
— 1.1.优惠券id
local voucherId = ARGV[1]
— 1.2.用户id
local userId = ARGV[2]
— 1.3.订单id
local orderId = ARGV[3]

— 2.数据key
— 2.1.库存key
local stockKey = 'seckill:stock:' .. voucherId
— 2.2.订单key
local orderKey = 'seckill:order:' .. voucherId

— 3.脚本业务
— 3.1.判断库存是否充足 get stockKey
if(tonumber(redis.call('get', stockKey)) <= 0) then
— 3.2.库存不足,返回1
return 1
end
— 3.2.判断用户是否下单 SISMEMBER orderKey userId
if(redis.call('sismember', orderKey, userId) == 1) then
— 3.3.存在,说明是重复下单,返回2
return 2
end
— 3.4.扣库存 incrby stockKey -1
redis.call('incrby', stockKey, -1)
— 3.5.下单(保存用户)sadd orderKey userId
redis.call('sadd', orderKey, userId)
— 3.6.发送消息到队列中, XADD stream.orders * k1 v1 k2 v2 …
redis.call('xadd', 'stream.orders', '*', 'userId', userId, 'voucherId', voucherId, 'id', orderId)
return 0

4.2 创建消息队列

直接在redis里面创建就可以了

XGROUP CREATE stream.orders g1 0 MKSTREAM

4.3 代码实现

@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {

@Autowired
private ISeckillVoucherService seckillVoucherService;
@Autowired
private RedisIdWorker redisIdWorker;
@Autowired
private StringRedisTemplate stringRedisTemplate;

//lua脚本
private static final DefaultRedisScript<Long> SECKILL_SCRIPT ;
static {
SECKILL_SCRIPT = new DefaultRedisScript<>();
SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));
SECKILL_SCRIPT.setResultType(Long.class);
}

//异步处理线程池
private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();

//在类初始化之后执行,因为当这个类初始化好了之后,随时都是有可能要执行的
@PostConstruct
private void init() {
SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
}

@Override
@Transactional
public Long seckillVoucher(Long voucherId) {
// 1.查询优惠券
SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
// 2.判断秒杀是否开始
if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {
// 尚未开始
throw new BaseException(ExceptionConstants.NOT_START);
}
// 3.判断秒杀是否已经结束
if (voucher.getEndTime().isBefore(LocalDateTime.now())) {
// 尚未开始
throw new BaseException(ExceptionConstants.IS_END);
}
// 4.判断库存是否充足
if (voucher.getStock() < 1) {
// 库存不足
throw new BaseException(ExceptionConstants.NOT_ENOUGH);
}
// 6.执行lua脚本
Long userId = UserHolder.getUser().getId();
long orderId = redisIdWorker.nextId("order");
Long result = stringRedisTemplate.execute(
SECKILL_SCRIPT,
Collections.emptyList(),
voucherId.toString(), userId.toString(), String.valueOf(orderId)
);
int r = result.intValue();
if (r == NumberConstants.NOT_ENOUGH){
// 7.判断库存是否充足
throw new BaseException(ExceptionConstants.NOT_ENOUGH);
} else if (r == NumberConstants.EXIST_BUY) {
// 8.用户已经购买过了
throw new BaseException(ExceptionConstants.EXIST_BUY);
}
//9.返回订单id
return orderId;
}

@Transactional
public void createVoucherOrder(VoucherOrder voucherOrder) {
Long userId = voucherOrder.getUserId();
// 1.查询订单
int count = query().eq("user_id", userId).eq("voucher_id", voucherOrder.getVoucherId()).count();
// 2.判断是否存在
if (count > 0) {
// 用户已经购买过了
throw new BaseException(ExceptionConstants.EXIST_BUY);
}
// 2.扣减库存
boolean success = seckillVoucherService.update()
.setSql("stock = stock – 1") // set stock = stock – 1
.eq("voucher_id", voucherOrder.getVoucherId()).gt("stock", 0) // where id = ? and stock > 0
.update();
if (!success) {
// 扣减失败
throw new BaseException(ExceptionConstants.NOT_ENOUGH);
}
save(voucherOrder);
}
}

主业务更加简洁明了

@Slf4j
@Component
public class VoucherOrderHandler implements Runnable {

@Autowired
private IVoucherOrderService voucherOrderService;
@Autowired
private RedissonClient redissonClient;
@Autowired
private StringRedisTemplate stringRedisTemplate;

@Override
public void run() {
while (true) {
try {
// 1.获取消息队列中的订单信息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS s1 >
List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
Consumer.from("g1", "c1"),
StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),
StreamOffset.create("stream.orders", ReadOffset.lastConsumed())
);
// 2.判断订单信息是否为空
if (list == null || list.isEmpty()) {
// 如果为null,说明没有消息,继续下一次循环
continue;
}
// 解析数据
MapRecord<String, Object, Object> record = list.get(0);
Map<Object, Object> value = record.getValue();
VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);
//3.获取用户
Long userId = voucherOrder.getUserId();
// 4.创建锁对象
String key = RedisConstants.getLockOrderKey(userId);
// 5.尝试获取锁
RLock lock = redissonClient.getLock(key);
try {
boolean isLock = lock.tryLock();
// 6.判断是否获得锁成功
if (!isLock) {
// 获取锁失败,直接返回失败或者重试
throw new BaseException(ExceptionConstants.EXIST_BUY);
}
voucherOrderService.createVoucherOrder(voucherOrder);
// 7.确认消息 XACK
stringRedisTemplate.opsForStream().acknowledge("s1", "g1", record.getId());
} finally {
// 释放锁
lock.unlock();
}
} catch (Exception e) {
log.error("处理订单异常", e);
//处理异常消息
handlePendingList();
}
}
}

private void handlePendingList() {
while (true) {
try {
// 1.获取pending-list中的订单信息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS s1 0
List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
Consumer.from("g1", "c1"),
StreamReadOptions.empty().count(1),
StreamOffset.create("stream.orders", ReadOffset.from("0"))
);
// 2.判断订单信息是否为空
if (list == null || list.isEmpty()) {
// 如果为null,说明没有异常消息,结束循环
break;
}
// 解析数据
MapRecord<String, Object, Object> record = list.get(0);
Map<Object, Object> value = record.getValue();
VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);
//3.获取用户
Long userId = voucherOrder.getUserId();
// 4.创建锁对象
String key = RedisConstants.getLockOrderKey(userId);
// 5.尝试获取锁
RLock lock = redissonClient.getLock(key);
try {
boolean isLock = lock.tryLock();
// 6.判断是否获得锁成功
if (!isLock) {
// 获取锁失败,直接返回失败或者重试
throw new BaseException(ExceptionConstants.EXIST_BUY);
}
voucherOrderService.createVoucherOrder(voucherOrder);
// 7.确认消息 XACK
stringRedisTemplate.opsForStream().acknowledge("s1", "g1", record.getId());
} finally {
// 释放锁
lock.unlock();
}
} catch (Exception e) {
log.error("处理pendding订单异常", e);
try {
Thread.sleep(20);
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
}
}

4.4 流程图

赞(0)
未经允许不得转载:网硕互联帮助中心 » 基于 Redis 的秒杀集群系统迭代实现【异步优化】(附详细流程图)
分享到: 更多 (0)

评论 抢沙发

评论前必须登录!