云计算百科
云计算领域专业知识百科平台

【C#高级】TCP服务器并发优化与错误处理改进实战

TCP服务器并发优化与请求-应答模式实战

前言

在工业视觉定位系统中,TCP服务器承担着多客户端通信的重要职责。本文以 TcpSvrService 类为例,深入解析如何实现:

  • 多客户端并发安全
  • 标准的请求-应答分离模式
  • 完善的错误处理与超时机制

一、架构概述

1.1 核心组件

组件说明
TcpSvrService 基于 SuperSocket 的 TCP 服务器
TcpRequestContext 请求上下文,封装请求数据和响应设置
ConcurrentDictionary 线程安全的并发任务管理
TaskCompletionSource 事件回调转异步等待

1.2 请求-应答流程

客户端发送请求 → 服务端创建上下文 → 外部获取请求 → 外部处理 → 设置应答 → 发送响应


二、核心设计:TcpRequestContext

请求上下文是实现请求-应答分离的关键:

public class TcpRequestContext
{
public string SessionId { get; } // 会话标识
public string RequestBody { get; } // 请求内容
public StringPackageInfo Package { get; } // 原始数据包

internal TaskCompletionSource<string> ResponseTcs { get; } = new();

/// <summary>
/// 设置响应内容,触发服务端发送应答
/// </summary>
public void SetResponse(string response)
{
ResponseTcs.TrySetResult(response);
}
}

设计要点:

  • ✅ 封装请求数据,外部可直接读取
  • ✅ SetResponse() 方法触发响应发送
  • ✅ ResponseTcs 使用 internal 修饰,外部无法绕过正常流程

三、并发安全设计

3.1 问题分析

原有设计的缺陷:

// ❌ 单例模式:多客户端时会相互覆盖
private TaskCompletionSource<object> _tcs;

public Task<object> WaitForNextPackageAsync()
{
_tcs = new TaskCompletionSource<object>(); // 后续调用会覆盖前一个
return _tcs.Task;
}

问题场景:

T1: 客户端A调用 → 创建 TCS_A
T2: 客户端B调用 → 创建 TCS_B(覆盖 TCS_A)❌
T3: 客户端A数据到达 → 错误地完成 TCS_B ❌

3.2 改进方案

使用 ConcurrentDictionary 按会话ID隔离:

// ✅ 字典管理,每个会话独立
private ConcurrentDictionary<string, TaskCompletionSource<TcpRequestContext>> _waitingTasks = new();
private ConcurrentDictionary<string, TcpRequestContext> _pendingRequests = new();
private const string GLOBAL_WAIT_KEY = "__GLOBAL_WAIT__";

3.3 等待方法实现

// 等待任意会话(向后兼容)
public Task<TcpRequestContext> WaitForNextPackageAsync()
{
var tcs = new TaskCompletionSource<TcpRequestContext>();
_waitingTasks.TryAdd(GLOBAL_WAIT_KEY, tcs);
return tcs.Task;
}

// 等待指定会话
public Task<TcpRequestContext> WaitForNextPackageAsync(string sessionId)
{
if (string.IsNullOrEmpty(sessionId))
throw new ArgumentException("SessionId cannot be null or empty");

var tcs = new TaskCompletionSource<TcpRequestContext>();
_waitingTasks.TryAdd(sessionId, tcs);
return tcs.Task;
}


四、请求-应答分离与超时机制

4.1 核心处理流程

private async ValueTask HandlePackageAsync(IAppSession session, StringPackageInfo package)
{
string sessionId = session.SessionID;
TcpRequestContext context = null;

try
{
// 1️⃣ 创建请求上下文
context = new TcpRequestContext(sessionId, package);
_pendingRequests.TryAdd(sessionId, context);

// 2️⃣ 触发预处理事件(可选)
if (OnPackageReceived != null)
await OnPackageReceived.Invoke(package);

// 3️⃣ 通知外部等待者
if (_waitingTasks.TryRemove(sessionId, out var sessionTcs))
sessionTcs.TrySetResult(context);
if (_waitingTasks.TryRemove(GLOBAL_WAIT_KEY, out var globalTcs))
globalTcs.TrySetResult(context);

// 4️⃣ 等待外部设置响应(带超时)
string response = await WaitForResponseAsync(context);

// 5️⃣ 发送应答
await session.SendAsync(Encoding.UTF8.GetBytes(response + TcpTerminator));
}
catch (Exception ex)
{
// 异常处理…
}
finally
{
_pendingRequests.TryRemove(sessionId, out _);
}
}

4.2 超时机制

/// <summary>
/// 响应超时时间(毫秒),默认30秒。设为0或负数表示不超时。
/// </summary>
public int ResponseTimeoutMs { get; set; } = 30000;

// 等待响应,带超时保护
private async Task<string> WaitForResponseAsync(TcpRequestContext context)
{
if (ResponseTimeoutMs > 0)
{
var responseTask = context.ResponseTcs.Task;
var timeoutTask = Task.Delay(ResponseTimeoutMs);
var completedTask = await Task.WhenAny(responseTask, timeoutTask);

if (completedTask == timeoutTask)
{
context.ResponseTcs.TrySetCanceled();
return "响应超时";
}
return await responseTask;
}
return await context.ResponseTcs.Task;
}


五、错误处理与资源清理

5.1 异常处理

catch (Exception ex)
{
// 完成等待任务,避免永久等待
if (_waitingTasks.TryRemove(sessionId, out var sessionTcs))
sessionTcs.TrySetException(ex);
if (_waitingTasks.TryRemove(GLOBAL_WAIT_KEY, out var globalTcs))
globalTcs.TrySetException(ex);

// 取消响应等待
context?.ResponseTcs.TrySetException(ex);

// 发送错误响应(嵌套保护)
try
{
await session.SendAsync(Encoding.UTF8.GetBytes($"处理错误: {ex.Message}" + TcpTerminator));
}
catch { /* 忽略发送失败 */ }
}

5.2 资源清理

public void DisConnect(CommunicationConfig config)
{
try
{
host?.StopAsync().Wait();
config.IsConnected = false;
}
finally
{
// 清理等待任务
foreach (var kvp in _waitingTasks)
kvp.Value.TrySetCanceled();
_waitingTasks.Clear();

// 清理待处理请求
foreach (var kvp in _pendingRequests)
kvp.Value.ResponseTcs.TrySetCanceled();
_pendingRequests.Clear();

host?.Dispose();
}
}


六、使用示例

6.1 基本使用:请求-应答模式

var tcpService = new TcpSvrService();
await tcpService.Connect(config);

// 等待客户端请求
var context = await tcpService.WaitForNextPackageAsync();

// 获取请求数据
string request = context.RequestBody;
Console.WriteLine($"收到请求: {request}");

// 处理请求,生成应答
string response = ProcessRequest(request);

// 设置应答,自动发送给客户端
context.SetResponse(response);

6.2 指定会话等待

// 等待特定客户端的请求
var context = await tcpService.WaitForNextPackageAsync("session-123");
string request = context.RequestBody;

// 处理后设置应答
context.SetResponse($"处理完成: {request}");

6.3 配置超时

// 设置响应超时时间
tcpService.ResponseTimeoutMs = 60000; // 60秒
tcpService.ResponseTimeoutMs = 0; // 不超时(谨慎使用)

6.4 错误处理

try
{
var context = await tcpService.WaitForNextPackageAsync(sessionId);
context.SetResponse("OK");
}
catch (OperationCanceledException)
{
Console.WriteLine("请求被取消");
}
catch (Exception ex)
{
Console.WriteLine($"处理失败: {ex.Message}");
}


七、设计对比

维度改进前改进后
并发支持 单例 TCS,多客户端覆盖 字典管理,会话隔离
请求-应答 混合处理 分离模式,清晰可控
超时保护 可配置超时
错误处理 无捕获,任务悬挂 完整处理,资源清理
向后兼容 ✅ 保留无参数版本

八、流程图

┌─────────────────────────────────────────────────────────┐
│ 客户端发送请求 │
└──────────────────────────┬──────────────────────────────┘

┌─────────────────────────────────────────────────────────┐
│ HandlePackageAsync 接收数据包 │
│ 1. 创建 TcpRequestContext │
│ 2. 存入 _pendingRequests │
└──────────────────────────┬──────────────────────────────┘

┌─────────────────────────────────────────────────────────┐
│ 通知外部等待者 │
│ • 完成会话级等待任务 │
│ • 完成全局等待任务 │
└──────────────────────────┬──────────────────────────────┘

┌─────────────────────────────────────────────────────────┐
│ 等待外部调用 context.SetResponse() │
│ • 带超时保护 │
└──────────────────────────┬──────────────────────────────┘

┌─────────────────────────────────────────────────────────┐
│ 发送应答给客户端 │
└─────────────────────────────────────────────────────────┘


九、最佳实践

9.1 使用建议

场景推荐方式
单客户端 WaitForNextPackageAsync()
多客户端 WaitForNextPackageAsync(sessionId)
需要超时 配置 ResponseTimeoutMs
错误处理 使用 try-catch 包裹

9.2 注意事项

  • 必须调用 SetResponse:收到请求后必须设置响应,否则触发超时
  • 一问一答模式:当前设计适合请求-响应模式,不适合流式处理
  • 及时清理:不再使用时调用 DisConnect() 释放资源

  • 总结

    本文实现了一个生产级的 TCP 服务器,核心改进:

  • TcpRequestContext:封装请求-应答,逻辑清晰
  • 并发安全:ConcurrentDictionary + 会话隔离
  • 超时机制:防止请求永久等待
  • 完整的错误处理:异常捕获 + 资源清理
  • 这是典型的从原型到生产级代码的演进,关键是在并发、异常、资源管理上做到完善。


    版本: 2.0 更新内容: 新增 TcpRequestContext、请求-应答分离、超时机制

    ps:TaskCompletionSource的详细用法可以参考这篇文章: 利用 TaskCompletionSource 在 SuperSocket 中实现跨模块异步处理客户端消息

    赞(0)
    未经允许不得转载:网硕互联帮助中心 » 【C#高级】TCP服务器并发优化与错误处理改进实战
    分享到: 更多 (0)

    评论 抢沙发

    评论前必须登录!