云计算百科
云计算领域专业知识百科平台

SPICE源码分析(八):Red Worker与主服务器(Reds)实现分析

本文分析SPICE服务器的两个核心组件:负责图形处理的RedWorker和负责连接管理的主服务器Reds。

背景与目标

SPICE服务器采用多线程架构:

  • 主线程(Reds):处理连接管理、认证、Agent通信、输入和音频通道
  • Worker线程(RedWorker):专门处理图形密集操作,每个QXL设备一个

这种设计将耗时的图形处理与连接管理分离,提高整体响应性。

整体架构

在这里插入图片描述

上图展示了SPICE服务器的完整线程架构。这种设计的核心思想是将低延迟敏感的操作(如输入处理、连接管理)放在主线程,而将CPU密集型操作(如图像压缩、视频编码)放在独立的Worker线程中。

主线程职责

主线程运行在QEMU的主事件循环中,负责:

  • 连接生命周期管理:监听端口、接受新连接、处理SSL握手和SASL认证
  • MainChannel处理:协议协商、能力交换、客户端状态管理
  • InputsChannel处理:键盘鼠标事件需要低延迟响应
  • 音频通道处理:PlaybackChannel和RecordChannel,音频也是延迟敏感的
  • Agent通信:通过VDIPort设备与Guest Agent交互
  • 迁移协调:管理VM热迁移过程中的状态转移

Worker线程职责

每个QXL虚拟显卡设备对应一个Worker线程,这允许多显示器配置下的并行处理:

  • DisplayChannel处理:图形命令的解析、压缩和发送
  • CursorChannel处理:光标图像和位置更新
  • QXL命令处理:从Ring Buffer读取并处理QXL命令
  • 图像压缩:根据图像特征选择并执行压缩算法
  • 视频流管理:检测视频区域并进行视频编码

Reds(主服务器)实现

核心数据结构

RedsState是整个SPICE服务器的核心状态结构,包含了服务器运行所需的所有全局状态。

// reds-private.h
struct RedsState {
// ===== 服务器配置 =====
RedServerConfig *config; // 运行时配置
// 包括端口、加密、压缩策略等

// ===== 事件循环集成 =====
SpiceCoreInterfaceInternal core; // QEMU提供的事件循环接口
// 定时器、文件描述符监视等

// ===== 网络监听 =====
RedServerListeners *listeners; // 监听器列表
// 支持同时监听普通和SSL端口

// ===== 通道管理 =====
MainChannel *main_channel; // 主通道(协议协商)
InputsChannel *inputs_channel; // 输入通道(键盘鼠标)
GList *channels; // 所有注册的通道列表
// 包括Display、Cursor、Sound等

// ===== 客户端管理 =====
GList *clients; // 已连接的客户端列表
int num_clients; // 客户端数量
// 用于快速检查是否有客户端

// ===== VDAgent通信 =====
SpiceCharDeviceInstance *vdagent; // Agent字符设备实例
red::shared_ptr<RedCharDeviceVDIPort> agent_dev; // Agent设备管理器
int agent_attached; // Agent是否连接
uint32_t agent_state; // Agent状态机

// ===== QXL显卡设备 =====
GList *qxl_instances; // QXL设备实例列表
// 每个QXL对应一个显示器

// ===== 鼠标模式 =====
SpiceMouseMode mouse_mode; // 当前鼠标模式
// SERVER或CLIENT

// ===== VM迁移 =====
int mig_target; // 是否是迁移目标
int mig_seamless_connect; // 是否无缝迁移
RedsMigTargetClients mig_target_clients; // 迁移中的客户端

// ===== 性能统计 =====
SpiceStatFile *stat_file; // 统计文件
// 可通过工具查看运行时统计

// ===== 其他 =====
uint32_t mm_time; // 多媒体时间戳
// 用于音视频同步
bool allow_multiple_clients; // 是否允许多客户端
};

RedsState在架构中的位置:

RedsState ├── config (配置) ├── core (事件循环) ├── main_channel → MainChannel ├── inputs_channel → InputsChannel ├── channels → [DisplayChannel, CursorChannel, …] ├── clients → [RedClient1, RedClient2, …] └── qxl_instances → [QXL0 → Worker0, QXL1 → Worker1, …]

服务器初始化

SPICE服务器初始化分为两个阶段:spice_server_new创建配置,spice_server_init启动服务。

// reds.cpp
// 阶段1:创建服务器实例和默认配置
SpiceServer *spice_server_new(void)
{
// ===== 分配主状态结构 =====
// g_new0确保内存被清零
RedsState *reds = g_new0(RedsState, 1);

// ===== 初始化默认配置 =====
reds->config = g_new0(RedServerConfig, 1);

// 视频流策略:FILTER(只编码检测为视频的区域)
// 这是最优的默认值,平衡性能和画质
reds->config->streaming_video = SPICE_STREAM_VIDEO_FILTER;

// 图像压缩:AUTO_GLZ(自动选择QUIC或GLZ)
// QUIC适合渐变图像,GLZ适合人工图像
reds->config->image_compression = SPICE_IMAGE_COMPRESSION_AUTO_GLZ;

// 音频压缩:启用OPUS编码
reds->config->playback_compression = TRUE;

// Agent功能:全部启用
reds->config->agent_mouse = TRUE; // Agent鼠标模式
reds->config->agent_copypaste = TRUE; // 剪贴板共享
reds->config->agent_file_xfer = TRUE; // 文件拖放

// ===== 初始化视频编解码器列表 =====
// 动态数组,后续可以添加多种编解码器
reds->config->video_codecs = g_array_new(FALSE, FALSE,
sizeof(RedVideoCodec));

// ===== 初始化内部Core接口 =====
// 包装QEMU提供的事件循环接口
reds_init_core_interface(reds);

// ===== 注册到全局服务器列表 =====
// 支持多服务器实例(多个VM)
// 需要加锁保护全局列表
pthread_mutex_lock(&global_reds_lock);
servers = g_list_prepend(servers, reds);
pthread_mutex_unlock(&global_reds_lock);

return reds;
}

// 阶段2:启动服务,创建核心通道
int spice_server_init(SpiceServer *reds, SpiceCoreInterface *core)
{
// ===== 保存QEMU提供的Core接口 =====
// Core接口包含定时器、watch等事件循环功能
reds_set_core_interface(reds, core);

// ===== 创建核心通道 =====
// MainChannel:协议协商、能力交换、迁移协调
// 是所有客户端必须首先连接的通道
reds->main_channel = new MainChannel(reds);

// InputsChannel:键盘鼠标输入
// 需要低延迟,运行在主线程
reds->inputs_channel = new InputsChannel(reds);

// ===== 初始化Agent通信 =====
// Agent运行在Guest OS中,提供增强功能
// 如:剪贴板共享、分辨率调整等
reds_agent_init(reds);

// ===== 初始化统计 =====
#ifdef RED_STATISTICS
// 创建共享内存统计文件
// 可通过外部工具读取运行时性能数据
reds->stat_file = stat_file_new(REDS_MAX_STAT_NODES);
#endif

return 0;
}

为什么分两阶段初始化?

  • 配置阶段:spice_server_new后可以调用spice_server_set_xxx系列函数修改配置
  • 启动阶段:spice_server_init后配置生效,开始监听连接
  • spice_server_new() ↓ spice_server_set_port(5900) spice_server_set_image_compression(LZ4) spice_server_set_streaming_video(ALL) ↓ spice_server_init(core) // 配置锁定,服务启动

    连接监听

    连接监听是服务器与客户端建立通信的第一步,采用异步事件驱动模型。

    // reds.cpp
    // 创建并启动监听Socket
    int spice_server_open_listen_socket(SpiceServer *reds, int port, int flags)
    {
    // ===== 创建TCP Socket =====
    // AF_INET: IPv4, SOCK_STREAM: TCP协议
    int listen_socket = socket(AF_INET, SOCK_STREAM, 0);

    // ===== 设置SO_REUSEADDR =====
    // 允许服务器重启后立即重用端口
    // 否则需要等待TIME_WAIT状态超时(约2分钟)
    int on = 1;
    setsockopt(listen_socket, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));

    // ===== 绑定到指定端口 =====
    struct sockaddr_in addr;
    addr.sin_family = AF_INET;
    addr.sin_port = htons(port); // 网络字节序
    addr.sin_addr.s_addr = INADDR_ANY; // 监听所有网卡

    bind(listen_socket, (struct sockaddr *)&addr, sizeof(addr));

    // ===== 开始监听 =====
    // SOMAXCONN: 系统允许的最大等待连接队列长度
    listen(listen_socket, SOMAXCONN);

    // ===== 注册到事件循环 =====
    // 当有新连接时,reds_accept_connection会被调用
    // 这是SPICE的核心设计:异步非阻塞I/O
    reds->core.watch_add(&reds->core, listen_socket, SPICE_WATCH_EVENT_READ,
    reds_accept_connection, reds);

    return listen_socket;
    }

    // 新连接到来时的回调
    static void reds_accept_connection(int fd, int event, void *opaque)
    {
    RedsState *reds = (RedsState *)opaque;

    // ===== 接受新连接 =====
    struct sockaddr_in peer_addr;
    socklen_t peer_len = sizeof(peer_addr);
    int socket = accept(fd, (struct sockaddr *)&peer_addr, &peer_len);

    if (socket < 0) {
    return; // 接受失败,可能是临时错误
    }

    // ===== 配置Socket选项 =====
    // 非阻塞:配合事件循环使用,不能阻塞主线程
    red_socket_set_non_blocking(socket, TRUE);

    // TCP_NODELAY:禁用Nagle算法
    // 减少小数据包的发送延迟,对交互式应用很重要
    red_socket_set_no_delay(socket, TRUE);

    // ===== 创建RedStream =====
    // RedStream是对Socket的封装,支持:
    // – 普通TCP
    // – SSL/TLS加密
    // – SASL认证
    RedStream *stream = red_stream_new(reds, socket);

    // ===== 创建链接上下文 =====
    // 保存连接建立过程中的临时状态
    RedLinkInfo *link = g_new0(RedLinkInfo, 1);
    link->reds = reds;
    link->stream = stream;

    // ===== 启动协议握手 =====
    // 异步读取客户端发送的SpiceLinkMess
    // 包含:版本信息、请求的通道类型、能力标志等
    reds_handle_new_link(link);
    }

    连接建立流程:

    在这里插入图片描述

    通道链接

    SPICE使用多通道设计,每种功能(显示、输入、音频)使用独立的TCP连接。所有连接必须先建立MainChannel,然后才能连接其他通道。

    // reds.cpp
    // 处理客户端通道连接请求
    static void reds_handle_link(RedLinkInfo *link)
    {
    RedsState *reds = link->reds;
    SpiceLinkMess *link_mess = link->link_mess;

    // ===== 版本检查 =====
    // SPICE协议有主版本和次版本
    // 主版本不同表示协议不兼容
    if (link_mess->major_version != SPICE_VERSION_MAJOR) {
    reds_link_error(link, SPICE_LINK_ERR_VERSION_MISMATCH);
    return;
    }

    // ===== MainChannel特殊处理 =====
    // MainChannel是第一个建立的通道
    // 用于协议协商、能力交换、分配connection_id
    if (link_mess->channel_type == SPICE_CHANNEL_MAIN) {
    reds_handle_main_link(reds, link);
    return; // 创建新客户端
    }

    // ===== 验证connection_id =====
    // 其他通道必须携带MainChannel分配的ID
    // 这确保所有通道属于同一个客户端会话
    RedClient *client = reds_find_client_by_link_id(
    reds, link_mess->connection_id);

    if (!client) {
    // 无效ID,可能是攻击或客户端Bug
    reds_link_error(link, SPICE_LINK_ERR_BAD_CONNECTION_ID);
    return;
    }

    // ===== 查找目标通道 =====
    // channel_type: 通道类型(Display=1, Inputs=2, …)
    // channel_id: 通道实例ID(多显示器时有多个Display)
    RedChannel *channel = reds_find_channel(
    reds, link_mess->channel_type, link_mess->channel_id);

    if (!channel) {
    // 请求的通道不存在
    // 可能是服务器配置不支持,或通道尚未创建
    reds_link_error(link, SPICE_LINK_ERR_CHANNEL_NOT_AVAILABLE);
    return;
    }

    // ===== 建立通道客户端连接 =====
    // 将TCP连接(stream)绑定到通道
    // 创建对应的ChannelClient实例
    // link_mess->caps: 客户端声明的能力标志
    channel->on_connect(client, link->stream,
    link_mess->connection_id != 0, // 是否是迁移
    &link_mess->caps);

    // 连接成功,释放临时结构
    reds_link_free(link);
    }

    为什么使用多通道设计?

  • 并行处理:不同类型的数据可以并行传输
  • QoS控制:可以为不同通道设置不同优先级
  • 独立流控:一个通道拥塞不影响其他通道
  • 灵活性:客户端可以选择只连接需要的通道
  • 通道注册

    通道注册机制使服务器能够动态管理通道,并在通道可用时通知客户端。

    // reds.cpp
    // 注册新通道到服务器
    void reds_register_channel(RedsState *reds, RedChannel *channel)
    {
    spice_assert(reds);

    // ===== 添加到全局通道列表 =====
    // g_list_prepend: O(1)操作,添加到列表头部
    // 通道创建后必须注册,否则客户端无法连接
    reds->channels = g_list_prepend(reds->channels, channel);

    // ===== 通知MainChannel =====
    // MainChannel需要知道所有可用通道
    // 用于回复客户端的通道查询
    if (reds->main_channel) {
    reds->main_channel->registered_new_channel(channel);
    }

    // ===== 通知已连接的客户端 =====
    // 如果有客户端已经连接,告诉他们新通道可用
    // 客户端收到后可以选择连接这个新通道
    GList *l;
    for (l = reds->clients; l; l = l->next) {
    RedClient *client = (RedClient *)l->data;

    // 获取客户端的MainChannelClient
    MainChannelClient *mcc = client->get_main();

    if (mcc) {
    // 发送SPICE_MSG_MAIN_CHANNELS_LIST或类似消息
    // 通知客户端有新通道
    mcc->push_init_channel(channel);
    }
    }
    }

    // 查找指定类型和ID的通道
    RedChannel *reds_find_channel(RedsState *reds, uint32_t type, uint32_t id)
    {
    GList *l;

    // ===== 遍历通道列表 =====
    // 简单的线性查找
    // 通道数量通常很少(<10),不需要哈希表
    for (l = reds->channels; l; l = l->next) {
    RedChannel *channel = (RedChannel *)l->data;

    // ===== 匹配类型和ID =====
    // type: SPICE_CHANNEL_MAIN, SPICE_CHANNEL_DISPLAY等
    // id: 通道实例编号(多显示器时有多个Display通道)
    if (channel->type() == type && channel->id() == id) {
    return channel;
    }
    }

    return NULL; // 未找到
    }

    通道注册时机:

    服务器启动 │ ├─ spice_server_init() │ ├─ new MainChannel() → reds_register_channel() │ └─ new InputsChannel() → reds_register_channel() │ ├─ QXL设备初始化(VM启动时) │ ├─ red_qxl_init() │ │ ├─ new DisplayChannel() → reds_register_channel() │ │ └─ new CursorChannel() → reds_register_channel() │ └─ 音频设备初始化 ├─ new PlaybackChannel() → reds_register_channel() └─ new RecordChannel() → reds_register_channel()

    通道查找场景:

    场景typeid
    主通道 SPICE_CHANNEL_MAIN 0
    单显示器 SPICE_CHANNEL_DISPLAY 0
    第二显示器 SPICE_CHANNEL_DISPLAY 1
    输入 SPICE_CHANNEL_INPUTS 0
    音频播放 SPICE_CHANNEL_PLAYBACK 0

    RedWorker 实现

    核心数据结构

    RedWorker封装了Worker线程的所有状态,每个QXL设备对应一个Worker实例。

    // red-worker.cpp
    struct RedWorker {
    // ===== 线程管理 =====
    pthread_t thread; // Worker线程句柄
    QXLInstance *qxl; // 关联的QXL设备
    // 通过qxl访问设备接口和命令环
    SpiceWatch *dispatch_watch; // 调度器FD的事件监视器
    // 当主线程发送消息时触发
    SpiceCoreInterfaceInternal core; // 线程专用的事件循环接口
    // 与主线程使用不同的实例

    unsigned int event_timeout; // 事件等待超时(毫秒)
    // INF_EVENT_WAIT表示无限等待

    // ===== 通道管理 =====
    DisplayChannel *display_channel; // 显示通道
    // 处理图形命令和图像传输
    CursorChannel *cursor_channel; // 光标通道
    // 处理光标图像和位置

    // ===== 命令轮询 =====
    uint32_t display_poll_tries; // 显示命令轮询重试计数
    uint32_t cursor_poll_tries; // 光标命令轮询重试计数
    gboolean was_blocked; // 上次循环是否因管道满而阻塞
    // 轮询机制用于减少CPU空转:
    // 命令环为空时,先重试几次,然后等待事件通知

    // ===== 内存管理 =====
    RedMemSlotInfo mem_slots; // QXL内存槽信息
    // 用于将Guest地址转换为Host地址

    // ===== 处理追踪 =====
    uint32_t process_display_generation; // 处理"代数"
    // 每轮处理递增,用于避免重复处理

    // ===== 性能统计 =====
    RedStatNode stat; // 统计根节点
    RedStatCounter wakeup_counter; // 唤醒次数
    RedStatCounter command_counter; // 处理的命令数
    RedStatCounter full_loop_counter; // 完整处理循环数
    RedStatCounter total_loop_counter; // 总循环数

    // ===== 驱动能力 =====
    bool driver_cap_monitors_config; // 驱动是否支持显示器配置

    // ===== 调试功能 =====
    RedRecord *record; // 命令录制器
    // 可录制QXL命令用于回放调试

    // ===== 事件循环 =====
    GMainLoop *loop; // GLib主循环
    // Worker线程的事件分发基础
    };

    Worker线程的工作模式:

    GMainLoop运行 │ ├─ 收到Dispatcher消息 → 处理主线程请求(连接、断开等) │ ├─ 定时器触发 → 处理QXL命令环 │ └─ 读取命令 → 处理命令 → 发送到客户端 │ └─ 网络可写 → 发送队列中的数据

    Worker创建

    Worker的创建是SPICE多线程架构的关键步骤,每个QXL设备对应一个独立的Worker线程。

    // red-qxl.cpp
    // 当QXL设备被添加到QEMU时调用
    // 这通常发生在VM启动或热插拔显卡时
    void red_qxl_init(RedsState *reds, QXLInstance *qxl)
    {
    // ===== 创建QXL状态结构 =====
    // QXLState是SPICE对QXL设备的私有数据
    // 存储在qxl->st中,供后续使用
    QXLState *qxl_state = g_new0(QXLState, 1);
    qxl->st = qxl_state;

    // ===== 设置关联 =====
    qxl_state->reds = reds; // 指向主服务器
    qxl_state->id = qxl->id; // 设备ID(0, 1, 2…)

    // ===== 创建Dispatcher =====
    // Dispatcher是主线程和Worker线程之间的消息队列
    // WORKER_MESSAGES_NUM是消息类型的数量
    // 内部使用socketpair实现
    qxl_state->dispatcher = dispatcher_new(WORKER_MESSAGES_NUM);

    // ===== 创建Worker实例 =====
    // Worker包含线程状态和相关通道
    qxl_state->worker = red_worker_new(qxl);

    // ===== 注册到服务器 =====
    // 支持多显卡配置
    reds->qxl_instances = g_list_prepend(reds->qxl_instances, qxl);

    // ===== 启动Worker线程 =====
    // 从这里开始,Worker线程独立运行
    // 通过Dispatcher与主线程通信
    pthread_create(&qxl_state->worker->thread, NULL,
    red_worker_main, qxl_state->worker);
    }

    // 创建Worker实例(在主线程执行)
    RedWorker *red_worker_new(QXLInstance *qxl)
    {
    RedWorker *worker = g_new0(RedWorker, 1);

    worker->qxl = qxl;

    // ===== 初始化事件超时 =====
    // INF_EVENT_WAIT表示无限等待
    // 有命令或连接时会被唤醒
    worker->event_timeout = INF_EVENT_WAIT;

    // ===== 初始化内存槽 =====
    // QXL使用"内存槽"机制访问Guest内存
    // Guest地址 → 内存槽 → Host地址
    // 这是虚拟化的关键:隔离Guest和Host地址空间
    red_memslot_info_init(&worker->mem_slots,
    qxl_get_memslot_generation(qxl), // 槽代数(热插拔时递增)
    qxl_get_id(qxl), // 设备ID
    qxl_get_n_memslots(qxl), // 槽数量
    qxl_get_memslot_slot_gen_bits(qxl),// 代数位数
    qxl_get_memslot_addr_gen_bits(qxl),// 地址位数
    qxl_get_memslot_internal_mask(qxl));// 内部掩码

    // ===== 创建DisplayChannel =====
    // qxl->id作为通道ID,支持多显示器
    // &worker->core: Worker线程的事件循环接口
    worker->display_channel = new DisplayChannel(
    qxl_get_server(qxl), qxl->id, &worker->core);

    // ===== 创建CursorChannel =====
    // 光标通道与显示通道配对
    // 处理鼠标光标的图像和位置
    worker->cursor_channel = new CursorChannel(
    qxl_get_server(qxl), qxl->id, &worker->core);

    return worker;
    }

    Worker创建流程:

    在这里插入图片描述

    为什么每个QXL对应一个Worker?

  • 并行处理:多显示器可以并行压缩和发送图像
  • 隔离性:一个显示器的处理不阻塞其他显示器
  • 简单性:每个Worker只需处理一个QXL设备
  • 可扩展:易于支持更多显示器
  • Worker主循环

    Worker线程的入口函数,负责初始化线程环境并运行事件循环。

    // red-worker.cpp
    // Worker线程入口点
    static void *red_worker_main(void *arg)
    {
    RedWorker *worker = (RedWorker *)arg;

    // ===== 设置线程名称 =====
    // 方便调试和监控(如top -H、ps -T)
    pthread_setname_np(pthread_self(), "SPICE Worker");

    // ===== 创建GLib主循环 =====
    // GLib提供跨平台的事件循环实现
    // 参数:NULL=默认上下文,FALSE=不是正在运行
    worker->loop = g_main_loop_new(NULL, FALSE);

    // ===== 注册调度器事件源 =====
    // 监听Dispatcher的接收端FD
    // 当主线程发送消息时,这个FD变得可读
    SpiceWatch *watch = red_watch_add(
    &worker->core,
    dispatcher_get_recv_fd(worker->qxl->st->dispatcher), // socketpair的一端
    SPICE_WATCH_EVENT_READ, // 监听可读事件
    red_worker_dispatch, // 回调函数
    worker // 用户数据
    );
    worker->dispatch_watch = watch;

    // ===== 运行事件循环 =====
    // 这个调用会阻塞,直到g_main_loop_quit被调用
    // 循环中会分发:
    // – 调度器消息(来自主线程)
    // – 定时器事件(命令处理定时器)
    // – 网络事件(客户端连接的读写)
    g_main_loop_run(worker->loop);

    // ===== 清理 =====
    g_main_loop_unref(worker->loop);

    return NULL;
    }

    Worker线程生命周期:

    在这里插入图片描述

    命令处理

    命令处理是Worker线程的核心工作:从QXL命令环读取Guest发出的图形命令并处理。

    // red-worker.cpp
    // 处理显示命令,返回处理的命令数量
    static int red_process_display(RedWorker *worker, int *ring_is_empty)
    {
    QXLCommandExt ext_cmd; // 扩展命令结构,包含命令数据和元信息
    int n = 0; // 处理的命令计数

    // ===== 检查设备状态 =====
    // QXL设备可能被暂停(如VM暂停)
    if (!red_qxl_is_running(worker->qxl)) {
    *ring_is_empty = TRUE;
    return n;
    }

    // ===== 更新统计和代数 =====
    stat_inc_counter(worker->total_loop_counter, 1);

    // process_display_generation用于避免重复处理
    // 每轮递增,Drawable会记录创建时的代数
    worker->process_display_generation++;

    *ring_is_empty = FALSE;

    // ===== 主处理循环 =====
    // 两个退出条件:
    // 1. 客户端管道满(背压)
    // 2. 命令环为空
    while (worker->display_channel->max_pipe_size() <= MAX_PIPE_SIZE) {
    // ===== 从命令环获取命令 =====
    // QXL命令环是Guest和SPICE之间的共享内存队列
    if (!red_qxl_get_command(worker->qxl, &ext_cmd)) {
    *ring_is_empty = TRUE;

    // ===== 轮询重试机制 =====
    // 命令环可能暂时为空但很快会有新命令
    // 先轮询几次,减少不必要的等待
    if (worker->display_poll_tries < CMD_RING_POLL_RETRIES) {
    // 设置短超时,很快再次检查
    worker->event_timeout = MIN(worker->event_timeout,
    CMD_RING_POLL_TIMEOUT);
    } else if (worker->display_poll_tries == CMD_RING_POLL_RETRIES &&
    !red_qxl_req_cmd_notification(worker->qxl)) {
    // 请求中断通知,但Guest刚好放入了命令
    // 继续处理
    continue;
    }
    worker->display_poll_tries++;
    return n;
    }

    // ===== 命令录制(调试功能) =====
    // 可以录制所有QXL命令用于回放调试
    if (worker->record) {
    red_record_qxl_command(worker->record, &worker->mem_slots, ext_cmd);
    }

    stat_inc_counter(worker->command_counter, 1);
    worker->display_poll_tries = 0; // 重置轮询计数

    // ===== 命令分发 =====
    switch (ext_cmd.cmd.type) {
    case QXL_CMD_DRAW: {
    // ===== 绘图命令 =====
    // 最常见的命令:填充、复制、透明混合等
    // 将QXL格式转换为SPICE内部格式
    auto red_drawable = red_drawable_new(
    worker->qxl, &worker->mem_slots,
    ext_cmd.group_id, ext_cmd.cmd.data,
    ext_cmd.flags);

    if (red_drawable) {
    // 处理绘图:构建命令树,检测视频流等
    // std::move避免不必要的拷贝
    display_channel_process_draw(
    worker->display_channel,
    std::move(red_drawable),
    worker->process_display_generation);
    }
    break;
    }

    case QXL_CMD_UPDATE: {
    // ===== 更新命令 =====
    // 强制刷新指定区域,通常用于:
    // – 视频播放(直接写入framebuffer的场景)
    // – 某些特殊绘图操作
    auto update_cmd = red_update_cmd_new(
    worker->qxl, &worker->mem_slots,
    ext_cmd.group_id, ext_cmd.cmd.data);

    if (update_cmd) {
    display_channel_process_update(
    worker->display_channel, std::move(update_cmd));
    }
    break;
    }

    case QXL_CMD_MESSAGE: {
    // ===== 消息命令 =====
    // Guest驱动发送的调试消息
    // 输出到日志,帮助调试驱动问题
    auto message = red_message_new(
    worker->qxl, &worker->mem_slots,
    ext_cmd.group_id, ext_cmd.cmd.data);

    if (message) {
    // 释放Guest资源
    red_qxl_release_resource(worker->qxl, message->release_info_ext);
    }
    break;
    }

    case QXL_CMD_SURFACE: {
    // ===== Surface命令 =====
    // 创建或销毁Surface
    // Surface是绘图的目标画布
    red_process_surface_cmd(worker, &ext_cmd, FALSE);
    break;
    }

    default:
    spice_warning("bad command type %d", ext_cmd.cmd.type);
    }

    n++;
    }

    // 循环因管道满而退出
    worker->was_blocked = TRUE;
    return n;
    }

    命令处理流程图: 在这里插入图片描述

    调度器消息处理

    调度器消息是主线程向Worker线程发送请求的机制,Worker通过事件循环异步处理这些消息。

    // red-worker.cpp
    // Dispatcher FD可读时的回调(有消息到达)
    static void red_worker_dispatch(int fd, int event, void *opaque)
    {
    RedWorker *worker = (RedWorker *)opaque;
    Dispatcher *dispatcher = worker->qxl->st->dispatcher;

    // ===== 处理到达的消息 =====
    // dispatcher_handle会:
    // 1. 从socketpair读取消息
    // 2. 根据消息类型调用对应处理函数
    // 3. 发送ACK(如果是同步消息)
    dispatcher_handle(dispatcher);
    }

    // ===== 消息类型定义 =====
    // 这些消息类型覆盖了主线程需要通知Worker的所有场景
    enum {
    // —– 基本控制 —–
    WORKER_MESSAGE_WAKEUP, // 唤醒Worker处理命令
    WORKER_MESSAGE_OOM, // 内存不足警告
    WORKER_MESSAGE_READY, // Worker初始化完成确认

    // —– 连接管理 —–
    WORKER_MESSAGE_DISPLAY_CONNECT, // 新客户端连接Display通道
    WORKER_MESSAGE_DISPLAY_DISCONNECT, // 客户端断开Display通道
    WORKER_MESSAGE_DISPLAY_MIGRATE, // 迁移相关

    // —– 设备控制 —–
    WORKER_MESSAGE_START, // 启动处理(VM启动/恢复)
    WORKER_MESSAGE_STOP, // 停止处理(VM暂停)
    WORKER_MESSAGE_LOADVM_COMMANDS, // 加载VM快照中的命令

    // —– 配置更新 —–
    WORKER_MESSAGE_MONITORS_CONFIG_ASYNC, // 显示器配置变更
    WORKER_MESSAGE_DRIVER_UNLOAD, // 驱动卸载通知

    WORKER_MESSAGES_NUM, // 消息类型总数
    };

    // ===== 处理显示通道连接 =====
    // 当客户端连接DisplayChannel时,主线程发送此消息
    static void handle_display_connect(void *opaque, void *payload)
    {
    RedWorker *worker = (RedWorker *)opaque;
    WorkerMessageDisplayConnect *msg = (WorkerMessageDisplayConnect *)payload;

    // ===== 在Worker线程上下文中处理连接 =====
    // 这很重要:DisplayChannel的所有操作都在Worker线程执行
    // 避免了主线程和Worker线程竞争通道数据结构
    worker->display_channel->on_connect(
    msg->client, // 客户端对象
    msg->stream, // TCP连接流
    msg->migration, // 是否是迁移连接
    msg->caps); // 客户端能力

    // 释放深拷贝的能力数据
    g_free(msg->caps);
    }

    消息处理架构:

    步骤线程操作
    1 主线程 客户端连接DisplayChannel
    2 主线程 调用dispatcher_send(DISPLAY_CONNECT)
    3 消息通过socketpair传输
    4 Worker线程 red_worker_dispatch()被触发
    5 Worker线程 dispatcher_handle()读取消息类型
    6 Worker线程 查找处理函数handlers[DISPLAY_CONNECT]
    7 Worker线程 执行handle_display_connect()
    8 Worker线程 写入ACK,主线程继续

    为什么连接要在Worker线程处理?

    方案优点缺点
    主线程处理 简单 需要加锁保护通道数据
    Worker线程处理 无锁、安全 需要跨线程消息

    SPICE选择Worker线程处理,因为DisplayChannel的所有数据结构都由Worker线程独占访问,无需同步开销。

    主线程与Worker通信

    Dispatcher机制

    主线程和Worker线程之间通过Dispatcher进行安全的跨线程通信。Dispatcher基于Unix socketpair实现,提供了同步和异步两种消息发送模式:

    同步模式(ACK_SYNC):发送者发送消息后阻塞等待,直到Worker线程处理完成并返回确认。这用于需要确保操作完成才能继续的场景,如客户端连接建立。

    异步模式(ACK_ASYNC):发送者发送消息后立即返回,不等待处理完成。这用于不需要立即反馈的场景,如唤醒Worker处理新命令。

    // dispatcher.cpp
    // Dispatcher类实现线程间的消息传递
    class Dispatcher {
    public:
    // ===== 同步消息发送 =====
    // 发送消息并阻塞等待Worker处理完成
    // 用于:客户端连接、断开等需要确保完成的操作
    void send_message(uint32_t type, void *payload);

    // ===== 异步消息发送 =====
    // 发送消息后立即返回,不等待
    // 用于:唤醒、通知等不需要确认的操作
    void send_message_async(uint32_t type, void *payload);

    // ===== 事件循环集成 =====
    // 返回接收端FD,用于注册到事件循环
    int get_recv_fd();

    // ===== 消息处理 =====
    // 从队列读取并处理一个消息
    bool handle_message();

    private:
    int send_fd; // socketpair发送端
    int recv_fd; // socketpair接收端
    DispatcherMessage handlers[MAX_MESSAGES]; // 消息处理器数组
    };

    // 同步发送消息的实现
    void Dispatcher::send_message(uint32_t type, void *payload)
    {
    // ===== 写入消息到socketpair =====
    // 消息格式简单:[类型][载荷指针]
    uint32_t msg[2] = { type, (uint32_t)payload };
    write(send_fd, msg, sizeof(msg));

    // ===== 等待处理完成确认 =====
    // Worker处理完消息后会写入一个字节的ACK
    // 这里阻塞读取,实现同步语义
    uint8_t ack;
    read(recv_fd, &ack, 1);
    // 此时可以安全地认为Worker已经处理完成
    }

    为什么使用socketpair而不是锁?

  • 简单性:消息传递比共享内存+锁更容易理解和调试
  • 事件驱动:socketpair的FD可以直接用于epoll/poll
  • 无死锁:消息队列不会产生复杂的锁依赖
  • 性能:对于SPICE的使用场景,socketpair开销可以接受
  • 跨线程操作示例

    以DisplayChannel连接为例,展示完整的跨线程操作流程。

    // ===== 主线程:发起连接请求 =====
    // 这个函数在主线程执行,当客户端请求连接DisplayChannel时调用
    void red_qxl_display_connect(QXLInstance *qxl, RedClient *client,
    RedStream *stream, int migration,
    RedChannelCapabilities *caps)
    {
    // ===== 构造消息载荷 =====
    // 注意:这是栈上变量,但内容会被复制到消息队列
    WorkerMessageDisplayConnect msg = {
    .client = client, // 客户端对象指针(主线程创建)
    .stream = stream, // TCP连接流(主线程创建)
    .migration = migration, // 是否迁移连接

    // ===== 深拷贝能力数据 =====
    // 能力数据在栈上,必须拷贝一份
    // Worker线程处理时原始数据可能已失效
    .caps = red_channel_capabilities_dup(caps),
    };

    // ===== 发送消息到Worker线程 =====
    // dispatcher_send_message是同步调用:
    // 1. 写入消息到socketpair
    // 2. 阻塞等待Worker处理完成
    // 3. 收到ACK后返回
    //
    // 返回时,Worker已经完成连接处理
    // DisplayChannelClient已经创建并开始工作
    dispatcher_send_message(qxl->st->dispatcher,
    WORKER_MESSAGE_DISPLAY_CONNECT,
    &msg);
    // ===== 执行到这里时,连接已在Worker线程完成 =====
    }

    完整调用链:

    为什么用同步消息?

    对于连接操作,使用同步消息(阻塞等待)是有意义的:

  • 确保完成:主线程需要知道连接是否成功
  • 顺序保证:后续操作依赖连接已建立
  • 简化错误处理:同步返回可以立即处理错误
  • // 同步 vs 异步对比
    // 同步(用于连接、断开等)
    dispatcher_send_message(...); // 阻塞直到完成
    // 此时可以安全地进行后续操作

    // 异步(用于唤醒、通知等)
    dispatcher_send_message_async(...); // 立即返回
    // Worker稍后处理,不保证顺序

    VM迁移支持

    迁移流程

    在这里插入图片描述

    SPICE支持虚拟机热迁移(Live Migration),允许在不中断用户会话的情况下将虚拟机从一台物理主机迁移到另一台。迁移过程中,SPICE需要协调客户端平滑地从源服务器切换到目标服务器。

    迁移过程的关键步骤:

  • 迁移启动:源端调用reds_mig_started()标记迁移开始,此时服务器进入迁移模式,开始收集需要迁移的状态数据。

  • 通知客户端:源端通过MainChannel向客户端发送SPICE_MSG_MAIN_MIGRATE_BEGIN消息,告知目标服务器的地址和端口。

  • 状态保存:源端收集各个通道的状态信息,包括显示缓存、图像字典等,准备发送给目标端。

  • 客户端切换:客户端收到迁移消息后,保存当前通道状态,断开与源端的连接,然后连接到目标服务器。

  • 状态恢复:目标端接收客户端连接后,调用reds_mig_completed()完成迁移,恢复通道状态,继续提供服务。

  • 整个过程对用户来说应该是透明的,最多只会感受到短暂的画面停顿。

    迁移数据准备

    VM迁移时需要将SPICE会话状态从源主机传输到目标主机,这需要序列化各种状态数据。

    // main-channel.cpp
    // ===== 触发迁移数据发送 =====
    // 当迁移开始时,将迁移数据项添加到管道
    static void main_channel_push_migrate_data_item(MainChannel *main_chan)
    {
    // 添加一个管道项,稍后会触发序列化
    // RED_PIPE_ITEM_TYPE_MAIN_MIGRATE_DATA是一个标记
    // 实际数据在发送时才生成
    main_chan->pipes_add_type(RED_PIPE_ITEM_TYPE_MAIN_MIGRATE_DATA);
    }

    // ===== 序列化迁移数据 =====
    // 当管道项被发送时调用,将状态序列化为网络格式
    static void marshall_migrate_data(SpiceMarshaller *m, RedChannelClient *rcc)
    {
    // ===== 写入数据头 =====
    // Magic用于验证数据完整性
    // Version用于处理版本兼容性
    SpiceMigrateDataHeader header;
    header.magic = SPICE_MIGRATE_DATA_MAIN_MAGIC; // 0x… 特定值
    header.version = SPICE_MIGRATE_DATA_MAIN_VERSION; // 当前版本号

    // SpiceMarshaller是SPICE的序列化工具
    // 将数据收集到缓冲区,稍后一次性发送
    spice_marshaller_add(m, (uint8_t *)&header, sizeof(header));

    // ===== 写入主通道状态 =====
    SpiceMigrateDataMain data;

    // 填充需要迁移的状态数据:
    // – 多媒体时间戳(音视频同步)
    // – 代理状态(clipboard、文件传输等)
    // – 鼠标模式
    // – 等等…

    spice_marshaller_add(m, (uint8_t *)&data, sizeof(data));
    }

    迁移数据类型:

    通道迁移数据内容
    MainChannel 多媒体时间戳(mm_time)、代理状态、鼠标模式
    DisplayChannel Surface列表、图像缓存(ImageCache)、GLZ字典、视频流状态
    InputsChannel 键盘LED状态、修饰键状态
    SoundChannel 音量设置

    迁移数据流转:

    步骤源主机目标主机
    1 reds_mig_started() 开始迁移
    2 收集各通道状态
    3 marshall_migrate_data() 序列化
    4 通过QEMU迁移流传输 接收迁移数据
    5 解析并恢复状态
    6 断开旧连接 客户端重新连接
    7 reds_mig_completed() 完成

    为什么需要传输图像缓存?

    图像缓存(ImageCache)存储了客户端已缓存的图像ID。如果不迁移,目标主机会认为客户端没有任何缓存,导致重新发送所有图像,浪费带宽和时间。

    总结

    SPICE服务器的两个核心组件分工明确:

    Reds(主服务器)

    职责实现
    连接管理 监听、认证、客户端生命周期
    通道注册 管理所有通道,通知客户端
    Agent通信 VDIPort设备,鼠标注入
    迁移协调 迁移数据准备和恢复
    主线程通道 Main、Inputs、Sound

    RedWorker

    职责实现
    图形处理 解析QXL命令,构建命令树
    显示通道 DisplayChannel管理和数据发送
    光标通道 CursorChannel处理
    图像压缩 QUIC/LZ/GLZ/JPEG编码
    视频流 检测和编码视频区域

    这种设计实现了:

  • 解耦:连接管理与图形处理分离
  • 并行:多个QXL设备可并行处理
  • 响应性:图形处理不阻塞输入响应
  • 可扩展:易于添加新的通道类型
  • 赞(0)
    未经允许不得转载:网硕互联帮助中心 » SPICE源码分析(八):Red Worker与主服务器(Reds)实现分析
    分享到: 更多 (0)

    评论 抢沙发

    评论前必须登录!