云计算百科
云计算领域专业知识百科平台

基于Spring Boot和WebSocket的实时聊天系统

一、项目架构设计

1. 技术栈组成

组件用途版本
Spring Boot 基础框架 2.7.x
javax.websocket WebSocket实现 JSR-356
Mybatis-Plus 数据持久化 3.5.x
Redis 缓存/消息队列 6.2.x
MongoDB 聊天记录存储 5.0.x
MinIO 文件存储 8.0.x
Kafka 消息分发 2.8.x
OAuth2 认证授权 2.5.x

2. 系统架构图

#mermaid-svg-gMZPM2eWE5TjzoLg {font-family:\”trebuchet ms\”,verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-gMZPM2eWE5TjzoLg .error-icon{fill:#552222;}#mermaid-svg-gMZPM2eWE5TjzoLg .error-text{fill:#552222;stroke:#552222;}#mermaid-svg-gMZPM2eWE5TjzoLg .edge-thickness-normal{stroke-width:2px;}#mermaid-svg-gMZPM2eWE5TjzoLg .edge-thickness-thick{stroke-width:3.5px;}#mermaid-svg-gMZPM2eWE5TjzoLg .edge-pattern-solid{stroke-dasharray:0;}#mermaid-svg-gMZPM2eWE5TjzoLg .edge-pattern-dashed{stroke-dasharray:3;}#mermaid-svg-gMZPM2eWE5TjzoLg .edge-pattern-dotted{stroke-dasharray:2;}#mermaid-svg-gMZPM2eWE5TjzoLg .marker{fill:#333333;stroke:#333333;}#mermaid-svg-gMZPM2eWE5TjzoLg .marker.cross{stroke:#333333;}#mermaid-svg-gMZPM2eWE5TjzoLg svg{font-family:\”trebuchet ms\”,verdana,arial,sans-serif;font-size:16px;}#mermaid-svg-gMZPM2eWE5TjzoLg .label{font-family:\”trebuchet ms\”,verdana,arial,sans-serif;color:#333;}#mermaid-svg-gMZPM2eWE5TjzoLg .cluster-label text{fill:#333;}#mermaid-svg-gMZPM2eWE5TjzoLg .cluster-label span{color:#333;}#mermaid-svg-gMZPM2eWE5TjzoLg .label text,#mermaid-svg-gMZPM2eWE5TjzoLg span{fill:#333;color:#333;}#mermaid-svg-gMZPM2eWE5TjzoLg .node rect,#mermaid-svg-gMZPM2eWE5TjzoLg .node circle,#mermaid-svg-gMZPM2eWE5TjzoLg .node ellipse,#mermaid-svg-gMZPM2eWE5TjzoLg .node polygon,#mermaid-svg-gMZPM2eWE5TjzoLg .node path{fill:#ECECFF;stroke:#9370DB;stroke-width:1px;}#mermaid-svg-gMZPM2eWE5TjzoLg .node .label{text-align:center;}#mermaid-svg-gMZPM2eWE5TjzoLg .node.clickable{cursor:pointer;}#mermaid-svg-gMZPM2eWE5TjzoLg .arrowheadPath{fill:#333333;}#mermaid-svg-gMZPM2eWE5TjzoLg .edgePath .path{stroke:#333333;stroke-width:2.0px;}#mermaid-svg-gMZPM2eWE5TjzoLg .flowchart-link{stroke:#333333;fill:none;}#mermaid-svg-gMZPM2eWE5TjzoLg .edgeLabel{background-color:#e8e8e8;text-align:center;}#mermaid-svg-gMZPM2eWE5TjzoLg .edgeLabel rect{opacity:0.5;background-color:#e8e8e8;fill:#e8e8e8;}#mermaid-svg-gMZPM2eWE5TjzoLg .cluster rect{fill:#ffffde;stroke:#aaaa33;stroke-width:1px;}#mermaid-svg-gMZPM2eWE5TjzoLg .cluster text{fill:#333;}#mermaid-svg-gMZPM2eWE5TjzoLg .cluster span{color:#333;}#mermaid-svg-gMZPM2eWE5TjzoLg div.mermaidTooltip{position:absolute;text-align:center;max-width:200px;padding:2px;font-family:\”trebuchet ms\”,verdana,arial,sans-serif;font-size:12px;background:hsl(80, 100%, 96.2745098039%);border:1px solid #aaaa33;border-radius:2px;pointer-events:none;z-index:100;}#mermaid-svg-gMZPM2eWE5TjzoLg :root{–mermaid-font-family:\”trebuchet ms\”,verdana,arial,sans-serif;}客户端WebSocket连接Spring Boot服务消息处理器MySQL: 用户/关系MongoDB: 聊天记录Redis: 在线状态MinIO: 文件存储Kafka: 消息分发

二、核心功能实现

1. WebSocket配置增强版

@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {

@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
registry.addHandler(chatWebSocketHandler(), "/websocket")
.setAllowedOrigins("*")
.addInterceptors(new AuthHandshakeInterceptor())
.withSockJS();
}

@Bean
public WebSocketHandler chatWebSocketHandler() {
return new ChatWebSocketHandler();
}

@Bean
public ServletServerContainerFactoryBean createWebSocketContainer() {
ServletServerContainerFactoryBean container = new ServletServerContainerFactoryBean();
container.setMaxTextMessageBufferSize(8192);
container.setMaxBinaryMessageBufferSize(8192);
container.setMaxSessionIdleTimeout(600000L);
return container;
}
}

2. 消息处理器实现

public class ChatWebSocketHandler extends TextWebSocketHandler {

private static final Map<String, WebSocketSession> sessions = new ConcurrentHashMap<>();
private final ObjectMapper objectMapper = new ObjectMapper();

@Override
public void afterConnectionEstablished(WebSocketSession session) {
String userId = getUserIdFromSession(session);
sessions.put(userId, session);
updateOnlineStatus(userId, true);
}

@Override
protected void handleTextMessage(WebSocketSession session, TextMessage message) {
try {
ChatMessage chatMessage = objectMapper.readValue(message.getPayload(), ChatMessage.class);

switch (chatMessage.getType()) {
case HEARTBEAT:
handleHeartbeat(session);
break;
case SINGLE_CHAT:
handleSingleChat(chatMessage);
break;
case GROUP_CHAT:
handleGroupChat(chatMessage);
break;
case READ_RECEIPT:
handleReadReceipt(chatMessage);
break;
case FILE_UPLOAD:
handleFileUpload(chatMessage);
break;
}
} catch (Exception e) {
log.error("消息处理异常", e);
}
}

private void handleHeartbeat(WebSocketSession session) {
try {
session.sendMessage(new TextMessage("{\\\\"type\\\\":\\\\"HEARTBEAT_RESPONSE\\\\"}"));
} catch (IOException e) {
log.error("心跳响应失败", e);
}
}

// 其他处理方法…
}

三、关键业务逻辑实现

1. 消息存储设计

MySQL表结构

CREATE TABLE `chat_message` (
`id` bigint NOT NULL AUTO_INCREMENT,
`msg_id` varchar(64) NOT NULL COMMENT '消息唯一ID',
`sender_id` varchar(64) NOT NULL,
`receiver_id` varchar(64) NOT NULL,
`content` text,
`msg_type` tinyint NOT NULL COMMENT '1-文本 2-图片 3-视频',
`status` tinyint DEFAULT '0' COMMENT '0-未读 1-已读',
`created_at` datetime NOT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `uk_msg_id` (`msg_id`),
KEY `idx_sender_receiver` (`sender_id`,`receiver_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

MongoDB文档结构

@Document(collection = "chat_messages")
public class ChatMessageDocument {
@Id
private String id;
private String msgId;
private String senderId;
private String receiverId;
private String content;
private MessageType msgType;
private MessageStatus status;
private Date createdAt;
private List<ReadReceipt> readReceipts;

// 嵌套文档
public static class ReadReceipt {
private String userId;
private Date readAt;
}
}

2. 消息分发流程

@Service
@RequiredArgsConstructor
public class MessageDispatcher {

private final KafkaTemplate<String, String> kafkaTemplate;
private final RedisTemplate<String, String> redisTemplate;

public void dispatch(ChatMessage message) {
// 存储消息
storeMessage(message);

// 实时推送
if (isUserOnline(message.getReceiverId())) {
realtimePush(message);
} else {
// 离线用户通过推送通知
pushNotification(message);
}

// 发往Kafka做后续处理
kafkaTemplate.send("chat-messages", message.getMsgId(), serialize(message));
}

private boolean isUserOnline(String userId) {
return redisTemplate.opsForValue().get("user:online:" + userId) != null;
}

private void realtimePush(ChatMessage message) {
WebSocketSession session = sessions.get(message.getReceiverId());
if (session != null && session.isOpen()) {
try {
session.sendMessage(new TextMessage(serialize(message)));
} catch (IOException e) {
log.error("消息推送失败", e);
}
}
}
}

四、高级功能实现

1. 心跳检测机制

@Scheduled(fixedRate = 30000)
public void checkHeartbeat() {
long now = System.currentTimeMillis();
sessions.forEach((userId, session) -> {
Long lastHeartbeat = heartbeatTimestamps.get(userId);
if (lastHeartbeat == null || now lastHeartbeat > 60000) {
try {
session.close(CloseStatus.SESSION_NOT_RELIABLE);
sessions.remove(userId);
updateOnlineStatus(userId, false);
} catch (IOException e) {
log.error("关闭会话失败", e);
}
}
});
}

2. 消息已读回执

public void handleReadReceipt(ChatMessage message) {
// 更新MySQL中的消息状态
chatMessageMapper.updateStatusByMsgId(message.getMsgId(), MessageStatus.READ);

// 更新MongoDB中的阅读状态
Query query = Query.query(Criteria.where("msgId").is(message.getMsgId()));
Update update = new Update()
.push("readReceipts", new ReadReceipt(message.getSenderId(), new Date()))
.set("status", MessageStatus.READ);
mongoTemplate.updateFirst(query, update, ChatMessageDocument.class);

// 通知发送方消息已读
if (isUserOnline(message.getSenderId())) {
realtimePush(new ChatMessage(
MessageType.READ_RECEIPT,
message.getMsgId(),
message.getReceiverId(),
message.getSenderId()
));
}
}

五、性能优化方案

1. 消息批量处理

@KafkaListener(topics = "chat-messages", groupId = "message-processor")
public void processMessages(List<ConsumerRecord<String, String>> records) {
List<ChatMessage> messages = records.stream()
.map(record -> deserialize(record.value()))
.collect(Collectors.toList());

// 批量存储MySQL
chatMessageMapper.batchInsert(messages);

// 批量存储MongoDB
List<ChatMessageDocument> documents = messages.stream()
.map(this::convertToDocument)
.collect(Collectors.toList());
mongoTemplate.insertAll(documents);
}

2. Redis缓存优化

@Service
public class UserStatusService {

private final RedisTemplate<String, String> redisTemplate;

public boolean isOnline(String userId) {
return redisTemplate.opsForValue().get("user:online:" + userId) != null;
}

public void setOnline(String userId, boolean online) {
if (online) {
redisTemplate.opsForValue().set(
"user:online:" + userId,
"1",
5, TimeUnit.MINUTES);
} else {
redisTemplate.delete("user:online:" + userId);
}
}

public List<String> getOnlineUsers(List<String> userIds) {
return redisTemplate.executePipelined((RedisCallback<Object>) connection -> {
for (String userId : userIds) {
connection.exists(("user:online:" + userId).getBytes());
}
return null;
}).stream()
.map(Object::toString)
.collect(Collectors.toList());
}
}

六、安全防护措施

1. OAuth2认证集成

@Bean
public SecurityFilterChain securityFilterChain(HttpSecurity http) throws Exception {
http
.authorizeHttpRequests(auth -> auth
.antMatchers("/websocket/**").authenticated()
.anyRequest().permitAll()
)
.oauth2ResourceServer(oauth2 -> oauth2
.jwt(jwt -> jwt
.decoder(jwtDecoder())
)
)
.sessionManagement(session -> session
.sessionCreationPolicy(SessionCreationPolicy.STATELESS)
);
return http.build();
}

@Bean
public JwtDecoder jwtDecoder() {
return NimbusJwtDecoder.withJwkSetUri(jwkSetUri).build();
}

2. WebSocket安全拦截器

public class AuthHandshakeInterceptor implements HandshakeInterceptor {

@Override
public boolean beforeHandshake(ServerHttpRequest request,
ServerHttpResponse response,
WebSocketHandler wsHandler,
Map<String, Object> attributes) {
String token = extractToken(request);
if (token == null) {
return false;
}

try {
Jwt jwt = jwtDecoder.decode(token);
attributes.put("userId", jwt.getSubject());
return true;
} catch (JwtException e) {
return false;
}
}

private String extractToken(ServerHttpRequest request) {
// 从请求头或参数中提取token
}
}

七、部署与监控

1. Docker Compose配置

version: '3.8'

services:
app:
build: .
ports:
"8080:8080"
depends_on:
redis
mysql
mongodb
kafka

redis:
image: redis:6.2
ports:
"6379:6379"

mysql:
image: mysql:8.0
environment:
MYSQL_ROOT_PASSWORD: root
MYSQL_DATABASE: chat_db
ports:
"3306:3306"

mongodb:
image: mongo:5.0
ports:
"27017:27017"

kafka:
image: bitnami/kafka:2.8
ports:
"9092:9092"
environment:
KAFKA_CFG_ZOOKEEPER_CONNECT: zookeeper:2181
ALLOW_PLAINTEXT_LISTENER: "yes"

zookeeper:
image: bitnami/zookeeper:3.7
ports:
"2181:2181"

2. Prometheus监控配置

scrape_configs:
job_name: 'chat-app'
metrics_path: '/actuator/prometheus'
static_configs:
targets: ['app:8080']
labels:
service: 'chat-service'

job_name: 'redis'
static_configs:
targets: ['redis:9121']

job_name: 'mysql'
static_configs:
targets: ['mysql:9104']

job_name: 'kafka'
static_configs:
targets: ['kafka:7071']

通过以上实现方案,我们构建了一个功能完善、性能优越且安全可靠的实时聊天系统。该系统不仅支持基本的聊天功能,还提供了消息存储、已读回执、文件传输等高级特性,同时具备良好的扩展性和可维护性。

赞(0)
未经允许不得转载:网硕互联帮助中心 » 基于Spring Boot和WebSocket的实时聊天系统
分享到: 更多 (0)

评论 抢沙发

评论前必须登录!