云计算百科
云计算领域专业知识百科平台

协程库(模块的详解与代码分析)ioscheduler类(io+scheduler的结合)篇

文章目录

  • 为什么需要io+协程?
  • IO协程调度是什么,有什么功能?
  • sylar的IO协程调度模块基于什么?
  • IO协程调度为什么都包含一个三元组信息?
    • Fdcontext部分:
    • Eventcontext部分:
  • IO协程调度器因为TImer类对idle协程的改造
  • io+协程和协程调度器部分有什么不一样?
  • ioscheduler.h
  • ioscheduler.cpp
    • IOManager::GetThis():
      • 补充dynamic_cast和static_cast:
    • IOManager:FdContext::getEventContext(Event event):
    • IOManager::resetEventContext:
    • IOManager::triggerEvent:
    • IOManager的构造函数和析构函数:
      • epoll
      • Fcntl函数
    • IOManager::ContextResize():
    • 重点的IOManager::addevent函数:
    • IOManager::delEvent函数:
    • IOManager::cancelEvent函数:
    • IOManager::cancelAll():
    • IOManager::tickle():
    • IOManager::stopping():
    • IOManager::idle():
    • IOManager::onTimerInsertedAtFront():
    • main.cpp
    • 测试结果
    • 函数调用图

项目代码仓库

为什么需要io+协程?

在这里插入图片描述

IO协程调度是什么,有什么功能?

在这里插入图片描述 事实上很多的库都可以实现类似的工作,比如libevent,libuv,libev等,这些库被称为异步事件库或IO库,从网络上可以搜索到很多资料。深入浅出理解libevent——2万字总结_libev 堆-CSDN博客等,有的库不仅可以处理socketfd事件,还可以处理定时器事件和信号事件。这些事件库的实现原理基本类似,都是先将套接字设置为非阻塞状态,然后将套接字与回调函数绑定,接下来进入一个基于IO多路复用的事件循环,等待事件发生,然后调用对应的回调函数。

sylar的IO协程调度模块基于什么?

基于epoll实现,只支持linux平台。对每个fd,sylar支持两类事件,一类是可读事件,对应EPOLLIN,一类是可写事件,对应EPOLLOUT,sylar的事件枚举值直接继承子epoll。

当然epoll除了本身支持epollin和epollout两类事件外,还支持其他事件,比如epollrdhup(对端关闭),epollerr(错误事件),epollhup(挂起事件)。对于这些事件,sylar的做法是对其进行归类,分别对应到epollin和epollout中,也就是所有的事件都可以表示为可读或可写事件,甚至有的事件还可以同时表示可读或可写。比如epollerr事件发生,fd同时可读或可写。

IO协程调度为什么都包含一个三元组信息?

在这里插入图片描述

public:
// 内部枚举
// Event 是一个枚举类型,表示文件描述符上的事件类型
enum Event {
// 表示没有事件
NONE = 0x0,

// 表示读事件,对应于 epoll 的 EPOLLIN 事件
// READ == EPOLLIN
READ = 0x1,

// 表示写事件,对应于 epoll 的 EPOLLOUT 事件
// WRITE == EPOLLOUT
WRITE = 0x4
};

private:
// 用于描述一个文件描述的事件上下文
// FdContext 结构体用于存储每个文件描述符的事件上下文。每个文件描述符可以有两个事件上下文:read 和 write,分别对应读事件和写事件
struct FdContext {
// 描述一个具体事件的上下文,如读事件或写事件
struct EventContext {
// scheduler
// 关联的调度器
Scheduler* scheduler = nullptr;

// callback fiber
// 关联的回调线程(协程)
std::shared_ptr<Fiber> fiber;

// callback function
// 关联的回调函数
std::function<void()> cb;
};

// read event context
// read 和write表示读和写的上下文
EventContext read;

// write event context
EventContext write;
int fd = 0;

// events registered
// 当前注册的事件目前是没有事件,但可能变成 READ、WRITE 或二者的组合。
Event events = NONE;
std::mutex mutex;

// 根据事件类型获取相应的事件上下文(如读事件上下文或写事件上下文)
EventContext& getEventContext(Event event);

// 重置事件上下文
void resetEventContext(EventContext& ctx);

// 触发事件
void triggerEvent(Event event);
};

Fdcontext部分:

Fdcontext,首先记录了fd和event(读写组合、读或写、无事件),很正常以为我们io+协程的设计目的就是利用epoll的IO多路复用,让socket中的文件描述符绑定sylar中规定的事件可读或可写或组合的情况,所以在代码中我们可以看见Fdcontext有fd,event的成员变量,Fdcontext也叫文件描述符上下文具体用来存放fd和相应的读写事件

Eventcontext部分:

Eventcontext是为了在fd触发了event事件(读或写或读写组合)的时候具体让执行的系统调用如read(),write(),send()作为协程的入口函数进行任务的调度,目的是提高IO等待read(),write数据较多的时候可以先解决小任务,增加灵活性和提高效率。所以Eventcontext有函数对象和协程,协程调度器,目的是为了在触发事件后找到对应的调度器对象去运行协程或者函数对象绑定的入口函数(read(),write())。

IO协程调度器因为TImer类对idle协程的改造

IO协程调度器在idle时会epoll_wait所有注册的fd,如果有fd满足条件,epoll_wait返回,从私有数据中拿到fd的上下文信息,并且执行其中的回调函数。(实际是idle协程只负责收集所有已触发的fd的回调函数并将其加入调度器的任务队列,真正的执行时机是idle协程退出后,调度器在下一轮调度时执行)。

io+协程和协程调度器部分有什么不一样?

IO协程调度器支持取消事件。 取消事件表示不关心某个fd的某个事件了,如果某个fd的 可读或可写事件都被取消了,那这个fd会从调度器的epoll_wait中删除。 支持使用IO事件调度,针对套接字描述符,将描述符注册可读和可写事件的回到函数,当事件触发时执行对应的回调函数。多了一个外挂的定时器处理定时的任务主要是处理sleep、usleep等。

ioscheduler.h

#ifndef __SYLAR_IOMANAGER_H__
#define __SYLAR_IOMANAGER_H__

#include "scheduler.h"
#include "timer.h"

namespace sylar {
// work flow
// 1 register one event -> 2 wait for it to ready -> 3 schedule the callback -> 4 unregister the event -> 5 run the callback
// 1 注册事件 -> 2 等待事件 -> 3 事件触发调度回调 -> 4 注销事件回调后从epoll注销 -> 5 执行回调进入调度器中执行调度。

// 主要用于管理异步 IO 事件,并结合调度器和定时器功能进行高效的事件处理。该类基于事件驱动模型设计,适用于高性能的网络或文件描述符管理
// IOManager 类继承自 Scheduler 和 TimerManager,因此它具有调度任务和管理定时器的能力。
class IOManager: public Scheduler, public TimerManager {
public:
// 内部枚举
// Event 是一个枚举类型,表示文件描述符上的事件类型
enum Event {
// 表示没有事件
NONE = 0x0,

// 表示读事件,对应于 epoll 的 EPOLLIN 事件
// READ == EPOLLIN
READ = 0x1,

// 表示写事件,对应于 epoll 的 EPOLLOUT 事件
// WRITE == EPOLLOUT
WRITE = 0x4
};

private:
// 用于描述一个文件描述的事件上下文
// FdContext 结构体用于存储每个文件描述符的事件上下文。每个文件描述符可以有两个事件上下文:read 和 write,分别对应读事件和写事件
struct FdContext {
// 描述一个具体事件的上下文,如读事件或写事件
struct EventContext {
// scheduler
// 关联的调度器 用于安排回调的执行
Scheduler* scheduler = nullptr;

// callback fiber
// 协程对象,表示回调函数运行时的上下文。
std::shared_ptr<Fiber> fiber;

// callback function
// 关联的回调函数 事件触发时会执行该函数。
std::function<void()> cb;
};

// read event context
// read 和write表示读和写的上下文
EventContext read;

// write event context
EventContext write;
int fd = 0;

// events registered
// 当前注册的事件,表示当前文件描述符上注册的事件类型。它的值可以是 NONE、READ、WRITE 或者 READ | WRITE(组合事件)。这个变量用于标识哪些事件正在被监视和处理。
Event events = NONE;

// 用于保护 FdContext 数据的互斥锁。由于 IOManager 可能在多线程环境中运行,mutex 保证了在并发环境中对文件描述符上下文的安全访问,避免竞态条件
std::mutex mutex;

// 根据事件类型获取相应的事件上下文(如读事件上下文或写事件上下文)
// 根据事件类型获取相应的事件上下文(read 或 write)。这个方法根据传入的 event(如 READ 或 WRITE)返回对应的 EventContext
EventContext& getEventContext(Event event);

// 重置事件上下文
// 重置事件上下文的状态。这个方法会将事件上下文中的成员变量(如 scheduler、fiber 和 cb)重置为初始状态,通常在事件完成处理后调用,以准备好下一次事件的注册。
void resetEventContext(EventContext& ctx);

// 触发事件
// 触发事件,执行与事件相关的回调。调用此方法会根据事件类型(如 READ 或 WRITE)执行相应的回调函数。
void triggerEvent(Event event);
};
public:
// 允许设置线程数量、是否使用调用者线程以及名称。
// threads线程数量,use_caller是否将主线程或调度线程包含进去,name调度器的名字
// 定是否使用调用者线程来执行事件处理。默认值为 true,表示在 IOManager 中,调用者线程也会被用于执行 I/O 操作
IOManager(size_t threads = 1, bool use_caller = true, const std::string& name = "IOManager");
~IOManager();

// add one event at a time
// 事件管理方法
// 添加一个事件到文件描述符 fd 上,并关联一个回调函数 cb。
int addEvent(int fd, Event event, std::function<void()> cb = nullptr);

// delete event
// 删除文件描述符fd上的某个事件
// 删除某个文件描述符上的特定事件。取消该事件的监视。
bool delEvent(int fd, Event event);

// delete the event and trigger its callback
// 取消文件描述符上的某个事件,并触发其回调函数
bool cancelEvent(int fd, Event event);

// delete all events and trigger its callback
// 取消文件描述符 fd 上的所有事件,并触发所有回调函数。
bool cancelAll(int fd);

// 获取当前的 IOManager 实例
static IOManager* GetThis();

protected:
//通知调度器有任务调度
//写pipe让idle协程从epoll_wait退出,待idle协程yield之后Scheduler::run就可以调度其他任务.
void tickle() override;

//判断调度器是否可以停止
//判断条件是Scheduler::stopping()外加IOManager的m_pendingEventCount为0,表示没有IO事件可调度
bool stopping() override;

//实际是idle协程只负责收集所有已触发的fd的回调函数并将其加⼊调度器
//的任务队列,真正的执⾏时机是idle协程退出后,调度器在下⼀轮调度时执⾏
//这里也是scheduler的重写,当没有事件处理时,线程处于空闲状态时的处理逻辑。
void idle() override;

//因为Timer类的成员函数重写当有新的定时器插入到前面时的处理逻辑
void onTimerInsertedAtFront() override;

// 调整 IOManager 内部上下文(例如,文件描述符上下文)的大小。这可能用于动态增加或减少事件处理的容量
void contextResize(size_t size);

private:
//用于epoll的文件描述符。
// fd[0] read,fd[1] write
int m_epfd = 0;

//用于线程间通信的管道文件描述符,fd[0] 是读端,fd[1] 是写端。
int m_tickleFds[2];

//原子计数器,用于记录待处理的事件数量。使用atomic的好处是这个变量再进行加或-都是不会被多线程影响
// 原子变量,表示当前挂起的事件数量。使用 atomic 类型确保在多线程环境下的并发访问不会出现问题。
std::atomic<size_t> m_pendingEventCount = {0};

// 读写锁
std::shared_mutex m_mutex;

// store fdcontexts for each fd
//文件描述符上下文数组,用于存储每个文件描述符的 FdContext。
std::vector<FdContext*> m_fdContexts;
};
}
#endif

ioscheduler.cpp

以下都是关键代码部分:

IOManager::GetThis():

获取当前线程的调度器对象,然后将其动态转换为IOManager*类型,如果转换成功,表示当前线程的调度器对象确实是一个IOManager对象。否则,如果是转化的是指针类型返回nullptr,引用类型抛出std::bad_cast异常

IOManager* IOManager::GetThis() {
// dynamic_cast 用于将基类指针转换为派生类指针。在这里,我们将 Scheduler 类的 GetThis() 返回的指针转换为 IOManager*
// 如果当前调度器实例是 IOManager 类型,转换将成功。如果不是(例如,基类指针指向其他类型),dynamic_cast 将返回 nullptr,这保证了类型安全。
return dynamic_cast<IOManager*>(Scheduler::GetThis());
}

补充dynamic_cast和static_cast:

dynamic_cast:有运行时候的安全检查,不安全就直接如上面说的一样返回,必须基类需要有虚函数,确保类型安全时使用此关键词转化。

static_cast:无运行的检查,比如基类转换成子类在dynamic_cast中是不允许的,但是static_cast不检查所以无所谓,但是转化后有可能导致某些未定义的行为,如无法访问子类的部分成员。

IOManager:FdContext::getEventContext(Event event):

根据传入的事件event,返回对应事件上下文的引用。 assert是一种调式工具,它会在运行时检查这个条件是否为真。如果条件为假(即event既不是READ或WRIE),程序中断执行,并报告错误。

// 用于获取指定事件类型(如读事件或写事件)对应的事件上下文(EventContext)。FdContext 类中维护了 read 和 write 两个 EventContext,该方法根据传入的 event 类型返回相应的事件上下文
IOManager::FdContext::EventContext& IOManager::FdContext::getEventContext(Event event) {
assert(event == READ || event == WRITE);
switch(event) {
// READ 和 WRITE 事件类型会分别返回 read 或 write 对应的 EventContext。
case READ:
return read;
case WRITE:
return write;
}
// 如果传入的 event 既不是 READ 也不是 WRITE,则抛出 std::invalid_argument 异常,表示传入的事件类型无效。
// 这行代码可以防止传入无效事件类型时造成未定义行为,确保代码的健壮性。
throw std::invalid_argument("Unsupported event type");
}

补充: std::invalid_argument: 属于std::exception的派生类在<stdexcept>头文件中,简单来说就是传入的参数不符合预期的参数时抛出异常。

IOManager::resetEventContext:

重置EventContext事件的上下文,将其恢复到初始或者空的状态。主要作用是清理并重置传入的 EventContext 对象,使其不再与任何调度器、线程或回调函数相关联。

void IOManager::FdContext::resetEventContext(EventContext& ctx) {
ctx.scheduler = nullptr;
ctx.fiber.reset();
ctx.cb = nullptr;
}

IOManager::triggerEvent:

函数负责在指定的 IO 事件被触发时,执行相应的回调函数或线程,并且在执行完之后清理相关的事件上下文。

具体函数流程: 通过判断触发指定的event事件在Fdcontext中的events中存在对应的读或写或读写组合,没有assert就抛出异常了,有就从取反从events中删除,然后获取相应的EventContext具体的读或写事件对应的上下文,将fd绑定的具体读或写任务的回调协程或回调函数,放入到任务队列中等待调度器调度。

// no lock
// 用于触发并处理特定文件描述符(fd)上已经发生的事件。
void IOManager::FdContext::triggerEvent(IOManager::Event event) {
//确保event是中有指定的事件,否则程序中断。
assert(events & event);

// delete event
// 清理该事件,表示不再关注,也就是说,注册IO事件是一次性的,
//如果想持续关注某个Socket fd的读写事件,那么每次触发事件后都要重新添加
//因为不是使用了十六进制位,所以对标志位取反就是相当于将event从events中删除
// 通过assert(events & event)确保触发的事件(event)的确是之前注册过并感兴趣的,否则说明程序逻辑存在错误,直接终止程序执行。
// (events & ~event)则表示原来的事件集中移除指定事件
// 假设events原本为:0110(关注事件2、3)
// 假设event为:0010(事件2发生)
// 取反后:~0010 → 1101
// 进行与运算后:0110 & 1101 → 0100,成功移除了事件2,仅剩事件3
events = (Event)(events & ~event);

// trigger
// 获取当前触发事件(event)所对应的上下文对象(EventContext)
EventContext& ctx = getEventContext(event);

//这个过程就相当于scheduler文件中的main.cpp测试一样,把真正要执行的函数放入到任务队列中等线程取出后任务后,协程执行,执行完成后返回主协程继续,执行run方法取任务执行任务(不过可能是不同的线程的协程执行了)。
// 判断上下文对象存放的是一个回调函数还是一个协程
if(ctx.cb) {
// call ScheduleTask(std::function<void()>* f, int thr)
// 如果是回调函数 (std::function<void()> ctx.cb),则将该回调封装为调度任务,放入调度器的任务队列。
ctx.scheduler->scheduleLock(&ctx.cb);
} else {
// call ScheduleTask(std::shared_ptr<Fiber>* f, int thr)
// 如果是协程任务 (std::shared_ptr<Fiber> ctx.fiber),则直接将协程对象作为任务加入调度队列。
ctx.scheduler->scheduleLock(&ctx.fiber);
} // scheduleLock 是调度器的方法,作用是将任务安全地加入到调度器维护的任务队列中,并唤醒等待取任务的线程进行调度执行

// reset event context
// 一旦触发执行完毕,当前事件上下文(ctx)需恢复到初始状态
resetEventContext(ctx);
return;
}

IOManager的构造函数和析构函数:

epoll

防止你没学习过epoll:图文并茂讲解epoll原理,彻底弄懂epoll机制-CSDN博客

man epoll_event

在这里插入图片描述

IOManager::IOManager(size_t threads, bool use_caller, const std::string &name):
Scheduler(threads, use_caller, name), TimerManager() {
// create epoll fd
// 5000,epoll_create 的参数实际上在现代 Linux 内核中已经被忽略,最早版本的 Linux 中,这个参数用于指定 epoll 内部使用的事件表的大小。
m_epfd = epoll_create(5000);

//错误就终止程序
// 成功返回新创建的epoll实例的文件描述符(正整数),失败返回-1
assert(m_epfd > 0);

// create pipe
//创建管道的函数规定了m_tickleFds[0]是读端,[1]是写端
// 创建一个管道用于唤醒阻塞在epoll_wait的线程
// m_tickleFds[0]:读端,用于epoll监控。
// m_tickleFds[1]:写端,当有新任务到来时写入该端,唤醒阻塞的线程
int rt = pipe(m_tickleFds);

// 成功返回0失败返回-1
assert(!rt);

// add read event to epoll
//将管道的监听注册到epoll上
// struct epoll_event {
// uint32_t events; /* Epoll events (事件类型) */
// epoll_data_t data; /* User data variable (用户数据,一般为文件描述符) */
// };
epoll_event event;

// Edge Triggered,设置标志位,并且采用边缘触发和读事件。
// EPOLLIN:监听读事件。
// EPOLLET (Edge Triggered):边沿触发模式。
// 边沿触发意味着:仅当pipe从不可读变为可读时触发一次,若不一次性处理完毕,将不会再通知。
// 仅当文件描述符状态发生变化时触发通知。
// 状态变化的含义:
// 从不可读变为可读,或从不可写变为可写的瞬间。
// 边缘触发模式下,只有在状态变化瞬间(从不可读 → 可读,或从不可写 → 可写)时才通知一次。
// 如果事件发生后你没有一次性读取完缓冲区中的所有数据,那么即使缓冲区中还有剩余数据,epoll也不会再通知你。
// 边沿触发就像“门铃”,只在有人按门铃瞬间响一下。如果你开门时没把人接进来,后面再去看门铃,它不会再响。
// 与水平触发(LT)对比:
// 水平触发模式(默认模式),只要缓冲区中还有数据可读,就一直通知你(不断响铃),直到读完为止。
// EPOLLIN:监听读事件(可读即通知)。
// EPOLLET:事件边沿触发模式(变化瞬间通知一次,不会持续通知)。
event.events = EPOLLIN | EPOLLET;

// 指定event.data.fd为pipe的读端,确保epoll监控pipe读端的可读事件。
// epoll可以监听多个文件描述符(如socket、pipe),这里明确指定了要监听的是哪个文件描述符。
// epoll监听的是管道的读端,因为只有读端有数据可读时,才需要通知程序处理数据
event.data.fd = m_tickleFds[0];

// non-blocked
//修改管道文件描述符以非阻塞的方式,配合边缘触发。
// 设置pipe读端为非阻塞模式
// 防止在读取pipe内容时线程阻塞。
// 若无数据则直接返回-1,而非等待数据。
rt = fcntl(m_tickleFds[0], F_SETFL, O_NONBLOCK);

//每次需要判断rt是否成功
// 成功返回0失败返回-1
assert(!rt);

//将 m_tickleFds[0];作为读事件放入到event监听集合中
// 调用epoll_ctl函数,将上面定义好的事件(event)添加到epoll实例(m_epfd)中,开始监控m_tickleFds[0]
// 成功返回0失败返回-1
rt = epoll_ctl(m_epfd, EPOLL_CTL_ADD, m_tickleFds[0], &event);
assert(!rt);

//初始化了一个包含 32 个文件描述符上下文的数组
// 事件上下文数组大小初始化
// 预先分配大小为32个FD的事件上下文对象,减少运行时频繁内存分配的开销。
contextResize(32);

//启动 Scheduler,开启线程池,准备处理任务
start();
}

IOManager::~IOManager() {
// 关闭scheduler类中的线程池,让任务全部执行完后线程安全退出
stop();

// 关闭epoll的句柄
// 关闭epoll句柄后,操作系统会自动清理epoll实例相关资源,停止监听事件
close(m_epfd);

//关闭管道读端写端
close(m_tickleFds[0]);
close(m_tickleFds[1]);

//将fdcontext文件描述符一个个关闭
for(size_t i = 0; i < m_fdContexts.size(); ++i) {
if(m_fdContexts[i]) {
delete m_fdContexts[i];
}
}
}

在这里插入图片描述 在这里插入图片描述 在这里插入图片描述

void handleEvents(int fd) {
while (true) {
// 定义了一个大小为4096字节的缓冲区,用于存储从文件描述符读取到的数据。
char buffer[4096];

// 使用recv()函数从给定的文件描述符 fd 中读取数据。
ssize_t bytesRead = recv(fd, buffer, sizeof(buffer), 0);
if (bytesRead == 1) {
// 表示当前处于非阻塞模式,此时缓冲区暂时没有数据可读(非阻塞模式下是正常现象),可以直接跳出循环,稍后再尝试。
// 在非阻塞模式下,如果当前没有数据可读,系统调用会立即返回-1,且errno设置为EAGAIN或EWOULDBLOCK。
if (errno == EAGAIN || errno == EWOULDBLOCK) {
// 没有更多数据可读
break;
} else {
// 处理其他错误
perror("recv");
break;
}
} else if (bytesRead == 0) {
// 当 recv() 返回值为 0 时,意味着对端已经关闭了连接,不会再有数据到来。
// 连接关闭
close(fd);
break;
} else {
// 处理读取的数据
processData(buffer, bytesRead);
}
}
}

在这里插入图片描述 在这里插入图片描述

Fcntl函数

fcntl() 全称 file control,用于控制和修改已打开文件描述符的属性。 它可以用于设置文件描述符状态标志(例如阻塞/非阻塞模式)、文件状态标志、文件锁等

函数原型:

#include <fcntl.h>

int fcntl(int fd, int cmd, ... /* arg */);

在这里插入图片描述 在这里插入图片描述 设置非阻塞模式(O_NONBLOCK) 最常见的用法之一,即将文件描述符设置为非阻塞:

int flags = fcntl(fd, F_GETFL, 0); // 获取当前标志
fcntl(fd, F_SETFL, flags | O_NONBLOCK); // 设置为非阻塞

在这里插入图片描述 在这里插入图片描述

IOManager::ContextResize():

主要作用是调整m_fdContexts数组的大小,并为新的文件描述符(fd)创建并初始化相应的Fdcontext对象。

// no lock
// 函数的用途是调整类内维护的FdContext数组(或容器)大小到指定的size。
void IOManager::contextResize(size_t size) {
if (size < m_fdContexts.size()) {
for (size_t i = size; i < m_fdContexts.size(); ++i) {
delete m_fdContexts[i]; // 释放多余的FdContext
m_fdContexts[i] = nullptr;
}
}

//调整m_fdContexts的大小
// 遍历 m_fdContexts 向量,初始化尚未初始化的 FdContext 对象
m_fdContexts.resize(size);

for(size_t i = 0; i < m_fdContexts.size(); ++i) {
if(m_fdContexts[i] == nullptr) {
m_fdContexts[i] = new FdContext();

// 将文件描述符的编号赋值给 fd
// 设置FdContext的成员fd为当前的索引i
m_fdContexts[i]->fd = i;
}
}
}

重点的IOManager::addevent函数:

在这里插入图片描述

// addEvent方法用于向IO管理器中注册一个事件(如读或写事件)
// fd:文件描述符(通常是socket)。
// event:事件类型(如读事件或写事件)。
// cb:当事件触发时执行的回调函数。
int IOManager::addEvent(int fd, Event event, std::function<void()> cb) {
// attemp to find FdContext
FdContext* fd_ctx = nullptr;

std::shared_lock<std::shared_mutex> read_lock(m_mutex);

// 如果容器m_fdContexts足够大,快速获取对应fd的上下文对象
if((int)m_fdContexts.size() > fd) {
fd_ctx = m_fdContexts[fd];
read_lock.unlock();
} else {
// 如果大小不足,释放读锁,申请独占写锁,调用contextResize(fd * 1.5)扩展容器,保证容器能够容纳新的fd。
read_lock.unlock();
std::unique_lock<std::shared_mutex> write_lock(m_mutex);
contextResize(fd * 1.5);
fd_ctx = m_fdContexts[fd];
}

// 锁定fd_ctx并检查是否已有事件
// fd_ctx->mutex是保护FdContext自身状态的互斥锁,确保多个线程不会同时修改。
std::lock_guard<std::mutex> lock(fd_ctx->mutex);

// the event has already been added
// 检查fd_ctx->events是否已经包含当前的事件:
// 如果已经包含,则表示重复添加事件,直接返回-1,表示失败。
// 如果不包含,继续往下执行。
if(fd_ctx->events & event) {
return 1;
}

// add new event
// 构建epoll事件并调用epoll_ctl
// 确定调用epoll_ctl的操作类型:
// 若fd_ctx已有事件,则使用EPOLL_CTL_MOD(修改)。
// 若还没有注册任何事件,则使用EPOLL_CTL_ADD(新增)。
int op = fd_ctx->events ? EPOLL_CTL_MOD: EPOLL_CTL_ADD;
epoll_event epevent;
epevent.events = EPOLLET | fd_ctx->events | event;
epevent.data.ptr = fd_ctx;

// 成功返回0失败返回-1
int rt = epoll_ctl(m_epfd, op, fd, &epevent);

if(rt) {
std::cerr << "addEvent::epoll_ctl failed: " << strerror(errno) << std::endl;
return 1;
}

++m_pendingEventCount;

// update fdcontext
// 更新fd_ctx内部的事件状态和回调
fd_ctx->events = (Event)(fd_ctx->events | event);

// update event context
// getEventContext(event) 会返回对应事件的引用:
// 比如事件为READ或WRITE,分别返回对应的EventContext结构
FdContext::EventContext& event_ctx = fd_ctx->getEventContext(event);

// 确保事件上下文未被占用(防御性编程)
assert(!event_ctx.scheduler && !event_ctx.fiber && !event_ctx.cb);

// 当前的事件被注册到特定的调度器(当前线程绑定的调度器)
event_ctx.scheduler = Scheduler::GetThis();

// 绑定回调函数(回调模式)或绑定协程(协程模式)
if(cb) {
// 如果传入的参数 cb 不为空,说明采用回调模式
event_ctx.cb.swap(cb);
} else {
// 如果未提供回调函数,默认使用协程模式:
event_ctx.fiber = Fiber::GetThis();
// 断言协程状态为 Fiber::RUNNING,说明当前一定处于协程运行状态下调用此函数。
assert(event_ctx.fiber->getState() == Fiber::RUNNING);
}
return 0;
}

IOManager::delEvent函数:

在这里插入图片描述

// delEvent 函数的作用是从事件管理器中移除一个指定文件描述符(fd)的特定事件(Event)。通常用于取消对某个文件描述符的读写或异常事件监听。
bool IOManager::delEvent(int fd, Event event) {
// attemp to find FdContext
//这里的步骤和上面的addevent添加事件类似
FdContext* fd_ctx = nullptr;

// 这里使用读锁(共享锁)安全访问m_fdContexts容器(通常是数组或vector),检查指定的fd是否存在对应的上下文FdContext。
// 若找到,则保存到fd_ctx指针中;否则直接返回false(表示无法删除)
std::shared_lock<std::shared_mutex> read_lock(m_mutex);
if((int)m_fdContexts.size() > fd) {
fd_ctx = m_fdContexts[fd];
read_lock.unlock();
} else {
read_lock.unlock();
//如果没查找到代表数组中没这个文件描述符直接,返回false;
return false;
}

//找到后添加互斥锁
std::lock_guard<std::mutex> lock(fd_ctx->mutex);

// the event doesn't exist
// 检查待删除事件是否存在
if(!(fd_ctx->events & event)) {
return false;
}

// delete the event
//因为这里要删除事件,对原有的事件状态取反就是删除原有的状态比如说传入参数是读事件,我们取反就是删除了这个读事件但可能还要写事件
// 使用位运算从原本的事件掩码中移除指定的event
Event new_events = (Event)(fd_ctx->events & ~event);

// 如果删除后事件还存在其他监听事件,则修改(EPOLL_CTL_MOD)监听的事件集;
// 否则,没有任何事件监听了,则从epoll监听集中删除(EPOLL_CTL_DEL)此fd。
int op = new_events ? EPOLL_CTL_MOD: EPOLL_CTL_DEL;

epoll_event epevent;
epevent.events = EPOLLET | new_events;

//这一步是为了在 epoll 事件触发时能够快速找到与该事件相关联的 FdContext 对象。
// 并将fd_ctx保存至data.ptr,便于epoll_wait时获取上下文
epevent.data.ptr = fd_ctx;

// 调用epoll_ctl更新epoll事件监听状态
int rt = epoll_ctl(m_epfd, op, fd, &epevent);

if(rt) {
std::cerr << "delEvent::epoll_ctl failed: " << strerror(errno) << std::endl;
return 1;
}

//减少了待处理的事件
m_pendingEventCount;

// update fdcontext
// 更新FdContext的事件掩码
fd_ctx->events = new_events;

// update event context
// 重置事件上下文EventContext
FdContext::EventContext& event_ctx = fd_ctx->getEventContext(event);

// 将原本存储在FdContext内的事件上下文(通常包含回调、协程信息等)清理重置,防止内存泄漏或误操作。
fd_ctx->resetEventContext(event_ctx);
return true;
}

IOManager::cancelEvent函数:

函数用于取消特定文件描述符上的指定事件(如读事件或写事件),并触发该事件的回调函数。

我们可以发现具体的操作好像这几个event事件函数做的都差不多,代码也差不多实际上确实差不多。

这里相比delEvent不同在于删除事件后,还需要将删除的事件直接交给trigger函数放入到协程调度器中进行触发。

// 取消并立即触发指定文件描述符(fd)上的特定事件。与delEvent不同的是,它会在取消事件的同时,主动调用与该事件关联的回调函数或协程,使事件处理逻辑立即执行,而不是等待事件实际触发。
bool IOManager::cancelEvent(int fd, Event event) {
// attemp to find FdContext
FdContext* fd_ctx = nullptr;

std::shared_lock<std::shared_mutex> read_lock(m_mutex);
if((int)m_fdContexts.size() > fd) {
fd_ctx = m_fdContexts[fd];
read_lock.unlock();
} else {
read_lock.unlock();
return false;
}

std::lock_guard<std::mutex> lock(fd_ctx->mutex);

// the event doesn't exist
if(!(fd_ctx->events & event)) {
return false;
}

// delete the event
Event new_events = (Event)(fd_ctx->events & ~event);
int op = new_events ? EPOLL_CTL_MOD: EPOLL_CTL_DEL;
epoll_event epevent;
epevent.events = EPOLLET | new_events;
epevent.data.ptr = fd_ctx;

int rt = epoll_ctl(m_epfd, op, fd, &epevent);

if(rt) {
std::cerr << "cancelEvent::epoll_ctl failed: " << strerror(errno) << std::endl;
return false;
}

m_pendingEventCount;

// update fdcontext, event context and trigger
//这个代码和上面那个delEvent一致好像就是最后的处理不同一个是重置,一个是调用事件的回调函数
// 立即触发事件回调
fd_ctx->triggerEvent(event);
return true;
}

IOManager::cancelAll():

主要功能是取消指定文件描述符(fd)上的所有事件,并且触发这些事件的回调。 和cancle的不同,完全将fd上的事件从epoll中移除,并且挨个的触发了响应的读事件或写事件确保,事件都能被执行。不然咋加了一个all呢?

// 用于取消指定文件描述符(fd)上所有已注册的事件监听,并且立即主动触发所有已注册事件对应的回调函数或协程逻辑。
bool IOManager::cancelAll(int fd) {
// attemp to find FdContext
FdContext* fd_ctx = nullptr;

std::shared_lock<std::shared_mutex> read_lock(m_mutex);
if((int)m_fdContexts.size() > fd) {
fd_ctx = m_fdContexts[fd];
read_lock.unlock();
} else {
read_lock.unlock();
return false;
}

std::lock_guard<std::mutex> lock(fd_ctx->mutex);

// none of events exist
if(!fd_ctx->events) {
return false;
}

// delete all events
int op = EPOLL_CTL_DEL;
epoll_event epevent;
epevent.events = 0;
epevent.data.ptr = fd_ctx;

// 所有事件清空
int rt = epoll_ctl(m_epfd, op, fd, &epevent);
if(rt) {
std::cerr << "IOManager::epoll_ctl failed: " << strerror(errno) << std::endl;
return false;
}

// update fdcontext, event context and trigger
// 逐个检查并触发已注册事件的回调
// 检测并主动触发所有已注册事件(如读事件、写事件)的回调函数或协程任务。
// 每触发一个事件的回调,都需要减少全局待处理事件计数器(m_pendingEventCount)。
if(fd_ctx->events & READ) {
fd_ctx->triggerEvent(READ);
m_pendingEventCount;
}

if(fd_ctx->events & WRITE) {
fd_ctx->triggerEvent(WRITE);
m_pendingEventCount;
}

// 确认最终fd_ctx->events掩码中事件已被全部清除。
assert(fd_ctx->events == 0);
return true;
}

IOManager::tickle():

重写了scheduler的tickle(); 作用是检测到有空闲线程时,通过写入一个字符到管(m_tickleFds[1]) 中,唤醒那些等待任务的线程。

// 用于唤醒当前IO管理器中处于空闲等待(idle)状态的线程。
void IOManager::tickle() {
// no idle threads
//这个函数在scheduler检查当前是否有线程处于空闲状态。如果没有空闲线程,函数直接返回,不执行后续操作。
// 若当前没有任何线程处于空闲(等待事件)的状态,则无需唤醒。
// 提前返回,避免无意义的唤醒调用。
if(!hasIdleThreads()) {
return;
}

//如果有空闲线程,函数会向管道 m_tickleFds[1] 写入一个字符 "T"。这个写操作的目的是向等待在 m_tickleFds[0](管道的另一端)的线程发送一个信号,通知它有新任务可以处理了。
// 用于唤醒处于阻塞等待状态(例如调用epoll_wait())的线程
// ssize_t write(int fd, const void *buf, size_t count);
// 将缓冲区buf中最多count字节的数据写入到文件描述符fd中。
// 成功时返回实际写入的字节数(应与count相同),失败则返回-1。
int rt = write(m_tickleFds[1], "T", 1);
assert(rt == 1);
}

IOManager::stopping():

在这里插入图片描述

bool IOManager::stopping() {
uint64_t timeout = getNextTimer();

// no timers left and no pending events left with the Scheduler::stopping()
return timeout == ~0ull && m_pendingEventCount == 0 && Scheduler::stopping();
}

IOManager::idle():

这里是Scheduler的idle,IOmanager对idle的一个重写;

通常在没有任务处理时运行(或者即使当前没有任务处理,线程也会在 idle() 中持续休眠并等待新的任务。保证了在所有任务完成之前,调度器不会退出),等待和处理 I/O 事件。

在这里插入图片描述 在这里插入图片描述 在这里插入图片描述 在这里插入图片描述

// 本质是一个运行于Fiber(协程)或独立线程上的事件循环函数,负责监视并处理IO事件与定时任务。
// 该函数利用了Linux的高效I/O复用机制(epoll),并结合超时机制与协程调度,构建一个异步高效的事件驱动模型。
void IOManager::idle() {
//定义了 epoll_wait 能同时处理的最大事件数。
static const uint64_t MAX_EVENTS = 256;

// <epoll_event[]>:
// 表示管理的是一个动态数组,而不是单个对象。
// 因此,unique_ptr会调用delete[]释放数组内存
// 使用 std::unique_ptr 动态分配了一个大小为 MAX_EVENTS 的 epoll_event 数组,用于存储从 epoll_wait 获取的事件
std::unique_ptr<epoll_event[]> events(new epoll_event[MAX_EVENTS]);

while(true) {
if(debug) {
std::cout << "IOManager::idle(),run in thread: " << Thread::GetThreadId() << std::endl;
}

// 如果IOManager准备停止(stopping()返回true),则退出循环并结束idle()运行
if(stopping()) {
if(debug) {
std::cout << "name = " << getName() << " idle exits in thread: " << Thread::GetThreadId() << std::endl;
}
break;
}

// blocked at epoll_wait
// int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
// 监控一个或多个文件描述符(fd)上的事件。
// 如果指定的文件描述符发生了事件(如读、写或错误),它会返回并填充events数组。
// 若无事件发生,则在timeout毫秒后返回。
// 如果timeout设为-1,会无限期阻塞直到有事件发生。
/*
epfd:
epoll 实例的文件描述符,由 epoll_create 或 epoll_create1 创建。
此处为变量 m_epfd。
events:
指向用户分配的数组,内存用于存储返回的事件。
此处为智能指针 events.get(),指向 epoll_event 类型数组。
maxevents:
允许返回的最大事件数量,表示数组最大长度。
此处为 MAX_EVENTS (比如256)。
timeout:
等待超时时间,以毫秒为单位。
如果 timeout = -1,表示一直阻塞直到有事件发生。
如果 timeout = 0,表示立即返回(非阻塞)。
如果 timeout > 0,表示最多阻塞这么多毫秒。*/

// 返回值含义:
// > 0 表示触发事件的数量。
// = 0 表示超时,未有事件发生。
// < 0 出错,需检查errno确定错误原因。

// 存放epoll_wait调用的返回值(触发事件的数量或错误码)
int rt = 0;

// 无限循环直至epoll_wait成功返回或发生非信号中断错误
while(true) {
// 将next_timeout限制在最大5000ms(5秒)内
static const uint64_t MAX_TIMEOUT = 5000;

//获取下一个超时的定时器
uint64_t next_timeout = getNextTimer();

//获取下一个定时器的超时时间,并将其与 MAX_TIMEOUT 取较小值,避免等待时间过长。
next_timeout = std::min(next_timeout, MAX_TIMEOUT);

// std::unique_ptr通过get()方法返回其管理的原始指针(裸指针)
// 注意:此处必须提供C风格裸指针。
// C++智能指针无法直接隐式转换为原始指针,因此必须显式调用get()
//epoll_wait陷入阻塞,等待tickle信号的唤醒,
//并且使用了定时器堆中最早超时的定时器作为epoll_wait超时时间。
rt = epoll_wait(m_epfd, events.get(), MAX_EVENTS, (int)next_timeout);

// EINTR -> retry
// EINTR全称为 "Interrupted system call"(被中断的系统调用),是在Linux/Unix环境中非常常见的一种错误返回值。
// 当程序调用某些系统函数(例如epoll_wait()、select()、read()等)时,若调用过程中收到了信号(例如SIGALRM、SIGINT、SIGTERM等),系统调用可能会被中断,并立即返回-1,同时设置errno = EINTR。
// 注意:EINTR并非真正的错误,而是一种中断,提示程序“当前调用未完成,需要重新尝试”。
if(rt < 0 && errno == EINTR) {
continue;
} else {
break;
}
}

// collect all timers overdue
// 处理到期的定时任务
//用于存储超时的回调函数。
std::vector<std::function<void()>> cbs;

//用来获取所有超时的定时器回调,并将它们添加到 cbs 向量中
listExpiredCb(cbs);
if(!cbs.empty()) {
for(const auto& cb: cbs) {
// 将定时器回调调度到协程/任务队列中异步执行。
scheduleLock(cb);
}
cbs.clear();
}

// collect all events ready
// 处理epoll_wait返回的所有I/O事件
// 循环处理此次调用epoll_wait返回的rt个事件
for(int i = 0; i < rt; ++i) {
// 获取第 i 个 epoll_event,用于处理该事件。
epoll_event& event = events[i];

// tickle event
//检查当前事件是否是 tickle 事件(即用于唤醒空闲线程的事件)。
// 当其他线程添加了新任务或新监听事件时,需要立刻通知(唤醒)阻塞线程重新处理
// tickle事件(通常是一个管道或eventfd)用于唤醒阻塞在epoll_wait的线程。
// 采用边缘触发模式(EPOLLET),必须将管道内数据全部读出,否则下次不会再通知。
if(event.data.fd == m_tickleFds[0]) {
uint8_t dummy[256];

// edge triggered -> exhaust
// epoll_wait侦测到可读事件后返回,如果不及时将管道内的数据读走,下次调用epoll_wait还会持续返回(因为管道内还有数据),造成无效唤醒(busy loop)。
// 因此必须在接收到事件通知后一次性将管道内数据全部读取完毕。
// 对于一个管道,如果管道里有多个字节数据(可能是多次写入),epoll_wait只会在管道中有数据可读时触发一次事件。
// 为了确保每次事件触发都能“消费掉”管道中的所有数据,必须将管道中积累的数据一次性读完。否则,下次epoll_wait被调用时,可能无法再次触发tickle事件。

// ssize_t read(int fd, void *buf, size_t count);
// fd:文件描述符(file descriptor),表示要读取的文件、管道、socket 等的标识符。
// buf:指向缓冲区的指针,存储读取的数据。
// count:期望读取的字节数(buffer大小)。
// 返回值(ssize_t,有符号整型):
// > 0:成功读取的字节数。
// = 0:已到达文件末尾(EOF)。
// < 0:读取失败,出错信息存储在errno中
while(read(m_tickleFds[0], dummy, sizeof(dummy)) > 0);
continue;
}

// other events
//通过 event.data.ptr 获取与当前事件关联的 FdContext 指针 fd_ctx,该指针包含了与文件描述符相关的上下文信息。
// 普通事件处理逻辑:
// 获取 fd 的上下文,并加锁:
FdContext* fd_ctx = (FdContext*)event.data.ptr;
std::lock_guard<std::mutex> lock(fd_ctx->mutex);

// convert EPOLLERR or EPOLLHUP to -> read or write event
//如果当前事件是错误或挂起(EPOLLERR 或 EPOLLHUP),则将其转换为可读或可写事件(EPOLLIN 或 EPOLLOUT),以便后续处理。
// EPOLLERR:表示 fd 上发生了错误(例如 socket 出错)。
// EPOLLHUP:表示对端关闭连接(例如 socket 被对方关闭)
if(event.events & (EPOLLERR | EPOLLHUP)) {
// fd_ctx->events:该 fd 当前用户注册监听的事件集合(READ/WRITE)。
// EPOLLIN | EPOLLOUT:表示 “可读或可写” 两种事件。
// 只转化 用户关心的事件,所以要与 fd_ctx->events 做一个 按位与运算
event.events |= (EPOLLIN | EPOLLOUT) & fd_ctx->events;
}

// events happening during this turn of epoll_wait
//确定实际发生的事件类型(读取、写入或两者)。
int real_events = NONE;

// EPOLLIN 和 EPOLLOUT 是 Linux epoll 事件机制中的事件标志,用于表示你想要“监听什么类型的事件”,或者“某个文件描述符上发生了什么事件”。
// EPOLLIN 表示:这个 fd 上现在“可读”,也就是:
// 有数据可读(对于 socket/管道)
// 文件可读
// 客户端连接已就绪(对于监听 socket)
// EPOLLOUT 表示:这个 fd 上现在“可写”,也就是:
// 发送缓冲区有足够空间可以写入数据
// 判断 epoll 是否返回了读事件
if(event.events & EPOLLIN) {
// 如果 epoll_wait() 返回的 event 包含 EPOLLIN(可读),说明这个 fd 当前可以读了。
// 把 real_events 标记上 READ(框架内部自定义的枚举)。
real_events |= READ;
}

// 判断 epoll 是否返回了写事件
if(event.events & EPOLLOUT) {
real_events |= WRITE;
}

// 检查这次返回的事件是否是我们注册监听的
// 如果 real_events 中的事件我们并没有监听过,就跳过,不处理(例如系统返回了写事件,但我们没监听写)。
if((fd_ctx->events & real_events) == NONE) {
continue;
}

// delete the events that have already happened
// 删除已经触发的事件,更新 epoll 监听状态
// 例如我们监听了 READ + WRITE,但现在只触发了 READ,那么 left_events = WRITE。
//这里进行取反就是计算剩余未发送的的事件
int left_events = (fd_ctx->events & ~real_events);
int op = left_events ? EPOLL_CTL_MOD: EPOLL_CTL_DEL;

//如果left_event没有事件了那么就只剩下边缘触发了events设置了
event.events = EPOLLET | left_events;

// 构造新的事件设置,并提交更新
//根据之前计算的操作(op),调用 epoll_ctl 更新或删除 epoll 监听,如果失败,打印错误并继续处理下一个事件。
int rt2 = epoll_ctl(m_epfd, op, fd_ctx->fd, &event);
if(rt2) {
std::cerr << "idle::epoll_ctl failed: " << strerror(errno) << std::endl;
continue;
}

// schedule callback and update fdcontext and event context
//触发事件,事件的执行
if(real_events & READ) {
fd_ctx->triggerEvent(READ);
m_pendingEventCount;
}

if(real_events & WRITE) {
fd_ctx->triggerEvent(WRITE);
m_pendingEventCount;
}
}

//当前线程的协程主动让出控制权,调度器可以选择执行其他任务或再次进入 idle 状态。
Fiber::GetThis()->yield();
}
}

IOManager::onTimerInsertedAtFront():

函数的作用是在定时器被插入到最前面时,触发tickle事件,唤醒阻塞的epoll_wait回收超时的定时任务(回调cb和协程)放入协程调度器中等待调度。

// 当一个定时器被插入到定时器队列的最前面时,通知(唤醒)IOManager 的 epoll 线程,重新评估等待时间。
// onTimerInsertedAtFront() 是一个钩子,用于在插入最早定时器时立即唤醒 epoll,使得定时器精确触发。
void IOManager::onTimerInsertedAtFront() {
// 唤醒可能被阻塞的 epoll_wait 调用
tickle();
}

main.cpp

#include "ioscheduler.h"
#include <iostream>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <fcntl.h>
#include <unistd.h>
#include <sys/epoll.h>
#include <cstring>
#include <cerrno>

// 1. 构造服务器地址 159.75.118.34:80
// 2. 创建 TCP socket(非阻塞)
// 3. 调用 connect() 发起连接
// 4. epoll 注册:
// ├─ WRITE → func2():连接建立时发请求
// └─ READ → func() :收到响应时读取数据
// 5. IOManager(基于 epoll)异步调度
// 6. 网络通信完成

using namespace sylar;

// 接收缓冲区
char recv_data[4096];

// 是一个 HTTP 请求报文,表示向服务器请求主页
// GET / HTTP/1.0\\r\\n ← 请求行:GET方法,请求"/",协议HTTP/1.0
// \\r\\n ← 空行:表示没有额外请求头,请求结束
const char data[] = "GET / HTTP/1.0\\r\\n\\r\\n";

// 全局 socket fd
int sock;

void func() {
// ssize_t recv(int sockfd, void *buf, size_t len, int flags);
// sockfd: socket 文件描述符(int)
// buf: 接收缓冲区(char 数组)
// 希望读取的最大字节数(最多读 4096 字节)
// flags 参数,设置为 0 表示默认行为(阻塞读、无特殊标志)
// 返回值:
// >0 : 成功读取的字节数(可能少于 4096)
// =0 : 对端已关闭连接(FIN),表示连接结束
// <0 : 发生错误,需通过 errno 判断原因
// 从 sock 表示的 TCP 连接中尝试读取最多 4096 字节的数据
// 读取到的数据会被存入 recv_data 指向的缓冲区
// 调用会阻塞当前线程/协程,直到至少有一个字节可以读(前提是 socket 是阻塞模式);
// 如果 socket 是非阻塞,可能会立即返回 -1 并设置 errno = EAGAIN 或 EWOULDBLOCK
ssize_t n = recv(sock, recv_data, 4096, 0);
if (n > 0) {
recv_data[n] = '\\0'; // 添加终止符
std::cout << "Received:\\n" << recv_data << std::endl;
} else if (n == 0) {
std::cout << "Connection closed by peer\\n";
} else {
std::cerr << "recv failed: " << strerror(errno) << std::endl;
}
}

void func2() {
// ssize_t send(int sockfd, const void *buf, size_t len, int flags);
// sockfd: 目标 socket 的文件描述符,表示一个已连接的 TCP 连接
// buf: 指向要发送的数据缓冲区的指针
// len: 要发送的字节数,这里是 sizeof("GET / HTTP/1.0\\r\\n\\r\\n")
// flags: flags 参数,设为 0 表示默认发送行为(阻塞发送、无特殊选项)
// 返回值:
// 实际成功写入到 socket 缓冲区的字节数。
// 成功时:> 0,表示发送了多少字节;
// 失败时:< 0,表示出错,需要检查 errno
ssize_t n = send(sock, data, sizeof(data), 0);
if (n < 0) {
std::cerr << "send failed: " << strerror(errno) << std::endl;
} else {
std::cout << "Sent " << n << " bytes" << std::endl;
}
}

int main(int argc, char const* argv[]) {
IOManager manager(2);

// 创建 socket
// int socket(int domain, int type, int protocol);
// AF_INET —— 地址族(domain)
// 表示使用 IPv4 协议;
// 如果你想用 IPv6,可以写 AF_INET6;
// 对于本地通信(如进程间通信),可以使用 AF_UNIX
// SOCK_STREAM —— 套接字类型(type)
// 表示使用 面向连接的 TCP 协议
// 0 —— 协议(protocol)
// 一般设置为 0,让系统自动选择默认协议:
// 如果你选了 SOCK_STREAM,就默认是 TCP;
// 如果你选了 SOCK_DGRAM,就默认是 UDP
// 返回值:一个整数,表示 socket 的文件描述符(fd)
// 成功:返回一个正整数(文件描述符),用于后续的:connect()、send()、recv() 等操作;
// 失败:返回 -1,并设置 errno 来指示错误原因(如文件描述符耗尽、权限问题等)
sock = socket(AF_INET, SOCK_STREAM, 0);

// sockaddr_in:IPv4 专用的 socket 地址结构
sockaddr_in server;

// server.sin_family = AF_INET;
// 指定地址族为 IPv4
server.sin_family = AF_INET;

// 设置目标端口为 80(HTTP 默认端口)。
// htons:将主机字节序转换为网络字节序(大端)
server.sin_port = htons(8080);

// 将 IP 地址字符串 "159.75.118.34" 转换为整数,并赋值给 s_addr 字段。
// inet_addr 返回的是网络字节序的 IPv4 地址。
server.sin_addr.s_addr = inet_addr("127.0.0.1");
// server.sin_addr.s_addr = inet_addr("10.1.12.15");

// 设置 socket 为非阻塞模式
fcntl(sock, F_SETFL, O_NONBLOCK);

// 发起连接(非阻塞 connect)
// 因为 socket 是非阻塞的,这通常会立即返回 -1,errno = EINPROGRESS,表示“连接正在建立中”
int rt = connect(sock, (struct sockaddr*)&server, sizeof(server));

if (rt == 0) {
std::cout << "Connect immediately succeeded (rare)\\n";
func2(); // 如果立即连接成功,直接调用写回调
} else if (rt < 0 && errno == EINPROGRESS) {
std::cout << "Connecting…\\n";
manager.addEvent(sock, IOManager::WRITE, &func2); // 等连接完成再写
} else {
std::cerr << "connect error: " << strerror(errno) << std::endl;
close(sock);
return 1;
}

// manager.addEvent(sock, IOManager::WRITE, &func2);
// 注册读事件(无论写是否成功)
manager.addEvent(sock, IOManager::READ, &func);

std::cout << "event has been posted\\n\\n";
return 0;
}

测试结果

如果你没有运行任何 HTTP 服务器,你可以临时起一个:

python3 -m http.server 8080

然后再次运行你的程序,就能收到 HTTP 响应,不再是 Connection refused

这个命令的作用是:在本地 启动一个 HTTP 服务器,它会监听本地的 8080 端口,并准备接收客户端的连接请求。

server.sin_addr.s_addr = inet_addr("127.0.0.1");
server.sin_port = htons(8080);

表示:客户端要连接本机的 127.0.0.1:8080

如果这时本地根本没有任何程序监听这个端口(8080),那么:

操作系统一看到客户端来连接 127.0.0.1:8080,

就发现:没人监听这个端口啊!

所以就立刻返回一个错误:

connect error: Connection refused

表示连接请求被“系统”拒绝了。 在这里插入图片描述

在这里插入图片描述 在这里插入图片描述

函数调用图

在这里插入图片描述 在这里插入图片描述 在这里插入图片描述 在这里插入图片描述 在这里插入图片描述 在这里插入图片描述

之后我会持续更新,如果喜欢我的文章,请记得一键三连哦,点赞关注收藏,你的每一个赞每一份关注每一次收藏都将是我前进路上的无限动力 !!!↖(▔▽▔)↗感谢支持!

赞(0)
未经允许不得转载:网硕互联帮助中心 » 协程库(模块的详解与代码分析)ioscheduler类(io+scheduler的结合)篇
分享到: 更多 (0)

评论 抢沙发

评论前必须登录!