一、项目架构设计
1. 技术栈组成
Spring Boot | 基础框架 | 2.7.x |
javax.websocket | WebSocket实现 | JSR-356 |
Mybatis-Plus | 数据持久化 | 3.5.x |
Redis | 缓存/消息队列 | 6.2.x |
MongoDB | 聊天记录存储 | 5.0.x |
MinIO | 文件存储 | 8.0.x |
Kafka | 消息分发 | 2.8.x |
OAuth2 | 认证授权 | 2.5.x |
2. 系统架构图
#mermaid-svg-gMZPM2eWE5TjzoLg {font-family:\”trebuchet ms\”,verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-gMZPM2eWE5TjzoLg .error-icon{fill:#552222;}#mermaid-svg-gMZPM2eWE5TjzoLg .error-text{fill:#552222;stroke:#552222;}#mermaid-svg-gMZPM2eWE5TjzoLg .edge-thickness-normal{stroke-width:2px;}#mermaid-svg-gMZPM2eWE5TjzoLg .edge-thickness-thick{stroke-width:3.5px;}#mermaid-svg-gMZPM2eWE5TjzoLg .edge-pattern-solid{stroke-dasharray:0;}#mermaid-svg-gMZPM2eWE5TjzoLg .edge-pattern-dashed{stroke-dasharray:3;}#mermaid-svg-gMZPM2eWE5TjzoLg .edge-pattern-dotted{stroke-dasharray:2;}#mermaid-svg-gMZPM2eWE5TjzoLg .marker{fill:#333333;stroke:#333333;}#mermaid-svg-gMZPM2eWE5TjzoLg .marker.cross{stroke:#333333;}#mermaid-svg-gMZPM2eWE5TjzoLg svg{font-family:\”trebuchet ms\”,verdana,arial,sans-serif;font-size:16px;}#mermaid-svg-gMZPM2eWE5TjzoLg .label{font-family:\”trebuchet ms\”,verdana,arial,sans-serif;color:#333;}#mermaid-svg-gMZPM2eWE5TjzoLg .cluster-label text{fill:#333;}#mermaid-svg-gMZPM2eWE5TjzoLg .cluster-label span{color:#333;}#mermaid-svg-gMZPM2eWE5TjzoLg .label text,#mermaid-svg-gMZPM2eWE5TjzoLg span{fill:#333;color:#333;}#mermaid-svg-gMZPM2eWE5TjzoLg .node rect,#mermaid-svg-gMZPM2eWE5TjzoLg .node circle,#mermaid-svg-gMZPM2eWE5TjzoLg .node ellipse,#mermaid-svg-gMZPM2eWE5TjzoLg .node polygon,#mermaid-svg-gMZPM2eWE5TjzoLg .node path{fill:#ECECFF;stroke:#9370DB;stroke-width:1px;}#mermaid-svg-gMZPM2eWE5TjzoLg .node .label{text-align:center;}#mermaid-svg-gMZPM2eWE5TjzoLg .node.clickable{cursor:pointer;}#mermaid-svg-gMZPM2eWE5TjzoLg .arrowheadPath{fill:#333333;}#mermaid-svg-gMZPM2eWE5TjzoLg .edgePath .path{stroke:#333333;stroke-width:2.0px;}#mermaid-svg-gMZPM2eWE5TjzoLg .flowchart-link{stroke:#333333;fill:none;}#mermaid-svg-gMZPM2eWE5TjzoLg .edgeLabel{background-color:#e8e8e8;text-align:center;}#mermaid-svg-gMZPM2eWE5TjzoLg .edgeLabel rect{opacity:0.5;background-color:#e8e8e8;fill:#e8e8e8;}#mermaid-svg-gMZPM2eWE5TjzoLg .cluster rect{fill:#ffffde;stroke:#aaaa33;stroke-width:1px;}#mermaid-svg-gMZPM2eWE5TjzoLg .cluster text{fill:#333;}#mermaid-svg-gMZPM2eWE5TjzoLg .cluster span{color:#333;}#mermaid-svg-gMZPM2eWE5TjzoLg div.mermaidTooltip{position:absolute;text-align:center;max-width:200px;padding:2px;font-family:\”trebuchet ms\”,verdana,arial,sans-serif;font-size:12px;background:hsl(80, 100%, 96.2745098039%);border:1px solid #aaaa33;border-radius:2px;pointer-events:none;z-index:100;}#mermaid-svg-gMZPM2eWE5TjzoLg :root{–mermaid-font-family:\”trebuchet ms\”,verdana,arial,sans-serif;}客户端WebSocket连接Spring Boot服务消息处理器MySQL: 用户/关系MongoDB: 聊天记录Redis: 在线状态MinIO: 文件存储Kafka: 消息分发
二、核心功能实现
1. WebSocket配置增强版
@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
registry.addHandler(chatWebSocketHandler(), "/websocket")
.setAllowedOrigins("*")
.addInterceptors(new AuthHandshakeInterceptor())
.withSockJS();
}
@Bean
public WebSocketHandler chatWebSocketHandler() {
return new ChatWebSocketHandler();
}
@Bean
public ServletServerContainerFactoryBean createWebSocketContainer() {
ServletServerContainerFactoryBean container = new ServletServerContainerFactoryBean();
container.setMaxTextMessageBufferSize(8192);
container.setMaxBinaryMessageBufferSize(8192);
container.setMaxSessionIdleTimeout(600000L);
return container;
}
}
2. 消息处理器实现
public class ChatWebSocketHandler extends TextWebSocketHandler {
private static final Map<String, WebSocketSession> sessions = new ConcurrentHashMap<>();
private final ObjectMapper objectMapper = new ObjectMapper();
@Override
public void afterConnectionEstablished(WebSocketSession session) {
String userId = getUserIdFromSession(session);
sessions.put(userId, session);
updateOnlineStatus(userId, true);
}
@Override
protected void handleTextMessage(WebSocketSession session, TextMessage message) {
try {
ChatMessage chatMessage = objectMapper.readValue(message.getPayload(), ChatMessage.class);
switch (chatMessage.getType()) {
case HEARTBEAT:
handleHeartbeat(session);
break;
case SINGLE_CHAT:
handleSingleChat(chatMessage);
break;
case GROUP_CHAT:
handleGroupChat(chatMessage);
break;
case READ_RECEIPT:
handleReadReceipt(chatMessage);
break;
case FILE_UPLOAD:
handleFileUpload(chatMessage);
break;
}
} catch (Exception e) {
log.error("消息处理异常", e);
}
}
private void handleHeartbeat(WebSocketSession session) {
try {
session.sendMessage(new TextMessage("{\\\\"type\\\\":\\\\"HEARTBEAT_RESPONSE\\\\"}"));
} catch (IOException e) {
log.error("心跳响应失败", e);
}
}
// 其他处理方法…
}
三、关键业务逻辑实现
1. 消息存储设计
MySQL表结构
CREATE TABLE `chat_message` (
`id` bigint NOT NULL AUTO_INCREMENT,
`msg_id` varchar(64) NOT NULL COMMENT '消息唯一ID',
`sender_id` varchar(64) NOT NULL,
`receiver_id` varchar(64) NOT NULL,
`content` text,
`msg_type` tinyint NOT NULL COMMENT '1-文本 2-图片 3-视频',
`status` tinyint DEFAULT '0' COMMENT '0-未读 1-已读',
`created_at` datetime NOT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `uk_msg_id` (`msg_id`),
KEY `idx_sender_receiver` (`sender_id`,`receiver_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
MongoDB文档结构
@Document(collection = "chat_messages")
public class ChatMessageDocument {
@Id
private String id;
private String msgId;
private String senderId;
private String receiverId;
private String content;
private MessageType msgType;
private MessageStatus status;
private Date createdAt;
private List<ReadReceipt> readReceipts;
// 嵌套文档
public static class ReadReceipt {
private String userId;
private Date readAt;
}
}
2. 消息分发流程
@Service
@RequiredArgsConstructor
public class MessageDispatcher {
private final KafkaTemplate<String, String> kafkaTemplate;
private final RedisTemplate<String, String> redisTemplate;
public void dispatch(ChatMessage message) {
// 存储消息
storeMessage(message);
// 实时推送
if (isUserOnline(message.getReceiverId())) {
realtimePush(message);
} else {
// 离线用户通过推送通知
pushNotification(message);
}
// 发往Kafka做后续处理
kafkaTemplate.send("chat-messages", message.getMsgId(), serialize(message));
}
private boolean isUserOnline(String userId) {
return redisTemplate.opsForValue().get("user:online:" + userId) != null;
}
private void realtimePush(ChatMessage message) {
WebSocketSession session = sessions.get(message.getReceiverId());
if (session != null && session.isOpen()) {
try {
session.sendMessage(new TextMessage(serialize(message)));
} catch (IOException e) {
log.error("消息推送失败", e);
}
}
}
}
四、高级功能实现
1. 心跳检测机制
@Scheduled(fixedRate = 30000)
public void checkHeartbeat() {
long now = System.currentTimeMillis();
sessions.forEach((userId, session) -> {
Long lastHeartbeat = heartbeatTimestamps.get(userId);
if (lastHeartbeat == null || now – lastHeartbeat > 60000) {
try {
session.close(CloseStatus.SESSION_NOT_RELIABLE);
sessions.remove(userId);
updateOnlineStatus(userId, false);
} catch (IOException e) {
log.error("关闭会话失败", e);
}
}
});
}
2. 消息已读回执
public void handleReadReceipt(ChatMessage message) {
// 更新MySQL中的消息状态
chatMessageMapper.updateStatusByMsgId(message.getMsgId(), MessageStatus.READ);
// 更新MongoDB中的阅读状态
Query query = Query.query(Criteria.where("msgId").is(message.getMsgId()));
Update update = new Update()
.push("readReceipts", new ReadReceipt(message.getSenderId(), new Date()))
.set("status", MessageStatus.READ);
mongoTemplate.updateFirst(query, update, ChatMessageDocument.class);
// 通知发送方消息已读
if (isUserOnline(message.getSenderId())) {
realtimePush(new ChatMessage(
MessageType.READ_RECEIPT,
message.getMsgId(),
message.getReceiverId(),
message.getSenderId()
));
}
}
五、性能优化方案
1. 消息批量处理
@KafkaListener(topics = "chat-messages", groupId = "message-processor")
public void processMessages(List<ConsumerRecord<String, String>> records) {
List<ChatMessage> messages = records.stream()
.map(record -> deserialize(record.value()))
.collect(Collectors.toList());
// 批量存储MySQL
chatMessageMapper.batchInsert(messages);
// 批量存储MongoDB
List<ChatMessageDocument> documents = messages.stream()
.map(this::convertToDocument)
.collect(Collectors.toList());
mongoTemplate.insertAll(documents);
}
2. Redis缓存优化
@Service
public class UserStatusService {
private final RedisTemplate<String, String> redisTemplate;
public boolean isOnline(String userId) {
return redisTemplate.opsForValue().get("user:online:" + userId) != null;
}
public void setOnline(String userId, boolean online) {
if (online) {
redisTemplate.opsForValue().set(
"user:online:" + userId,
"1",
5, TimeUnit.MINUTES);
} else {
redisTemplate.delete("user:online:" + userId);
}
}
public List<String> getOnlineUsers(List<String> userIds) {
return redisTemplate.executePipelined((RedisCallback<Object>) connection -> {
for (String userId : userIds) {
connection.exists(("user:online:" + userId).getBytes());
}
return null;
}).stream()
.map(Object::toString)
.collect(Collectors.toList());
}
}
六、安全防护措施
1. OAuth2认证集成
@Bean
public SecurityFilterChain securityFilterChain(HttpSecurity http) throws Exception {
http
.authorizeHttpRequests(auth -> auth
.antMatchers("/websocket/**").authenticated()
.anyRequest().permitAll()
)
.oauth2ResourceServer(oauth2 -> oauth2
.jwt(jwt -> jwt
.decoder(jwtDecoder())
)
)
.sessionManagement(session -> session
.sessionCreationPolicy(SessionCreationPolicy.STATELESS)
);
return http.build();
}
@Bean
public JwtDecoder jwtDecoder() {
return NimbusJwtDecoder.withJwkSetUri(jwkSetUri).build();
}
2. WebSocket安全拦截器
public class AuthHandshakeInterceptor implements HandshakeInterceptor {
@Override
public boolean beforeHandshake(ServerHttpRequest request,
ServerHttpResponse response,
WebSocketHandler wsHandler,
Map<String, Object> attributes) {
String token = extractToken(request);
if (token == null) {
return false;
}
try {
Jwt jwt = jwtDecoder.decode(token);
attributes.put("userId", jwt.getSubject());
return true;
} catch (JwtException e) {
return false;
}
}
private String extractToken(ServerHttpRequest request) {
// 从请求头或参数中提取token
}
}
七、部署与监控
1. Docker Compose配置
version: '3.8'
services:
app:
build: .
ports:
– "8080:8080"
depends_on:
– redis
– mysql
– mongodb
– kafka
redis:
image: redis:6.2
ports:
– "6379:6379"
mysql:
image: mysql:8.0
environment:
MYSQL_ROOT_PASSWORD: root
MYSQL_DATABASE: chat_db
ports:
– "3306:3306"
mongodb:
image: mongo:5.0
ports:
– "27017:27017"
kafka:
image: bitnami/kafka:2.8
ports:
– "9092:9092"
environment:
KAFKA_CFG_ZOOKEEPER_CONNECT: zookeeper:2181
ALLOW_PLAINTEXT_LISTENER: "yes"
zookeeper:
image: bitnami/zookeeper:3.7
ports:
– "2181:2181"
2. Prometheus监控配置
scrape_configs:
– job_name: 'chat-app'
metrics_path: '/actuator/prometheus'
static_configs:
– targets: ['app:8080']
labels:
service: 'chat-service'
– job_name: 'redis'
static_configs:
– targets: ['redis:9121']
– job_name: 'mysql'
static_configs:
– targets: ['mysql:9104']
– job_name: 'kafka'
static_configs:
– targets: ['kafka:7071']
通过以上实现方案,我们构建了一个功能完善、性能优越且安全可靠的实时聊天系统。该系统不仅支持基本的聊天功能,还提供了消息存储、已读回执、文件传输等高级特性,同时具备良好的扩展性和可维护性。
评论前必须登录!
注册