一、前置内容
1.1 单机秒杀实现
基于 Redis 的秒杀单机系统迭代实现(附详细流程图)-CSDN博客
1.2 SETNX实现集群模式下
基于 Redis 的秒杀集群系统迭代实现【SETNX】(附详细流程图)-CSDN博客
1.3 Redisson锁实现
基于 Redis 的秒杀集群系统迭代实现【Redisson源码分析】(附详细流程图)-CSDN博客
1.4 前置流程图

二、基于JVM的阻塞队列实现异步秒杀
2.1 问题分析
我们原来用的乐观锁和Mysql底层的互斥锁查询数据库来判断库存有无剩余并更新,但是更新数据库是比较耗时的,所以用redis来替代,并且是要lua脚本保证一致性。
第一次查询依旧是数据库查询,因为我们要进行一些详细的判断,而redis是键值对类型,确定库存有剩余,再加入到订单set集合中,保证一人一单,所以给用户下单判断是否买过的加上Redisson锁也可以省去了,set是保证不重复的,但是将用户订单保存到数据库中还是要执行的,采用异步来优化。
2.2 lua脚本
— 1.参数列表
— 1.1.优惠券id
local voucherId = ARGV[1]
— 1.2.用户id
local userId = ARGV[2]
— 2.数据key
— 2.1.库存key
local stockKey = 'seckill:stock:' .. voucherId
— 2.2.订单key
local orderKey = 'seckill:order:' .. voucherId
— 3.脚本业务
— 3.1.判断库存是否充足 get stockKey
if(tonumber(redis.call('get', stockKey)) <= 0) then
— 3.2.库存不足,返回1
return 1
end
— 3.2.判断用户是否下单 SISMEMBER orderKey userId
if(redis.call('sismember', orderKey, userId) == 1) then
— 3.3.存在,说明是重复下单,返回2
return 2
end
— 3.4.扣库存 incrby stockKey -1
redis.call('incrby', stockKey, -1)
— 3.5.下单(保存用户)sadd orderKey userId
redis.call('sadd', orderKey, userId)
return 0

2.3 流程图

2.4 代码实现
添加秒杀的时候把优惠卷放到Redis中
@Override
@Transactional
public void addSeckillVoucher(Voucher voucher) {
// 保存优惠券
save(voucher);
// 保存秒杀信息
SeckillVoucher seckillVoucher = new SeckillVoucher();
seckillVoucher.setVoucherId(voucher.getId());
seckillVoucher.setStock(voucher.getStock());
seckillVoucher.setBeginTime(voucher.getBeginTime());
seckillVoucher.setEndTime(voucher.getEndTime());
seckillVoucherService.save(seckillVoucher);
stringRedisTemplate.opsForValue().set(RedisConstants.getSeckillStockKey(voucher.getId()), voucher.getStock().toString());
}
再修改秒杀接口
//lua脚本
private static final DefaultRedisScript<Long> SECKILL_SCRIPT ;
static {
SECKILL_SCRIPT = new DefaultRedisScript<>();
SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));
SECKILL_SCRIPT.setResultType(Long.class);
}
//当前代理对象
private IVoucherOrderService proxy;
@Override
@Transactional
public Long seckillVoucher(Long voucherId) {
// 1.查询优惠券
SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
// 2.判断秒杀是否开始
if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {
// 尚未开始
throw new BaseException(ExceptionConstants.NOT_START);
}
// 3.判断秒杀是否已经结束
if (voucher.getEndTime().isBefore(LocalDateTime.now())) {
// 尚未开始
throw new BaseException(ExceptionConstants.IS_END);
}
// 4.判断库存是否充足
if (voucher.getStock() < 1) {
// 库存不足
throw new BaseException(ExceptionConstants.NOT_ENOUGH);
}
// 5.执行lua脚本
Long userId = UserHolder.getUser().getId();
long orderId = redisIdWorker.nextId("order");
Long result = stringRedisTemplate.execute(
SECKILL_SCRIPT,
Collections.emptyList(),
voucherId.toString(), userId.toString()
);
int r = result.intValue();
if (r == NumberConstants.NOT_ENOUGH){
// 6.判断库存是否充足
throw new BaseException(ExceptionConstants.NOT_ENOUGH);
} else if (r == NumberConstants.EXIST_BUY) {
// 7.用户已经购买过了
throw new BaseException(ExceptionConstants.EXIST_BUY);
}
VoucherOrder voucherOrder = new VoucherOrder();
voucherOrder.setId(orderId);
voucherOrder.setUserId(userId);
voucherOrder.setVoucherId(voucherId);
// 8.获取代理对象
proxy = (IVoucherOrderService)AopContext.currentProxy();
// 9.放入阻塞队列
orderTasks.add(voucherOrder);
//10.返回订单id
return orderId;
}
@Transactional
public void createVoucherOrder(VoucherOrder voucherOrder) {
Long userId = voucherOrder.getUserId();
// 1.查询订单
int count = query().eq("user_id", userId).eq("voucher_id", voucherOrder.getVoucherId()).count();
// 2.判断是否存在
if (count > 0) {
// 用户已经购买过了
throw new BaseException(ExceptionConstants.EXIST_BUY);
}
// 2.扣减库存
boolean success = seckillVoucherService.update()
.setSql("stock = stock – 1") // set stock = stock – 1
.eq("voucher_id", voucherOrder.getVoucherId()).gt("stock", 0) // where id = ? and stock > 0
.update();
if (!success) {
throw new BaseException(ExceptionConstants.NOT_ENOUGH);
}
save(voucherOrder);
}
//阻塞队列
private BlockingQueue<VoucherOrder> orderTasks =new ArrayBlockingQueue<>(1024 * 1024);
//异步处理线程池
private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();
//在类初始化之后执行,因为当这个类初始化好了之后,随时都是有可能要执行的
@PostConstruct
private void init() {
SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
}
// 用于线程池处理的任务
private class VoucherOrderHandler implements Runnable {
@Override
public void run() {
while (true) {
try {
// 1.获取队列中的订单信息
VoucherOrder voucherOrder = orderTasks.take();
// 2.创建订单
handleVoucherOrder(voucherOrder);
} catch (Exception e) {
log.error("处理订单异常", e);
}
}
}
private void handleVoucherOrder(VoucherOrder voucherOrder) {
//1.获取用户
Long userId = voucherOrder.getUserId();
// 2.创建锁对象
String key = RedisConstants.getLockOrderKey(userId);
// 3.尝试获取锁
RLock lock = redissonClient.getLock(key);
try {
boolean isLock = lock.tryLock();
// 4.判断是否获得锁成功
if (!isLock) {
// 获取锁失败,直接返回失败或者重试
throw new BaseException(ExceptionConstants.EXIST_BUY);
}
//注意:由于是spring的事务是放在threadLocal中
proxy.createVoucherOrder(voucherOrder);
} finally {
// 释放锁
lock.unlock();
}
}
}
// 用于线程池处理的任务
@Slf4j
@Component
public class VoucherOrderHandler implements Runnable {
@Autowired
private IVoucherOrderService voucherOrderService;
@Autowired
private RedissonClient redissonClient;
@Autowired
private StringRedisTemplate stringRedisTemplate;
@Override
public void run() {
while (true) {
try {
// 1.获取消息队列中的订单信息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS s1 >
List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
Consumer.from("g1", "c1"),
StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),
StreamOffset.create("stream.orders", ReadOffset.lastConsumed())
);
// 2.判断订单信息是否为空
if (list == null || list.isEmpty()) {
// 如果为null,说明没有消息,继续下一次循环
continue;
}
// 解析数据
MapRecord<String, Object, Object> record = list.get(0);
Map<Object, Object> value = record.getValue();
VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);
//3.获取用户
Long userId = voucherOrder.getUserId();
// 4.创建锁对象
String key = RedisConstants.getLockOrderKey(userId);
// 5.尝试获取锁
RLock lock = redissonClient.getLock(key);
try {
boolean isLock = lock.tryLock();
// 6.判断是否获得锁成功
if (!isLock) {
// 获取锁失败,直接返回失败或者重试
throw new BaseException(ExceptionConstants.EXIST_BUY);
}
// 7.确认消息 XACK
stringRedisTemplate.opsForStream().acknowledge("s1", "g1", record.getId());
} finally {
// 释放锁
lock.unlock();
}
} catch (Exception e) {
log.error("处理订单异常", e);
//处理异常消息
handlePendingList();
}
}
}
private void handlePendingList() {
while (true) {
try {
// 1.获取pending-list中的订单信息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS s1 0
List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
Consumer.from("g1", "c1"),
StreamReadOptions.empty().count(1),
StreamOffset.create("stream.orders", ReadOffset.from("0"))
);
// 2.判断订单信息是否为空
if (list == null || list.isEmpty()) {
// 如果为null,说明没有异常消息,结束循环
break;
}
// 解析数据
MapRecord<String, Object, Object> record = list.get(0);
Map<Object, Object> value = record.getValue();
VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);
//3.获取用户
Long userId = voucherOrder.getUserId();
// 4.创建锁对象
String key = RedisConstants.getLockOrderKey(userId);
// 5.尝试获取锁
RLock lock = redissonClient.getLock(key);
try {
boolean isLock = lock.tryLock();
// 6.判断是否获得锁成功
if (!isLock) {
// 获取锁失败,直接返回失败或者重试
throw new BaseException(ExceptionConstants.EXIST_BUY);
}
//注意:由于是spring的事务是放在threadLocal中
voucherOrderService.createVoucherOrder(voucherOrder);
// 7.确认消息 XACK
stringRedisTemplate.opsForStream().acknowledge("s1", "g1", record.getId());
} finally {
// 释放锁
lock.unlock();
}
} catch (Exception e) {
log.error("处理pendding订单异常", e);
try {
Thread.sleep(20);
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
}
}
2.5 问题分析
JVM的阻塞队列在高并发的情况下可能会有内存溢出的风险,如果服务器宕机了,内存里面的数据就会丢失,出现了数据不一致的情况。
三、介绍Redis消息队列
3.1 基于list实现
Redis的list数据结构是一个双向链表 ,使用BRPOP或者BLPOP来实现阻塞效果。
优点:
-
利用Redis存储,不受限于JVM内存上限
-
基于Redis的持久化机制,数据安全性有保证
-
可以满足消息有序性
缺点:
-
出队只能保证消息被移出,无法保证有没有对这个消息进行处理,无法避免消息丢失
-
只支持单消费者,一个信息只能被一个消费者取出
3.2 基于PubSub的消息队列
消费者可以订阅一个或多个channel,生产者向对应channel发送消息后,所有订阅者都能收到相关消息。
优点:
-
采用发布订阅模型,支持多生产、多消费
缺点:
-
如果一个channel无人订阅,那么这个消息就丢失了,无法做到数据的持久化
-
消息堆积有上限,发送的消息如果有消费者监听,会存在消费者的缓存区域,如果处理得慢消息有多,就会超出上线,数据丢失
3.3 基于Stream的消息队列(XREAD)

优点:
-
消息可回溯
-
一个消息可以被多个消费者读取
-
可以阻塞读取
缺点:
-
当我们处理完一条消息时可能一下子多了很多条消息,但只读取了最新的消息,导致消息漏读
3.4 基于Stream的消息队列(XREADGROUP )
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key …] ID [ID …]
-
group:消费组名称
-
consumer:消费者名称,如果消费者不存在,会自动创建一个消费者
-
count:本次查询的最大数量
-
BLOCK milliseconds:当没有消息时最长等待时间
-
NOACK:无需手动ACK,获取到消息后自动确认
-
STREAMS key:指定队列名称
-
ID:获取消息的起始ID
-
">":从下一个未消费的消息开始
-
其它:根据指定id从pending-list中获取已消费但未确认的消息,例如0,是从pending-list中的第一个消息开始
-
将多个消费者划分到一个组中,监听同一个队列。具备下列特点:
- 消息分流
- 队列中的消息会分流给组内的不同消费者,而不是重复消费,从而加快消息处理的速度
- 消息标示
- 消费者组会维护一个标示,记录最后一个被处理的消息,哪怕消费者宕机重启,还会从标示之后读取消息。确保每一个消息都会被消费
- 消息确认
- 消费者获取消息后,消息处于 pending 状态,并存入一个 pending-list。当处理完成后需要通过 XACK 来确认消息,标记消息为已处理,才会从 pending-list 移除。
优点:
-
消息可回溯
-
可以多消费者争抢消息,加快消费速度
-
可以阻塞读取
-
没有消息漏读的风险
-
有消息确认机制,保证消息至少被消费一次
3.5 对比
| 核心原理 | 使用 LPUSH/RPOP 等命令,基于链表结构实现简单的先进先出队列 | 发布 – 订阅模式,消息会被广播给所有订阅者 | 基于 Stream 数据结构,通过游标 XREAD 拉取消息,支持持久化 | 基于 Stream 的消费者组模式,支持多消费者协同消费,通过 XACK 确认消息 |
| 持久化 | 支持(Redis 持久化机制保障) | 不支持(消息无持久化,订阅者离线后消息丢失) | 支持(Stream 数据持久化到磁盘) | 支持(Stream 数据持久化,消费者组状态也持久化) |
| 消息可靠性 | 一般(消费者宕机可能导致未处理消息丢失,需额外机制保障) | 差(消息不持久化,订阅者离线即丢失) | 高(消息持久化,可通过游标回溯) | 极高(消费者组维护状态,消息确认机制保障不丢失、不重复) |
| 消费模式 | 单消费者竞争消费(一个消息只能被一个消费者处理) | 多消费者广播消费(一个消息会被所有订阅者接收) | 单消费者或多消费者重复消费(多个消费者用相同游标会收到重复消息) | 多消费者分流消费(一个消息仅被组内一个消费者处理,支持负载均衡) |
| 消息回溯 | 仅支持按队列顺序从头 / 尾读取,无游标回溯能力 | 无回溯能力(订阅者只能收到订阅后的新消息) | 支持(通过游标可以从任意位置开始读取历史消息) | 支持(消费者组维护最后处理位置,宕机重启后可从断点继续) |
| 负载均衡 | 依赖客户端自行实现竞争消费,天然具备一定负载均衡能力 | 不支持(消息广播给所有订阅者,无法分流) | 不支持(多消费者会收到重复消息) | 支持(消息自动分流给组内不同消费者) |
| 消息确认机制 | 无原生确认机制,需自行实现 ACK | 无 | 无原生确认机制,需自行维护消费进度 | 原生支持 XACK 确认,消息处理完成后才会从 pending-list 移除 |
| 适用场景 | 简单的任务队列、低并发场景 | 实时通知、日志广播等不要求可靠性的场景 | 需要持久化、支持回溯的单消费者或重复消费场景 | 高可靠、高并发的分布式消费场景,如订单处理、日志分流等 |
四、基于Redis消息队列实现异步秒杀
4.1 修改lua脚本
直接在成功下单后发送消息给消息队列,减少java和redis的交互
— 1.参数列表
— 1.1.优惠券id
local voucherId = ARGV[1]
— 1.2.用户id
local userId = ARGV[2]
— 1.3.订单id
local orderId = ARGV[3]
— 2.数据key
— 2.1.库存key
local stockKey = 'seckill:stock:' .. voucherId
— 2.2.订单key
local orderKey = 'seckill:order:' .. voucherId
— 3.脚本业务
— 3.1.判断库存是否充足 get stockKey
if(tonumber(redis.call('get', stockKey)) <= 0) then
— 3.2.库存不足,返回1
return 1
end
— 3.2.判断用户是否下单 SISMEMBER orderKey userId
if(redis.call('sismember', orderKey, userId) == 1) then
— 3.3.存在,说明是重复下单,返回2
return 2
end
— 3.4.扣库存 incrby stockKey -1
redis.call('incrby', stockKey, -1)
— 3.5.下单(保存用户)sadd orderKey userId
redis.call('sadd', orderKey, userId)
— 3.6.发送消息到队列中, XADD stream.orders * k1 v1 k2 v2 …
redis.call('xadd', 'stream.orders', '*', 'userId', userId, 'voucherId', voucherId, 'id', orderId)
return 0
4.2 创建消息队列
直接在redis里面创建就可以了
XGROUP CREATE stream.orders g1 0 MKSTREAM
4.3 代码实现
@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
@Autowired
private ISeckillVoucherService seckillVoucherService;
@Autowired
private RedisIdWorker redisIdWorker;
@Autowired
private StringRedisTemplate stringRedisTemplate;
//lua脚本
private static final DefaultRedisScript<Long> SECKILL_SCRIPT ;
static {
SECKILL_SCRIPT = new DefaultRedisScript<>();
SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));
SECKILL_SCRIPT.setResultType(Long.class);
}
//异步处理线程池
private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();
//在类初始化之后执行,因为当这个类初始化好了之后,随时都是有可能要执行的
@PostConstruct
private void init() {
SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
}
@Override
@Transactional
public Long seckillVoucher(Long voucherId) {
// 1.查询优惠券
SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
// 2.判断秒杀是否开始
if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {
// 尚未开始
throw new BaseException(ExceptionConstants.NOT_START);
}
// 3.判断秒杀是否已经结束
if (voucher.getEndTime().isBefore(LocalDateTime.now())) {
// 尚未开始
throw new BaseException(ExceptionConstants.IS_END);
}
// 4.判断库存是否充足
if (voucher.getStock() < 1) {
// 库存不足
throw new BaseException(ExceptionConstants.NOT_ENOUGH);
}
// 6.执行lua脚本
Long userId = UserHolder.getUser().getId();
long orderId = redisIdWorker.nextId("order");
Long result = stringRedisTemplate.execute(
SECKILL_SCRIPT,
Collections.emptyList(),
voucherId.toString(), userId.toString(), String.valueOf(orderId)
);
int r = result.intValue();
if (r == NumberConstants.NOT_ENOUGH){
// 7.判断库存是否充足
throw new BaseException(ExceptionConstants.NOT_ENOUGH);
} else if (r == NumberConstants.EXIST_BUY) {
// 8.用户已经购买过了
throw new BaseException(ExceptionConstants.EXIST_BUY);
}
//9.返回订单id
return orderId;
}
@Transactional
public void createVoucherOrder(VoucherOrder voucherOrder) {
Long userId = voucherOrder.getUserId();
// 1.查询订单
int count = query().eq("user_id", userId).eq("voucher_id", voucherOrder.getVoucherId()).count();
// 2.判断是否存在
if (count > 0) {
// 用户已经购买过了
throw new BaseException(ExceptionConstants.EXIST_BUY);
}
// 2.扣减库存
boolean success = seckillVoucherService.update()
.setSql("stock = stock – 1") // set stock = stock – 1
.eq("voucher_id", voucherOrder.getVoucherId()).gt("stock", 0) // where id = ? and stock > 0
.update();
if (!success) {
// 扣减失败
throw new BaseException(ExceptionConstants.NOT_ENOUGH);
}
save(voucherOrder);
}
}
主业务更加简洁明了
@Slf4j
@Component
public class VoucherOrderHandler implements Runnable {
@Autowired
private IVoucherOrderService voucherOrderService;
@Autowired
private RedissonClient redissonClient;
@Autowired
private StringRedisTemplate stringRedisTemplate;
@Override
public void run() {
while (true) {
try {
// 1.获取消息队列中的订单信息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS s1 >
List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
Consumer.from("g1", "c1"),
StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),
StreamOffset.create("stream.orders", ReadOffset.lastConsumed())
);
// 2.判断订单信息是否为空
if (list == null || list.isEmpty()) {
// 如果为null,说明没有消息,继续下一次循环
continue;
}
// 解析数据
MapRecord<String, Object, Object> record = list.get(0);
Map<Object, Object> value = record.getValue();
VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);
//3.获取用户
Long userId = voucherOrder.getUserId();
// 4.创建锁对象
String key = RedisConstants.getLockOrderKey(userId);
// 5.尝试获取锁
RLock lock = redissonClient.getLock(key);
try {
boolean isLock = lock.tryLock();
// 6.判断是否获得锁成功
if (!isLock) {
// 获取锁失败,直接返回失败或者重试
throw new BaseException(ExceptionConstants.EXIST_BUY);
}
voucherOrderService.createVoucherOrder(voucherOrder);
// 7.确认消息 XACK
stringRedisTemplate.opsForStream().acknowledge("s1", "g1", record.getId());
} finally {
// 释放锁
lock.unlock();
}
} catch (Exception e) {
log.error("处理订单异常", e);
//处理异常消息
handlePendingList();
}
}
}
private void handlePendingList() {
while (true) {
try {
// 1.获取pending-list中的订单信息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS s1 0
List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
Consumer.from("g1", "c1"),
StreamReadOptions.empty().count(1),
StreamOffset.create("stream.orders", ReadOffset.from("0"))
);
// 2.判断订单信息是否为空
if (list == null || list.isEmpty()) {
// 如果为null,说明没有异常消息,结束循环
break;
}
// 解析数据
MapRecord<String, Object, Object> record = list.get(0);
Map<Object, Object> value = record.getValue();
VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);
//3.获取用户
Long userId = voucherOrder.getUserId();
// 4.创建锁对象
String key = RedisConstants.getLockOrderKey(userId);
// 5.尝试获取锁
RLock lock = redissonClient.getLock(key);
try {
boolean isLock = lock.tryLock();
// 6.判断是否获得锁成功
if (!isLock) {
// 获取锁失败,直接返回失败或者重试
throw new BaseException(ExceptionConstants.EXIST_BUY);
}
voucherOrderService.createVoucherOrder(voucherOrder);
// 7.确认消息 XACK
stringRedisTemplate.opsForStream().acknowledge("s1", "g1", record.getId());
} finally {
// 释放锁
lock.unlock();
}
} catch (Exception e) {
log.error("处理pendding订单异常", e);
try {
Thread.sleep(20);
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
}
}
4.4 流程图

网硕互联帮助中心





评论前必须登录!
注册