云计算百科
云计算领域专业知识百科平台

无中间件依赖!SpringBoot + 本地消息表实现分布式最终一致性

无中间件依赖!SpringBoot + 本地消息表实现分布式最终一致性

在分布式系统中,跨服务数据一致性是核心难题 —— 例如电商下单时,需同时完成订单创建、库存扣减、积分增加等操作,若某一步失败,可能导致数据不一致。主流的分布式事务方案(如 Seata、RocketMQ 事务消息)需依赖额外中间件,运维成本高,不适用于小型团队或资源有限的场景。本文详细拆解 SpringBoot + 本地消息表 + 定时补偿 的轻量级方案,无需额外中间件,通过 “本地事务保障 + 定时重试” 实现分布式最终一致性。

一、分布式一致性痛点与方案选型

1. 核心痛点

分布式场景下,跨服务调用面临三大问题,导致数据不一致:

  • 网络异常:服务间通信超时、断连,部分操作执行成功部分失败;
  • 服务宕机:某服务执行中宕机,未完成后续操作;
  • 数据冲突:并发场景下,多服务同时操作同一数据导致冲突。

传统 “同步调用” 方案(下单→扣库存→加积分)一旦中间环节失败,需手动回滚所有已执行操作,代码复杂且可靠性低。

2. 方案设计理念

本地消息表方案的核心是 “本地事务原子性 + 消息异步补偿”:

  • 将跨服务操作转化为 “业务操作 + 记录消息” 的本地事务,确保两者同时成功或同时回滚;
  • 通过定时任务扫描未完成的消息,异步调用目标服务;
  • 采用重试机制处理临时失败,达到最大重试次数后标记为死信,人工介入处理。
  • 整体流程示意图:

    服务A(订单):
    1. 开启本地事务 → 2. 创建订单 → 3. 记录消息到本地消息表 → 4. 提交事务
    5. 定时任务扫描消息表 → 6. 发送消息给服务B(库存)→ 7. 服务B执行扣库存 → 8. 回调更新消息状态

    3. 技术选型与优势

    组件选型理由
    开发框架 SpringBoot(快速整合组件,简化配置)
    数据库 MySQL(支持事务、行锁,满足本地消息表存储需求)
    ORM 框架 MyBatis-Plus(简化 CRUD 操作,支持乐观锁 / 悲观锁)
    定时任务 Spring Scheduled(轻量级,无需额外部署,满足定时扫描需求)
    重试策略 指数退避算法(避免频繁重试导致服务压力,适配临时故障场景)
    幂等保障 消息唯一标识(msgId)+ 目标服务接口幂等设计

    核心优势:

    • 无额外依赖:无需部署消息队列、分布式事务中间件,降低运维成本;
    • 实现简单:基于本地事务和定时任务,开发门槛低,易落地;
    • 可靠性高:消息持久化存储,宕机后可恢复,通过重试保障最终一致性;
    • 适配场景广:适用于订单创建、支付回调、库存同步等非实时强一致场景。

    二、核心实现细节

    1. 数据库设计(本地消息表)

    本地消息表与业务表在同一数据库,确保业务操作与消息记录原子性:

    — 本地消息表
    CREATE TABLE `local_message` (
    `id` bigint NOT NULL AUTO_INCREMENT COMMENT '主键ID',
    `msg_id` varchar(64) NOT NULL COMMENT '消息唯一标识(UUID)',
    `msg_type` varchar(32) NOT NULL COMMENT '消息类型(如:ORDER_CREATE、STOCK_DEDUCT)',
    `msg_content` text NOT NULL COMMENT '消息内容(JSON格式)',
    `target_service` varchar(64) NOT NULL COMMENT '目标服务(如:stock-service)',
    `target_url` varchar(255) NOT NULL COMMENT '目标接口URL',
    `status` tinyint NOT NULL COMMENT '消息状态:0-待处理,1-发送中,2-已完成,3-失败(死信)',
    `retry_count` int NOT NULL DEFAULT '0' COMMENT '重试次数',
    `next_retry_time` datetime NOT NULL COMMENT '下次重试时间',
    `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
    `update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
    PRIMARY KEY (`id`),
    UNIQUE KEY `uk_msg_id` (`msg_id`),
    KEY `idx_status_next_retry_time` (`status`,`next_retry_time`) COMMENT '查询待重试消息索引'
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='本地消息表';

    — 订单表(示例业务表)
    CREATE TABLE `t_order` (
    `id` bigint NOT NULL AUTO_INCREMENT COMMENT '订单ID',
    `order_no` varchar(64) NOT NULL COMMENT '订单编号',
    `user_id` bigint NOT NULL COMMENT '用户ID',
    `product_id` bigint NOT NULL COMMENT '商品ID',
    `quantity` int NOT NULL COMMENT '购买数量',
    `amount` decimal(10,2) NOT NULL COMMENT '订单金额',
    `status` tinyint NOT NULL COMMENT '订单状态:0-待支付,1-已支付,2-已取消',
    `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,
    `update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
    PRIMARY KEY (`id`),
    UNIQUE KEY `uk_order_no` (`order_no`)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='订单表';

    2. 核心实体类

    (1)本地消息实体

    @Data
    @TableName("local_message")
    public class LocalMessage {
    @TableId(type = IdType.AUTO)
    private Long id;
    @TableField("msg_id")
    private String msgId;
    @TableField("msg_type")
    private String msgType;
    @TableField("msg_content")
    private String msgContent;
    @TableField("target_service")
    private String targetService;
    @TableField("target_url")
    private String targetUrl;
    @TableField("status")
    private Integer status;
    @TableField("retry_count")
    private Integer retryCount;
    @TableField("next_retry_time")
    private LocalDateTime nextRetryTime;
    @TableField("create_time")
    private LocalDateTime createTime;
    @TableField("update_time")
    private LocalDateTime updateTime;

    // 消息状态枚举
    public static final Integer STATUS_PENDING = 0; // 待处理
    public static final Integer STATUS_SENDING = 1; // 发送中
    public static final Integer STATUS_COMPLETED = 2; // 已完成
    public static final Integer STATUS_FAILED = 3; // 失败(死信)
    }

    (2)订单实体

    @Data
    @TableName("t_order")
    public class Order {
    @TableId(type = IdType.AUTO)
    private Long id;
    @TableField("order_no")
    private String orderNo;
    @TableField("user_id")
    private Long userId;
    @TableField("product_id")
    private Long productId;
    @TableField("quantity")
    private Integer quantity;
    @TableField("amount")
    private BigDecimal amount;
    @TableField("status")
    private Integer status;
    @TableField("create_time")
    private LocalDateTime createTime;
    @TableField("update_time")
    private LocalDateTime updateTime;

    // 订单状态枚举
    public static final Integer STATUS_PENDING_PAY = 0; // 待支付
    public static final Integer STATUS_PAID = 1; // 已支付
    public static final Integer STATUS_CANCELLED = 2; // 已取消
    }

    3. 业务操作与消息记录(本地事务)

    核心逻辑:订单创建与消息记录在同一本地事务中执行,确保原子性:

    @Service
    public class OrderService {

    @Autowired
    private OrderMapper orderMapper;
    @Autowired
    private LocalMessageMapper localMessageMapper;
    @Autowired
    private RestTemplate restTemplate;

    // 最大重试次数
    private static final int MAX_RETRY_COUNT = 5;
    // 初始重试间隔(5秒)
    private static final int INIT_RETRY_INTERVAL_SECONDS = 5;

    /**
    * 创建订单 + 记录本地消息(扣库存)
    */

    @Transactional(rollbackFor = Exception.class)
    public Order createOrder(OrderDTO orderDTO) {
    // 1. 生成订单编号
    String orderNo = IdUtil.fastSimpleUUID();

    // 2. 构建订单对象
    Order order = new Order();
    order.setOrderNo(orderNo);
    order.setUserId(orderDTO.getUserId());
    order.setProductId(orderDTO.getProductId());
    order.setQuantity(orderDTO.getQuantity());
    order.setAmount(orderDTO.getAmount());
    order.setStatus(Order.STATUS_PENDING_PAY);
    order.setCreateTime(LocalDateTime.now());
    order.setUpdateTime(LocalDateTime.now());

    // 3. 保存订单(本地事务第一步)
    orderMapper.insert(order);

    // 4. 构建本地消息(扣库存消息)
    LocalMessage localMessage = buildDeductStockMessage(order);

    // 5. 保存本地消息(本地事务第二步)
    localMessageMapper.insert(localMessage);

    return order;
    }

    /**
    * 构建扣库存本地消息
    */

    private LocalMessage buildDeductStockMessage(Order order) {
    LocalMessage message = new LocalMessage();
    // 消息唯一标识(UUID)
    message.setMsgId(IdUtil.fastUUID());
    // 消息类型
    message.setMsgType("STOCK_DEDUCT");
    // 消息内容(JSON格式,包含商品ID、数量、订单号)
    StockDeductDTO deductDTO = new StockDeductDTO();
    deductDTO.setProductId(order.getProductId());
    deductDTO.setQuantity(order.getQuantity());
    deductDTO.setOrderNo(order.getOrderNo());
    message.setMsgContent(JSON.toJSONString(deductDTO));
    // 目标服务(库存服务)
    message.setTargetService("stock-service");
    // 目标接口URL(库存服务扣库存接口)
    message.setTargetUrl("http://stock-service/api/stock/deduct");
    // 消息状态:待处理
    message.setStatus(LocalMessage.STATUS_PENDING);
    // 初始重试次数:0
    message.setRetryCount(0);
    // 下次重试时间:当前时间 + 初始间隔
    message.setNextRetryTime(LocalDateTime.now().plusSeconds(INIT_RETRY_INTERVAL_SECONDS));
    message.setCreateTime(LocalDateTime.now());
    message.setUpdateTime(LocalDateTime.now());

    return message;
    }
    }

    4. 定时补偿任务(扫描 + 发送消息)

    通过 Spring Scheduled 定时扫描待处理消息,执行发送逻辑,失败则更新重试次数和下次重试时间:

    @Component
    @EnableScheduling
    public class LocalMessageScheduledTask {

    @Autowired
    private LocalMessageMapper localMessageMapper;
    @Autowired
    private RestTemplate restTemplate;

    // 定时任务执行间隔(30秒,可通过配置中心动态调整)
    @Scheduled(cron = "0/30 * * * * ?")
    public void processPendingMessages() {
    log.info("开始扫描待处理本地消息");

    // 1. 查询待处理且已到重试时间的消息(加悲观锁,防止并发处理)
    List<LocalMessage> pendingMessages = localMessageMapper.selectPendingMessages(LocalDateTime.now());
    if (CollectionUtils.isEmpty(pendingMessages)) {
    log.info("无待处理本地消息");
    return;
    }

    // 2. 遍历消息,执行发送逻辑
    for (LocalMessage message : pendingMessages) {
    try {
    // 2.1 更新消息状态为“发送中”
    updateMessageStatus(message.getId(), LocalMessage.STATUS_SENDING);

    // 2.2 发送消息到目标服务
    boolean sendSuccess = sendMessageToTargetService(message);
    if (sendSuccess) {
    // 2.3 发送成功:更新状态为“已完成”
    updateMessageToCompleted(message.getId());
    log.info("消息发送成功:msgId={}", message.getMsgId());
    } else {
    // 2.4 发送失败:处理重试逻辑
    handleRetry(message);
    }
    } catch (Exception e) {
    log.error("处理消息失败:msgId={}, 异常={}", message.getMsgId(), e.getMessage(), e);
    // 异常时同样处理重试逻辑
    handleRetry(message);
    }
    }
    }

    /**
    * 发送消息到目标服务
    */

    private boolean sendSuccess = sendMessageToTargetService(message);
    if (sendSuccess) {
    // 2.3 发送成功:更新状态为“已完成”
    updateMessageToCompleted(message.getId());
    log.info("消息发送成功:msgId={}", message.getMsgId());
    } else {
    // 2.4 发送失败:处理重试逻辑
    handleRetry(message);
    }
    } catch (Exception e) {
    log.error("处理消息失败:msgId={}, 异常={}", message.getMsgId(), e.getMessage(), e);
    // 异常时同样处理重试逻辑
    handleRetry(message);
    }
    }
    }

    /**
    * 发送消息到目标服务
    */

    private boolean sendMessageToTargetService(LocalMessage message) {
    try {
    // 构建请求头
    HttpHeaders headers = new HttpHeaders();
    headers.setContentType(MediaType.APPLICATION_JSON);

    // 构建请求体
    HttpEntity<String> requestEntity = new HttpEntity<>(message.getMsgContent(), headers);

    // 发送POST请求
    ResponseEntity<String> response = restTemplate.postForEntity(
    message.getTargetUrl(),
    requestEntity,
    String.class
    );

    // 响应状态码200且返回成功标识,视为发送成功
    return response.getStatusCode().is2xxSuccessful()
    && "success".equals(JSON.parseObject(response.getBody()).getString("code"));
    } catch (Exception e) {
    log.error("调用目标服务失败:msgId={}, targetUrl={}", message.getMsgId(), message.getTargetUrl(), e);
    return false;
    }
    }

    /**
    * 处理重试逻辑(指数退避)
    */

    private void handleRetry(LocalMessage message) {
    int currentRetryCount = message.getRetryCount() + 1;

    // 超过最大重试次数:标记为死信
    if (currentRetryCount >= MAX_RETRY_COUNT) {
    updateMessageToFailed(message.getId(), currentRetryCount);
    log.warn("消息达到最大重试次数,标记为死信:msgId={}, retryCount={}", message.getMsgId(), currentRetryCount);
    return;
    }

    // 未超过最大重试次数:计算下次重试时间(指数退避:5秒×2^重试次数)
    long retryInterval = INIT_RETRY_INTERVAL_SECONDS * (long) Math.pow(2, currentRetryCount);
    LocalDateTime nextRetryTime = LocalDateTime.now().plusSeconds(retryInterval);

    // 更新重试次数和下次重试时间,状态重置为“待处理”
    LocalMessage updateMsg = new LocalMessage();
    updateMsg.setId(message.getId());
    updateMsg.setRetryCount(currentRetryCount);
    updateMsg.setNextRetryTime(nextRetryTime);
    updateMsg.setStatus(LocalMessage.STATUS_PENDING);
    updateMsg.setUpdateTime(LocalDateTime.now());
    localMessageMapper.updateById(updateMsg);

    log.warn("消息重试处理:msgId={}, 当前重试次数={}, 下次重试时间={}",
    message.getMsgId(), currentRetryCount, nextRetryTime);
    }

    // ——————- 消息状态更新工具方法 ——————-
    private void updateMessageStatus(Long id, Integer status) {
    LocalMessage updateMsg = new LocalMessage();
    updateMsg.setId(id);
    updateMsg.setStatus(status);
    updateMsg.setUpdateTime(LocalDateTime.now());
    localMessageMapper.updateById(updateMsg);
    }

    private void updateMessageToCompleted(Long id) {
    LocalMessage updateMsg = new LocalMessage();
    updateMsg.setId(id);
    updateMsg.setStatus(LocalMessage.STATUS_COMPLETED);
    updateMsg.setUpdateTime(LocalDateTime.now());
    localMessageMapper.updateById(updateMsg);
    }

    private void updateMessageToFailed(Long id, Integer retryCount) {
    LocalMessage updateMsg = new LocalMessage();
    updateMsg.setId(id);
    updateMsg.setStatus(LocalMessage.STATUS_FAILED);
    updateMsg.setRetryCount(retryCount);
    updateMsg.setUpdateTime(LocalDateTime.now());
    localMessageMapper.updateById(updateMsg);
    }
    }

    5. Mapper 接口(MyBatis-Plus)

    // 本地消息表Mapper
    public interface LocalMessageMapper extends BaseMapper<LocalMessage> {

    /**
    * 查询待处理/发送中且已到重试时间的消息(悲观锁)
    */

    @Select("SELECT * FROM local_message WHERE status IN (#{statusPending}, #{statusSending}) " +
    "AND next_retry_time <= #{currentTime} FOR UPDATE")
    List<LocalMessage> selectPendingMessages(
    @Param("statusPending") Integer statusPending,
    @Param("statusSending") Integer statusSending,
    @Param("currentTime") LocalDateTime currentTime);
    }

    // 订单表Mapper
    public interface OrderMapper extends BaseMapper<Order> {
    // 基础CRUD由MyBatis-Plus自动生成
    }

    6. 目标服务幂等性实现(库存服务)

    库存服务需保证接口幂等,防止重复扣减库存:

    @Service
    public class StockService {

    @Autowired
    private StockMapper stockMapper;
    @Autowired
    private StockOperateLogMapper operateLogMapper;

    /**
    * 扣库存(幂等实现)
    */

    @Transactional(rollbackFor = Exception.class)
    public boolean deductStock(StockDeductDTO deductDTO) {
    String orderNo = deductDTO.getOrderNo();
    Long productId = deductDTO.getProductId();
    Integer quantity = deductDTO.getQuantity();

    // 1. 幂等校验:查询是否已处理该订单的扣库存请求
    StockOperateLog log = operateLogMapper.selectByOrderNo(orderNo);
    if (log != null) {
    // 已处理,直接返回成功
    return true;
    }

    // 2. 扣减库存(悲观锁防止并发扣减)
    Stock stock = stockMapper.selectByProductIdForUpdate(productId);
    if (stock == null || stock.getQuantity() < quantity) {
    throw new RuntimeException("库存不足,商品ID:" + productId);
    }
    stock.setQuantity(stock.getQuantity()quantity);
    stockMapper.updateById(stock);

    // 3. 记录操作日志(幂等标记)
    StockOperateLog operateLog = new StockOperateLog();
    operateLog.setOrderNo(orderNo);
    operateLog.setProductId(productId);
    operateLog.setQuantity(quantity);
    operateLog.setOperateType("DEDUCT");
    operateLog.setCreateTime(LocalDateTime.now());
    operateLogMapper.insert(operateLog);

    return true;
    }
    }

    三、生产环境优化与最佳实践

    1. 性能优化

    • 索引优化:消息表添加idx_status_next_retry_time复合索引,提升扫描效率;
    • 批量处理:定时任务批量读取消息(如每次 100 条),减少数据库连接开销;
    • 分库分表:高并发场景下,按消息类型或业务 ID 分片,避免消息表成为瓶颈;
    • 异步化:消息发送逻辑异步执行,避免阻塞定时任务主线程。

    2. 可靠性增强

    • 死信处理:死信消息存入死信表,提供可视化界面人工重试;
    • 监控告警:接入 Prometheus+Grafana,监控消息发送成功率、重试次数、死信数量,异常时告警;
    • 分布式锁:定时任务执行时加分布式锁(如 Redis 锁),防止多实例重复扫描;
    • 日志链路追踪:通过traceId串联业务操作与消息发送日志,便于问题排查。

    3. 幂等性进阶方案

    幂等实现方式适用场景实现要点
    唯一标识 订单、支付等有唯一 ID 的场景 消息 ID / 订单号作为唯一键,查询操作日志判断是否已处理
    乐观锁 库存扣减、余额更新等数值操作 基于版本号更新,UPDATE … SET version=version+1 WHERE version=#{version}
    状态机校验 流程类业务(如订单状态流转) 状态变更需符合预设流程,如 “待支付”→“已支付”

    4. 与其他方案对比

    方案依赖复杂度实时性适用场景
    本地消息表 中(取决于扫描间隔) 小型团队、资源有限、轻量级场景
    RocketMQ 事务消息 消息中间件 中大型团队、高并发、需高可靠场景
    Seata 分布式事务框架 强一致性需求、复杂业务场景

    四、总结与演进方向

    本地消息表方案以无中间件依赖、实现简单、可靠性高的特点,成为小型团队解决分布式最终一致性问题的首选。核心是通过本地事务保证业务与消息的原子性,定时补偿实现消息必达,幂等设计防止重复处理。

    适用场景

    • 对实时性要求不高的场景(如积分同步、物流状态更新);
    • 不想引入复杂中间件的轻量级微服务系统;
    • 订单创建、库存扣减、支付回调等核心业务场景。

    演进方向

  • 接入消息中间件:当业务规模扩大,可平滑迁移至 RabbitMQ/RocketMQ/Kafka 事务消息,提升实时性与吞吐量;
  • 引入 Saga 模式:处理长链路业务,支持正向流程与反向补偿;
  • 云原生适配:结合 Serverless 架构,实现消息处理的弹性扩缩容。
  • 赞(0)
    未经允许不得转载:网硕互联帮助中心 » 无中间件依赖!SpringBoot + 本地消息表实现分布式最终一致性
    分享到: 更多 (0)

    评论 抢沙发

    评论前必须登录!