TCP服务器并发优化与请求-应答模式实战
前言
在工业视觉定位系统中,TCP服务器承担着多客户端通信的重要职责。本文以 TcpSvrService 类为例,深入解析如何实现:
- 多客户端并发安全
- 标准的请求-应答分离模式
- 完善的错误处理与超时机制
一、架构概述
1.1 核心组件
| TcpSvrService | 基于 SuperSocket 的 TCP 服务器 |
| TcpRequestContext | 请求上下文,封装请求数据和响应设置 |
| ConcurrentDictionary | 线程安全的并发任务管理 |
| TaskCompletionSource | 事件回调转异步等待 |
1.2 请求-应答流程
客户端发送请求 → 服务端创建上下文 → 外部获取请求 → 外部处理 → 设置应答 → 发送响应
二、核心设计:TcpRequestContext
请求上下文是实现请求-应答分离的关键:
public class TcpRequestContext
{
public string SessionId { get; } // 会话标识
public string RequestBody { get; } // 请求内容
public StringPackageInfo Package { get; } // 原始数据包
internal TaskCompletionSource<string> ResponseTcs { get; } = new();
/// <summary>
/// 设置响应内容,触发服务端发送应答
/// </summary>
public void SetResponse(string response)
{
ResponseTcs.TrySetResult(response);
}
}
设计要点:
- ✅ 封装请求数据,外部可直接读取
- ✅ SetResponse() 方法触发响应发送
- ✅ ResponseTcs 使用 internal 修饰,外部无法绕过正常流程
三、并发安全设计
3.1 问题分析
原有设计的缺陷:
// ❌ 单例模式:多客户端时会相互覆盖
private TaskCompletionSource<object> _tcs;
public Task<object> WaitForNextPackageAsync()
{
_tcs = new TaskCompletionSource<object>(); // 后续调用会覆盖前一个
return _tcs.Task;
}
问题场景:
T1: 客户端A调用 → 创建 TCS_A
T2: 客户端B调用 → 创建 TCS_B(覆盖 TCS_A)❌
T3: 客户端A数据到达 → 错误地完成 TCS_B ❌
3.2 改进方案
使用 ConcurrentDictionary 按会话ID隔离:
// ✅ 字典管理,每个会话独立
private ConcurrentDictionary<string, TaskCompletionSource<TcpRequestContext>> _waitingTasks = new();
private ConcurrentDictionary<string, TcpRequestContext> _pendingRequests = new();
private const string GLOBAL_WAIT_KEY = "__GLOBAL_WAIT__";
3.3 等待方法实现
// 等待任意会话(向后兼容)
public Task<TcpRequestContext> WaitForNextPackageAsync()
{
var tcs = new TaskCompletionSource<TcpRequestContext>();
_waitingTasks.TryAdd(GLOBAL_WAIT_KEY, tcs);
return tcs.Task;
}
// 等待指定会话
public Task<TcpRequestContext> WaitForNextPackageAsync(string sessionId)
{
if (string.IsNullOrEmpty(sessionId))
throw new ArgumentException("SessionId cannot be null or empty");
var tcs = new TaskCompletionSource<TcpRequestContext>();
_waitingTasks.TryAdd(sessionId, tcs);
return tcs.Task;
}
四、请求-应答分离与超时机制
4.1 核心处理流程
private async ValueTask HandlePackageAsync(IAppSession session, StringPackageInfo package)
{
string sessionId = session.SessionID;
TcpRequestContext context = null;
try
{
// 1️⃣ 创建请求上下文
context = new TcpRequestContext(sessionId, package);
_pendingRequests.TryAdd(sessionId, context);
// 2️⃣ 触发预处理事件(可选)
if (OnPackageReceived != null)
await OnPackageReceived.Invoke(package);
// 3️⃣ 通知外部等待者
if (_waitingTasks.TryRemove(sessionId, out var sessionTcs))
sessionTcs.TrySetResult(context);
if (_waitingTasks.TryRemove(GLOBAL_WAIT_KEY, out var globalTcs))
globalTcs.TrySetResult(context);
// 4️⃣ 等待外部设置响应(带超时)
string response = await WaitForResponseAsync(context);
// 5️⃣ 发送应答
await session.SendAsync(Encoding.UTF8.GetBytes(response + TcpTerminator));
}
catch (Exception ex)
{
// 异常处理…
}
finally
{
_pendingRequests.TryRemove(sessionId, out _);
}
}
4.2 超时机制
/// <summary>
/// 响应超时时间(毫秒),默认30秒。设为0或负数表示不超时。
/// </summary>
public int ResponseTimeoutMs { get; set; } = 30000;
// 等待响应,带超时保护
private async Task<string> WaitForResponseAsync(TcpRequestContext context)
{
if (ResponseTimeoutMs > 0)
{
var responseTask = context.ResponseTcs.Task;
var timeoutTask = Task.Delay(ResponseTimeoutMs);
var completedTask = await Task.WhenAny(responseTask, timeoutTask);
if (completedTask == timeoutTask)
{
context.ResponseTcs.TrySetCanceled();
return "响应超时";
}
return await responseTask;
}
return await context.ResponseTcs.Task;
}
五、错误处理与资源清理
5.1 异常处理
catch (Exception ex)
{
// 完成等待任务,避免永久等待
if (_waitingTasks.TryRemove(sessionId, out var sessionTcs))
sessionTcs.TrySetException(ex);
if (_waitingTasks.TryRemove(GLOBAL_WAIT_KEY, out var globalTcs))
globalTcs.TrySetException(ex);
// 取消响应等待
context?.ResponseTcs.TrySetException(ex);
// 发送错误响应(嵌套保护)
try
{
await session.SendAsync(Encoding.UTF8.GetBytes($"处理错误: {ex.Message}" + TcpTerminator));
}
catch { /* 忽略发送失败 */ }
}
5.2 资源清理
public void DisConnect(CommunicationConfig config)
{
try
{
host?.StopAsync().Wait();
config.IsConnected = false;
}
finally
{
// 清理等待任务
foreach (var kvp in _waitingTasks)
kvp.Value.TrySetCanceled();
_waitingTasks.Clear();
// 清理待处理请求
foreach (var kvp in _pendingRequests)
kvp.Value.ResponseTcs.TrySetCanceled();
_pendingRequests.Clear();
host?.Dispose();
}
}
六、使用示例
6.1 基本使用:请求-应答模式
var tcpService = new TcpSvrService();
await tcpService.Connect(config);
// 等待客户端请求
var context = await tcpService.WaitForNextPackageAsync();
// 获取请求数据
string request = context.RequestBody;
Console.WriteLine($"收到请求: {request}");
// 处理请求,生成应答
string response = ProcessRequest(request);
// 设置应答,自动发送给客户端
context.SetResponse(response);
6.2 指定会话等待
// 等待特定客户端的请求
var context = await tcpService.WaitForNextPackageAsync("session-123");
string request = context.RequestBody;
// 处理后设置应答
context.SetResponse($"处理完成: {request}");
6.3 配置超时
// 设置响应超时时间
tcpService.ResponseTimeoutMs = 60000; // 60秒
tcpService.ResponseTimeoutMs = 0; // 不超时(谨慎使用)
6.4 错误处理
try
{
var context = await tcpService.WaitForNextPackageAsync(sessionId);
context.SetResponse("OK");
}
catch (OperationCanceledException)
{
Console.WriteLine("请求被取消");
}
catch (Exception ex)
{
Console.WriteLine($"处理失败: {ex.Message}");
}
七、设计对比
| 并发支持 | 单例 TCS,多客户端覆盖 | 字典管理,会话隔离 |
| 请求-应答 | 混合处理 | 分离模式,清晰可控 |
| 超时保护 | 无 | 可配置超时 |
| 错误处理 | 无捕获,任务悬挂 | 完整处理,资源清理 |
| 向后兼容 | – | ✅ 保留无参数版本 |
八、流程图
┌─────────────────────────────────────────────────────────┐
│ 客户端发送请求 │
└──────────────────────────┬──────────────────────────────┘
▼
┌─────────────────────────────────────────────────────────┐
│ HandlePackageAsync 接收数据包 │
│ 1. 创建 TcpRequestContext │
│ 2. 存入 _pendingRequests │
└──────────────────────────┬──────────────────────────────┘
▼
┌─────────────────────────────────────────────────────────┐
│ 通知外部等待者 │
│ • 完成会话级等待任务 │
│ • 完成全局等待任务 │
└──────────────────────────┬──────────────────────────────┘
▼
┌─────────────────────────────────────────────────────────┐
│ 等待外部调用 context.SetResponse() │
│ • 带超时保护 │
└──────────────────────────┬──────────────────────────────┘
▼
┌─────────────────────────────────────────────────────────┐
│ 发送应答给客户端 │
└─────────────────────────────────────────────────────────┘
九、最佳实践
9.1 使用建议
| 单客户端 | WaitForNextPackageAsync() |
| 多客户端 | WaitForNextPackageAsync(sessionId) |
| 需要超时 | 配置 ResponseTimeoutMs |
| 错误处理 | 使用 try-catch 包裹 |
9.2 注意事项
总结
本文实现了一个生产级的 TCP 服务器,核心改进:
这是典型的从原型到生产级代码的演进,关键是在并发、异常、资源管理上做到完善。
版本: 2.0 更新内容: 新增 TcpRequestContext、请求-应答分离、超时机制
ps:TaskCompletionSource的详细用法可以参考这篇文章: 利用 TaskCompletionSource 在 SuperSocket 中实现跨模块异步处理客户端消息
网硕互联帮助中心





评论前必须登录!
注册