云计算百科
云计算领域专业知识百科平台

高并发系统中常遇到的问题

一、缓存问题

1.1 缓存穿透

解决方案:使用布隆过滤器(Bloom Filter)拦截不存在的数据请求。

操作步骤:

  • 初始化布隆过滤器,将数据库中存在的数据ID全部放入布隆过滤器。

  • 查询请求到达时,先检查布隆过滤器是否存在该key。

  • 如果布隆过滤器判断不存在,则直接返回空,避免查询数据库。

  • 如果存在,则走缓存、数据库查询逻辑。

  • Java代码示例(使用Google Guava的BloomFilter):

    java

    import com.google.common.hash.BloomFilter;
    import com.google.common.hash.Funnels;
    import org.springframework.stereotype.Service;
    import javax.annotation.PostConstruct;
    import java.nio.charset.Charset;
    import java.util.List;

    @Service
    public class ProductService {
    private BloomFilter<String> bloomFilter;
    private final int EXPECTED_INSERTIONS = 1000000; // 预计数据量
    private final double FPP = 0.01; // 误判率

    @PostConstruct
    public void init() {
    bloomFilter = BloomFilter.create(
    Funnels.stringFunnel(Charset.defaultCharset()),
    EXPECTED_INSERTIONS,
    FPP);
    // 从数据库加载所有存在的商品ID,添加到布隆过滤器
    List<String> allProductIds = loadAllProductIdsFromDB();
    for (String id : allProductIds) {
    bloomFilter.put(id);
    }
    }

    public Product getProduct(String productId) {
    // 1. 布隆过滤器判断不存在,直接返回null
    if (!bloomFilter.mightContain(productId)) {
    return null;
    }
    // 2. 查询缓存(省略具体代码)
    Product product = getFromCache(productId);
    if (product != null) {
    return product;
    }
    // 3. 缓存不存在,查询数据库
    product = getFromDB(productId);
    if (product != null) {
    // 放入缓存,设置过期时间
    putToCache(productId, product);
    } else {
    // 可选:缓存空对象,但布隆过滤器已拦截大部分,可不缓存空值
    }
    return product;
    }
    }


    1.2 缓存雪崩

    解决方案:设置不同的过期时间(基础过期时间 + 随机值)。

    操作步骤:

  • 在放入缓存时,为key设置一个基础过期时间(如1小时)。

  • 在此基础上增加一个随机秒数(如0~300秒),使key的过期时间分散开。

  • 避免大量key同时失效。

  • Java代码示例(使用Spring Data Redis):

    java

    import org.springframework.data.redis.core.RedisTemplate;
    import org.springframework.stereotype.Service;
    import java.util.concurrent.TimeUnit;

    @Service
    public class CacheService {
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;

    public void putWithRandomExpire(String key, Object value, long baseTimeout, TimeUnit unit) {
    long timeout = unit.toSeconds(baseTimeout);
    // 增加随机值,范围0~300秒
    long random = (long) (Math.random() * 300);
    long finalTimeout = timeout + random;
    redisTemplate.opsForValue().set(key, value, finalTimeout, TimeUnit.SECONDS);
    }
    }


    1.3 缓存失效(缓存击穿)

    解决方案:使用互斥锁(分布式锁)保证只有一个线程去加载数据。

    操作步骤:

  • 查询缓存,如果命中则返回。

  • 如果未命中,尝试获取分布式锁。

  • 获取锁成功后,再次查询缓存(防止在等待锁的过程中其他线程已经加载了数据)。

  • 如果缓存仍不存在,则查询数据库,并更新缓存。

  • 释放锁。

  • Java代码示例(使用Redisson实现分布式锁):

    java

    import org.redisson.api.RLock;
    import org.redisson.api.RedissonClient;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Service;

    @Service
    public class ProductService {
    @Autowired
    private RedissonClient redissonClient;

    public Product getProductWithLock(String productId) {
    // 1. 查询缓存
    Product product = getFromCache(productId);
    if (product != null) {
    return product;
    }

    // 2. 尝试获取分布式锁
    String lockKey = "lock:product:" + productId;
    RLock lock = redissonClient.getLock(lockKey);
    try {
    // 尝试加锁,最多等待3秒,锁过期时间10秒
    if (lock.tryLock(3, 10, TimeUnit.SECONDS)) {
    try {
    // 3. 再次查询缓存,可能其他线程已经加载
    product = getFromCache(productId);
    if (product != null) {
    return product;
    }
    // 4. 查询数据库
    product = getFromDB(productId);
    if (product != null) {
    putToCache(productId, product);
    }
    return product;
    } finally {
    lock.unlock();
    }
    } else {
    // 获取锁失败,等待重试或返回空
    Thread.sleep(100);
    return getProductWithLock(productId); // 递归重试
    }
    } catch (InterruptedException e) {
    Thread.currentThread().interrupt();
    return null;
    }
    }
    }


    1.4 热点缓存重建

    解决方案:逻辑过期 + 异步刷新(缓存永不过期,但存储逻辑过期时间)。

    操作步骤:

  • 缓存中保存一个包装对象,包含实际数据和逻辑过期时间。

  • 查询时,如果逻辑时间未过期,直接返回数据。

  • 如果逻辑时间已过期,则异步线程去加载最新数据更新缓存,同时返回旧数据给当前请求(避免等待)。

  • 使用分布式锁防止多个线程同时刷新。

  • Java代码示例:

    java

    import com.fasterxml.jackson.annotation.JsonIgnore;
    import lombok.Data;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;

    @Data
    class CacheWrapper<T> {
    private T data;
    private long expireTime; // 逻辑过期时间戳(毫秒)
    }

    @Service
    public class HotProductService {
    private ExecutorService executor = Executors.newFixedThreadPool(10);
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;

    public Product getHotProduct(String productId) {
    String key = "hot:product:" + productId;
    CacheWrapper<Product> wrapper = (CacheWrapper<Product>) redisTemplate.opsForValue().get(key);
    if (wrapper == null) {
    // 缓存不存在,直接加载(同步)
    return loadProductToCache(productId);
    }
    if (wrapper.getExpireTime() > System.currentTimeMillis()) {
    // 逻辑时间未过期,直接返回
    return wrapper.getData();
    }
    // 逻辑过期,异步刷新
    executor.submit(() -> {
    // 尝试获取分布式锁,避免并发刷新
    String lockKey = "lock:refresh:" + productId;
    RLock lock = redissonClient.getLock(lockKey);
    if (lock.tryLock()) {
    try {
    // 再次检查是否已被其他线程刷新
    CacheWrapper<Product> latest = (CacheWrapper<Product>) redisTemplate.opsForValue().get(key);
    if (latest != null && latest.getExpireTime() > System.currentTimeMillis()) {
    return;
    }
    // 加载数据库
    Product product = getFromDB(productId);
    CacheWrapper<Product> newWrapper = new CacheWrapper<>();
    newWrapper.setData(product);
    newWrapper.setExpireTime(System.currentTimeMillis() + 3600_000); // 设置1小时后过期
    redisTemplate.opsForValue().set(key, newWrapper);
    } finally {
    lock.unlock();
    }
    }
    });
    // 返回旧数据
    return wrapper.getData();
    }
    }


    1.5 缓存数据库双写不一致

    解决方案:先更新数据库,再删除缓存(经典的Cache-Aside模式),配合延迟双删或Binlog监听。

    操作步骤(延迟双删):

  • 删除缓存。

  • 更新数据库。

  • 休眠一小段时间(如500ms),再次删除缓存,确保在更新过程中可能产生的并发读请求带来的脏缓存被清除。

  • Java代码示例:

    java

    @Service
    public class ProductUpdateService {
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;
    @Autowired
    private JdbcTemplate jdbcTemplate;

    public void updateProduct(Product product) {
    String cacheKey = "product:" + product.getId();
    // 第一次删除缓存
    redisTemplate.delete(cacheKey);

    // 更新数据库
    jdbcTemplate.update("UPDATE product SET name = ? WHERE id = ?", product.getName(), product.getId());

    // 延迟一段时间后再次删除
    ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
    scheduler.schedule(() -> {
    redisTemplate.delete(cacheKey);
    }, 500, TimeUnit.MILLISECONDS);
    }
    }

    更可靠的方案:监听MySQL binlog(使用Canal),异步删除缓存。

    操作步骤:

  • 部署Canal,伪装为MySQL slave,接收binlog变更。

  • 在Java应用中集成Canal客户端,解析binlog中的更新事件。

  • 根据变更的数据构造缓存key,并删除缓存。

  • Java代码示例(Canal客户端):

    java

    import com.alibaba.otter.canal.client.CanalConnector;
    import com.alibaba.otter.canal.client.CanalConnectors;
    import com.alibaba.otter.canal.protocol.Message;
    import com.alibaba.otter.canal.protocol.CanalEntry.*;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    import javax.annotation.PostConstruct;
    import java.net.InetSocketAddress;
    import java.util.List;

    @Component
    public class CanalClient {
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;

    @PostConstruct
    public void run() {
    CanalConnector connector = CanalConnectors.newSingleConnector(
    new InetSocketAddress("127.0.0.1", 11111), "example", "", "");
    while (true) {
    connector.connect();
    connector.subscribe(".*\\\\..*");
    connector.rollback();
    while (true) {
    Message message = connector.getWithoutAck(100); // 获取100条数据
    long batchId = message.getId();
    if (batchId == -1 || message.getEntries().isEmpty()) {
    try { Thread.sleep(1000); } catch (InterruptedException e) {}
    continue;
    }
    List<Entry> entries = message.getEntries();
    for (Entry entry : entries) {
    if (entry.getEntryType() == EntryType.ROWDATA) {
    RowChange rowChange;
    try {
    rowChange = RowChange.parseFrom(entry.getStoreValue());
    } catch (Exception e) {
    continue;
    }
    EventType eventType = rowChange.getEventType();
    if (eventType == EventType.UPDATE || eventType == EventType.INSERT || eventType == EventType.DELETE) {
    String tableName = entry.getHeader().getTableName();
    for (RowData rowData : rowChange.getRowDatasList()) {
    if ("product".equals(tableName)) {
    // 获取主键ID(假设列名为id)
    String id = null;
    for (Column col : rowData.getAfterColumnsList()) {
    if ("id".equals(col.getName())) {
    id = col.getValue();
    break;
    }
    }
    if (id != null) {
    redisTemplate.delete("product:" + id);
    }
    }
    }
    }
    }
    }
    connector.ack(batchId);
    }
    }
    }
    }


    二、消息中间件问题

    2.1 消息丢失

    解决方案:生产者启用确认模式,消费者手动提交offset,MQ持久化。

    操作步骤(以RocketMQ为例):

  • 生产者设置setRetryTimesWhenSendFailed,并使用同步发送等待发送结果。

  • 消费者设置消费模式为ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET,并手动提交offset。

  • 消息处理成功后,再执行commitSync()。

  • Java代码示例(RocketMQ生产者):

    java

    import org.apache.rocketmq.client.producer.DefaultMQProducer;
    import org.apache.rocketmq.client.producer.SendResult;
    import org.apache.rocketmq.common.message.Message;

    public class OrderProducer {
    public static void main(String[] args) throws Exception {
    DefaultMQProducer producer = new DefaultMQProducer("order_producer_group");
    producer.setNamesrvAddr("127.0.0.1:9876");
    producer.setRetryTimesWhenSendFailed(3);
    producer.start();
    Message msg = new Message("order_topic", "order", "orderId_123".getBytes());
    SendResult result = producer.send(msg);
    if (result.getSendStatus() == SendStatus.SEND_OK) {
    System.out.println("消息发送成功");
    } else {
    System.out.println("消息发送失败,需重试或记录日志");
    }
    producer.shutdown();
    }
    }

    消费者(手动ack):

    java

    import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import org.apache.rocketmq.common.message.MessageExt;
    import java.util.List;

    public class OrderConsumer {
    public static void main(String[] args) throws Exception {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("order_consumer_group");
    consumer.setNamesrvAddr("127.0.0.1:9876");
    consumer.subscribe("order_topic", "*");
    consumer.registerMessageListener(new MessageListenerConcurrently() {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
    for (MessageExt msg : msgs) {
    try {
    // 处理消息
    process(msg);
    } catch (Exception e) {
    // 处理失败,稍后重试
    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
    }
    }
    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; // 手动ack
    }
    });
    consumer.start();
    }
    }


    2.2 消息重复消费

    解决方案:幂等性设计(使用业务唯一ID去重)。

    操作步骤:

  • 在消息体中包含一个全局唯一的业务ID(如订单号、消息ID)。

  • 消费前,先检查该ID是否已处理过(使用数据库唯一索引或Redis setnx)。

  • 如果已处理,则忽略;否则执行业务逻辑,并记录处理状态。

  • Java代码示例(Redis去重):

    java

    @Service
    public class OrderMessageConsumer {
    @Autowired
    private RedisTemplate<String, String> redisTemplate;

    public void handleMessage(MessageExt msg) {
    String msgId = msg.getKeys(); // RocketMQ的消息key作为唯一ID
    // 使用Redis的setnx,设置过期时间(防止无限增长)
    Boolean success = redisTemplate.opsForValue().setIfAbsent("processed:msg:" + msgId, "1", 1, TimeUnit.DAYS);
    if (success != null && success) {
    // 第一次处理
    processOrder(msg);
    } else {
    // 已经处理过,直接忽略
    System.out.println("重复消息,已忽略:" + msgId);
    }
    }
    }


    2.3 消息积压

    解决方案:临时增加消费者数量,并优化消费逻辑(批量消费)。

    操作步骤:

  • 扩容消费者实例(水平扩展)。

  • 调整消费者的消费线程数(如RocketMQ的consumeThreadMin和consumeThreadMax)。

  • 使用批量拉取方式,一次处理多条消息。

  • Java代码示例(RocketMQ批量消费设置):

    java

    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group");
    consumer.setConsumeThreadMin(20);
    consumer.setConsumeThreadMax(20);
    consumer.setConsumeMessageBatchMaxSize(10); // 每次最多拉取10条

    批量处理逻辑:

    java

    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
    // 批量处理,例如批量插入数据库
    List<Order> orders = parseToOrders(msgs);
    orderDao.batchInsert(orders);
    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }


    2.4 消息乱序

    解决方案:使用顺序消息(RocketMQ的MessageQueueSelector将相同业务ID的消息发送到同一队列,消费者单线程消费该队列)。

    操作步骤:

  • 生产者指定队列选择器,保证相同订单号的消息进入同一队列。

  • 消费者注册MessageListenerOrderly,每个队列只有一个线程消费,保证顺序。

  • Java代码示例(生产者顺序消息):

    java

    SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
    @Override
    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
    Long orderId = (Long) arg; // 订单号作为选择参数
    int index = (int) (orderId % mqs.size());
    return mqs.get(index);
    }
    }, orderId);

    消费者顺序消费:

    java

    consumer.registerMessageListener(new MessageListenerOrderly() {
    @Override
    public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
    for (MessageExt msg : msgs) {
    process(msg); // 单线程处理同一队列的消息
    }
    return ConsumeOrderlyStatus.SUCCESS;
    }
    });


    2.5 消息回滚(回澜)

    解决方案:使用分布式事务消息(RocketMQ事务消息)。

    操作步骤:

  • 发送半消息(prepare)。

  • 执行本地事务。

  • 根据本地事务结果提交或回滚消息。

  • 如果本地事务状态未知,MQ会回查生产者确认。

  • Java代码示例(RocketMQ事务消息生产者):

    java

    import org.apache.rocketmq.client.producer.TransactionMQProducer;
    import org.apache.rocketmq.client.producer.TransactionSendResult;
    import org.apache.rocketmq.common.message.Message;

    public class TransactionProducer {
    public static void main(String[] args) throws Exception {
    TransactionMQProducer producer = new TransactionMQProducer("transaction_producer_group");
    producer.setNamesrvAddr("127.0.0.1:9876");
    producer.setTransactionListener(new TransactionListener() {
    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
    // 执行本地事务(如数据库操作)
    try {
    // 更新订单状态
    updateOrderStatus((String) arg);
    return LocalTransactionState.COMMIT_MESSAGE;
    } catch (Exception e) {
    return LocalTransactionState.ROLLBACK_MESSAGE;
    }
    }

    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
    // 回查本地事务状态
    String orderId = msg.getKeys();
    if (isOrderCommitted(orderId)) {
    return LocalTransactionState.COMMIT_MESSAGE;
    } else {
    return LocalTransactionState.ROLLBACK_MESSAGE;
    }
    }
    });
    producer.start();
    Message msg = new Message("order_topic", "order", "order_123".getBytes());
    TransactionSendResult result = producer.sendMessageInTransaction(msg, "order_123");
    producer.shutdown();
    }
    }


    三、分布式问题

    3.1 脑裂

    解决方案:使用Zookeeper等强一致性服务,配置法定人数(quorum),并监控集群状态。

    操作步骤(以Zookeeper为例):

  • 配置Zookeeper集群的zoo.cfg,设置initLimit、syncLimit和server.x列表。

  • 客户端连接Zookeeper,监听集群节点变化。

  • 当检测到集群分裂时,通过选举算法确保只有一个主节点。

  • Java代码示例(Zookeeper客户端监控节点变化):

    java

    import org.apache.zookeeper.*;

    public class ZookeeperWatcher implements Watcher {
    private ZooKeeper zooKeeper;

    public void connect() throws Exception {
    zooKeeper = new ZooKeeper("127.0.0.1:2181", 3000, this);
    }

    @Override
    public void process(WatchedEvent event) {
    if (event.getType() == Event.EventType.None && event.getState() == Event.KeeperState.Disconnected) {
    System.out.println("Zookeeper连接断开,可能发生脑裂");
    // 触发告警或切换策略
    }
    }

    // 创建临时节点作为leader选举
    public void electLeader() throws Exception {
    String path = "/election/node-";
    zooKeeper.create(path, "data".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
    // 获取所有子节点,最小的为leader
    // … 具体选举逻辑
    }
    }


    3.2 羊群效应

    解决方案:使用Zookeeper时,所有客户端监听同一节点,节点变化时大量客户端被唤醒。可以改为每个客户端监听自己的专属子节点,或者添加随机延迟。

    操作步骤:

  • 客户端创建自己的临时顺序节点。

  • 只监听前一个节点的变化,而不是父节点。

  • 当前一个节点消失时,才触发当前节点的操作。

  • Java代码示例(Zookeeper分布式锁避免羊群效应):

    java

    public class DistributedLock {
    private ZooKeeper zooKeeper;
    private String lockPath = "/lock";
    private String currentPath;
    private String watchPath;

    public void lock() throws Exception {
    currentPath = zooKeeper.create(lockPath + "/lock-", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
    List<String> children = zooKeeper.getChildren(lockPath, false);
    Collections.sort(children);
    if (currentPath.equals(lockPath + "/" + children.get(0))) {
    // 获得锁
    return;
    }
    // 监听前一个节点
    int index = children.indexOf(currentPath.substring(lockPath.length() + 1));
    watchPath = lockPath + "/" + children.get(index – 1);
    zooKeeper.exists(watchPath, new Watcher() {
    @Override
    public void process(WatchedEvent event) {
    if (event.getType() == Event.EventType.NodeDeleted) {
    // 前一个节点删除,当前节点获得锁
    synchronized (DistributedLock.this) {
    DistributedLock.this.notify();
    }
    }
    }
    });
    synchronized (this) {
    wait();
    }
    }
    }


    3.3 哈希碰撞

    解决方案:使用一致性哈希算法,并引入虚拟节点。

    操作步骤:

  • 构建一个一致性哈希环,将物理节点映射到环上。

  • 为每个物理节点创建多个虚拟节点(如160个),分散在环上。

  • 计算key的哈希值,顺时针找到最近的虚拟节点,再映射到物理节点。

  • Java代码示例(简单一致性哈希实现):

    java

    import java.security.MessageDigest;
    import java.security.NoSuchAlgorithmException;
    import java.util.*;

    public class ConsistentHashing {
    private final TreeMap<Long, String> virtualNodes = new TreeMap<>();
    private final int virtualReplicas = 160;
    private final List<String> physicalNodes;

    public ConsistentHashing(List<String> physicalNodes) {
    this.physicalNodes = physicalNodes;
    for (String node : physicalNodes) {
    for (int i = 0; i < virtualReplicas; i++) {
    long hash = hash(node + "-" + i);
    virtualNodes.put(hash, node);
    }
    }
    }

    public String getNode(String key) {
    long hash = hash(key);
    // 返回大于等于hash的最小节点
    Map.Entry<Long, String> entry = virtualNodes.ceilingEntry(hash);
    if (entry == null) {
    entry = virtualNodes.firstEntry();
    }
    return entry.getValue();
    }

    private long hash(String key) {
    try {
    MessageDigest md5 = MessageDigest.getInstance("MD5");
    byte[] digest = md5.digest(key.getBytes());
    return ((long) (digest[3] & 0xFF) << 24) | ((long) (digest[2] & 0xFF) << 16)
    | ((long) (digest[1] & 0xFF) << 8) | (digest[0] & 0xFF);
    } catch (NoSuchAlgorithmException e) {
    throw new RuntimeException(e);
    }
    }
    }


    3.4 时钟回拨

    解决方案:在分布式ID生成器(如Snowflake)中检测时钟回拨,如果回拨时间较短则等待,如果过长则抛出异常或使用备用方案。

    操作步骤:

  • 记录上次生成ID的时间戳。

  • 如果当前时间小于上次时间戳,说明发生了时钟回拨。

  • 根据回拨大小决定策略:小幅度回拨(<5ms)则等待时间追上;大幅度回拨则报警并拒绝服务。

  • Java代码示例(Snowflake算法处理时钟回拨):

    java

    public class SnowflakeIdWorker {
    private final long twepoch = 1288834974657L;
    private final long workerIdBits = 5L;
    private final long sequenceBits = 12L;
    private final long workerId;
    private volatile long lastTimestamp = -1L;
    private volatile long sequence = 0L;

    public SnowflakeIdWorker(long workerId) {
    if (workerId < 0 || workerId > (-1L ^ (-1L << workerIdBits))) {
    throw new IllegalArgumentException("workerId out of range");
    }
    this.workerId = workerId;
    }

    public synchronized long nextId() {
    long timestamp = timeGen();
    // 如果当前时间小于上次生成ID的时间戳,说明时钟回拨
    if (timestamp < lastTimestamp) {
    long offset = lastTimestamp – timestamp;
    if (offset < 5) {
    // 如果回拨小于5ms,等待时间追上
    try {
    wait(lastTimestamp – timestamp);
    } catch (InterruptedException e) {
    Thread.currentThread().interrupt();
    }
    timestamp = timeGen();
    // 再次判断
    if (timestamp < lastTimestamp) {
    throw new RuntimeException("Clock moved backwards, refuse generate id");
    }
    } else {
    throw new RuntimeException("Clock moved backwards, refuse generate id");
    }
    }

    if (lastTimestamp == timestamp) {
    sequence = (sequence + 1) & ((1 << sequenceBits) – 1);
    if (sequence == 0) {
    timestamp = tilNextMillis(lastTimestamp);
    }
    } else {
    sequence = 0L;
    }
    lastTimestamp = timestamp;
    return ((timestamp – twepoch) << (workerIdBits + sequenceBits)) | (workerId << sequenceBits) | sequence;
    }

    private long tilNextMillis(long lastTimestamp) {
    long timestamp = timeGen();
    while (timestamp <= lastTimestamp) {
    timestamp = timeGen();
    }
    return timestamp;
    }

    private long timeGen() {
    return System.currentTimeMillis();
    }
    }


    3.5 拒绝连接

    解决方案:使用限流(如令牌桶算法)保护服务,防止过载拒绝连接。

    操作步骤:

  • 引入Guava的RateLimiter或Sentinel。

  • 在服务入口(如Controller)进行限流判断。

  • 如果超过阈值,返回“服务繁忙”或抛出异常。

  • Java代码示例(Guava RateLimiter):

    java

    import com.google.common.util.concurrent.RateLimiter;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.RestController;

    @RestController
    public class ApiController {
    // 每秒最多处理10个请求
    private final RateLimiter rateLimiter = RateLimiter.create(10.0);

    @GetMapping("/api/data")
    public String getData() {
    if (!rateLimiter.tryAcquire()) {
    return "系统繁忙,请稍后重试";
    }
    // 执行业务逻辑
    return "success";
    }
    }

    使用Sentinel的示例(Spring Cloud Alibaba):

    java

    @RestController
    public class SentinelController {
    @GetMapping("/hello")
    @SentinelResource(value = "hello", blockHandler = "handleBlock")
    public String hello() {
    return "Hello Sentinel";
    }

    public String handleBlock(BlockException ex) {
    return "限流了";
    }
    }


    四、系统问题

    4.1 内存泄漏

    问题描述:应用程序中不再使用的对象无法被垃圾回收,导致堆内存使用持续增长,最终引发OutOfMemoryError。

    解决方案:通过内存分析工具(如MAT、JProfiler)定位泄漏点,修复代码中的不合理引用。

    操作步骤:

  • 监控内存使用:使用JVM监控工具(如jstat、VisualVM)观察堆内存变化趋势。

  • 生成Heap Dump:当内存异常时,使用jmap或配置JVM参数-XX:+HeapDumpOnOutOfMemoryError自动导出dump文件。

  • 分析Heap Dump:使用MAT(Eclipse Memory Analyzer)打开dump文件,查看可疑对象、GC Roots引用链。

  • 定位问题代码:根据分析结果找到未释放的引用(如ThreadLocal未清理、静态集合不断添加、未关闭的资源等)。

  • 修复代码:修改代码,确保对象使用完毕后置空引用或调用清理方法。

  • Java代码示例:模拟ThreadLocal内存泄漏及修复。

    泄漏代码:

    java

    public class ThreadLocalLeakDemo {
    private static final ThreadLocal<byte[]> threadLocal = new ThreadLocal<>();

    public void process() {
    // 假设每个请求都设置一个大对象
    threadLocal.set(new byte[10 * 1024 * 1024]); // 10MB
    // 执行业务逻辑…
    // 忘记调用 remove()
    }
    }

    修复代码:

    java

    public class ThreadLocalLeakDemo {
    private static final ThreadLocal<byte[]> threadLocal = new ThreadLocal<>();

    public void process() {
    try {
    threadLocal.set(new byte[10 * 1024 * 1024]);
    // 执行业务逻辑…
    } finally {
    threadLocal.remove(); // 确保清理
    }
    }
    }

    操作指令:

    • 使用jmap生成堆dump:jmap -dump:live,format=b,file=heap.hprof <pid>

    • 使用jstat观察GC情况:jstat -gcutil <pid> 1000 10


    4.2 重复提交

    问题描述:用户短时间内多次提交同一表单或请求,导致数据重复(如重复下单、重复评论)。

    解决方案:前端按钮置灰 + 后端Token机制(防重令牌)。

    操作步骤:

  • 前端:在提交按钮点击后立即禁用按钮,防止用户重复点击。

  • 后端:生成一个唯一的Token(如UUID),存入Redis并设置过期时间,返回给前端。

  • 请求携带Token:前端在提交请求时带上该Token。

  • 后端校验:处理请求前先校验Token是否存在且有效,若有效则删除Token并继续处理;若无效则返回重复提交错误。

  • Java代码示例:

    生成Token接口:

    java

    @RestController
    @RequestMapping("/token")
    public class TokenController {
    @Autowired
    private RedisTemplate<String, String> redisTemplate;

    @GetMapping("/generate")
    public String generateToken() {
    String token = UUID.randomUUID().toString();
    redisTemplate.opsForValue().set(token, "1", 30, TimeUnit.MINUTES); // 30分钟有效
    return token;
    }
    }

    自定义注解+拦截器实现Token校验:

    java

    @Target(ElementType.METHOD)
    @Retention(RetentionPolicy.RUNTIME)
    public @interface PreventDuplicateSubmit {
    }

    @Component
    public class DuplicateSubmitInterceptor implements HandlerInterceptor {
    @Autowired
    private RedisTemplate<String, String> redisTemplate;

    @Override
    public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
    if (handler instanceof HandlerMethod) {
    HandlerMethod method = (HandlerMethod) handler;
    if (method.hasMethodAnnotation(PreventDuplicateSubmit.class)) {
    String token = request.getHeader("Request-Token");
    if (StringUtils.isEmpty(token)) {
    throw new RuntimeException("缺少防重Token");
    }
    // 原子性删除(使用lua脚本或setnx)
    Boolean result = redisTemplate.delete(token);
    if (Boolean.TRUE.equals(result)) {
    return true; // 第一次提交,放行
    } else {
    response.setStatus(400);
    response.getWriter().write("请勿重复提交");
    return false;
    }
    }
    }
    return true;
    }
    }

    Controller使用:

    java

    @PostMapping("/order")
    @PreventDuplicateSubmit
    public String createOrder(@RequestBody OrderDTO orderDTO) {
    // 业务逻辑
    return "success";
    }


    4.3 指令重排

    问题描述:在多线程环境下,编译器和CPU为了优化可能对指令进行重排,导致意料之外的并发问题(如双重检查锁失效)。

    解决方案:使用volatile关键字禁止指令重排,或使用AtomicInteger、synchronized等并发工具。

    操作步骤:

  • 识别需要禁止重排的变量(通常是被多个线程共享的状态标志)。

  • 使用volatile修饰该变量。

  • 对于复合操作,使用java.util.concurrent.atomic包中的原子类或锁。

  • Java代码示例:

    双重检查锁单例模式(正确使用volatile):

    java

    public class Singleton {
    private static volatile Singleton instance; // volatile禁止重排

    private Singleton() {}

    public static Singleton getInstance() {
    if (instance == null) {
    synchronized (Singleton.class) {
    if (instance == null) {
    instance = new Singleton(); // 不会重排
    }
    }
    }
    return instance;
    }
    }

    如果不加volatile,instance = new Singleton() 可能被重排为:分配内存 -> 将引用赋给instance -> 初始化对象。其他线程可能拿到未初始化的对象。


    4.4 系统假死

    问题描述:系统无响应(不处理请求),但进程仍在,CPU/内存可能正常,通常是线程池耗尽、死锁、网络阻塞等原因。

    解决方案:监控线程状态、死锁检测、配置健康检查并自动重启。

    操作步骤:

  • 检测线程池耗尽:通过jstack查看线程堆栈,确认大量线程在等待队列或阻塞。

  • 检测死锁:使用jstack或JConsole检测死锁。

  • 增加监控:在应用中暴露健康检查端点(如Spring Boot Actuator),由外部监控系统(如Kubernetes liveness probe)定期检查,发现假死后自动重启。

  • 优化代码:设置合理的线程池参数、避免同步块过大、使用超时机制。

  • Java代码示例:

    Spring Boot Actuator健康检查端点:

    yaml

    # application.yml
    management:
    endpoints:
    web:
    exposure:
    include: health,info

    自定义健康检查(检测数据库连接等):

    java

    @Component
    public class CustomHealthIndicator implements HealthIndicator {
    @Autowired
    private DataSource dataSource;

    @Override
    public Health health() {
    try {
    dataSource.getConnection().close(); // 简单测试连接
    return Health.up().build();
    } catch (Exception e) {
    return Health.down(e).build();
    }
    }
    }

    Kubernetes探针配置:

    yaml

    livenessProbe:
    httpGet:
    path: /actuator/health/liveness
    port: 8080
    initialDelaySeconds: 30
    periodSeconds: 10

    死锁检测代码(可放在监控脚本中):

    bash

    # 使用jstack检测死锁
    jstack -l <pid> | grep -A 10 "Found one Java-level deadlock"


    4.5 数据倾斜

    问题描述:在分布式计算(如MapReduce、Spark、Flink)或数据库分库分表中,数据分布不均,导致部分节点负载过高,影响整体性能。

    解决方案:重新设计分区键、加盐(随机前缀)、二次聚合、扩容。

    操作步骤:

  • 分析数据分布:查看各个分区的数据量或处理时间。

  • 定位热点Key:如某个用户ID的数据量特别大。

  • 加盐处理:在Key上加随机后缀,将数据打散到多个分区。

  • 二次聚合:第一阶段加盐处理,第二阶段去掉后缀合并结果。

  • 调整分区键:选择分布更均匀的字段作为分区键。

  • Java代码示例(Flink中处理数据倾斜):

    原始KeyBy可能导致倾斜:

    java

    DataStream<Order> stream = …;
    stream.keyBy(Order::getUserId).process(new MyProcessFunction());

    加盐方案:

    java

    int parallelism = 10; // 并行度
    DataStream<Order> stream = …;
    stream
    .map(order -> {
    // 加随机后缀(0~parallelism-1)
    String saltedKey = order.getUserId() + "_" + ThreadLocalRandom.current().nextInt(parallelism);
    return Tuple2.of(saltedKey, order);
    })
    .keyBy(t -> t.f0)
    .process(new SaltedProcessFunction())
    .map(t -> t.f1); // 去除盐

    在数据库分库分表中的应用:如果某个商户ID数据量极大,可以对该商户ID进行取模+随机后缀,将数据分散到多个库表中,查询时需聚合。


    五、故障管理

    5.1 故障监控发现

    问题描述:如何第一时间发现系统异常,避免故障扩大。

    解决方案:建立多维度监控体系,包括基础监控(CPU、内存)、应用监控(QPS、延迟、错误率)、业务监控(订单量、成功率)。

    操作步骤:

  • 部署监控工具:Prometheus + Grafana 收集指标,ELK收集日志,SkyWalking进行链路追踪。

  • 暴露指标:在应用中集成Micrometer,暴露自定义指标。

  • 设置告警规则:在Prometheus中配置Alertmanager,通过邮件、钉钉等通知。

  • 配置仪表盘:在Grafana中可视化关键指标,方便快速定位问题。

  • Java代码示例(Micrometer暴露指标):

    java

    import io.micrometer.core.instrument.MeterRegistry;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.RestController;

    @RestController
    public class MetricController {
    private final MeterRegistry meterRegistry;

    public MetricController(MeterRegistry meterRegistry) {
    this.meterRegistry = meterRegistry;
    }

    @GetMapping("/api/order")
    public String createOrder() {
    // 记录请求数
    meterRegistry.counter("order.create.count").increment();
    // 记录耗时
    long start = System.currentTimeMillis();
    // 业务逻辑…
    long duration = System.currentTimeMillis() – start;
    meterRegistry.timer("order.create.duration").record(duration, TimeUnit.MILLISECONDS);
    return "success";
    }
    }

    Prometheus配置示例:

    yaml

    # prometheus.yml
    scrape_configs:
    – job_name: 'spring-boot-app'
    metrics_path: '/actuator/prometheus'
    static_configs:
    – targets: ['localhost:8080']

    告警规则示例:

    yaml

    groups:
    – name: example
    rules:
    – alert: HighErrorRate
    expr: rate(http_server_requests_seconds_count{status=~"5.."}[1m]) > 0.05
    for: 1m
    annotations:
    summary: "High error rate detected"


    5.2 故障分析与定位

    问题描述:故障发生后,如何快速定位根本原因。

    解决方案:利用日志、链路追踪、线程堆栈、数据库慢查询等工具进行多维度分析。

    操作步骤:

  • 查看日志:使用ELK或Splunk搜索错误堆栈,关注异常类型和时间。

  • 链路追踪:通过SkyWalking或Zipkin查看请求调用链,定位耗时长的服务或方法。

  • 线程分析:使用jstack dump线程,检查是否有死锁、线程阻塞。

  • 数据库分析:开启慢查询日志,找出执行慢的SQL,使用explain分析执行计划。

  • 资源分析:查看CPU、内存、磁盘I/O等指标,判断是否资源耗尽。

  • Java代码示例(动态获取线程堆栈):

    java

    import java.lang.management.ManagementFactory;
    import java.lang.management.ThreadInfo;
    import java.lang.management.ThreadMXBean;

    public class ThreadDumpUtil {
    public static String dumpThreads() {
    ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();
    ThreadInfo[] threadInfos = threadMXBean.dumpAllThreads(true, true);
    StringBuilder sb = new StringBuilder();
    for (ThreadInfo info : threadInfos) {
    sb.append(info.toString());
    }
    return sb.toString();
    }
    }

    常用命令:

    bash

    # 查看Java进程CPU占用高的线程
    top -H -p <pid>
    printf "%x\\n" <thread-id> # 转为十六进制
    jstack <pid> | grep -A 10 <thread-hex>

    # 查看数据库慢查询(MySQL)
    SET GLOBAL slow_query_log = ON;
    SET GLOBAL long_query_time = 2; # 超过2秒记录
    tail -f /var/log/mysql/mysql-slow.log


    5.3 故障恢复与管理

    问题描述:故障发生后,如何快速恢复服务,并管理整个流程。

    解决方案:制定应急预案,包括服务重启、降级、熔断、数据修复等;通过自动化脚本快速恢复;事后进行故障复盘。

    操作步骤:

  • 立即止损:根据预案执行操作,如重启应用、切流、降级非核心功能。

  • 熔断降级:使用Sentinel或Hystrix熔断异常服务,防止雪崩。

  • 数据修复:如果数据损坏,从备份恢复或执行补偿脚本。

  • 恢复后验证:确认服务正常,逐步恢复流量。

  • 故障复盘:记录故障时间、影响范围、根本原因、改进措施,形成文档。

  • Java代码示例(Sentinel熔断降级配置):

    java

    import com.alibaba.csp.sentinel.annotation.SentinelResource;
    import com.alibaba.csp.sentinel.slots.block.BlockException;

    @RestController
    public class ProductController {
    @GetMapping("/product/{id}")
    @SentinelResource(value = "getProduct", blockHandler = "handleBlock")
    public Product getProduct(@PathVariable String id) {
    // 可能出错的远程调用
    return productService.getProduct(id);
    }

    public Product handleBlock(String id, BlockException ex) {
    // 降级逻辑,返回兜底数据
    return new Product(id, "默认商品", 0.0);
    }
    }

    自动化重启脚本示例(使用Shell):

    bash

    #!/bin/bash
    APP_NAME="myapp.jar"
    PID=$(ps -ef | grep $APP_NAME | grep -v grep | awk '{print $2}')
    if [ -n "$PID" ]; then
    kill -15 $PID
    sleep 10
    fi
    nohup java -jar $APP_NAME > app.log 2>&1 &
    echo "Application restarted"


    5.4 故障预防

    问题描述:如何在故障发生前采取措施,避免或减少故障发生的概率。

    解决方案:压测、代码审查、灰度发布、容量规划、灾备演练等。

    操作步骤:

  • 压力测试:使用JMeter或LoadRunner模拟高并发,发现性能瓶颈。

  • 代码审查:定期审查代码,关注并发、资源释放、异常处理等。

  • 灰度发布:采用金丝雀发布,先让少量用户使用新版本,观察无异常再全量。

  • 容量评估:根据业务增长预估流量,提前扩容。

  • 灾备演练:定期进行主备切换、数据恢复演练,确保流程可用。

  • Java代码示例(简单的限流预防):

    java

    import com.google.common.util.concurrent.RateLimiter;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.RestController;

    @RestController
    public class LoginController {
    private final RateLimiter rateLimiter = RateLimiter.create(100); // 每秒100个请求

    @GetMapping("/login")
    public String login() {
    if (!rateLimiter.tryAcquire()) {
    return "系统繁忙,请稍后再试";
    }
    // 登录逻辑
    return "success";
    }
    }

    压测脚本示例(JMeter):可以通过JMeter GUI创建测试计划,或者使用命令行:

    bash

    jmeter -n -t testplan.jmx -l results.jtl -e -o report/

    Kubernetes滚动更新(灰度发布):

    yaml

    apiVersion: apps/v1
    kind: Deployment
    metadata:
    name: myapp
    spec:
    replicas: 10
    strategy:
    type: RollingUpdate
    rollingUpdate:
    maxSurge: 1
    maxUnavailable: 0
    template:
    spec:
    containers:
    – name: myapp
    image: myapp:1.0.1
    readinessProbe:
    httpGet:
    path: /actuator/health/readiness
    port: 8080
    initialDelaySeconds: 10
    livenessProbe:
    httpGet:
    path: /actuator/health/liveness
    port: 8080

    赞(0)
    未经允许不得转载:网硕互联帮助中心 » 高并发系统中常遇到的问题
    分享到: 更多 (0)

    评论 抢沙发

    评论前必须登录!