第17章 优于select的epoll

17.1 epoll理解及应用

select不管怎么优化,应用程序也无法同时接入上百个客户端。所以select不太适合以Web服务器端开发为主流的现代开发环境,所以要学习Linux下的epoll。

1.基于select的I/O复用技术速度慢的原因

  • 调用select函数后常见的针对所有文件描述符的循环语句
  • 每次调用select函数时都需要向该函数传递监视对象信息

select函数并不是把发生变化的文件描述符集中起来,而是监视监视对象fd_set变量的变化所以要循环扫描,而且调用select函数前还要复制并保存原有信息,每次调用select函数都要传入监视对象信息。

循环并不是降低性能最大的问题,而是每次向select传递监视对象信息会对性能产生巨大影响。 套接字是由操作系统管理,监视监视套接字变化需要操作系统的帮助这会造成巨大的开销,解决办法:

仅向操作系统传递一次监视对象信息,监视范围或内容发生变化时只通知发生变化的事项

操作系统支持的前提下可以实现,Linux支持的是epoll,Windows支持的是IOCP。

2.select的优点

  • 服务端接入者少
  • 程序应具有兼容性

3.实现epoll时必要的函数和结构体

epoll优点:

  • 无需编写以监视状态变化为目的的针对所有文件描述符的循环语句
  • 调用对应于select函数的epoll_wait函数时无需每次传递监视对象信息

epoll_create:创建保存epoll文件描述符的空间 epoll_ctl:向空间注册并注销文件描述符 epoll_wait:与select函数类似,等待文件描述符发生变化

select为了保存监视目标的文件描述符,有fd_set变量,而epoll方式是操作系统负责保存监视对象文件描述符,需要向操作系统申请创建保存文件描述符的空间,使用epoll_create

select通过fd_set变量查看监视对象的状态变化,而epoll通过epoll_event将发生变化的文件描述符单独集中在一起。

struct epoll_event
{
    __uint32_t events;
    epoll_data_t data;
}

typedef union epoll_data
{
    void * ptr;
    int fd;
    __uint32_t u32;
    __uint64_t u64;
}epoll_data_t;

声明足够大的epoll_event结构体数组后,传递给epoll_wait函数,发生变化的文件描述符信息将被填入该数组。无需向select函数那样针对所有文件描述符进行循环。

4.epoll_create

#include<sys/epoll.h>
int epoll_create(int size);
成功时返回epoll文件描述符,失败返回-1

epoll_create函数创建的文件描述符保存空间称为“epoll例程”,通过参数传递的size只是给操作系统的参考,并不会最终决定epoll例程大小。

Linux2.6.8之后的内核完全忽略传入的参数size,但是本书还未到达2.6.8所以必须传递

epoll_create创建的资源与套接字相同,也由操作系统管理,终止时也需要close。

5.epoll_ctl

生成例程后要向里面注册监视对象文件描述符

#include<sys/epoll.h>
int epoll_ctl(int epfd,int op,int fd,struct epoll_event * event);
成功返回0,失败时返回-1
epfd        用于注册监视对象的epoll例程的文件描述符
op          用于指定监视对象的添加、删除或更改等操作
fd          需要注册的监视对象文件描述符
event       监视对象的事件类型

第二个参数的选项:

  • EPOLL_CTL_ADD:将文件描述符注册到epoll例程
  • EPOLL_CTL_DEL:从epoll例程中删除文件描述符
  • EPOLL_CTL_MOD:更改注册的文件描述符的关注事件
epoll_ctl(A,EPOLL_CTL_ADD,B,C);

含义:epoll例程A的注册文件描述符B,该文件描述符主要是为了监视参数C中事件

当用EPOLL_CTL_DEL删除文件描述符时最后一个参数可以填写NULL,Linux2.6.9之前的内核不允许传递NULL。虽然被忽略到但也要传递epoll_event地址值。

不是说epoll_event结构体用来保存发生事件的文件描述符集合吗。但是epoll_event也可以注册文件描述符,用于注册关注的事件。

struct epoll_event event;
...
event.events=EPOLLIN;//发生要读取数据的情况时
event.data.fd=sockfd;
epoll_ctl(epfd,EPOLL_CTL_ADD,sockfd,&event)
...

上述代码将sockfd注册到epoll例程epfd中,并在需要读取数据的情况下产生相应事件。 可以指定的事件类型

  • EPOLLIN:需要读取数据的情况
  • EPOLLOUT:输出缓冲为空,可以立即发送数据的情况
  • EPOLLPRI:收到OOB数据的情况
  • EPOLLRDHUP:断开连接或半关闭的情况,这在边沿触发方式非常有用。
  • EPOLLERR:发生错误的情况
  • EPOLLET:以边沿触发的方式得到事件通知
  • EPOLLONESHOT:发生一次事件后,相应的文件描述符不再收到事件通知给,因此需要向epoll_ctl函数的第二个参数传输EPOLL_CTL_MOD,再次设置事件。

上述参数可以用位或运算传递多个参数。

6.epoll_wait

最后调用该函数,等待事件发生

#include<sys/epoll.h>
int epoll_wait(int epfd,struct epoll_event * events,int maxevents,int timeout);
成功时返回发生事件的文件描述符,失败时返回-1
epfd         表示监视事件发生的epoll例程的文件描述符
events       保存发生事件的文件描述符集合的结构体的地址值
maxevents    第二个参数可以保存的最大事件数
timeout      以1/1000秒为单位的等待时间,传递-1时,一直等到事件发生

第二个参数所指的缓冲需要动态分配

int event_cnt;
struct epoll_event * ep_events;
...
ep_events = malloc(sizeof(struct epoll_event)*EPOLL_SIZE)
...
event_cnt = epoll_wait(epfd,ep_events,EPOLL_SIZE,-1);
...

7.基于epoll的回声服务器端

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <sys/epoll.h>

#define BUF_SIZE 100
#define EPOLL_SIZE 50//预定义发生事件最大保存数
void error_handling(char *buf);

int main(int argc, char *argv[])
{
	int serv_sock, clnt_sock;
	struct sockaddr_in serv_adr, clnt_adr;
	socklen_t adr_sz;
	int str_len, i;
	char buf[BUF_SIZE];

	struct epoll_event *ep_events;
	struct epoll_event event;
	int epfd, event_cnt;

	if(argc!=2) {
		printf("Usage : %s <port>\n", argv[0]);
		exit(1);
	}

	serv_sock=socket(PF_INET, SOCK_STREAM, 0);//创建TCP服务端套接字
	memset(&serv_adr, 0, sizeof(serv_adr));//结构体内部全部设置为0
	serv_adr.sin_family=AF_INET;//设置地址族
	serv_adr.sin_addr.s_addr=htonl(INADDR_ANY);//可接受本主机任意IP消息
	serv_adr.sin_port=htons(atoi(argv[1]));//设置端口
	
	if(bind(serv_sock, (struct sockaddr*) &serv_adr, sizeof(serv_adr))==-1)//给服务端套接字注册地址
		error_handling("bind() error");
	if(listen(serv_sock, 5)==-1)//设置为可监听状态
		error_handling("listen() error");

	epfd=epoll_create(EPOLL_SIZE);//创建保存文件描述符的例程
	ep_events=malloc(sizeof(struct epoll_event)*EPOLL_SIZE);//接收事件发生文件描述符的结构体的空间

	event.events=EPOLLIN;//发生可读取数据的事件
	event.data.fd=serv_sock;	//监视服务端套接字
	epoll_ctl(epfd, EPOLL_CTL_ADD, serv_sock, &event);//注册到epfd例程

	while(1)
	{
		event_cnt=epoll_wait(epfd, ep_events, EPOLL_SIZE, -1);//等待监视的文件描述符事件发生
		if(event_cnt==-1)
		{
			puts("epoll_wait() error");
			break;
		}

		for(i=0; i<event_cnt; i++)//处理发生事件的文件描述符
		{
			if(ep_events[i].data.fd==serv_sock)//服务端套接字发生事件说明有新的连接请求
			{
				adr_sz=sizeof(clnt_adr);
				clnt_sock=
					accept(serv_sock, (struct sockaddr*)&clnt_adr, &adr_sz);
				event.events=EPOLLIN;//发生可读取数据的情况
				event.data.fd=clnt_sock;
				epoll_ctl(epfd, EPOLL_CTL_ADD, clnt_sock, &event);
				printf("connected client: %d \n", clnt_sock);
			}
			else
			{
					str_len=read(ep_events[i].data.fd, buf, BUF_SIZE);
					if(str_len==0)    // close request!
					{
						epoll_ctl(
							epfd, EPOLL_CTL_DEL, ep_events[i].data.fd, NULL);//删除掉注册的文件描述符
						close(ep_events[i].data.fd);
						printf("closed client: %d \n", ep_events[i].data.fd);
					}
					else
					{
						write(ep_events[i].data.fd, buf, str_len);    // echo!
					}
	
			}
		}
	}
	close(serv_sock);
	close(epfd);
	return 0;
}

void error_handling(char *buf)
{
	fputs(buf, stderr);
	fputc('\n', stderr);
	exit(1);
}

17.2 条件触发和边沿触发

1.条件触发和边沿触发的区别在于发生事件的时间点

假设有一个文件描述符是关注缓冲区是否有可读数据 条件触发: 只要缓冲区有数据,就会一直通知该事件(注册到发生变化的文件描述符) 边沿触发: 输入缓冲区收到数据时仅注册一次事件。就算缓冲区还有数据也不会再注册该事件。

2.掌握条件触发的事件特性

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <fcntl.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <sys/epoll.h>

#define BUF_SIZE 4
#define EPOLL_SIZE 50
void error_handling(char *buf);

int main(int argc, char *argv[])
{
	int serv_sock, clnt_sock;
	struct sockaddr_in serv_adr, clnt_adr;
	socklen_t adr_sz;
	int str_len, i;
	char buf[BUF_SIZE];

	struct epoll_event *ep_events;
	struct epoll_event event;
	int epfd, event_cnt;

	if(argc!=2) {
		printf("Usage : %s <port>\n", argv[0]);
		exit(1);
	}

	serv_sock=socket(PF_INET, SOCK_STREAM, 0);
	memset(&serv_adr, 0, sizeof(serv_adr));
	serv_adr.sin_family=AF_INET;
	serv_adr.sin_addr.s_addr=htonl(INADDR_ANY);
	serv_adr.sin_port=htons(atoi(argv[1]));
	
	if(bind(serv_sock, (struct sockaddr*) &serv_adr, sizeof(serv_adr))==-1)
		error_handling("bind() error");
	if(listen(serv_sock, 5)==-1)
		error_handling("listen() error");

	epfd=epoll_create(EPOLL_SIZE);
	ep_events=malloc(sizeof(struct epoll_event)*EPOLL_SIZE);

	event.events=EPOLLIN;
	event.data.fd=serv_sock;	
	epoll_ctl(epfd, EPOLL_CTL_ADD, serv_sock, &event);

	while(1)
	{
		event_cnt=epoll_wait(epfd, ep_events, EPOLL_SIZE, -1);
		if(event_cnt==-1)
		{
			puts("epoll_wait() error");
			break;
		}

		puts("return epoll_wait");//y验证是否发生事件
		for(i=0; i<event_cnt; i++)
		{
			if(ep_events[i].data.fd==serv_sock)
			{
				adr_sz=sizeof(clnt_adr);
				clnt_sock=accept(serv_sock, (struct sockaddr*)&clnt_adr, &adr_sz);
				event.events=EPOLLIN;
				event.data.fd=clnt_sock;
				epoll_ctl(epfd, EPOLL_CTL_ADD, clnt_sock, &event);
				printf("connected client: %d \n", clnt_sock);
			}
			else
			{
					str_len=read(ep_events[i].data.fd, buf, BUF_SIZE);
					if(str_len==0)    // close request!
					{
						epoll_ctl(epfd, EPOLL_CTL_DEL, ep_events[i].data.fd, NULL);
						close(ep_events[i].data.fd);
						printf("closed client: %d \n", ep_events[i].data.fd);
					}
					else
					{
						write(ep_events[i].data.fd, buf, str_len);    // echo!
					}
			}
		}
	}
	close(serv_sock);
	close(epfd);
	return 0;
}

void error_handling(char *buf)
{
	fputs(buf, stderr);
	fputc('\n', stderr);
	exit(1);
}

运行一次代码很可以看出,发一条消息发生了多次事件,因为接收缓冲区最多接受4字节,每接受一次就触发一次事件。 那么如何改成边沿触发:event.events=EPOLLIN|EPOLLET

前面章节的select模型使用的是条件触发方式。

3.边缘触发的服务器端实现中必知的两点

  • 通过errno变量验证错误原因
  • 为了完成非阻塞I/O,更该套接字特性

套接字相关函数一般通过返回-1通知发生了错误,但是不知道错误产生原因,所以Linux声明了如下全局变量:

int errno;//需要引进error.h头文件

每次函数发生错误时都返回不同的值。 例如:

read函数发现输入缓冲中没有数据可读返回-1,errno中会保存EAGAIN常量。

如何把套接字改成非阻塞状态

#include<fcntl.h>
int fcntl(int filedes,int cmd,...);
成功时返回cmd参数相关值,失败时返回-1
filedes        属性更改目标的文件描述符
cmd            表示函数调用的目的

如果向第二个参数传递F_GETFL,可以获得第一个参数所指的文件描述符属性,反之如果传递F_SETFL,可以更改文件描述符属性。 把套接字改成非阻塞模式

int flag = fcntl(fd,F_GETFL,0);
fcntl(fd,F_SETFL,flag|O_NONBLOCK);

当调用read和write时,无论是否存在数据,都不会阻塞。

4.实现边缘触发的回声服务器端

为何需要用errno确认错误原因?

边缘触发中,接收数据仅注册一次事件,因为这种特性一旦发生输入相关的事件,就应该读取输入缓冲中全部数据,因此需要先验证缓冲区是否为空,read返回-1,errno中的值为EAGAIN,说明没有数据可读。 为何把套接字改成非阻塞模式? 边沿触发中,以阻塞方式工作的read和write可能会引起服务端长时间停顿,所以边缘触发情况下一定要采用非阻塞。

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <fcntl.h>
#include <errno.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <sys/epoll.h>

#define BUF_SIZE 4
#define EPOLL_SIZE 50
void setnonblockingmode(int fd);
void error_handling(char *buf);

int main(int argc, char *argv[])
{
	int serv_sock, clnt_sock;
	struct sockaddr_in serv_adr, clnt_adr;
	socklen_t adr_sz;
	int str_len, i;
	char buf[BUF_SIZE];

	struct epoll_event *ep_events;
	struct epoll_event event;
	int epfd, event_cnt;

	if(argc!=2) {
		printf("Usage : %s <port>\n", argv[0]);
		exit(1);
	}

	serv_sock=socket(PF_INET, SOCK_STREAM, 0);
	memset(&serv_adr, 0, sizeof(serv_adr));
	serv_adr.sin_family=AF_INET;
	serv_adr.sin_addr.s_addr=htonl(INADDR_ANY);
	serv_adr.sin_port=htons(atoi(argv[1]));
	
	if(bind(serv_sock, (struct sockaddr*) &serv_adr, sizeof(serv_adr))==-1)
		error_handling("bind() error");
	if(listen(serv_sock, 5)==-1)
		error_handling("listen() error");

	epfd=epoll_create(EPOLL_SIZE);
	ep_events=malloc(sizeof(struct epoll_event)*EPOLL_SIZE);

	setnonblockingmode(serv_sock);
	event.events=EPOLLIN;
	event.data.fd=serv_sock;	
	epoll_ctl(epfd, EPOLL_CTL_ADD, serv_sock, &event);

	while(1)
	{
		event_cnt=epoll_wait(epfd, ep_events, EPOLL_SIZE, -1);
		if(event_cnt==-1)
		{
			puts("epoll_wait() error");
			break;
		}

		puts("return epoll_wait");
		for(i=0; i<event_cnt; i++)
		{
			if(ep_events[i].data.fd==serv_sock)
			{
				adr_sz=sizeof(clnt_adr);
				clnt_sock=accept(serv_sock, (struct sockaddr*)&clnt_adr, &adr_sz);
				setnonblockingmode(clnt_sock);
				event.events=EPOLLIN|EPOLLET;
				event.data.fd=clnt_sock;
				epoll_ctl(epfd, EPOLL_CTL_ADD, clnt_sock, &event);
				printf("connected client: %d \n", clnt_sock);
			}
			else
			{
					while(1)//要把数据全都读完才能退出
					{
						str_len=read(ep_events[i].data.fd, buf, BUF_SIZE);
						if(str_len==0)    // close request!
						{
							epoll_ctl(epfd, EPOLL_CTL_DEL, ep_events[i].data.fd, NULL);
							close(ep_events[i].data.fd);
							printf("closed client: %d \n", ep_events[i].data.fd);
							break;
						}
						else if(str_len<0)
						{
							if(errno==EAGAIN)//缓冲区没数据退出
								break;
						}
						else
						{
							write(ep_events[i].data.fd, buf, str_len);    // echo!
						}
				}
			}
		}
	}
	close(serv_sock);
	close(epfd);
	return 0;
}

void setnonblockingmode(int fd)
{
	int flag=fcntl(fd, F_GETFL, 0);
	fcntl(fd, F_SETFL, flag|O_NONBLOCK);
}
void error_handling(char *buf)
{
	fputs(buf, stderr);
	fputc('\n', stderr);
	exit(1);
}

5.条件触发和边缘触发孰优孰劣?

边缘触发可以分离接收数据和处理数据的时间点。

图中运行流程:

  • 服务端分别从客户端A、B、C接收数据
  • 服务端按照A、B、C的顺序重新组合收到的数据
  • 组合的数据将发给任意主机

要想完成上述过程,如果程序按以下流程执行并不是难事

  • 客户端按照A、B、C的顺序连接服务器端,并依次向服务端发送数据
  • 需要接受数据的客户端应在客户端A、B、C之前连接到服务器端并等待

但是现实情况往往是:

  • 客户端C和B正在向服务端发送数据,但是A还没连接到服务器
  • 客户端A、B、C乱序发送数据
  • 服务器端已收到数据,但要接收数据的目标还没连接到服务端

条件触发无法区分数据接收和处理吗? 是可以的,但是在输入缓冲接收到数据的情况下,如果不读取,则每次调用epoll_wait函数都会产生相应事件,事件数还会累加,服务器能承受的住吗? 现实就是不可能。 条件触发和边缘触发的区别应该从服务器端实现模型角度谈论,因此考虑谁快谁慢并没有意义。


本文章使用limfx的vscode插件快速发布