无中间件依赖!SpringBoot + 本地消息表实现分布式最终一致性
在分布式系统中,跨服务数据一致性是核心难题 —— 例如电商下单时,需同时完成订单创建、库存扣减、积分增加等操作,若某一步失败,可能导致数据不一致。主流的分布式事务方案(如 Seata、RocketMQ 事务消息)需依赖额外中间件,运维成本高,不适用于小型团队或资源有限的场景。本文详细拆解 SpringBoot + 本地消息表 + 定时补偿 的轻量级方案,无需额外中间件,通过 “本地事务保障 + 定时重试” 实现分布式最终一致性。
一、分布式一致性痛点与方案选型
1. 核心痛点
分布式场景下,跨服务调用面临三大问题,导致数据不一致:
- 网络异常:服务间通信超时、断连,部分操作执行成功部分失败;
- 服务宕机:某服务执行中宕机,未完成后续操作;
- 数据冲突:并发场景下,多服务同时操作同一数据导致冲突。
传统 “同步调用” 方案(下单→扣库存→加积分)一旦中间环节失败,需手动回滚所有已执行操作,代码复杂且可靠性低。
2. 方案设计理念
本地消息表方案的核心是 “本地事务原子性 + 消息异步补偿”:
整体流程示意图:
服务A(订单):
1. 开启本地事务 → 2. 创建订单 → 3. 记录消息到本地消息表 → 4. 提交事务
5. 定时任务扫描消息表 → 6. 发送消息给服务B(库存)→ 7. 服务B执行扣库存 → 8. 回调更新消息状态
3. 技术选型与优势
| 开发框架 | SpringBoot(快速整合组件,简化配置) |
| 数据库 | MySQL(支持事务、行锁,满足本地消息表存储需求) |
| ORM 框架 | MyBatis-Plus(简化 CRUD 操作,支持乐观锁 / 悲观锁) |
| 定时任务 | Spring Scheduled(轻量级,无需额外部署,满足定时扫描需求) |
| 重试策略 | 指数退避算法(避免频繁重试导致服务压力,适配临时故障场景) |
| 幂等保障 | 消息唯一标识(msgId)+ 目标服务接口幂等设计 |
核心优势:
- 无额外依赖:无需部署消息队列、分布式事务中间件,降低运维成本;
- 实现简单:基于本地事务和定时任务,开发门槛低,易落地;
- 可靠性高:消息持久化存储,宕机后可恢复,通过重试保障最终一致性;
- 适配场景广:适用于订单创建、支付回调、库存同步等非实时强一致场景。
二、核心实现细节
1. 数据库设计(本地消息表)
本地消息表与业务表在同一数据库,确保业务操作与消息记录原子性:
— 本地消息表
CREATE TABLE `local_message` (
`id` bigint NOT NULL AUTO_INCREMENT COMMENT '主键ID',
`msg_id` varchar(64) NOT NULL COMMENT '消息唯一标识(UUID)',
`msg_type` varchar(32) NOT NULL COMMENT '消息类型(如:ORDER_CREATE、STOCK_DEDUCT)',
`msg_content` text NOT NULL COMMENT '消息内容(JSON格式)',
`target_service` varchar(64) NOT NULL COMMENT '目标服务(如:stock-service)',
`target_url` varchar(255) NOT NULL COMMENT '目标接口URL',
`status` tinyint NOT NULL COMMENT '消息状态:0-待处理,1-发送中,2-已完成,3-失败(死信)',
`retry_count` int NOT NULL DEFAULT '0' COMMENT '重试次数',
`next_retry_time` datetime NOT NULL COMMENT '下次重试时间',
`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
PRIMARY KEY (`id`),
UNIQUE KEY `uk_msg_id` (`msg_id`),
KEY `idx_status_next_retry_time` (`status`,`next_retry_time`) COMMENT '查询待重试消息索引'
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='本地消息表';
— 订单表(示例业务表)
CREATE TABLE `t_order` (
`id` bigint NOT NULL AUTO_INCREMENT COMMENT '订单ID',
`order_no` varchar(64) NOT NULL COMMENT '订单编号',
`user_id` bigint NOT NULL COMMENT '用户ID',
`product_id` bigint NOT NULL COMMENT '商品ID',
`quantity` int NOT NULL COMMENT '购买数量',
`amount` decimal(10,2) NOT NULL COMMENT '订单金额',
`status` tinyint NOT NULL COMMENT '订单状态:0-待支付,1-已支付,2-已取消',
`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,
`update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (`id`),
UNIQUE KEY `uk_order_no` (`order_no`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='订单表';
2. 核心实体类
(1)本地消息实体
@Data
@TableName("local_message")
public class LocalMessage {
@TableId(type = IdType.AUTO)
private Long id;
@TableField("msg_id")
private String msgId;
@TableField("msg_type")
private String msgType;
@TableField("msg_content")
private String msgContent;
@TableField("target_service")
private String targetService;
@TableField("target_url")
private String targetUrl;
@TableField("status")
private Integer status;
@TableField("retry_count")
private Integer retryCount;
@TableField("next_retry_time")
private LocalDateTime nextRetryTime;
@TableField("create_time")
private LocalDateTime createTime;
@TableField("update_time")
private LocalDateTime updateTime;
// 消息状态枚举
public static final Integer STATUS_PENDING = 0; // 待处理
public static final Integer STATUS_SENDING = 1; // 发送中
public static final Integer STATUS_COMPLETED = 2; // 已完成
public static final Integer STATUS_FAILED = 3; // 失败(死信)
}
(2)订单实体
@Data
@TableName("t_order")
public class Order {
@TableId(type = IdType.AUTO)
private Long id;
@TableField("order_no")
private String orderNo;
@TableField("user_id")
private Long userId;
@TableField("product_id")
private Long productId;
@TableField("quantity")
private Integer quantity;
@TableField("amount")
private BigDecimal amount;
@TableField("status")
private Integer status;
@TableField("create_time")
private LocalDateTime createTime;
@TableField("update_time")
private LocalDateTime updateTime;
// 订单状态枚举
public static final Integer STATUS_PENDING_PAY = 0; // 待支付
public static final Integer STATUS_PAID = 1; // 已支付
public static final Integer STATUS_CANCELLED = 2; // 已取消
}
3. 业务操作与消息记录(本地事务)
核心逻辑:订单创建与消息记录在同一本地事务中执行,确保原子性:
@Service
public class OrderService {
@Autowired
private OrderMapper orderMapper;
@Autowired
private LocalMessageMapper localMessageMapper;
@Autowired
private RestTemplate restTemplate;
// 最大重试次数
private static final int MAX_RETRY_COUNT = 5;
// 初始重试间隔(5秒)
private static final int INIT_RETRY_INTERVAL_SECONDS = 5;
/**
* 创建订单 + 记录本地消息(扣库存)
*/
@Transactional(rollbackFor = Exception.class)
public Order createOrder(OrderDTO orderDTO) {
// 1. 生成订单编号
String orderNo = IdUtil.fastSimpleUUID();
// 2. 构建订单对象
Order order = new Order();
order.setOrderNo(orderNo);
order.setUserId(orderDTO.getUserId());
order.setProductId(orderDTO.getProductId());
order.setQuantity(orderDTO.getQuantity());
order.setAmount(orderDTO.getAmount());
order.setStatus(Order.STATUS_PENDING_PAY);
order.setCreateTime(LocalDateTime.now());
order.setUpdateTime(LocalDateTime.now());
// 3. 保存订单(本地事务第一步)
orderMapper.insert(order);
// 4. 构建本地消息(扣库存消息)
LocalMessage localMessage = buildDeductStockMessage(order);
// 5. 保存本地消息(本地事务第二步)
localMessageMapper.insert(localMessage);
return order;
}
/**
* 构建扣库存本地消息
*/
private LocalMessage buildDeductStockMessage(Order order) {
LocalMessage message = new LocalMessage();
// 消息唯一标识(UUID)
message.setMsgId(IdUtil.fastUUID());
// 消息类型
message.setMsgType("STOCK_DEDUCT");
// 消息内容(JSON格式,包含商品ID、数量、订单号)
StockDeductDTO deductDTO = new StockDeductDTO();
deductDTO.setProductId(order.getProductId());
deductDTO.setQuantity(order.getQuantity());
deductDTO.setOrderNo(order.getOrderNo());
message.setMsgContent(JSON.toJSONString(deductDTO));
// 目标服务(库存服务)
message.setTargetService("stock-service");
// 目标接口URL(库存服务扣库存接口)
message.setTargetUrl("http://stock-service/api/stock/deduct");
// 消息状态:待处理
message.setStatus(LocalMessage.STATUS_PENDING);
// 初始重试次数:0
message.setRetryCount(0);
// 下次重试时间:当前时间 + 初始间隔
message.setNextRetryTime(LocalDateTime.now().plusSeconds(INIT_RETRY_INTERVAL_SECONDS));
message.setCreateTime(LocalDateTime.now());
message.setUpdateTime(LocalDateTime.now());
return message;
}
}
4. 定时补偿任务(扫描 + 发送消息)
通过 Spring Scheduled 定时扫描待处理消息,执行发送逻辑,失败则更新重试次数和下次重试时间:
@Component
@EnableScheduling
public class LocalMessageScheduledTask {
@Autowired
private LocalMessageMapper localMessageMapper;
@Autowired
private RestTemplate restTemplate;
// 定时任务执行间隔(30秒,可通过配置中心动态调整)
@Scheduled(cron = "0/30 * * * * ?")
public void processPendingMessages() {
log.info("开始扫描待处理本地消息");
// 1. 查询待处理且已到重试时间的消息(加悲观锁,防止并发处理)
List<LocalMessage> pendingMessages = localMessageMapper.selectPendingMessages(LocalDateTime.now());
if (CollectionUtils.isEmpty(pendingMessages)) {
log.info("无待处理本地消息");
return;
}
// 2. 遍历消息,执行发送逻辑
for (LocalMessage message : pendingMessages) {
try {
// 2.1 更新消息状态为“发送中”
updateMessageStatus(message.getId(), LocalMessage.STATUS_SENDING);
// 2.2 发送消息到目标服务
boolean sendSuccess = sendMessageToTargetService(message);
if (sendSuccess) {
// 2.3 发送成功:更新状态为“已完成”
updateMessageToCompleted(message.getId());
log.info("消息发送成功:msgId={}", message.getMsgId());
} else {
// 2.4 发送失败:处理重试逻辑
handleRetry(message);
}
} catch (Exception e) {
log.error("处理消息失败:msgId={}, 异常={}", message.getMsgId(), e.getMessage(), e);
// 异常时同样处理重试逻辑
handleRetry(message);
}
}
}
/**
* 发送消息到目标服务
*/
private boolean sendSuccess = sendMessageToTargetService(message);
if (sendSuccess) {
// 2.3 发送成功:更新状态为“已完成”
updateMessageToCompleted(message.getId());
log.info("消息发送成功:msgId={}", message.getMsgId());
} else {
// 2.4 发送失败:处理重试逻辑
handleRetry(message);
}
} catch (Exception e) {
log.error("处理消息失败:msgId={}, 异常={}", message.getMsgId(), e.getMessage(), e);
// 异常时同样处理重试逻辑
handleRetry(message);
}
}
}
/**
* 发送消息到目标服务
*/
private boolean sendMessageToTargetService(LocalMessage message) {
try {
// 构建请求头
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_JSON);
// 构建请求体
HttpEntity<String> requestEntity = new HttpEntity<>(message.getMsgContent(), headers);
// 发送POST请求
ResponseEntity<String> response = restTemplate.postForEntity(
message.getTargetUrl(),
requestEntity,
String.class
);
// 响应状态码200且返回成功标识,视为发送成功
return response.getStatusCode().is2xxSuccessful()
&& "success".equals(JSON.parseObject(response.getBody()).getString("code"));
} catch (Exception e) {
log.error("调用目标服务失败:msgId={}, targetUrl={}", message.getMsgId(), message.getTargetUrl(), e);
return false;
}
}
/**
* 处理重试逻辑(指数退避)
*/
private void handleRetry(LocalMessage message) {
int currentRetryCount = message.getRetryCount() + 1;
// 超过最大重试次数:标记为死信
if (currentRetryCount >= MAX_RETRY_COUNT) {
updateMessageToFailed(message.getId(), currentRetryCount);
log.warn("消息达到最大重试次数,标记为死信:msgId={}, retryCount={}", message.getMsgId(), currentRetryCount);
return;
}
// 未超过最大重试次数:计算下次重试时间(指数退避:5秒×2^重试次数)
long retryInterval = INIT_RETRY_INTERVAL_SECONDS * (long) Math.pow(2, currentRetryCount);
LocalDateTime nextRetryTime = LocalDateTime.now().plusSeconds(retryInterval);
// 更新重试次数和下次重试时间,状态重置为“待处理”
LocalMessage updateMsg = new LocalMessage();
updateMsg.setId(message.getId());
updateMsg.setRetryCount(currentRetryCount);
updateMsg.setNextRetryTime(nextRetryTime);
updateMsg.setStatus(LocalMessage.STATUS_PENDING);
updateMsg.setUpdateTime(LocalDateTime.now());
localMessageMapper.updateById(updateMsg);
log.warn("消息重试处理:msgId={}, 当前重试次数={}, 下次重试时间={}",
message.getMsgId(), currentRetryCount, nextRetryTime);
}
// ——————- 消息状态更新工具方法 ——————-
private void updateMessageStatus(Long id, Integer status) {
LocalMessage updateMsg = new LocalMessage();
updateMsg.setId(id);
updateMsg.setStatus(status);
updateMsg.setUpdateTime(LocalDateTime.now());
localMessageMapper.updateById(updateMsg);
}
private void updateMessageToCompleted(Long id) {
LocalMessage updateMsg = new LocalMessage();
updateMsg.setId(id);
updateMsg.setStatus(LocalMessage.STATUS_COMPLETED);
updateMsg.setUpdateTime(LocalDateTime.now());
localMessageMapper.updateById(updateMsg);
}
private void updateMessageToFailed(Long id, Integer retryCount) {
LocalMessage updateMsg = new LocalMessage();
updateMsg.setId(id);
updateMsg.setStatus(LocalMessage.STATUS_FAILED);
updateMsg.setRetryCount(retryCount);
updateMsg.setUpdateTime(LocalDateTime.now());
localMessageMapper.updateById(updateMsg);
}
}
5. Mapper 接口(MyBatis-Plus)
// 本地消息表Mapper
public interface LocalMessageMapper extends BaseMapper<LocalMessage> {
/**
* 查询待处理/发送中且已到重试时间的消息(悲观锁)
*/
@Select("SELECT * FROM local_message WHERE status IN (#{statusPending}, #{statusSending}) " +
"AND next_retry_time <= #{currentTime} FOR UPDATE")
List<LocalMessage> selectPendingMessages(
@Param("statusPending") Integer statusPending,
@Param("statusSending") Integer statusSending,
@Param("currentTime") LocalDateTime currentTime);
}
// 订单表Mapper
public interface OrderMapper extends BaseMapper<Order> {
// 基础CRUD由MyBatis-Plus自动生成
}
6. 目标服务幂等性实现(库存服务)
库存服务需保证接口幂等,防止重复扣减库存:
@Service
public class StockService {
@Autowired
private StockMapper stockMapper;
@Autowired
private StockOperateLogMapper operateLogMapper;
/**
* 扣库存(幂等实现)
*/
@Transactional(rollbackFor = Exception.class)
public boolean deductStock(StockDeductDTO deductDTO) {
String orderNo = deductDTO.getOrderNo();
Long productId = deductDTO.getProductId();
Integer quantity = deductDTO.getQuantity();
// 1. 幂等校验:查询是否已处理该订单的扣库存请求
StockOperateLog log = operateLogMapper.selectByOrderNo(orderNo);
if (log != null) {
// 已处理,直接返回成功
return true;
}
// 2. 扣减库存(悲观锁防止并发扣减)
Stock stock = stockMapper.selectByProductIdForUpdate(productId);
if (stock == null || stock.getQuantity() < quantity) {
throw new RuntimeException("库存不足,商品ID:" + productId);
}
stock.setQuantity(stock.getQuantity()–quantity);
stockMapper.updateById(stock);
// 3. 记录操作日志(幂等标记)
StockOperateLog operateLog = new StockOperateLog();
operateLog.setOrderNo(orderNo);
operateLog.setProductId(productId);
operateLog.setQuantity(quantity);
operateLog.setOperateType("DEDUCT");
operateLog.setCreateTime(LocalDateTime.now());
operateLogMapper.insert(operateLog);
return true;
}
}
三、生产环境优化与最佳实践
1. 性能优化
- 索引优化:消息表添加idx_status_next_retry_time复合索引,提升扫描效率;
- 批量处理:定时任务批量读取消息(如每次 100 条),减少数据库连接开销;
- 分库分表:高并发场景下,按消息类型或业务 ID 分片,避免消息表成为瓶颈;
- 异步化:消息发送逻辑异步执行,避免阻塞定时任务主线程。
2. 可靠性增强
- 死信处理:死信消息存入死信表,提供可视化界面人工重试;
- 监控告警:接入 Prometheus+Grafana,监控消息发送成功率、重试次数、死信数量,异常时告警;
- 分布式锁:定时任务执行时加分布式锁(如 Redis 锁),防止多实例重复扫描;
- 日志链路追踪:通过traceId串联业务操作与消息发送日志,便于问题排查。
3. 幂等性进阶方案
| 唯一标识 | 订单、支付等有唯一 ID 的场景 | 消息 ID / 订单号作为唯一键,查询操作日志判断是否已处理 |
| 乐观锁 | 库存扣减、余额更新等数值操作 | 基于版本号更新,UPDATE … SET version=version+1 WHERE version=#{version} |
| 状态机校验 | 流程类业务(如订单状态流转) | 状态变更需符合预设流程,如 “待支付”→“已支付” |
4. 与其他方案对比
| 本地消息表 | 无 | 低 | 中(取决于扫描间隔) | 小型团队、资源有限、轻量级场景 |
| RocketMQ 事务消息 | 消息中间件 | 中 | 高 | 中大型团队、高并发、需高可靠场景 |
| Seata | 分布式事务框架 | 高 | 高 | 强一致性需求、复杂业务场景 |
四、总结与演进方向
本地消息表方案以无中间件依赖、实现简单、可靠性高的特点,成为小型团队解决分布式最终一致性问题的首选。核心是通过本地事务保证业务与消息的原子性,定时补偿实现消息必达,幂等设计防止重复处理。
适用场景
- 对实时性要求不高的场景(如积分同步、物流状态更新);
- 不想引入复杂中间件的轻量级微服务系统;
- 订单创建、库存扣减、支付回调等核心业务场景。
网硕互联帮助中心






评论前必须登录!
注册