【zaver源码剖析】基于epoll的非阻塞Http Server
Zaver是一个来自十年前的http项目,全篇使用C语言编写,代码比较简单,是一个非工业级的轮子。最近回顾两年前的文章,发现有一些坑没有填完,比如计算机网络的应用层(http等),见计算机网络基础理论:自底向上方法,Linux底层API关于epoll使用也没有相关demo,见Linux操作系统:网络编程,zaver基本上线程池、日志、epoll都是类Nginx风格的最简封装,学习成本较低,但也比一般demo的架构更齐全,作为一个epoll小轮子补充是恰到好处的。
Forked项目地址:https://github.com/EdenMoxe/zaver
日志输出
若干宏即可实现标准错误输出: 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
1
2
3
4log_info("%d has ten %d", 10, 1);
//输出:
10 has ten 1
第一部分:底层数据结构设计类
包含线程池、优先队列、定时器、链表设计等;
线程池
线程池架构可从.h看出: 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
extern "C" {
typedef struct zv_task_s { //任务链表
void (*func)(void *);
void *arg;
struct zv_task_s *next;
} zv_task_t;
typedef struct {
pthread_mutex_t lock; //锁
pthread_cond_t cond; //队列唤醒
pthread_t *threads; //线程对象
zv_task_t *head; //任务列表头结点(dummy head)
int thread_count; //线程数
int queue_size; //任务队列大小
int shutdown; //线程池关闭模式,1立刻关闭,2等待所有任务执行完成再关闭
int started; //worker占用数,即已经消费的线程数
} zv_threadpool_t;
//错误码定义
typedef enum {
zv_tp_invalid = -1,
zv_tp_lock_fail = -2,
zv_tp_already_shutdown = -3,
zv_tp_cond_broadcast = -4,
zv_tp_thread_fail = -5,
} zv_threadpool_error_t;
zv_threadpool_t *threadpool_init(int thread_num);
int threadpool_add(zv_threadpool_t *pool, void (*func)(void *), void *arg);
int threadpool_destroy(zv_threadpool_t *pool, int gracegul);
}
具体实现如下。
threadpool_init
1 | zv_threadpool_t *threadpool_init(int thread_num) { |
定义worker消费对象
worker对象负责消费一条线程,执行任务链表中的任务,本质是一个可被信号量通知或者变量shutdown中断的循环:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45static void *threadpool_worker(void *arg) { //参数是线程池对象,包括任务队列、线程池等
if (arg == NULL) {
log_err("arg should be type zv_threadpool_t*");
return NULL;
}
zv_threadpool_t *pool = (zv_threadpool_t *)arg;
zv_task_t *task;
while (1) {
pthread_mutex_lock(&(pool->lock));
/* Wait on condition variable, check for spurious wakeups. */
while ((pool->queue_size == 0) && !(pool->shutdown)){ //任务列表为空,
pthread_cond_wait(&(pool->cond), &(pool->lock));
}
if (pool->shutdown == immediate_shutdown) {
break;
} else if ((pool->shutdown == graceful_shutdown) && pool->queue_size == 0) {
break;
}
task = pool->head->next; //取一个非空任务
if (task == NULL) {
pthread_mutex_unlock(&(pool->lock));
continue;
}
pool->head->next = task->next; //弹出任务链表
pool->queue_size--;
pthread_mutex_unlock(&(pool->lock));
(*(task->func))(task->arg); //在worker中执行任务
/* TODO: memory pool */
free(task); //析构任务内存
}
pool->started--;
pthread_mutex_unlock(&(pool->lock)); //如果收到shutdown指令,可能break掉,需解锁
pthread_exit(NULL);
return NULL;
}
threadpool_add
函数指针放入队列: 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43int threadpool_add(zv_threadpool_t *pool, void (*func)(void *), void *arg) {
int rc, err = 0;
if (pool == NULL || func == NULL) {
log_err("pool == NULL or func == NULL");
return -1;
}
//注意:获取不到锁不会返回错误码,而是阻塞,这里指的是锁没初始化或者反复加锁才返回非0
if (pthread_mutex_lock(&(pool->lock)) != 0) {
log_err("pthread_mutex_lock");
return -1;
}
if (pool->shutdown) {
err = zv_tp_already_shutdown;
goto out;
}
// TODO: use a memory pool
zv_task_t *task = (zv_task_t *)malloc(sizeof(zv_task_t));
if (task == NULL) {
log_err("malloc task fail");
goto out;
}
// TODO: use a memory pool
task->func = func;
task->arg = arg;
task->next = pool->head->next; //头插法
pool->head->next = task; //放入任务队列
pool->queue_size++;
rc = pthread_cond_signal(&(pool->cond)); //唤醒一个worker执行
check(rc == 0, "pthread_cond_signal");
out:
if(pthread_mutex_unlock(&pool->lock) != 0) {
log_err("pthread_mutex_unlock");
return -1;
}
return err;
}
threadpool_destroy
唯一值得关注的是,销毁线程一样要去竞争锁,当拿到这个锁,广播唤醒信号,此时其他线程的条件变量已经被唤醒了(尽管都在等待锁),只要拿到锁都会跳出循环或者等待任务执行(取决于是否需要gracefully退出):
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68int threadpool_free(zv_threadpool_t *pool) {
if (pool == NULL || pool->started > 0) {
return -1;
}
if (pool->threads) { //析构任务对象
free(pool->threads);
}
zv_task_t *old;
/* pool->head is a dummy head */
while (pool->head->next) { //析构任务队列
old = pool->head->next;
pool->head->next = pool->head->next->next;
free(old);
}
return 0;
}
int threadpool_destroy(zv_threadpool_t *pool, int graceful) {
int err = 0;
if (pool == NULL) {
log_err("pool == NULL");
return zv_tp_invalid;
}
if (pthread_mutex_lock(&(pool->lock)) != 0) {
return zv_tp_lock_fail;
}
do {
// set the showdown flag of pool and wake up all thread
if (pool->shutdown) {
err = zv_tp_already_shutdown;
break;
}
pool->shutdown = (graceful)? graceful_shutdown: immediate_shutdown;
if (pthread_cond_broadcast(&(pool->cond)) != 0) {
err = zv_tp_cond_broadcast;
break;
}
if (pthread_mutex_unlock(&(pool->lock)) != 0) {
err = zv_tp_lock_fail;
break;
}
int i;
for (i=0; i<pool->thread_count; i++) {
if (pthread_join(pool->threads[i], NULL) != 0) { //逐个join掉
err = zv_tp_thread_fail;
}
log_info("thread %08x exit", (uint32_t) pool->threads[i]);
}
} while(0);
if (!err) {
pthread_mutex_destroy(&(pool->lock));
pthread_cond_destroy(&(pool->cond));
threadpool_free(pool);
}
return err;
}
线程池benchmask
zaver的线程池测试使用了一个累加测试: 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
pthread_mutex_t lock;
size_t sum = 0;
static void sum_n(void *arg) {
size_t n = (size_t) arg;
int rc;
rc = pthread_mutex_lock(&lock);
check_exit(rc == 0, "pthread_mutex_lock error");
sum += n;
rc = pthread_mutex_unlock(&lock);
check_exit(rc == 0, "pthread_mutex_unlock error");
}
int main() {
int rc;
check_exit(pthread_mutex_init(&lock, NULL) == 0, "lock init error");
log_info("%d has ten %d",10 , 1);
zv_threadpool_t *tp = threadpool_init(THREAD_NUM);
check_exit(tp != NULL, "threadpool_init error");
size_t i;
for (i=1; i< 1000; i++){
rc = threadpool_add(tp, sum_n, (void *)i);
check_exit(rc == 0, "threadpool_add error");
}
check_exit(threadpool_destroy(tp, 1) == 0, "threadpool_destroy error");
check_exit(sum == 499500, "sum error");
printf("pass thread_pool_test\n");
return 0;
}
优先队列
注解
不同于Nginx使用红黑树作为定时器结构管理,zaver的定时器是由优先队列(二叉堆)实现的,两种数据结构的共同点是都能够快速查找到最近定时的超时时间,但是对于堆而言,新的事件插入都会导致整体结构调整,调整复杂度高于红黑树。本节介绍其C语言的完整实现,对无优先队列相关基础的去看复杂数据排序可能有点吃力,建议先参考我的一篇专题:数据结构算法题目(六):优先队列专题,也可以直接参考下面注释:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137//priority_queue.h定义:
typedef int (*zv_pq_comparator_pt)(void *pi, void *pj);
typedef struct {
void **pq;
size_t nalloc;
size_t size;
zv_pq_comparator_pt comp;
} zv_pq_t;
//priority_queue.c实现:
//初始化:预先分配size+1个void*空间
int zv_pq_init(zv_pq_t *zv_pq, zv_pq_comparator_pt comp, size_t size) {
zv_pq->pq = (void **)malloc(sizeof(void *) * (size+1));
if (!zv_pq->pq) {
log_err("zv_pq_init: malloc failed");
return -1;
}
zv_pq->nalloc = 0; //已分配空间为0
zv_pq->size = size + 1;
zv_pq->comp = comp;
return ZV_OK;
}
//已分配空间为0,空堆
int zv_pq_is_empty(zv_pq_t *zv_pq) {
return (zv_pq->nalloc == 0)? 1: 0;
}
//堆大小取决于已分配的结点数
size_t zv_pq_size(zv_pq_t *zv_pq) {
return zv_pq->nalloc;
}
//最小堆的堆顶就是最小元素
void *zv_pq_min(zv_pq_t *zv_pq) {
if (zv_pq_is_empty(zv_pq)) {
return NULL;
}
return zv_pq->pq[1];
}
//初始化空间不足时,resize到更大的空间并且析构原来的空间
static int resize(zv_pq_t *zv_pq, size_t new_size) {
if (new_size <= zv_pq->nalloc) {
log_err("resize: new_size to small");
return -1;
}
void **new_ptr = (void **)malloc(sizeof(void *) * new_size);
if (!new_ptr) {
log_err("resize: malloc failed");
return -1;
}
memcpy(new_ptr, zv_pq->pq, sizeof(void *) * (zv_pq->nalloc + 1));
free(zv_pq->pq);
zv_pq->pq = new_ptr;
zv_pq->size = new_size;
return ZV_OK;
}
//交换两个结点指针,实际上就是交换了两个结点
static void exch(zv_pq_t *zv_pq, size_t i, size_t j) {
void *tmp = zv_pq->pq[i];
zv_pq->pq[i] = zv_pq->pq[j];
zv_pq->pq[j] = tmp;
}
//插入新元素,可能要上滤,作swim检查:
static void swim(zv_pq_t *zv_pq, size_t k) {
while (k > 1 && zv_pq->comp(zv_pq->pq[k], zv_pq->pq[k/2])) {
exch(zv_pq, k, k/2);
k /= 2;
}
}
//删除堆顶最小元素,新元素要下滤,所以要作sink检查:结果就是保持堆顶仍为最小元素
static size_t sink(zv_pq_t *zv_pq, size_t k) {
size_t j;
size_t nalloc = zv_pq->nalloc;
while (2*k <= nalloc) { //已分配的左结点
j = 2*k;
if (j < nalloc && zv_pq->comp(zv_pq->pq[j+1], zv_pq->pq[j])) j++; //右结点更小,比较右结点和父结点
if (!zv_pq->comp(zv_pq->pq[j], zv_pq->pq[k])) break; //否则比较左结点和父结点
exch(zv_pq, j, k);
k = j;
}
return k;
}
//取走堆顶最小元素
int zv_pq_delmin(zv_pq_t *zv_pq) {
if (zv_pq_is_empty(zv_pq)) {
return ZV_OK;
}
exch(zv_pq, 1, zv_pq->nalloc); //最小元素放在最后,不参与堆排序
zv_pq->nalloc--;
sink(zv_pq, 1); //对堆顶重新调整
if (zv_pq->nalloc > 0 && zv_pq->nalloc <= (zv_pq->size - 1)/4) { //nalloc分配空间不足size的四分一
if (resize(zv_pq, zv_pq->size / 2) < 0) { //缩小空间节省资源
return -1;
}
}
return ZV_OK;
}
int zv_pq_insert(zv_pq_t *zv_pq, void *item) {
if (zv_pq->nalloc + 1 == zv_pq->size) { //分配空间即将到size上限,扩大二倍
if (resize(zv_pq, zv_pq->size * 2) < 0) {
return -1;
}
}
zv_pq->pq[++zv_pq->nalloc] = item; //插入元素,上滤调整
swim(zv_pq, zv_pq->nalloc);
return ZV_OK;
}
//同sink
int zv_pq_sink(zv_pq_t *zv_pq, size_t i) {
return sink(zv_pq, i);
}
优先队列benchmask
优先队列的benchmask显然是一个最小堆排序:不断从堆顶删除元素,并且和标准的增序resultdata逐一,如果一一对应,则排序成立,优先队列的构建也是pass的,此处对比的testdata数据和resultdata可以参考源码仓库,不一一列出占据篇幅了:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
static int comp(void *i, void *j) {
size_t si = (size_t)i;
size_t sj = (size_t)j;
return (si < sj)? 1: 0;
}
size_t testdata[] = {...}; //数据量过多,请参考上述源码仓库
size_t resultdata[] = {...}; //数据量过多,请参考上述源码仓库
int main() {
zv_pq_t pq;
int rc;
rc = zv_pq_init(&pq, comp, ZV_PQ_DEFAULT_SIZE);
check_exit(rc == 0, "zv_pq_init error");
rc = zv_pq_is_empty(&pq);
check_exit(rc == 1, "zv_pq_is_empty error");
size_t sz;
sz = zv_pq_size(&pq);
check_exit(sz == 0, "zv_pq_size");
void *min;
min = zv_pq_min(&pq);
check_exit(min == NULL, "zv_pq_min");
rc = zv_pq_delmin(&pq);
check_exit(rc == 0, "zv_pq_delmin error");
int n = sizeof(testdata)/sizeof(size_t);
int i;
for (i = 0; i < n; i++) {
rc = zv_pq_insert(&pq, (void *)testdata[i]);
check_exit(rc == 0, "zv_pq_insert error");
check_exit(zv_pq_size(&pq) == (size_t)i+1, "zv_pq_size error");
}
i = 0;
while (!zv_pq_is_empty(&pq)) {
min = zv_pq_min(&pq);
check_exit(min != NULL, "zv_pq_min error");
check_exit((size_t)min == (size_t)resultdata[i], "zv_pq_min error, min=%zu, rd[i]=%zu", (size_t)min, (size_t)resultdata[i]);
i++;
rc = zv_pq_delmin(&pq);
check_exit(rc == 0, "zv_pq_delmin error");
}
check_exit(i == n, "size not match");
printf("pass priority_queue_test\n");
return 0;
}
定时器
zv_timer_node结构体定义,定义了事件时间key、事件函数等:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
typedef int (*timer_handler_pt)(zv_http_request_t *rq); //不同请求的定时函数
typedef struct zv_timer_node_s{
size_t key; //事件时间key
int deleted; //如果远程客户端关闭,置为1
timer_handler_pt handler; //请求处理函数
zv_http_request_t *rq; //http请求
} zv_timer_node;
1 |
|
第二部分:底层业务API类
包含普通字节读写、epoll接口、http协议接口等。
字节读写
字节读写的设计也比较普通,其定义了一个rio_t结构体,将其中的fd内容读取到缓冲区,在用户需要时可以从内部的缓冲区直接复制到用户复制缓冲区,这部分的设计来自CSAPP:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26//rio.h
/*
* reference the implementation in CSAPP
*/
typedef struct {
int rio_fd; //系统缓冲区的文件描述符
ssize_t rio_cnt; //系统缓冲区剩余的未读字节
char *rio_bufptr; //系统缓冲区未读字节的头指针
char rio_buf[RIO_BUFSIZE]; //系统缓冲区
} rio_t;
ssize_t rio_readn(int fd, void *usrbuf, size_t n);
ssize_t rio_writen(int fd, void *usrbuf, size_t n);
void rio_readinitb(rio_t *rp, int fd);
ssize_t rio_readnb(rio_t *rp, void *usrbuf, size_t n);
ssize_t rio_readlineb(rio_t *rp, void *usrbuf, size_t maxlen);
1 | //rio.c |
epoll
epoll的底层接口很直观,直接看代码: 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18//创建epoll实例fd:成功返回0,失败返回-1并设置errno
int epoll_create1(int __flags); //__flags=0或者EPOLL_CLOEXEC,后者代表在exec函数中自动关闭epoll fd
//监听或者取消监听fd:成功返回0,失败返回-1并设置errno
int epoll_ctl(int __epfd, //epoll获取的实例文件描述符
int __op, //__op=EPOLL_CTL_ADD,注册新fd
//__op=EPOLL_CTL_MOD,修改fd事件
//__op=EPOLL_CTL_DEL,移除新的fd
int __fd, //监听或者取消监听fd
struct epoll_event * //事件结构体指针,__op=EPOLL_CTL_DEL时可为NULL
);
//等待监听事件返回:成功返回事件数n,失败返回-1并设置errno
int epoll_wait(int __epfd, //epoll fd
struct epoll_event *__events, //输出数组,填充就绪的事件
int __maxevents, //输出数组大小,必须大于等于__events
int __timeout //超时时间,-1时阻塞监听,0时非阻塞立刻返回
);1
2
3
4
5
6
7
8
9
10
11
12
13typedef union epoll_data
{
void *ptr;
int fd;
uint32_t u32;
uint64_t u64;
} epoll_data_t;
struct epoll_event
{
uint32_t events; /* Epoll events */
epoll_data_t data; /* User data variable */
} __EPOLL_PACKED; //紧凑内存布局,不对齐
| 宏 | 含义 | 宏 | 含义 |
|---|---|---|---|
| EPOLLIN | 收到数据触发 | EPOLLOUT | 数据可写时触发(缓冲区不满) |
| EPOLLET | 边沿触发 | EPOLLPRI | 有紧急数据可读 |
| EPOLLERR | 发生错误(可能总触发) | EPOLLHUP | 对端断开 |
| EPOLLONESHOT | 触发一次不再监听 | EPOLLRDHUP | 对端关闭或者TCP半关闭 |
不同标记可以通过或运算组合。
data是要填充的监听对象,本质是一个联合体,这个对象既可以是通用的文件描述符,例如socket中返回的套接字,或者某个设备文件等,也可以是自定义封装的结构体对象,http中就使用了这样的方法,从后文例子可看出:
1
2
3
4
5
6
7
8
9
10
11
12
13
14struct epoll_event ev;
ev.events = EPOLLIN | EPOLLET;
//法一:普通描述符
ev.data.fd = socketfd;
//法二:自定义封装对象
struct connection {
int fd;
char buffer[1024];
};
struct connection *conn = malloc(sizeof(struct connection));
conn->fd = client_fd;
ev.data.ptr = conn;
zaver的相关实现只是一个封装: 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
struct epoll_event *events;
int zv_epoll_create(int flags) {
int fd = epoll_create1(flags);
check(fd > 0, "zv_epoll_create: epoll_create1");
events = (struct epoll_event *)malloc(sizeof(struct epoll_event) * MAXEVENTS);
check(events != NULL, "zv_epoll_create: malloc");
return fd;
}
void zv_epoll_add(int epfd, int fd, struct epoll_event *event) {
int rc = epoll_ctl(epfd, EPOLL_CTL_ADD, fd, event);
check(rc == 0, "zv_epoll_add: epoll_ctl");
return;
}
void zv_epoll_mod(int epfd, int fd, struct epoll_event *event) {
int rc = epoll_ctl(epfd, EPOLL_CTL_MOD, fd, event);
check(rc == 0, "zv_epoll_mod: epoll_ctl");
return;
}
void zv_epoll_del(int epfd, int fd, struct epoll_event *event) {
int rc = epoll_ctl(epfd, EPOLL_CTL_DEL, fd, event);
check(rc == 0, "zv_epoll_del: epoll_ctl");
return;
}
int zv_epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout) {
int n = epoll_wait(epfd, events, maxevents, timeout);
check(n >= 0, "zv_epoll_wait: epoll_wait");
return n;
}
http
1 | //http.h |
1 | //http.c |
1 | //http_request.h |
1 | //http_request.c |

