Zaver是一个来自十年前的http项目,全篇使用C语言编写,代码比较简单,是一个非工业级的轮子。最近回顾两年前的文章,发现有一些坑没有填完,比如计算机网络的应用层(http等),见计算机网络基础理论:自底向上方法,Linux底层API关于epoll使用也没有相关demo,见Linux操作系统:网络编程,zaver基本上线程池、日志、epoll都是类Nginx风格的最简封装,学习成本较低,但也比一般demo的架构更齐全,作为一个epoll小轮子补充是恰到好处的。

Forked项目地址:https://github.com/EdenMoxe/zaver

日志输出

若干宏即可实现标准错误输出:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
#ifndef DBG_H
#define DBG_H

#include <stdio.h>
#include <errno.h>
#include <string.h>
#include <stdlib.h>

#ifdef NDEBUG
#define debug(M, ...)
#else
#define debug(M, ...) fprintf(stderr, "DEBUG %s:%d: " M "\n", __FILE__, __LINE__, ##__VA_ARGS__)
#endif

#define clean_errno() (errno == 0 ? "None" : strerror(errno))

#define log_err(M, ...) fprintf(stderr, "[ERROR] (%s:%d: errno: %s) " M "\n", __FILE__, __LINE__, clean_errno(), ##__VA_ARGS__)

#define log_warn(M, ...) fprintf(stderr, "[WARN] (%s:%d: errno: %s) " M "\n", __FILE__, __LINE__, clean_errno(), ##__VA_ARGS__)

#define log_info(M, ...) fprintf(stderr, "[INFO] (%s:%d) " M "\n", __FILE__, __LINE__, ##__VA_ARGS__)

#define check(A, M, ...) if(!(A)) { log_err(M "\n", ##__VA_ARGS__); /* exit(1); */ }

#define check_exit(A, M, ...) if(!(A)) { log_err(M "\n", ##__VA_ARGS__); exit(1);}

#define check_debug(A, M, ...) if(!(A)) { debug(M "\n", ##__VA_ARGS__); /* exit(1); */}

#endif
值得学习的是,其中##____VA_ARGS__表示接受可变参数输入,即如果M中含格式化字符,也能接收参数进行格式化打印:
1
2
3
4
log_info("%d has ten %d", 10, 1);

//输出:
10 has ten 1

第一部分:底层数据结构设计类

包含线程池、优先队列、定时器、链表设计等;

线程池

线程池架构可从.h看出:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
#ifndef THREADPOOL_H
#define THREADPOOL_H

#ifdef __cplusplus
extern "C" {
#endif

#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>
#include <stdint.h>
#include "dbg.h"

#define THREAD_NUM 8

typedef struct zv_task_s { //任务链表
void (*func)(void *);
void *arg;
struct zv_task_s *next;
} zv_task_t;

typedef struct {
pthread_mutex_t lock; //锁
pthread_cond_t cond; //队列唤醒
pthread_t *threads; //线程对象
zv_task_t *head; //任务列表头结点(dummy head)
int thread_count; //线程数
int queue_size; //任务队列大小
int shutdown; //线程池关闭模式,1立刻关闭,2等待所有任务执行完成再关闭
int started; //worker占用数,即已经消费的线程数
} zv_threadpool_t;

//错误码定义
typedef enum {
zv_tp_invalid = -1,
zv_tp_lock_fail = -2,
zv_tp_already_shutdown = -3,
zv_tp_cond_broadcast = -4,
zv_tp_thread_fail = -5,
} zv_threadpool_error_t;


zv_threadpool_t *threadpool_init(int thread_num);
int threadpool_add(zv_threadpool_t *pool, void (*func)(void *), void *arg);
int threadpool_destroy(zv_threadpool_t *pool, int gracegul);

#ifdef __cplusplus
}
#endif

#endif
链表作为任务队列也许属于特别的设计,剩下的任务和一般的线程池估计没什么区别,也能猜出来,分别是定义任务函数,threadpool_init初始化线程池放入线程对象,造一个dummy头任务结点;threadpool_add即构造任务函数,消费一个worker并唤醒一个线程执行,threadpool_destroy则唤醒所有线程,逐个销毁。

具体实现如下。

threadpool_init

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
zv_threadpool_t *threadpool_init(int thread_num) {
if (thread_num <= 0) {
log_err("the arg of threadpool_init must greater than 0");
return NULL;
}

zv_threadpool_t *pool;
if ((pool = (zv_threadpool_t *)malloc(sizeof(zv_threadpool_t))) == NULL) { //分配线程池内存
goto err;
}

pool->thread_count = 0;
pool->queue_size = 0;
pool->shutdown = 0;
pool->started = 0;
pool->threads = (pthread_t *)malloc(sizeof(pthread_t) * thread_num); //分配线程对象内存
pool->head = (zv_task_t *)malloc(sizeof(zv_task_t)); /* 创建dummy head */

if ((pool->threads == NULL) || (pool->head == NULL)) {
goto err;
}

//dummy head,empty content
pool->head->func = NULL;
pool->head->arg = NULL;
pool->head->next = NULL;

//初始化线程池用的锁 信号量
if (pthread_mutex_init(&(pool->lock), NULL) != 0) {
goto err;
}

if (pthread_cond_init(&(pool->cond), NULL) != 0) {
pthread_mutex_destroy(&(pool->lock));
goto err;
}

//创建thread_num个空闲worker对象,绑定到每个线程上
int i;
for (i=0; i<thread_num; ++i) {
if (pthread_create(&(pool->threads[i]), NULL, threadpool_worker, (void *)pool) != 0) {
threadpool_destroy(pool, 0);
return NULL;
}
log_info("thread: %08x started", (uint32_t) pool->threads[i]);

pool->thread_count++;
pool->started++;
}

return pool;

err:
if (pool) {
threadpool_free(pool);
}

return NULL;
}

定义worker消费对象

worker对象负责消费一条线程,执行任务链表中的任务,本质是一个可被信号量通知或者变量shutdown中断的循环:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
static void *threadpool_worker(void *arg) {    //参数是线程池对象,包括任务队列、线程池等
if (arg == NULL) {
log_err("arg should be type zv_threadpool_t*");
return NULL;
}

zv_threadpool_t *pool = (zv_threadpool_t *)arg;
zv_task_t *task;

while (1) {
pthread_mutex_lock(&(pool->lock));

/* Wait on condition variable, check for spurious wakeups. */
while ((pool->queue_size == 0) && !(pool->shutdown)){ //任务列表为空,
pthread_cond_wait(&(pool->cond), &(pool->lock));
}

if (pool->shutdown == immediate_shutdown) {
break;
} else if ((pool->shutdown == graceful_shutdown) && pool->queue_size == 0) {
break;
}

task = pool->head->next; //取一个非空任务
if (task == NULL) {
pthread_mutex_unlock(&(pool->lock));
continue;
}

pool->head->next = task->next; //弹出任务链表
pool->queue_size--;

pthread_mutex_unlock(&(pool->lock));

(*(task->func))(task->arg); //在worker中执行任务
/* TODO: memory pool */
free(task); //析构任务内存
}

pool->started--;
pthread_mutex_unlock(&(pool->lock)); //如果收到shutdown指令,可能break掉,需解锁
pthread_exit(NULL);

return NULL;
}

threadpool_add

函数指针放入队列:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
int threadpool_add(zv_threadpool_t *pool, void (*func)(void *), void *arg) {
int rc, err = 0;
if (pool == NULL || func == NULL) {
log_err("pool == NULL or func == NULL");
return -1;
}

//注意:获取不到锁不会返回错误码,而是阻塞,这里指的是锁没初始化或者反复加锁才返回非0
if (pthread_mutex_lock(&(pool->lock)) != 0) {
log_err("pthread_mutex_lock");
return -1;
}

if (pool->shutdown) {
err = zv_tp_already_shutdown;
goto out;
}

// TODO: use a memory pool
zv_task_t *task = (zv_task_t *)malloc(sizeof(zv_task_t));
if (task == NULL) {
log_err("malloc task fail");
goto out;
}

// TODO: use a memory pool
task->func = func;
task->arg = arg;
task->next = pool->head->next; //头插法
pool->head->next = task; //放入任务队列

pool->queue_size++;

rc = pthread_cond_signal(&(pool->cond)); //唤醒一个worker执行
check(rc == 0, "pthread_cond_signal");

out:
if(pthread_mutex_unlock(&pool->lock) != 0) {
log_err("pthread_mutex_unlock");
return -1;
}
return err;
}

threadpool_destroy

唯一值得关注的是,销毁线程一样要去竞争锁,当拿到这个锁,广播唤醒信号,此时其他线程的条件变量已经被唤醒了(尽管都在等待锁),只要拿到锁都会跳出循环或者等待任务执行(取决于是否需要gracefully退出):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
int threadpool_free(zv_threadpool_t *pool) {
if (pool == NULL || pool->started > 0) {
return -1;
}

if (pool->threads) { //析构任务对象
free(pool->threads);
}

zv_task_t *old;
/* pool->head is a dummy head */
while (pool->head->next) { //析构任务队列
old = pool->head->next;
pool->head->next = pool->head->next->next;
free(old);
}

return 0;
}

int threadpool_destroy(zv_threadpool_t *pool, int graceful) {
int err = 0;

if (pool == NULL) {
log_err("pool == NULL");
return zv_tp_invalid;
}

if (pthread_mutex_lock(&(pool->lock)) != 0) {
return zv_tp_lock_fail;
}

do {
// set the showdown flag of pool and wake up all thread
if (pool->shutdown) {
err = zv_tp_already_shutdown;
break;
}

pool->shutdown = (graceful)? graceful_shutdown: immediate_shutdown;

if (pthread_cond_broadcast(&(pool->cond)) != 0) {
err = zv_tp_cond_broadcast;
break;
}

if (pthread_mutex_unlock(&(pool->lock)) != 0) {
err = zv_tp_lock_fail;
break;
}

int i;
for (i=0; i<pool->thread_count; i++) {
if (pthread_join(pool->threads[i], NULL) != 0) { //逐个join掉
err = zv_tp_thread_fail;
}
log_info("thread %08x exit", (uint32_t) pool->threads[i]);
}

} while(0);

if (!err) {
pthread_mutex_destroy(&(pool->lock));
pthread_cond_destroy(&(pool->cond));
threadpool_free(pool);
}
return err;
}
以上就是zaver的完整线程池实现,可见都是比较普遍的C语言操作;

线程池benchmask

zaver的线程池测试使用了一个累加测试:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
#include <threadpool.h>

#define THREAD_NUM 8

pthread_mutex_t lock;
size_t sum = 0;

static void sum_n(void *arg) {
size_t n = (size_t) arg;
int rc;

rc = pthread_mutex_lock(&lock);
check_exit(rc == 0, "pthread_mutex_lock error");

sum += n;

rc = pthread_mutex_unlock(&lock);
check_exit(rc == 0, "pthread_mutex_unlock error");
}

int main() {
int rc;
check_exit(pthread_mutex_init(&lock, NULL) == 0, "lock init error");

log_info("%d has ten %d",10 , 1);

zv_threadpool_t *tp = threadpool_init(THREAD_NUM);
check_exit(tp != NULL, "threadpool_init error");

size_t i;
for (i=1; i< 1000; i++){
rc = threadpool_add(tp, sum_n, (void *)i);
check_exit(rc == 0, "threadpool_add error");
}

check_exit(threadpool_destroy(tp, 1) == 0, "threadpool_destroy error");

check_exit(sum == 499500, "sum error");
printf("pass thread_pool_test\n");
return 0;
}

优先队列

注解

不同于Nginx使用红黑树作为定时器结构管理,zaver的定时器是由优先队列(二叉堆)实现的,两种数据结构的共同点是都能够快速查找到最近定时的超时时间,但是对于堆而言,新的事件插入都会导致整体结构调整,调整复杂度高于红黑树。本节介绍其C语言的完整实现,对无优先队列相关基础的去看复杂数据排序可能有点吃力,建议先参考我的一篇专题:数据结构算法题目(六):优先队列专题,也可以直接参考下面注释:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
//priority_queue.h定义:
#define ZV_PQ_DEFAULT_SIZE 10

typedef int (*zv_pq_comparator_pt)(void *pi, void *pj);

typedef struct {
void **pq;
size_t nalloc;
size_t size;
zv_pq_comparator_pt comp;
} zv_pq_t;


//priority_queue.c实现:
#include "priority_queue.h"

//初始化:预先分配size+1个void*空间
int zv_pq_init(zv_pq_t *zv_pq, zv_pq_comparator_pt comp, size_t size) {
zv_pq->pq = (void **)malloc(sizeof(void *) * (size+1));
if (!zv_pq->pq) {
log_err("zv_pq_init: malloc failed");
return -1;
}

zv_pq->nalloc = 0; //已分配空间为0
zv_pq->size = size + 1;
zv_pq->comp = comp;

return ZV_OK;
}

//已分配空间为0,空堆
int zv_pq_is_empty(zv_pq_t *zv_pq) {
return (zv_pq->nalloc == 0)? 1: 0;
}

//堆大小取决于已分配的结点数
size_t zv_pq_size(zv_pq_t *zv_pq) {
return zv_pq->nalloc;
}

//最小堆的堆顶就是最小元素
void *zv_pq_min(zv_pq_t *zv_pq) {
if (zv_pq_is_empty(zv_pq)) {
return NULL;
}

return zv_pq->pq[1];
}

//初始化空间不足时,resize到更大的空间并且析构原来的空间
static int resize(zv_pq_t *zv_pq, size_t new_size) {
if (new_size <= zv_pq->nalloc) {
log_err("resize: new_size to small");
return -1;
}

void **new_ptr = (void **)malloc(sizeof(void *) * new_size);
if (!new_ptr) {
log_err("resize: malloc failed");
return -1;
}

memcpy(new_ptr, zv_pq->pq, sizeof(void *) * (zv_pq->nalloc + 1));
free(zv_pq->pq);
zv_pq->pq = new_ptr;
zv_pq->size = new_size;
return ZV_OK;
}

//交换两个结点指针,实际上就是交换了两个结点
static void exch(zv_pq_t *zv_pq, size_t i, size_t j) {
void *tmp = zv_pq->pq[i];
zv_pq->pq[i] = zv_pq->pq[j];
zv_pq->pq[j] = tmp;
}


//插入新元素,可能要上滤,作swim检查:
static void swim(zv_pq_t *zv_pq, size_t k) {
while (k > 1 && zv_pq->comp(zv_pq->pq[k], zv_pq->pq[k/2])) {
exch(zv_pq, k, k/2);
k /= 2;
}
}

//删除堆顶最小元素,新元素要下滤,所以要作sink检查:结果就是保持堆顶仍为最小元素
static size_t sink(zv_pq_t *zv_pq, size_t k) {
size_t j;
size_t nalloc = zv_pq->nalloc;

while (2*k <= nalloc) { //已分配的左结点
j = 2*k;
if (j < nalloc && zv_pq->comp(zv_pq->pq[j+1], zv_pq->pq[j])) j++; //右结点更小,比较右结点和父结点
if (!zv_pq->comp(zv_pq->pq[j], zv_pq->pq[k])) break; //否则比较左结点和父结点
exch(zv_pq, j, k);
k = j;
}

return k;
}

//取走堆顶最小元素
int zv_pq_delmin(zv_pq_t *zv_pq) {
if (zv_pq_is_empty(zv_pq)) {
return ZV_OK;
}

exch(zv_pq, 1, zv_pq->nalloc); //最小元素放在最后,不参与堆排序
zv_pq->nalloc--;
sink(zv_pq, 1); //对堆顶重新调整
if (zv_pq->nalloc > 0 && zv_pq->nalloc <= (zv_pq->size - 1)/4) { //nalloc分配空间不足size的四分一
if (resize(zv_pq, zv_pq->size / 2) < 0) { //缩小空间节省资源
return -1;
}
}

return ZV_OK;
}

int zv_pq_insert(zv_pq_t *zv_pq, void *item) {
if (zv_pq->nalloc + 1 == zv_pq->size) { //分配空间即将到size上限,扩大二倍
if (resize(zv_pq, zv_pq->size * 2) < 0) {
return -1;
}
}

zv_pq->pq[++zv_pq->nalloc] = item; //插入元素,上滤调整
swim(zv_pq, zv_pq->nalloc);

return ZV_OK;
}

//同sink
int zv_pq_sink(zv_pq_t *zv_pq, size_t i) {
return sink(zv_pq, i);
}

优先队列benchmask

优先队列的benchmask显然是一个最小堆排序:不断从堆顶删除元素,并且和标准的增序resultdata逐一,如果一一对应,则排序成立,优先队列的构建也是pass的,此处对比的testdata数据和resultdata可以参考源码仓库,不一一列出占据篇幅了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
#include <priority_queue.h>
#include <dbg.h>

static int comp(void *i, void *j) {
size_t si = (size_t)i;
size_t sj = (size_t)j;

return (si < sj)? 1: 0;
}

size_t testdata[] = {...}; //数据量过多,请参考上述源码仓库

size_t resultdata[] = {...}; //数据量过多,请参考上述源码仓库

int main() {
zv_pq_t pq;
int rc;

rc = zv_pq_init(&pq, comp, ZV_PQ_DEFAULT_SIZE);
check_exit(rc == 0, "zv_pq_init error");

rc = zv_pq_is_empty(&pq);
check_exit(rc == 1, "zv_pq_is_empty error");

size_t sz;
sz = zv_pq_size(&pq);
check_exit(sz == 0, "zv_pq_size");

void *min;
min = zv_pq_min(&pq);
check_exit(min == NULL, "zv_pq_min");

rc = zv_pq_delmin(&pq);
check_exit(rc == 0, "zv_pq_delmin error");

int n = sizeof(testdata)/sizeof(size_t);
int i;
for (i = 0; i < n; i++) {
rc = zv_pq_insert(&pq, (void *)testdata[i]);
check_exit(rc == 0, "zv_pq_insert error");

check_exit(zv_pq_size(&pq) == (size_t)i+1, "zv_pq_size error");
}

i = 0;
while (!zv_pq_is_empty(&pq)) {
min = zv_pq_min(&pq);
check_exit(min != NULL, "zv_pq_min error");
check_exit((size_t)min == (size_t)resultdata[i], "zv_pq_min error, min=%zu, rd[i]=%zu", (size_t)min, (size_t)resultdata[i]);
i++;

rc = zv_pq_delmin(&pq);
check_exit(rc == 0, "zv_pq_delmin error");
}

check_exit(i == n, "size not match");
printf("pass priority_queue_test\n");
return 0;
}

定时器

zv_timer_node结构体定义,定义了事件时间key、事件函数等:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
#ifndef ZV_TIMER_H
#define ZV_TIMER_H

#include "priority_queue.h"
#include "http_request.h"

#define ZV_TIMER_INFINITE -1
#define TIMEOUT_DEFAULT 500 /* ms */

typedef int (*timer_handler_pt)(zv_http_request_t *rq); //不同请求的定时函数

typedef struct zv_timer_node_s{
size_t key; //事件时间key
int deleted; //如果远程客户端关闭,置为1
timer_handler_pt handler; //请求处理函数
zv_http_request_t *rq; //http请求
} zv_timer_node;

#endif

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
#include <sys/time.h>
#include "timer.h"

//定时器时间比较函数,定义了定时器不同void*间的比较原则
static int timer_comp(void *ti, void *tj) {
zv_timer_node *timeri = (zv_timer_node *)ti;
zv_timer_node *timerj = (zv_timer_node *)tj;

return (timeri->key < timerj->key)? 1: 0;
}

zv_pq_t zv_timer; //定时器,小顶堆数据结构
size_t zv_current_msec; //当前系统时间

//更新当前系统时间
static void zv_time_update() {
// there is only one thread calling zv_time_update, no need to lock?
struct timeval tv;
int rc;

rc = gettimeofday(&tv, NULL);
check(rc == 0, "zv_time_update: gettimeofday error");

zv_current_msec = tv.tv_sec * 1000 + tv.tv_usec / 1000;
debug("in zv_time_update, time = %zu", zv_current_msec);
}


int zv_timer_init() {
int rc;
rc = zv_pq_init(&zv_timer, timer_comp, ZV_PQ_DEFAULT_SIZE);
check(rc == ZV_OK, "zv_pq_init error");

zv_time_update();
return ZV_OK;
}

int zv_find_timer() {
zv_timer_node *timer_node;
int time = ZV_TIMER_INFINITE;
int rc;

while (!zv_pq_is_empty(&zv_timer)) {
debug("zv_find_timer");
zv_time_update(); //更新当前时间
timer_node = (zv_timer_node *)zv_pq_min(&zv_timer); //取出时间最小的定时器
check(timer_node != NULL, "zv_pq_min error");

if (timer_node->deleted) { //http远程客户端关闭,定时事件直接删除
rc = zv_pq_delmin(&zv_timer);
check(rc == 0, "zv_pq_delmin");
free(timer_node);
continue;
}

time = (int) (timer_node->key - zv_current_msec);
debug("in zv_find_timer, key = %zu, cur = %zu",
timer_node->key,
zv_current_msec);
time = (time > 0? time: 0); //过期了时间也置为0
break;
}

return time;
}

void zv_handle_expire_timers() {
debug("in zv_handle_expire_timers");
zv_timer_node *timer_node;
int rc;

while (!zv_pq_is_empty(&zv_timer)) {
debug("zv_handle_expire_timers, size = %zu", zv_pq_size(&zv_timer));
zv_time_update();
timer_node = (zv_timer_node *)zv_pq_min(&zv_timer);
check(timer_node != NULL, "zv_pq_min error");

if (timer_node->deleted) {
rc = zv_pq_delmin(&zv_timer);
check(rc == 0, "zv_handle_expire_timers: zv_pq_delmin error");
free(timer_node);
continue;
}

if (timer_node->key > zv_current_msec) { //未到时间
return;
}

if (timer_node->handler) { //到时间,执行请求处理函数
timer_node->handler(timer_node->rq);
}
rc = zv_pq_delmin(&zv_timer);
check(rc == 0, "zv_handle_expire_timers: zv_pq_delmin error");
free(timer_node);
}
}

//创建定时器放入最小堆
void zv_add_timer(zv_http_request_t *rq, size_t timeout, timer_handler_pt handler) {
int rc;
zv_timer_node *timer_node = (zv_timer_node *)malloc(sizeof(zv_timer_node));
check(timer_node != NULL, "zv_add_timer: malloc error");

zv_time_update();
rq->timer = timer_node;
timer_node->key = zv_current_msec + timeout;
debug("in zv_add_timer, key = %zu", timer_node->key);
timer_node->deleted = 0;
timer_node->handler = handler;
timer_node->rq = rq;

rc = zv_pq_insert(&zv_timer, timer_node);
check(rc == 0, "zv_add_timer: zv_pq_insert error");
}


void zv_del_timer(zv_http_request_t *rq) {
debug("in zv_del_timer");
zv_time_update();
zv_timer_node *timer_node = rq->timer;
check(timer_node != NULL, "zv_del_timer: rq->timer is NULL");

timer_node->deleted = 1;
}

第二部分:底层业务API类

包含普通字节读写、epoll接口、http协议接口等。

字节读写

字节读写的设计也比较普通,其定义了一个rio_t结构体,将其中的fd内容读取到缓冲区,在用户需要时可以从内部的缓冲区直接复制到用户复制缓冲区,这部分的设计来自CSAPP:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
//rio.h
#ifndef RIO_H
#define RIO_H

#include <sys/types.h>

#define RIO_BUFSIZE 8192

/*
* reference the implementation in CSAPP
*/

typedef struct {
int rio_fd; //系统缓冲区的文件描述符
ssize_t rio_cnt; //系统缓冲区剩余的未读字节
char *rio_bufptr; //系统缓冲区未读字节的头指针
char rio_buf[RIO_BUFSIZE]; //系统缓冲区
} rio_t;

ssize_t rio_readn(int fd, void *usrbuf, size_t n);
ssize_t rio_writen(int fd, void *usrbuf, size_t n);
void rio_readinitb(rio_t *rp, int fd);
ssize_t rio_readnb(rio_t *rp, void *usrbuf, size_t n);
ssize_t rio_readlineb(rio_t *rp, void *usrbuf, size_t maxlen);

#endif

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
//rio.c
#include <stdio.h>
#include <string.h>
#include <errno.h>
#include <unistd.h>
#include "dbg.h"
#include "rio.h"

//从fd读取n字节至usrbuf,返回读取字节数,n为正确
ssize_t rio_readn(int fd, void *usrbuf, size_t n)
{
size_t nleft = n;
ssize_t nread;
char *bufp = (char *)usrbuf;

while (nleft > 0) {
if ((nread = read(fd, bufp, nleft)) < 0) { //读取失败
if (errno == EINTR) //中断信号
nread = 0;
else
return -1; //否则失败
}
else if (nread == 0) //空读取
break;
nleft -= nread; //否则按n字节读取
bufp += nread; //指针后移
}
return (n - nleft);
}

//从usrbuf写n字节至fd,返回读取字节数,n为正确
ssize_t rio_writen(int fd, void *usrbuf, size_t n)
{
size_t nleft = n;
ssize_t nwritten;
char *bufp = (char *)usrbuf;

while (nleft > 0) {
if ((nwritten = write(fd, bufp, nleft)) <= 0) {
if (errno == EINTR) /* interrupted by sig handler return */
nwritten = 0; /* and call write() again */
else {
log_err("errno == %d\n", errno);
return -1; /* errorno set by write() */
}
}
nleft -= nwritten;
bufp += nwritten;
}

return n;
}


//从rp->fd读取满缓冲字节数到rp->rio_buf,rio_buf读取n字节到usrbuf
static ssize_t rio_read(rio_t *rp, char *usrbuf, size_t n)
{
size_t cnt;

//当系统缓冲区没有剩余字节时,从fd读取,至缓冲区满
while (rp->rio_cnt <= 0) {
rp->rio_cnt = read(rp->rio_fd, rp->rio_buf, sizeof(rp->rio_buf));
if (rp->rio_cnt < 0) {
if (errno == EAGAIN) { //fd文件已经读完
return -EAGAIN;
}
if (errno != EINTR) { //中断信号
return -1;
}
}
else if (rp->rio_cnt == 0) //EOF
return 0;
else
rp->rio_bufptr = rp->rio_buf; //剩余指针后移
}

//从rio_bufptr内部缓冲区拷贝min(n, 剩余字节)字节数到用户缓冲区usrbuf
cnt = n;
if (rp->rio_cnt < (ssize_t)n)
cnt = rp->rio_cnt;
memcpy(usrbuf, rp->rio_bufptr, cnt);
rp->rio_bufptr += cnt;
rp->rio_cnt -= cnt;
return cnt;
}


//初始化一个rio系统缓冲区
void rio_readinitb(rio_t *rp, int fd)
{
rp->rio_fd = fd;
rp->rio_cnt = 0;
rp->rio_bufptr = rp->rio_buf;
}

//从rp缓冲区读取n字节到usrbuf
ssize_t rio_readnb(rio_t *rp, void *usrbuf, size_t n)
{
size_t nleft = n;
ssize_t nread;
char *bufp = (char *)usrbuf;

while (nleft > 0) {
if ((nread = rio_read(rp, bufp, nleft)) < 0) {
if (errno == EINTR) /* interrupted by sig handler return */
nread = 0; /* call read() again */
else
return -1; /* errno set by read() */
}
else if (nread == 0)
break; /* EOF */
nleft -= nread;
bufp += nread;
}
return (n - nleft); /* return >= 0 */
}


//逐字节从rp缓冲区读取maxlen字节到usrbuf
ssize_t rio_readlineb(rio_t *rp, void *usrbuf, size_t maxlen)
{
size_t n;
ssize_t rc;
char c, *bufp = (char *)usrbuf;

for (n = 1; n < maxlen; n++) {
if ((rc = rio_read(rp, &c, 1)) == 1) {
*bufp++ = c; //读取一个字节放入usrbuf
if (c == '\n') //换行符,不再读取
break;
} else if (rc == 0) { //读完
if (n == 1){
// return and close fd;
return 0; /* EOF, no data read */
} else
break; /* EOF, some data was read */
} else if (rc == -EAGAIN){ //fd无数据可读,读完
//read next time;
return rc;
} else{
return -1; /* error */
}
}
*bufp = 0;
return n;
}

epoll

epoll的底层接口很直观,直接看代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
//创建epoll实例fd:成功返回0,失败返回-1并设置errno
int epoll_create1(int __flags); //__flags=0或者EPOLL_CLOEXEC,后者代表在exec函数中自动关闭epoll fd

//监听或者取消监听fd:成功返回0,失败返回-1并设置errno
int epoll_ctl(int __epfd, //epoll获取的实例文件描述符
int __op, //__op=EPOLL_CTL_ADD,注册新fd
//__op=EPOLL_CTL_MOD,修改fd事件
//__op=EPOLL_CTL_DEL,移除新的fd
int __fd, //监听或者取消监听fd
struct epoll_event * //事件结构体指针,__op=EPOLL_CTL_DEL时可为NULL
);

//等待监听事件返回:成功返回事件数n,失败返回-1并设置errno
int epoll_wait(int __epfd, //epoll fd
struct epoll_event *__events, //输出数组,填充就绪的事件
int __maxevents, //输出数组大小,必须大于等于__events
int __timeout //超时时间,-1时阻塞监听,0时非阻塞立刻返回
);
看一下epoll_event的系统实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
typedef union epoll_data
{
void *ptr;
int fd;
uint32_t u32;
uint64_t u64;
} epoll_data_t;

struct epoll_event
{
uint32_t events; /* Epoll events */
epoll_data_t data; /* User data variable */
} __EPOLL_PACKED; //紧凑内存布局,不对齐
此处要填充的epoll_event.events和events.data,events是标识相关的关联事件分类,以下:

含义 含义
EPOLLIN 收到数据触发 EPOLLOUT 数据可写时触发(缓冲区不满)
EPOLLET 边沿触发 EPOLLPRI 有紧急数据可读
EPOLLERR 发生错误(可能总触发) EPOLLHUP 对端断开
EPOLLONESHOT 触发一次不再监听 EPOLLRDHUP 对端关闭或者TCP半关闭

不同标记可以通过或运算组合。

data是要填充的监听对象,本质是一个联合体,这个对象既可以是通用的文件描述符,例如socket中返回的套接字,或者某个设备文件等,也可以是自定义封装的结构体对象,http中就使用了这样的方法,从后文例子可看出:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
struct epoll_event ev;
ev.events = EPOLLIN | EPOLLET;

//法一:普通描述符
ev.data.fd = socketfd;

//法二:自定义封装对象
struct connection {
int fd;
char buffer[1024];
};
struct connection *conn = malloc(sizeof(struct connection));
conn->fd = client_fd;
ev.data.ptr = conn;

zaver的相关实现只是一个封装:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
#include "epoll.h"
#include "dbg.h"

#define MAXEVENTS 1024

struct epoll_event *events;

int zv_epoll_create(int flags) {
int fd = epoll_create1(flags);
check(fd > 0, "zv_epoll_create: epoll_create1");

events = (struct epoll_event *)malloc(sizeof(struct epoll_event) * MAXEVENTS);
check(events != NULL, "zv_epoll_create: malloc");
return fd;
}

void zv_epoll_add(int epfd, int fd, struct epoll_event *event) {
int rc = epoll_ctl(epfd, EPOLL_CTL_ADD, fd, event);
check(rc == 0, "zv_epoll_add: epoll_ctl");
return;
}

void zv_epoll_mod(int epfd, int fd, struct epoll_event *event) {
int rc = epoll_ctl(epfd, EPOLL_CTL_MOD, fd, event);
check(rc == 0, "zv_epoll_mod: epoll_ctl");
return;
}

void zv_epoll_del(int epfd, int fd, struct epoll_event *event) {
int rc = epoll_ctl(epfd, EPOLL_CTL_DEL, fd, event);
check(rc == 0, "zv_epoll_del: epoll_ctl");
return;
}

int zv_epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout) {
int n = epoll_wait(epfd, events, maxevents, timeout);
check(n >= 0, "zv_epoll_wait: epoll_wait");
return n;
}

http

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
//http.h
#ifndef HTTP_H
#define HTTP_H

#include <strings.h>
#include <stdint.h>
#include "rio.h"
#include "list.h"
#include "dbg.h"
#include "util.h"
#include "http_request.h"

#define MAXLINE 8192
#define SHORTLINE 512

#define zv_str3_cmp(m, c0, c1, c2, c3) \
*(uint32_t *) m == ((c3 << 24) | (c2 << 16) | (c1 << 8) | c0)
#define zv_str3Ocmp(m, c0, c1, c2, c3) \
*(uint32_t *) m == ((c3 << 24) | (c2 << 16) | (c1 << 8) | c0)

#define zv_str4cmp(m, c0, c1, c2, c3) \
*(uint32_t *) m == ((c3 << 24) | (c2 << 16) | (c1 << 8) | c0)


typedef struct mime_type_s {
const char *type;
const char *value;
} mime_type_t;

void do_request(void *infd);

#endif

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
//http.c
#include <strings.h>
#include <sys/mman.h>
#include <unistd.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <fcntl.h>
#include "http.h"
#include "http_parse.h"
#include "http_request.h"
#include "epoll.h"
#include "error.h"
#include "timer.h"

static const char* get_file_type(const char *type);
static void parse_uri(char *uri, int length, char *filename, char *querystring);
static void do_error(int fd, char *cause, char *errnum, char *shortmsg, char *longmsg);
static void serve_static(int fd, char *filename, size_t filesize, zv_http_out_t *out);
static char *ROOT = NULL;

mime_type_t zaver_mime[] =
{
{".html", "text/html"},
{".xml", "text/xml"},
{".xhtml", "application/xhtml+xml"},
{".txt", "text/plain"},
{".rtf", "application/rtf"},
{".pdf", "application/pdf"},
{".word", "application/msword"},
{".png", "image/png"},
{".gif", "image/gif"},
{".jpg", "image/jpeg"},
{".jpeg", "image/jpeg"},
{".au", "audio/basic"},
{".mpeg", "video/mpeg"},
{".mpg", "video/mpeg"},
{".avi", "video/x-msvideo"},
{".gz", "application/x-gzip"},
{".tar", "application/x-tar"},
{".css", "text/css"},
{NULL ,"text/plain"}
};

void do_request(void *ptr) {
zv_http_request_t *r = (zv_http_request_t *)ptr;
int fd = r->fd;
int rc, n;
char filename[SHORTLINE];
struct stat sbuf;
ROOT = r->root;
char *plast = NULL;
size_t remain_size;

zv_del_timer(r);
for(;;) {
plast = &r->buf[r->last % MAX_BUF];
remain_size = MIN(MAX_BUF - (r->last - r->pos) - 1, MAX_BUF - r->last % MAX_BUF);

n = read(fd, plast, remain_size);
check(r->last - r->pos < MAX_BUF, "request buffer overflow!");

if (n == 0) {
// EOF
log_info("read return 0, ready to close fd %d, remain_size = %zu", fd, remain_size);
goto err;
}

if (n < 0) {
if (errno != EAGAIN) {
log_err("read err, and errno = %d", errno);
goto err;
}
break;
}

r->last += n;
check(r->last - r->pos < MAX_BUF, "request buffer overflow!");

log_info("ready to parse request line");
rc = zv_http_parse_request_line(r);
if (rc == ZV_AGAIN) {
continue;
} else if (rc != ZV_OK) {
log_err("rc != ZV_OK");
goto err;
}

log_info("method == %.*s", (int)(r->method_end - r->request_start), (char *)r->request_start);
log_info("uri == %.*s", (int)(r->uri_end - r->uri_start), (char *)r->uri_start);

debug("ready to parse request body");
rc = zv_http_parse_request_body(r);
if (rc == ZV_AGAIN) {
continue;
} else if (rc != ZV_OK) {
log_err("rc != ZV_OK");
goto err;
}

/*
* handle http header
*/
zv_http_out_t *out = (zv_http_out_t *)malloc(sizeof(zv_http_out_t));
if (out == NULL) {
log_err("no enough space for zv_http_out_t");
exit(1);
}

rc = zv_init_out_t(out, fd);
check(rc == ZV_OK, "zv_init_out_t");

parse_uri(r->uri_start, r->uri_end - r->uri_start, filename, NULL);

if(stat(filename, &sbuf) < 0) {
do_error(fd, filename, "404", "Not Found", "zaver can't find the file");
continue;
}

if (!(S_ISREG(sbuf.st_mode)) || !(S_IRUSR & sbuf.st_mode))
{
do_error(fd, filename, "403", "Forbidden",
"zaver can't read the file");
continue;
}

out->mtime = sbuf.st_mtime;

zv_http_handle_header(r, out);
check(list_empty(&(r->list)) == 1, "header list should be empty");

if (out->status == 0) {
out->status = ZV_HTTP_OK;
}

serve_static(fd, filename, sbuf.st_size, out);

if (!out->keep_alive) {
log_info("no keep_alive! ready to close");
free(out);
goto close;
}
free(out);

}

struct epoll_event event;
event.data.ptr = ptr;
event.events = EPOLLIN | EPOLLET | EPOLLONESHOT;

zv_epoll_mod(r->epfd, r->fd, &event);
zv_add_timer(r, TIMEOUT_DEFAULT, zv_http_close_conn);
return;

err:
close:
rc = zv_http_close_conn(r);
check(rc == 0, "do_request: zv_http_close_conn");
}

static void parse_uri(char *uri, int uri_length, char *filename, char *querystring) {
check(uri != NULL, "parse_uri: uri is NULL");
uri[uri_length] = '\0';

char *question_mark = strchr(uri, '?');
int file_length;
if (question_mark) {
file_length = (int)(question_mark - uri);
debug("file_length = (question_mark - uri) = %d", file_length);
} else {
file_length = uri_length;
debug("file_length = uri_length = %d", file_length);
}

if (querystring) {
//TODO
}

strcpy(filename, ROOT);

// uri_length can not be too long
if (uri_length > (SHORTLINE >> 1)) {
log_err("uri too long: %.*s", uri_length, uri);
return;
}

debug("before strncat, filename = %s, uri = %.*s, file_len = %d", filename, file_length, uri, file_length);
strncat(filename, uri, file_length);

char *last_comp = strrchr(filename, '/');
char *last_dot = strrchr(last_comp, '.');
if (last_dot == NULL && filename[strlen(filename)-1] != '/') {
strcat(filename, "/");
}

if(filename[strlen(filename)-1] == '/') {
strcat(filename, "index.html");
}

log_info("filename = %s", filename);
return;
}

static void do_error(int fd, char *cause, char *errnum, char *shortmsg, char *longmsg)
{
char header[MAXLINE], body[MAXLINE];

sprintf(body, "<html><title>Zaver Error</title>");
sprintf(body, "%s<body bgcolor=""ffffff"">\n", body);
sprintf(body, "%s%s: %s\n", body, errnum, shortmsg);
sprintf(body, "%s<p>%s: %s\n</p>", body, longmsg, cause);
sprintf(body, "%s<hr><em>Zaver web server</em>\n</body></html>", body);

sprintf(header, "HTTP/1.1 %s %s\r\n", errnum, shortmsg);
sprintf(header, "%sServer: Zaver\r\n", header);
sprintf(header, "%sContent-type: text/html\r\n", header);
sprintf(header, "%sConnection: close\r\n", header);
sprintf(header, "%sContent-length: %d\r\n\r\n", header, (int)strlen(body));
//log_info("header = \n %s\n", header);
rio_writen(fd, header, strlen(header));
rio_writen(fd, body, strlen(body));
//log_info("leave clienterror\n");
return;
}

static void serve_static(int fd, char *filename, size_t filesize, zv_http_out_t *out) {
char header[MAXLINE];
char buf[SHORTLINE];
size_t n;
struct tm tm;

const char *file_type;
const char *dot_pos = strrchr(filename, '.');
file_type = get_file_type(dot_pos);

sprintf(header, "HTTP/1.1 %d %s\r\n", out->status, get_shortmsg_from_status_code(out->status));

if (out->keep_alive) {
sprintf(header, "%sConnection: keep-alive\r\n", header);
sprintf(header, "%sKeep-Alive: timeout=%d\r\n", header, TIMEOUT_DEFAULT);
}

if (out->modified) {
sprintf(header, "%sContent-type: %s\r\n", header, file_type);
sprintf(header, "%sContent-length: %zu\r\n", header, filesize);
localtime_r(&(out->mtime), &tm);
strftime(buf, SHORTLINE, "%a, %d %b %Y %H:%M:%S GMT", &tm);
sprintf(header, "%sLast-Modified: %s\r\n", header, buf);
}

sprintf(header, "%sServer: Zaver\r\n", header);
sprintf(header, "%s\r\n", header);

n = (size_t)rio_writen(fd, header, strlen(header));
check(n == strlen(header), "rio_writen error, errno = %d", errno);
if (n != strlen(header)) {
log_err("n != strlen(header)");
goto out;
}

if (!out->modified) {
goto out;
}

int srcfd = open(filename, O_RDONLY, 0);
check(srcfd > 2, "open error");
// can use sendfile
char *srcaddr = mmap(NULL, filesize, PROT_READ, MAP_PRIVATE, srcfd, 0);
check(srcaddr != (void *) -1, "mmap error");
close(srcfd);

n = rio_writen(fd, srcaddr, filesize);
// check(n == filesize, "rio_writen error");

munmap(srcaddr, filesize);

out:
return;
}

static const char* get_file_type(const char *type)
{
if (type == NULL) {
return "text/plain";
}

int i;
for (i = 0; zaver_mime[i].type != NULL; ++i) {
if (strcmp(type, zaver_mime[i].type) == 0)
return zaver_mime[i].value;
}
return zaver_mime[i].value;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
//http_request.h
#ifndef HTTP_REQUEST_H
#define HTTP_REQUEST_H

#include <time.h>
#include "http.h"

#define ZV_AGAIN EAGAIN

#define ZV_HTTP_PARSE_INVALID_METHOD 10
#define ZV_HTTP_PARSE_INVALID_REQUEST 11
#define ZV_HTTP_PARSE_INVALID_HEADER 12

#define ZV_HTTP_UNKNOWN 0x0001
#define ZV_HTTP_GET 0x0002
#define ZV_HTTP_HEAD 0x0004
#define ZV_HTTP_POST 0x0008

#define ZV_HTTP_OK 200

#define ZV_HTTP_NOT_MODIFIED 304

#define ZV_HTTP_NOT_FOUND 404

#define MAX_BUF 8124

typedef struct zv_http_request_s {
void *root;
int fd;
int epfd;
char buf[MAX_BUF]; /* ring buffer */
size_t pos, last;
int state;
void *request_start;
void *method_end; /* not include method_end*/
int method;
void *uri_start;
void *uri_end; /* not include uri_end*/
void *path_start;
void *path_end;
void *query_start;
void *query_end;
int http_major;
int http_minor;
void *request_end;

struct list_head list; /* store http header */
void *cur_header_key_start;
void *cur_header_key_end;
void *cur_header_value_start;
void *cur_header_value_end;

void *timer;
} zv_http_request_t;

typedef struct {
int fd;
int keep_alive;
time_t mtime; /* the modified time of the file*/
int modified; /* compare If-modified-since field with mtime to decide whether the file is modified since last time*/

int status;
} zv_http_out_t;

typedef struct zv_http_header_s {
void *key_start, *key_end; /* not include end */
void *value_start, *value_end;
list_head list;
} zv_http_header_t;

typedef int (*zv_http_header_handler_pt)(zv_http_request_t *r, zv_http_out_t *o, char *data, int len);

typedef struct {
char *name;
zv_http_header_handler_pt handler;
} zv_http_header_handle_t;

void zv_http_handle_header(zv_http_request_t *r, zv_http_out_t *o);
int zv_http_close_conn(zv_http_request_t *r);

int zv_init_request_t(zv_http_request_t *r, int fd, int epfd, zv_conf_t *cf);
int zv_free_request_t(zv_http_request_t *r);

int zv_init_out_t(zv_http_out_t *o, int fd);
int zv_free_out_t(zv_http_out_t *o);

const char *get_shortmsg_from_status_code(int status_code);

extern zv_http_header_handle_t zv_http_headers_in[];

#endif
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
//http_request.c

#ifndef _GNU_SOURCE
/* why define _GNU_SOURCE? http://stackoverflow.com/questions/15334558/compiler-gets-warnings-when-using-strptime-function-ci */
#define _GNU_SOURCE
#endif

#include <math.h>
#include <time.h>
#include <unistd.h>
#include "http.h"
#include "http_request.h"
#include "error.h"

static int zv_http_process_ignore(zv_http_request_t *r, zv_http_out_t *out, char *data, int len);
static int zv_http_process_connection(zv_http_request_t *r, zv_http_out_t *out, char *data, int len);
static int zv_http_process_if_modified_since(zv_http_request_t *r, zv_http_out_t *out, char *data, int len);

zv_http_header_handle_t zv_http_headers_in[] = {
{"Host", zv_http_process_ignore},
{"Connection", zv_http_process_connection},
{"If-Modified-Since", zv_http_process_if_modified_since},
{"", zv_http_process_ignore}
};

int zv_init_request_t(zv_http_request_t *r, int fd, int epfd, zv_conf_t *cf) {
r->fd = fd;
r->epfd = epfd;
r->pos = r->last = 0;
r->state = 0;
r->root = cf->root;
INIT_LIST_HEAD(&(r->list));

return ZV_OK;
}

int zv_free_request_t(zv_http_request_t *r) {
// TODO
(void) r;
return ZV_OK;
}

int zv_init_out_t(zv_http_out_t *o, int fd) {
o->fd = fd;
o->keep_alive = 0;
o->modified = 1;
o->status = 0;

return ZV_OK;
}

int zv_free_out_t(zv_http_out_t *o) {
// TODO
(void) o;
return ZV_OK;
}

void zv_http_handle_header(zv_http_request_t *r, zv_http_out_t *o) {
list_head *pos;
zv_http_header_t *hd;
zv_http_header_handle_t *header_in;
int len;

list_for_each(pos, &(r->list)) {
hd = list_entry(pos, zv_http_header_t, list);
/* handle */

for (header_in = zv_http_headers_in;
strlen(header_in->name) > 0;
header_in++) {
if (strncmp(hd->key_start, header_in->name, hd->key_end - hd->key_start) == 0) {

//debug("key = %.*s, value = %.*s", hd->key_end-hd->key_start, hd->key_start, hd->value_end-hd->value_start, hd->value_start);
len = hd->value_end - hd->value_start;
(*(header_in->handler))(r, o, hd->value_start, len);
break;
}
}

/* delete it from the original list */
list_del(pos);
free(hd);
}
}

int zv_http_close_conn(zv_http_request_t *r) {
// NOTICE: closing a file descriptor will cause it to be removed from all epoll sets automatically
// http://stackoverflow.com/questions/8707601/is-it-necessary-to-deregister-a-socket-from-epoll-before-closing-it
close(r->fd);
free(r);

return ZV_OK;
}

static int zv_http_process_ignore(zv_http_request_t *r, zv_http_out_t *out, char *data, int len) {
(void) r;
(void) out;
(void) data;
(void) len;

return ZV_OK;
}

static int zv_http_process_connection(zv_http_request_t *r, zv_http_out_t *out, char *data, int len) {
(void) r;
if (strncasecmp("keep-alive", data, len) == 0) {
out->keep_alive = 1;
}

return ZV_OK;
}

static int zv_http_process_if_modified_since(zv_http_request_t *r, zv_http_out_t *out, char *data, int len) {
(void) r;
(void) len;

struct tm tm;
if (strptime(data, "%a, %d %b %Y %H:%M:%S GMT", &tm) == (char *)NULL) {
return ZV_OK;
}
time_t client_time = mktime(&tm);

double time_diff = difftime(out->mtime, client_time);
if (fabs(time_diff) < 1e-6) {
// log_info("content not modified clienttime = %d, mtime = %d\n", client_time, out->mtime);
/* Not modified */
out->modified = 0;
out->status = ZV_HTTP_NOT_MODIFIED;
}

return ZV_OK;
}

const char *get_shortmsg_from_status_code(int status_code) {
/* for code to msg mapping, please check:
* http://users.polytech.unice.fr/~buffa/cours/internet/POLYS/servlets/Servlet-Tutorial-Response-Status-Line.html
*/
if (status_code == ZV_HTTP_OK) {
return "OK";
}

if (status_code == ZV_HTTP_NOT_MODIFIED) {
return "Not Modified";
}

if (status_code == ZV_HTTP_NOT_FOUND) {
return "Not Found";
}


return "Unknown";
}