一、缓存问题
1.1 缓存穿透
解决方案:使用布隆过滤器(Bloom Filter)拦截不存在的数据请求。
操作步骤:
初始化布隆过滤器,将数据库中存在的数据ID全部放入布隆过滤器。
查询请求到达时,先检查布隆过滤器是否存在该key。
如果布隆过滤器判断不存在,则直接返回空,避免查询数据库。
如果存在,则走缓存、数据库查询逻辑。
Java代码示例(使用Google Guava的BloomFilter):
java
import com.google.common.hash.BloomFilter;
import com.google.common.hash.Funnels;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.nio.charset.Charset;
import java.util.List;
@Service
public class ProductService {
private BloomFilter<String> bloomFilter;
private final int EXPECTED_INSERTIONS = 1000000; // 预计数据量
private final double FPP = 0.01; // 误判率
@PostConstruct
public void init() {
bloomFilter = BloomFilter.create(
Funnels.stringFunnel(Charset.defaultCharset()),
EXPECTED_INSERTIONS,
FPP);
// 从数据库加载所有存在的商品ID,添加到布隆过滤器
List<String> allProductIds = loadAllProductIdsFromDB();
for (String id : allProductIds) {
bloomFilter.put(id);
}
}
public Product getProduct(String productId) {
// 1. 布隆过滤器判断不存在,直接返回null
if (!bloomFilter.mightContain(productId)) {
return null;
}
// 2. 查询缓存(省略具体代码)
Product product = getFromCache(productId);
if (product != null) {
return product;
}
// 3. 缓存不存在,查询数据库
product = getFromDB(productId);
if (product != null) {
// 放入缓存,设置过期时间
putToCache(productId, product);
} else {
// 可选:缓存空对象,但布隆过滤器已拦截大部分,可不缓存空值
}
return product;
}
}
1.2 缓存雪崩
解决方案:设置不同的过期时间(基础过期时间 + 随机值)。
操作步骤:
在放入缓存时,为key设置一个基础过期时间(如1小时)。
在此基础上增加一个随机秒数(如0~300秒),使key的过期时间分散开。
避免大量key同时失效。
Java代码示例(使用Spring Data Redis):
java
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
import java.util.concurrent.TimeUnit;
@Service
public class CacheService {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
public void putWithRandomExpire(String key, Object value, long baseTimeout, TimeUnit unit) {
long timeout = unit.toSeconds(baseTimeout);
// 增加随机值,范围0~300秒
long random = (long) (Math.random() * 300);
long finalTimeout = timeout + random;
redisTemplate.opsForValue().set(key, value, finalTimeout, TimeUnit.SECONDS);
}
}
1.3 缓存失效(缓存击穿)
解决方案:使用互斥锁(分布式锁)保证只有一个线程去加载数据。
操作步骤:
查询缓存,如果命中则返回。
如果未命中,尝试获取分布式锁。
获取锁成功后,再次查询缓存(防止在等待锁的过程中其他线程已经加载了数据)。
如果缓存仍不存在,则查询数据库,并更新缓存。
释放锁。
Java代码示例(使用Redisson实现分布式锁):
java
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class ProductService {
@Autowired
private RedissonClient redissonClient;
public Product getProductWithLock(String productId) {
// 1. 查询缓存
Product product = getFromCache(productId);
if (product != null) {
return product;
}
// 2. 尝试获取分布式锁
String lockKey = "lock:product:" + productId;
RLock lock = redissonClient.getLock(lockKey);
try {
// 尝试加锁,最多等待3秒,锁过期时间10秒
if (lock.tryLock(3, 10, TimeUnit.SECONDS)) {
try {
// 3. 再次查询缓存,可能其他线程已经加载
product = getFromCache(productId);
if (product != null) {
return product;
}
// 4. 查询数据库
product = getFromDB(productId);
if (product != null) {
putToCache(productId, product);
}
return product;
} finally {
lock.unlock();
}
} else {
// 获取锁失败,等待重试或返回空
Thread.sleep(100);
return getProductWithLock(productId); // 递归重试
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return null;
}
}
}
1.4 热点缓存重建
解决方案:逻辑过期 + 异步刷新(缓存永不过期,但存储逻辑过期时间)。
操作步骤:
缓存中保存一个包装对象,包含实际数据和逻辑过期时间。
查询时,如果逻辑时间未过期,直接返回数据。
如果逻辑时间已过期,则异步线程去加载最新数据更新缓存,同时返回旧数据给当前请求(避免等待)。
使用分布式锁防止多个线程同时刷新。
Java代码示例:
java
import com.fasterxml.jackson.annotation.JsonIgnore;
import lombok.Data;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@Data
class CacheWrapper<T> {
private T data;
private long expireTime; // 逻辑过期时间戳(毫秒)
}
@Service
public class HotProductService {
private ExecutorService executor = Executors.newFixedThreadPool(10);
@Autowired
private RedisTemplate<String, Object> redisTemplate;
public Product getHotProduct(String productId) {
String key = "hot:product:" + productId;
CacheWrapper<Product> wrapper = (CacheWrapper<Product>) redisTemplate.opsForValue().get(key);
if (wrapper == null) {
// 缓存不存在,直接加载(同步)
return loadProductToCache(productId);
}
if (wrapper.getExpireTime() > System.currentTimeMillis()) {
// 逻辑时间未过期,直接返回
return wrapper.getData();
}
// 逻辑过期,异步刷新
executor.submit(() -> {
// 尝试获取分布式锁,避免并发刷新
String lockKey = "lock:refresh:" + productId;
RLock lock = redissonClient.getLock(lockKey);
if (lock.tryLock()) {
try {
// 再次检查是否已被其他线程刷新
CacheWrapper<Product> latest = (CacheWrapper<Product>) redisTemplate.opsForValue().get(key);
if (latest != null && latest.getExpireTime() > System.currentTimeMillis()) {
return;
}
// 加载数据库
Product product = getFromDB(productId);
CacheWrapper<Product> newWrapper = new CacheWrapper<>();
newWrapper.setData(product);
newWrapper.setExpireTime(System.currentTimeMillis() + 3600_000); // 设置1小时后过期
redisTemplate.opsForValue().set(key, newWrapper);
} finally {
lock.unlock();
}
}
});
// 返回旧数据
return wrapper.getData();
}
}
1.5 缓存数据库双写不一致
解决方案:先更新数据库,再删除缓存(经典的Cache-Aside模式),配合延迟双删或Binlog监听。
操作步骤(延迟双删):
删除缓存。
更新数据库。
休眠一小段时间(如500ms),再次删除缓存,确保在更新过程中可能产生的并发读请求带来的脏缓存被清除。
Java代码示例:
java
@Service
public class ProductUpdateService {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
@Autowired
private JdbcTemplate jdbcTemplate;
public void updateProduct(Product product) {
String cacheKey = "product:" + product.getId();
// 第一次删除缓存
redisTemplate.delete(cacheKey);
// 更新数据库
jdbcTemplate.update("UPDATE product SET name = ? WHERE id = ?", product.getName(), product.getId());
// 延迟一段时间后再次删除
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
scheduler.schedule(() -> {
redisTemplate.delete(cacheKey);
}, 500, TimeUnit.MILLISECONDS);
}
}
更可靠的方案:监听MySQL binlog(使用Canal),异步删除缓存。
操作步骤:
部署Canal,伪装为MySQL slave,接收binlog变更。
在Java应用中集成Canal客户端,解析binlog中的更新事件。
根据变更的数据构造缓存key,并删除缓存。
Java代码示例(Canal客户端):
java
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.protocol.CanalEntry.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.net.InetSocketAddress;
import java.util.List;
@Component
public class CanalClient {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
@PostConstruct
public void run() {
CanalConnector connector = CanalConnectors.newSingleConnector(
new InetSocketAddress("127.0.0.1", 11111), "example", "", "");
while (true) {
connector.connect();
connector.subscribe(".*\\\\..*");
connector.rollback();
while (true) {
Message message = connector.getWithoutAck(100); // 获取100条数据
long batchId = message.getId();
if (batchId == -1 || message.getEntries().isEmpty()) {
try { Thread.sleep(1000); } catch (InterruptedException e) {}
continue;
}
List<Entry> entries = message.getEntries();
for (Entry entry : entries) {
if (entry.getEntryType() == EntryType.ROWDATA) {
RowChange rowChange;
try {
rowChange = RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
continue;
}
EventType eventType = rowChange.getEventType();
if (eventType == EventType.UPDATE || eventType == EventType.INSERT || eventType == EventType.DELETE) {
String tableName = entry.getHeader().getTableName();
for (RowData rowData : rowChange.getRowDatasList()) {
if ("product".equals(tableName)) {
// 获取主键ID(假设列名为id)
String id = null;
for (Column col : rowData.getAfterColumnsList()) {
if ("id".equals(col.getName())) {
id = col.getValue();
break;
}
}
if (id != null) {
redisTemplate.delete("product:" + id);
}
}
}
}
}
}
connector.ack(batchId);
}
}
}
}
二、消息中间件问题
2.1 消息丢失
解决方案:生产者启用确认模式,消费者手动提交offset,MQ持久化。
操作步骤(以RocketMQ为例):
生产者设置setRetryTimesWhenSendFailed,并使用同步发送等待发送结果。
消费者设置消费模式为ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET,并手动提交offset。
消息处理成功后,再执行commitSync()。
Java代码示例(RocketMQ生产者):
java
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
public class OrderProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("order_producer_group");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.setRetryTimesWhenSendFailed(3);
producer.start();
Message msg = new Message("order_topic", "order", "orderId_123".getBytes());
SendResult result = producer.send(msg);
if (result.getSendStatus() == SendStatus.SEND_OK) {
System.out.println("消息发送成功");
} else {
System.out.println("消息发送失败,需重试或记录日志");
}
producer.shutdown();
}
}
消费者(手动ack):
java
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class OrderConsumer {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("order_consumer_group");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.subscribe("order_topic", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
try {
// 处理消息
process(msg);
} catch (Exception e) {
// 处理失败,稍后重试
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; // 手动ack
}
});
consumer.start();
}
}
2.2 消息重复消费
解决方案:幂等性设计(使用业务唯一ID去重)。
操作步骤:
在消息体中包含一个全局唯一的业务ID(如订单号、消息ID)。
消费前,先检查该ID是否已处理过(使用数据库唯一索引或Redis setnx)。
如果已处理,则忽略;否则执行业务逻辑,并记录处理状态。
Java代码示例(Redis去重):
java
@Service
public class OrderMessageConsumer {
@Autowired
private RedisTemplate<String, String> redisTemplate;
public void handleMessage(MessageExt msg) {
String msgId = msg.getKeys(); // RocketMQ的消息key作为唯一ID
// 使用Redis的setnx,设置过期时间(防止无限增长)
Boolean success = redisTemplate.opsForValue().setIfAbsent("processed:msg:" + msgId, "1", 1, TimeUnit.DAYS);
if (success != null && success) {
// 第一次处理
processOrder(msg);
} else {
// 已经处理过,直接忽略
System.out.println("重复消息,已忽略:" + msgId);
}
}
}
2.3 消息积压
解决方案:临时增加消费者数量,并优化消费逻辑(批量消费)。
操作步骤:
扩容消费者实例(水平扩展)。
调整消费者的消费线程数(如RocketMQ的consumeThreadMin和consumeThreadMax)。
使用批量拉取方式,一次处理多条消息。
Java代码示例(RocketMQ批量消费设置):
java
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group");
consumer.setConsumeThreadMin(20);
consumer.setConsumeThreadMax(20);
consumer.setConsumeMessageBatchMaxSize(10); // 每次最多拉取10条
批量处理逻辑:
java
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
// 批量处理,例如批量插入数据库
List<Order> orders = parseToOrders(msgs);
orderDao.batchInsert(orders);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
2.4 消息乱序
解决方案:使用顺序消息(RocketMQ的MessageQueueSelector将相同业务ID的消息发送到同一队列,消费者单线程消费该队列)。
操作步骤:
生产者指定队列选择器,保证相同订单号的消息进入同一队列。
消费者注册MessageListenerOrderly,每个队列只有一个线程消费,保证顺序。
Java代码示例(生产者顺序消息):
java
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Long orderId = (Long) arg; // 订单号作为选择参数
int index = (int) (orderId % mqs.size());
return mqs.get(index);
}
}, orderId);
消费者顺序消费:
java
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
for (MessageExt msg : msgs) {
process(msg); // 单线程处理同一队列的消息
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
2.5 消息回滚(回澜)
解决方案:使用分布式事务消息(RocketMQ事务消息)。
操作步骤:
发送半消息(prepare)。
执行本地事务。
根据本地事务结果提交或回滚消息。
如果本地事务状态未知,MQ会回查生产者确认。
Java代码示例(RocketMQ事务消息生产者):
java
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.client.producer.TransactionSendResult;
import org.apache.rocketmq.common.message.Message;
public class TransactionProducer {
public static void main(String[] args) throws Exception {
TransactionMQProducer producer = new TransactionMQProducer("transaction_producer_group");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.setTransactionListener(new TransactionListener() {
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
// 执行本地事务(如数据库操作)
try {
// 更新订单状态
updateOrderStatus((String) arg);
return LocalTransactionState.COMMIT_MESSAGE;
} catch (Exception e) {
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
// 回查本地事务状态
String orderId = msg.getKeys();
if (isOrderCommitted(orderId)) {
return LocalTransactionState.COMMIT_MESSAGE;
} else {
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
});
producer.start();
Message msg = new Message("order_topic", "order", "order_123".getBytes());
TransactionSendResult result = producer.sendMessageInTransaction(msg, "order_123");
producer.shutdown();
}
}
三、分布式问题
3.1 脑裂
解决方案:使用Zookeeper等强一致性服务,配置法定人数(quorum),并监控集群状态。
操作步骤(以Zookeeper为例):
配置Zookeeper集群的zoo.cfg,设置initLimit、syncLimit和server.x列表。
客户端连接Zookeeper,监听集群节点变化。
当检测到集群分裂时,通过选举算法确保只有一个主节点。
Java代码示例(Zookeeper客户端监控节点变化):
java
import org.apache.zookeeper.*;
public class ZookeeperWatcher implements Watcher {
private ZooKeeper zooKeeper;
public void connect() throws Exception {
zooKeeper = new ZooKeeper("127.0.0.1:2181", 3000, this);
}
@Override
public void process(WatchedEvent event) {
if (event.getType() == Event.EventType.None && event.getState() == Event.KeeperState.Disconnected) {
System.out.println("Zookeeper连接断开,可能发生脑裂");
// 触发告警或切换策略
}
}
// 创建临时节点作为leader选举
public void electLeader() throws Exception {
String path = "/election/node-";
zooKeeper.create(path, "data".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
// 获取所有子节点,最小的为leader
// … 具体选举逻辑
}
}
3.2 羊群效应
解决方案:使用Zookeeper时,所有客户端监听同一节点,节点变化时大量客户端被唤醒。可以改为每个客户端监听自己的专属子节点,或者添加随机延迟。
操作步骤:
客户端创建自己的临时顺序节点。
只监听前一个节点的变化,而不是父节点。
当前一个节点消失时,才触发当前节点的操作。
Java代码示例(Zookeeper分布式锁避免羊群效应):
java
public class DistributedLock {
private ZooKeeper zooKeeper;
private String lockPath = "/lock";
private String currentPath;
private String watchPath;
public void lock() throws Exception {
currentPath = zooKeeper.create(lockPath + "/lock-", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
List<String> children = zooKeeper.getChildren(lockPath, false);
Collections.sort(children);
if (currentPath.equals(lockPath + "/" + children.get(0))) {
// 获得锁
return;
}
// 监听前一个节点
int index = children.indexOf(currentPath.substring(lockPath.length() + 1));
watchPath = lockPath + "/" + children.get(index – 1);
zooKeeper.exists(watchPath, new Watcher() {
@Override
public void process(WatchedEvent event) {
if (event.getType() == Event.EventType.NodeDeleted) {
// 前一个节点删除,当前节点获得锁
synchronized (DistributedLock.this) {
DistributedLock.this.notify();
}
}
}
});
synchronized (this) {
wait();
}
}
}
3.3 哈希碰撞
解决方案:使用一致性哈希算法,并引入虚拟节点。
操作步骤:
构建一个一致性哈希环,将物理节点映射到环上。
为每个物理节点创建多个虚拟节点(如160个),分散在环上。
计算key的哈希值,顺时针找到最近的虚拟节点,再映射到物理节点。
Java代码示例(简单一致性哈希实现):
java
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.*;
public class ConsistentHashing {
private final TreeMap<Long, String> virtualNodes = new TreeMap<>();
private final int virtualReplicas = 160;
private final List<String> physicalNodes;
public ConsistentHashing(List<String> physicalNodes) {
this.physicalNodes = physicalNodes;
for (String node : physicalNodes) {
for (int i = 0; i < virtualReplicas; i++) {
long hash = hash(node + "-" + i);
virtualNodes.put(hash, node);
}
}
}
public String getNode(String key) {
long hash = hash(key);
// 返回大于等于hash的最小节点
Map.Entry<Long, String> entry = virtualNodes.ceilingEntry(hash);
if (entry == null) {
entry = virtualNodes.firstEntry();
}
return entry.getValue();
}
private long hash(String key) {
try {
MessageDigest md5 = MessageDigest.getInstance("MD5");
byte[] digest = md5.digest(key.getBytes());
return ((long) (digest[3] & 0xFF) << 24) | ((long) (digest[2] & 0xFF) << 16)
| ((long) (digest[1] & 0xFF) << 8) | (digest[0] & 0xFF);
} catch (NoSuchAlgorithmException e) {
throw new RuntimeException(e);
}
}
}
3.4 时钟回拨
解决方案:在分布式ID生成器(如Snowflake)中检测时钟回拨,如果回拨时间较短则等待,如果过长则抛出异常或使用备用方案。
操作步骤:
记录上次生成ID的时间戳。
如果当前时间小于上次时间戳,说明发生了时钟回拨。
根据回拨大小决定策略:小幅度回拨(<5ms)则等待时间追上;大幅度回拨则报警并拒绝服务。
Java代码示例(Snowflake算法处理时钟回拨):
java
public class SnowflakeIdWorker {
private final long twepoch = 1288834974657L;
private final long workerIdBits = 5L;
private final long sequenceBits = 12L;
private final long workerId;
private volatile long lastTimestamp = -1L;
private volatile long sequence = 0L;
public SnowflakeIdWorker(long workerId) {
if (workerId < 0 || workerId > (-1L ^ (-1L << workerIdBits))) {
throw new IllegalArgumentException("workerId out of range");
}
this.workerId = workerId;
}
public synchronized long nextId() {
long timestamp = timeGen();
// 如果当前时间小于上次生成ID的时间戳,说明时钟回拨
if (timestamp < lastTimestamp) {
long offset = lastTimestamp – timestamp;
if (offset < 5) {
// 如果回拨小于5ms,等待时间追上
try {
wait(lastTimestamp – timestamp);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
timestamp = timeGen();
// 再次判断
if (timestamp < lastTimestamp) {
throw new RuntimeException("Clock moved backwards, refuse generate id");
}
} else {
throw new RuntimeException("Clock moved backwards, refuse generate id");
}
}
if (lastTimestamp == timestamp) {
sequence = (sequence + 1) & ((1 << sequenceBits) – 1);
if (sequence == 0) {
timestamp = tilNextMillis(lastTimestamp);
}
} else {
sequence = 0L;
}
lastTimestamp = timestamp;
return ((timestamp – twepoch) << (workerIdBits + sequenceBits)) | (workerId << sequenceBits) | sequence;
}
private long tilNextMillis(long lastTimestamp) {
long timestamp = timeGen();
while (timestamp <= lastTimestamp) {
timestamp = timeGen();
}
return timestamp;
}
private long timeGen() {
return System.currentTimeMillis();
}
}
3.5 拒绝连接
解决方案:使用限流(如令牌桶算法)保护服务,防止过载拒绝连接。
操作步骤:
引入Guava的RateLimiter或Sentinel。
在服务入口(如Controller)进行限流判断。
如果超过阈值,返回“服务繁忙”或抛出异常。
Java代码示例(Guava RateLimiter):
java
import com.google.common.util.concurrent.RateLimiter;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class ApiController {
// 每秒最多处理10个请求
private final RateLimiter rateLimiter = RateLimiter.create(10.0);
@GetMapping("/api/data")
public String getData() {
if (!rateLimiter.tryAcquire()) {
return "系统繁忙,请稍后重试";
}
// 执行业务逻辑
return "success";
}
}
使用Sentinel的示例(Spring Cloud Alibaba):
java
@RestController
public class SentinelController {
@GetMapping("/hello")
@SentinelResource(value = "hello", blockHandler = "handleBlock")
public String hello() {
return "Hello Sentinel";
}
public String handleBlock(BlockException ex) {
return "限流了";
}
}
四、系统问题
4.1 内存泄漏
问题描述:应用程序中不再使用的对象无法被垃圾回收,导致堆内存使用持续增长,最终引发OutOfMemoryError。
解决方案:通过内存分析工具(如MAT、JProfiler)定位泄漏点,修复代码中的不合理引用。
操作步骤:
监控内存使用:使用JVM监控工具(如jstat、VisualVM)观察堆内存变化趋势。
生成Heap Dump:当内存异常时,使用jmap或配置JVM参数-XX:+HeapDumpOnOutOfMemoryError自动导出dump文件。
分析Heap Dump:使用MAT(Eclipse Memory Analyzer)打开dump文件,查看可疑对象、GC Roots引用链。
定位问题代码:根据分析结果找到未释放的引用(如ThreadLocal未清理、静态集合不断添加、未关闭的资源等)。
修复代码:修改代码,确保对象使用完毕后置空引用或调用清理方法。
Java代码示例:模拟ThreadLocal内存泄漏及修复。
泄漏代码:
java
public class ThreadLocalLeakDemo {
private static final ThreadLocal<byte[]> threadLocal = new ThreadLocal<>();
public void process() {
// 假设每个请求都设置一个大对象
threadLocal.set(new byte[10 * 1024 * 1024]); // 10MB
// 执行业务逻辑…
// 忘记调用 remove()
}
}
修复代码:
java
public class ThreadLocalLeakDemo {
private static final ThreadLocal<byte[]> threadLocal = new ThreadLocal<>();
public void process() {
try {
threadLocal.set(new byte[10 * 1024 * 1024]);
// 执行业务逻辑…
} finally {
threadLocal.remove(); // 确保清理
}
}
}
操作指令:
-
使用jmap生成堆dump:jmap -dump:live,format=b,file=heap.hprof <pid>
-
使用jstat观察GC情况:jstat -gcutil <pid> 1000 10
4.2 重复提交
问题描述:用户短时间内多次提交同一表单或请求,导致数据重复(如重复下单、重复评论)。
解决方案:前端按钮置灰 + 后端Token机制(防重令牌)。
操作步骤:
前端:在提交按钮点击后立即禁用按钮,防止用户重复点击。
后端:生成一个唯一的Token(如UUID),存入Redis并设置过期时间,返回给前端。
请求携带Token:前端在提交请求时带上该Token。
后端校验:处理请求前先校验Token是否存在且有效,若有效则删除Token并继续处理;若无效则返回重复提交错误。
Java代码示例:
生成Token接口:
java
@RestController
@RequestMapping("/token")
public class TokenController {
@Autowired
private RedisTemplate<String, String> redisTemplate;
@GetMapping("/generate")
public String generateToken() {
String token = UUID.randomUUID().toString();
redisTemplate.opsForValue().set(token, "1", 30, TimeUnit.MINUTES); // 30分钟有效
return token;
}
}
自定义注解+拦截器实现Token校验:
java
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface PreventDuplicateSubmit {
}
@Component
public class DuplicateSubmitInterceptor implements HandlerInterceptor {
@Autowired
private RedisTemplate<String, String> redisTemplate;
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
if (handler instanceof HandlerMethod) {
HandlerMethod method = (HandlerMethod) handler;
if (method.hasMethodAnnotation(PreventDuplicateSubmit.class)) {
String token = request.getHeader("Request-Token");
if (StringUtils.isEmpty(token)) {
throw new RuntimeException("缺少防重Token");
}
// 原子性删除(使用lua脚本或setnx)
Boolean result = redisTemplate.delete(token);
if (Boolean.TRUE.equals(result)) {
return true; // 第一次提交,放行
} else {
response.setStatus(400);
response.getWriter().write("请勿重复提交");
return false;
}
}
}
return true;
}
}
Controller使用:
java
@PostMapping("/order")
@PreventDuplicateSubmit
public String createOrder(@RequestBody OrderDTO orderDTO) {
// 业务逻辑
return "success";
}
4.3 指令重排
问题描述:在多线程环境下,编译器和CPU为了优化可能对指令进行重排,导致意料之外的并发问题(如双重检查锁失效)。
解决方案:使用volatile关键字禁止指令重排,或使用AtomicInteger、synchronized等并发工具。
操作步骤:
识别需要禁止重排的变量(通常是被多个线程共享的状态标志)。
使用volatile修饰该变量。
对于复合操作,使用java.util.concurrent.atomic包中的原子类或锁。
Java代码示例:
双重检查锁单例模式(正确使用volatile):
java
public class Singleton {
private static volatile Singleton instance; // volatile禁止重排
private Singleton() {}
public static Singleton getInstance() {
if (instance == null) {
synchronized (Singleton.class) {
if (instance == null) {
instance = new Singleton(); // 不会重排
}
}
}
return instance;
}
}
如果不加volatile,instance = new Singleton() 可能被重排为:分配内存 -> 将引用赋给instance -> 初始化对象。其他线程可能拿到未初始化的对象。
4.4 系统假死
问题描述:系统无响应(不处理请求),但进程仍在,CPU/内存可能正常,通常是线程池耗尽、死锁、网络阻塞等原因。
解决方案:监控线程状态、死锁检测、配置健康检查并自动重启。
操作步骤:
检测线程池耗尽:通过jstack查看线程堆栈,确认大量线程在等待队列或阻塞。
检测死锁:使用jstack或JConsole检测死锁。
增加监控:在应用中暴露健康检查端点(如Spring Boot Actuator),由外部监控系统(如Kubernetes liveness probe)定期检查,发现假死后自动重启。
优化代码:设置合理的线程池参数、避免同步块过大、使用超时机制。
Java代码示例:
Spring Boot Actuator健康检查端点:
yaml
# application.yml
management:
endpoints:
web:
exposure:
include: health,info
自定义健康检查(检测数据库连接等):
java
@Component
public class CustomHealthIndicator implements HealthIndicator {
@Autowired
private DataSource dataSource;
@Override
public Health health() {
try {
dataSource.getConnection().close(); // 简单测试连接
return Health.up().build();
} catch (Exception e) {
return Health.down(e).build();
}
}
}
Kubernetes探针配置:
yaml
livenessProbe:
httpGet:
path: /actuator/health/liveness
port: 8080
initialDelaySeconds: 30
periodSeconds: 10
死锁检测代码(可放在监控脚本中):
bash
# 使用jstack检测死锁
jstack -l <pid> | grep -A 10 "Found one Java-level deadlock"
4.5 数据倾斜
问题描述:在分布式计算(如MapReduce、Spark、Flink)或数据库分库分表中,数据分布不均,导致部分节点负载过高,影响整体性能。
解决方案:重新设计分区键、加盐(随机前缀)、二次聚合、扩容。
操作步骤:
分析数据分布:查看各个分区的数据量或处理时间。
定位热点Key:如某个用户ID的数据量特别大。
加盐处理:在Key上加随机后缀,将数据打散到多个分区。
二次聚合:第一阶段加盐处理,第二阶段去掉后缀合并结果。
调整分区键:选择分布更均匀的字段作为分区键。
Java代码示例(Flink中处理数据倾斜):
原始KeyBy可能导致倾斜:
java
DataStream<Order> stream = …;
stream.keyBy(Order::getUserId).process(new MyProcessFunction());
加盐方案:
java
int parallelism = 10; // 并行度
DataStream<Order> stream = …;
stream
.map(order -> {
// 加随机后缀(0~parallelism-1)
String saltedKey = order.getUserId() + "_" + ThreadLocalRandom.current().nextInt(parallelism);
return Tuple2.of(saltedKey, order);
})
.keyBy(t -> t.f0)
.process(new SaltedProcessFunction())
.map(t -> t.f1); // 去除盐
在数据库分库分表中的应用:如果某个商户ID数据量极大,可以对该商户ID进行取模+随机后缀,将数据分散到多个库表中,查询时需聚合。
五、故障管理
5.1 故障监控发现
问题描述:如何第一时间发现系统异常,避免故障扩大。
解决方案:建立多维度监控体系,包括基础监控(CPU、内存)、应用监控(QPS、延迟、错误率)、业务监控(订单量、成功率)。
操作步骤:
部署监控工具:Prometheus + Grafana 收集指标,ELK收集日志,SkyWalking进行链路追踪。
暴露指标:在应用中集成Micrometer,暴露自定义指标。
设置告警规则:在Prometheus中配置Alertmanager,通过邮件、钉钉等通知。
配置仪表盘:在Grafana中可视化关键指标,方便快速定位问题。
Java代码示例(Micrometer暴露指标):
java
import io.micrometer.core.instrument.MeterRegistry;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class MetricController {
private final MeterRegistry meterRegistry;
public MetricController(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
}
@GetMapping("/api/order")
public String createOrder() {
// 记录请求数
meterRegistry.counter("order.create.count").increment();
// 记录耗时
long start = System.currentTimeMillis();
// 业务逻辑…
long duration = System.currentTimeMillis() – start;
meterRegistry.timer("order.create.duration").record(duration, TimeUnit.MILLISECONDS);
return "success";
}
}
Prometheus配置示例:
yaml
# prometheus.yml
scrape_configs:
– job_name: 'spring-boot-app'
metrics_path: '/actuator/prometheus'
static_configs:
– targets: ['localhost:8080']
告警规则示例:
yaml
groups:
– name: example
rules:
– alert: HighErrorRate
expr: rate(http_server_requests_seconds_count{status=~"5.."}[1m]) > 0.05
for: 1m
annotations:
summary: "High error rate detected"
5.2 故障分析与定位
问题描述:故障发生后,如何快速定位根本原因。
解决方案:利用日志、链路追踪、线程堆栈、数据库慢查询等工具进行多维度分析。
操作步骤:
查看日志:使用ELK或Splunk搜索错误堆栈,关注异常类型和时间。
链路追踪:通过SkyWalking或Zipkin查看请求调用链,定位耗时长的服务或方法。
线程分析:使用jstack dump线程,检查是否有死锁、线程阻塞。
数据库分析:开启慢查询日志,找出执行慢的SQL,使用explain分析执行计划。
资源分析:查看CPU、内存、磁盘I/O等指标,判断是否资源耗尽。
Java代码示例(动态获取线程堆栈):
java
import java.lang.management.ManagementFactory;
import java.lang.management.ThreadInfo;
import java.lang.management.ThreadMXBean;
public class ThreadDumpUtil {
public static String dumpThreads() {
ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();
ThreadInfo[] threadInfos = threadMXBean.dumpAllThreads(true, true);
StringBuilder sb = new StringBuilder();
for (ThreadInfo info : threadInfos) {
sb.append(info.toString());
}
return sb.toString();
}
}
常用命令:
bash
# 查看Java进程CPU占用高的线程
top -H -p <pid>
printf "%x\\n" <thread-id> # 转为十六进制
jstack <pid> | grep -A 10 <thread-hex>
# 查看数据库慢查询(MySQL)
SET GLOBAL slow_query_log = ON;
SET GLOBAL long_query_time = 2; # 超过2秒记录
tail -f /var/log/mysql/mysql-slow.log
5.3 故障恢复与管理
问题描述:故障发生后,如何快速恢复服务,并管理整个流程。
解决方案:制定应急预案,包括服务重启、降级、熔断、数据修复等;通过自动化脚本快速恢复;事后进行故障复盘。
操作步骤:
立即止损:根据预案执行操作,如重启应用、切流、降级非核心功能。
熔断降级:使用Sentinel或Hystrix熔断异常服务,防止雪崩。
数据修复:如果数据损坏,从备份恢复或执行补偿脚本。
恢复后验证:确认服务正常,逐步恢复流量。
故障复盘:记录故障时间、影响范围、根本原因、改进措施,形成文档。
Java代码示例(Sentinel熔断降级配置):
java
import com.alibaba.csp.sentinel.annotation.SentinelResource;
import com.alibaba.csp.sentinel.slots.block.BlockException;
@RestController
public class ProductController {
@GetMapping("/product/{id}")
@SentinelResource(value = "getProduct", blockHandler = "handleBlock")
public Product getProduct(@PathVariable String id) {
// 可能出错的远程调用
return productService.getProduct(id);
}
public Product handleBlock(String id, BlockException ex) {
// 降级逻辑,返回兜底数据
return new Product(id, "默认商品", 0.0);
}
}
自动化重启脚本示例(使用Shell):
bash
#!/bin/bash
APP_NAME="myapp.jar"
PID=$(ps -ef | grep $APP_NAME | grep -v grep | awk '{print $2}')
if [ -n "$PID" ]; then
kill -15 $PID
sleep 10
fi
nohup java -jar $APP_NAME > app.log 2>&1 &
echo "Application restarted"
5.4 故障预防
问题描述:如何在故障发生前采取措施,避免或减少故障发生的概率。
解决方案:压测、代码审查、灰度发布、容量规划、灾备演练等。
操作步骤:
压力测试:使用JMeter或LoadRunner模拟高并发,发现性能瓶颈。
代码审查:定期审查代码,关注并发、资源释放、异常处理等。
灰度发布:采用金丝雀发布,先让少量用户使用新版本,观察无异常再全量。
容量评估:根据业务增长预估流量,提前扩容。
灾备演练:定期进行主备切换、数据恢复演练,确保流程可用。
Java代码示例(简单的限流预防):
java
import com.google.common.util.concurrent.RateLimiter;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class LoginController {
private final RateLimiter rateLimiter = RateLimiter.create(100); // 每秒100个请求
@GetMapping("/login")
public String login() {
if (!rateLimiter.tryAcquire()) {
return "系统繁忙,请稍后再试";
}
// 登录逻辑
return "success";
}
}
压测脚本示例(JMeter):可以通过JMeter GUI创建测试计划,或者使用命令行:
bash
jmeter -n -t testplan.jmx -l results.jtl -e -o report/
Kubernetes滚动更新(灰度发布):
yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: myapp
spec:
replicas: 10
strategy:
type: RollingUpdate
rollingUpdate:
maxSurge: 1
maxUnavailable: 0
template:
spec:
containers:
– name: myapp
image: myapp:1.0.1
readinessProbe:
httpGet:
path: /actuator/health/readiness
port: 8080
initialDelaySeconds: 10
livenessProbe:
httpGet:
path: /actuator/health/liveness
port: 8080
网硕互联帮助中心


评论前必须登录!
注册