[C++项目] Web Server(6):epoll 实现 I/O 多路复用

poll虽然对selcet的部分缺陷做出了改进:

  • 取消了文件描述符fd的最大数量限制
  • fd集合不能重用的问题

但是依然有着共同的缺陷:

  • 每次调用时都需要将fd集合从用户态拷贝至内核态
  • 内核态每次需要主动遍历集合所有fd才能知道需要检测哪些文件
  • 返回时无法得知具体是哪几个fd收到了数据,每次也都需要遍历集合中所有的fd

epoll则进一步解决了这些缺陷:

  • 它直接在内核中构造结构体用于存储fd集合,省去了每次的拷贝
  • 它不再使用线性的数组方式储存fd,而是采用了红黑树成员rbr来储存,红黑树节点上注册有回调函数,事件到来后执行回调函数
  • 将返回的结果存放在双向链表成员rd_list中,从而使得用户可以直接得到获得数据的文件而无需遍历

关键API

创建epoll实例

#include <sys/epoll.h>
// 创建一个新的epoll实例。在内核中创建了一个结构体类型的数据
// 这个结构体中有两个比较重要的数据,一个是需要检测的文件描述符的信息(红黑树)
// 还有一个是就绪列表,存放检测到数据发送改变的文件描述符信息(双向链表)。
int epoll_create(int size);
  - 参数:
    size : 目前没有意义了。随便写一个数,必须大于0
  - 返回值:
    -1 : 失败
    >0 : 文件描述符,用于操作epoll实例

epoll实例管理

typedef union epoll_data {
  void *ptr;
  int fd;
  uint32_t u32;
  uint64_t u64;
} epoll_data_t;

struct epoll_event {
  uint32_t events ;  /* Epoll events */
  epoll_data_t data; /* User data variable */
};

常见的Epoll events:
  - EPOLLIN
  - EPOLLOUT
  - EPOLLERR
  - EPOLLET //设置为ET模式

//对epoll实例进行管理:添加文件描述符信息,删除信息,修改信息
int epoll_ctl(int epfd, int op,int fd, struct epoll_event *event) ;
- 参数:
  - epfd: epoll实例对应的文件描述符
  - op: 要进行什么操作
    - EPOLL_CTL_ADD: 添加
    - EPOLL_CTL_MOD: 修改
    - EPOLL_CTL_DEL:删除
  - fd: 要检测的文件描述符
  - event: 检测文件描述符什么事情

检测epoll

// 检测函数
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
- 参数:
  - epfd : epoll实例对应的文件描述符
  - events : 传出参数,保存了发送了变化的文件描述符的信息
  - maxevents : 第二个参数结构体数组的大小
  - timeout : 阻塞时间
    - 0: 不阻塞
    - -1 : 阻塞,直到检测到fd数据发生变化,解除阻塞
    - > 0: 阻塞的时长(毫秒)

- 返回值:
  - 成功,返回发送变化的文件描述符的个数> 0
  - 失败-1

epoll的工作模式

工作模式是通过epoll event来进行设置的,见上述对于epoll event的介绍

LT模式(水平触发)

LT (level-triggered)是缺省的工作方式,并且同时支持block和no-block socket。

在这种做法中,内核告诉你一个文件描述符是否就绪了 ,然后你可以对这个就绪的fd进行IO操作。

如果你不对该文件描述作任何操作,或是未完全读完,即只要读缓冲区还有数据,内核还是会在下一次调用时继续通知你。

LT模式举例

假设委托内核检测读事件 -> 则内核检测fd的读缓冲区
读缓冲区有数据 -> epoll检测到了会给用户通知
a. 用户不读数据,数据一直在缓冲区,epoll会一直通知
b. 用户只读了一部分数据,epoll会通知
C. 缓冲区的数据读完了,不通知

ET模式(边沿触发)

ET (edge-triggered)是高速工作方式,只支持no-block socket。在这种模式下,当描述符从未就绪变为就绪时,内核通过epoll告诉你。

然后它会假设你知道文件描述符已经就绪,并且不会再为那个文件描述符发送更多的就绪通知,直到你做了某些操作导致那个文件描述符不再为就绪状态了。

但是请注意,如果一直不对这个fd作IO操作(从而导致它再次变成未就绪),内核不会发送更多的通知(only once)。

ET模式在很大程度上减少了epoll 事件被重复触发的次数,因此效率要比LT模式高。

但是 epoll 工作在ET模式的时候,必须使用非阻塞套接口,以避免由于一个文件句柄的阻塞读/阻塞写操作把处理多个文件描述符的任务饿死。这是由于在该模式下读取数据时必须读完每个文件的数据,因为内核不会再次通知。而读操作如果不设置未未阻塞,则会导致其他任务无法运行。

ET模式举例

假设委托内核检测读事件 -> 则内核检测fd的读缓冲区
读缓冲区有数据 -> epoll检测到了会给用户通知
a.用户不读数据,数据一致在缓冲区中,epoll下次检测的时候就不通知了
b.用户只读了一部分数据,epoll不通知
C.缓冲区的数据读完了,不通知

EPOLLONESHOT事件

即使可以使用ET模式,一个socket上的某个事件还是可能被触发多次。

比如一个线程在读取完某个socket上的数据后开始处理这些数据,而在数据的处理过程中该socket上又有新数据可读 (EPOLLIN再次被触发),此时另外一个线程被唤醒来读取这些新的数据。于是就出现了两个线程同时操作一个socket的局面。

一个socket连接在任一时刻都只被一个线程处理, 可以使用epollEPOLLONESHOT事件实现。

对于注册了EPOLLONESHOT事件的文件描述符,操作系统最多触发其上注册的一个可读、可写或者异常事件,且只触发一次,除非我们使用epoll_ctl函数重置该文件描述符上注册的EPOLLONESHOT事件。

这样,当一个线程在处理某个socket时,其他线程是不可能有机会操作该socket的。

但反过来思考,注册了EPOLLONESHOT事件的socket一旦被某个线程处理完毕,该线程就应该立即重置这个socket上的EPOLLONESHOT事件,以确保这个socket下一次可读时,其EPOLLIN事件能被触发,进而让其他工作线程有机会继续处理这个socket。

服务器端代码

LT 模式

#include <stdio.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <stdlib.h>
#include <string.h>
#include <sys/epoll.h>

int main(){
    int lfd = socket(PF_INET,SOCK_STREAM,0);

    struct sockaddr_in saddr;
    saddr.sin_port = htons(9999);
    saddr.sin_family = AF_INET;
    saddr.sin_addr.s_addr = INADDR_ANY;

    bind(lfd,(struct sockaddr *)&saddr,sizeof(saddr));

    listen(lfd, 8);

    // 创建epoll实例
    int epfd = epoll_create(100);

    // 创建检测事件
    struct epoll_event e;
    e.data.fd = lfd;
    e.events = EPOLLIN;

    // 设置检测文件
    epoll_ctl(epfd,EPOLL_CTL_ADD,lfd,&e);

    struct epoll_event revents[1024];
    while (1)
    {
        int ret = epoll_wait(epfd,revents,1024,-1);
        if(ret==-1){
            perror("select");
            exit(-1);
        }
        else if(ret==0) continue;
        else if(ret>0){
            for(int i =0;i<ret;i++){
                if(revents[i].data.fd == lfd){
                    struct sockaddr_in caddr;
                    int len = sizeof(caddr);
                    int cfd = accept(lfd,(struct sockaddr *)&caddr, &len);

                    e.data.fd = cfd;
                    e.events = EPOLLIN | EPOLLOUT;  //同时关注读写事件
                    epoll_ctl(epfd,EPOLL_CTL_ADD,cfd,&e);
                }
                else {
                    if(revents[i].events & EPOLLIN){
                       char buf[5] = {0};
                        int len = read(revents[i].data.fd,buf,sizeof(buf));
                        if(len==-1){
                            perror("read");
                            exit(-1);
                        }
                        else if(len==0){
                            printf("客户端 %d 关闭\n",revents[i].data.fd);
                            close(revents[i].data.fd);
                            epoll_ctl(epfd,EPOLL_CTL_DEL,revents[i].data.fd,NULL);
                        }
                        else if(len>0){
                            printf("读取数据:%s\n",buf);
                            write(revents[i].data.fd,buf,strlen(buf)+1);
                        } 
                    }
                    else if(revents[i].events & EPOLLOUT){
                        continue;
                    }
                }
            }
        }
        
    }
    close(lfd);
    close(epfd);
}

这种情况下,每次未读取完的数据仍旧会通知用户,每次接着读取,因此可以作用于阻塞的套接字

ET模式

#include <stdio.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <stdlib.h>
#include <string.h>
#include <sys/epoll.h>
#include <fcntl.h>
#include <errno.h>

int main(){
    int lfd = socket(PF_INET,SOCK_STREAM,0);

    struct sockaddr_in saddr;
    saddr.sin_port = htons(9999);
    saddr.sin_family = AF_INET;
    saddr.sin_addr.s_addr = INADDR_ANY;

    bind(lfd,(struct sockaddr *)&saddr,sizeof(saddr));

    listen(lfd, 8);

    // 创建epoll实例
    int epfd = epoll_create(100);

    // 创建检测事件
    struct epoll_event e;
    e.data.fd = lfd;
    e.events = EPOLLIN;

    // 设置检测文件
    epoll_ctl(epfd,EPOLL_CTL_ADD,lfd,&e);

    struct epoll_event revents[1024];
    while (1)
    {
        int ret = epoll_wait(epfd,revents,1024,-1);
        if(ret==-1){
            perror("select");
            exit(-1);
        }
        else if(ret==0) continue;
        else if(ret>0){
            for(int i =0;i<ret;i++){
                if(revents[i].data.fd == lfd){
                    struct sockaddr_in caddr;
                    int len = sizeof(caddr);
                    int cfd = accept(lfd,(struct sockaddr *)&caddr, &len);

                    //设置文件描述符非阻塞
                    int flag = fcntl(cfd,F_GETFL);
                    flag |= O_NONBLOCK;
                    fcntl(cfd,F_SETFL,flag);

                    e.data.fd = cfd;
                    e.events = EPOLLIN | EPOLLET;  // 设置为边沿触发 ET模式
                    epoll_ctl(epfd,EPOLL_CTL_ADD,cfd,&e);
                }
                else {
                    if(revents[i].events & EPOLLIN){

                        // 循环读取出所有数据
                        char buf[5] = {0};
                        int len = 0;
                        while ((len = read(revents[i].data.fd,buf,sizeof(buf)))>0)
                        {
                            printf("读取数据:%s\n",buf);
                            write(revents[i].data.fd,buf,strlen(buf)+1);
                        }
                        if(len==-1){
                            // 非阻塞模式下 读取完所有数据后也会返回-1 此时错误号为EAGAIN
                            if(errno == EAGAIN){
                                printf("data over\n");
                            }
                            else{
                                perror("read");
                                exit(-1);
                            }
                            
                        }
                        else if(len==0){
                            printf("客户端 %d 关闭\n",revents[i].data.fd);
                            close(revents[i].data.fd);
                            epoll_ctl(epfd,EPOLL_CTL_DEL,revents[i].data.fd,NULL);
                        }
                    }
                    else if(revents[i].events & EPOLLOUT){
                        continue;
                    }
                }
            }
        }
        
    }
    close(lfd);
    close(epfd);
}

先为文件描述符设置阻塞属性,然后将检测事件设置为ET模式

读取时循环读取文件中的所有数据