C语言网络编程:实现自己的高性能网络框架(上)

这篇文章开始,我们结合前面几篇文章的内容,实现一个接近生产环境的高性能网络框架。在实现代码的过程中,发现极客时间有一个专栏《网络编程实战》,之前的实现和这个专栏中的实例非常接近。所以,这两篇文章中大量的代码都来自于这个专栏,如果有兴趣可以去看一下这个专栏。

在epoll那篇文章中,我们遗留了一个问题没有解决,一般生产环境中最耗时的其实是业务逻辑处理。所以,我们就想,是不是可以将处理业务逻辑的代码给拆出来丢到线程池中去执行。

比如像下面这样:

我们事先创建好一堆worker线程,主线程accepter拿到一个连接上来的套接字,就从线程池中取出一个线程将这个套接字交给它。这样,我们就将accept和对套接字的操作解耦了,不会因为业务逻辑处理得慢而导致客户端不能及时和服务端建立连接。

当然,整个流程还可以被再被拆分。比如,可以使用专门的线程来注册读写事件,专门的线程来处理业务逻辑

本文中的实现和上面基本差不多,主线程不断accept出来新的套接字,然后交给专门负责事件注册的线程,业务逻辑的处理交给另外专门负责处理业务逻辑的线程,处理流程如下:

为了实现上面构想的流程,设计了几个核心的对象,上图中event_loop就是其中最重要的一个对象。

event_loop

event_loop实际是一个无限循环,还记得在讲epoll的时候,我们将各种套接字使用epoll监听起来,然后在一个循环里阻塞在epoll_wait函数,当它返回的时候说明有事件发生。event_loop本质上也是阻塞在epoll,但event_loop和我们之前的实现不一样的地方在于,它连接了我们设计的各种对象,比如buffer、channel、tcp_connection等,这些我们后面会一一展开。

event_loop对象定义如下


struct event_loop {
    int quit;     // 用来标识是否退出
    const struct event_dispatcher *ev_dispatcher;  // 可以理解为poll、epoll实现

    void *event_dispatcher_data;  // 声明为void*,可以灵活的保存不同的实现
    struct channel_map *chan_map; // 套接字和channel的映射关系

    int is_handle_pending;         
    struct channel_element *pending_head;
    struct channel_element *pending_tail;

    pthread_t owner_thread_id;   // 当前event_loop线程ID
    pthread_mutex_t mutex;
    pthread_cond_t cond;
    int socket_pair[2];         
    char *thread_name;
};

可以看到,event_loop中除了定义了当前线程相关的资源还定义了一个socket_pair,这个我们在讲poll的时候有详细讲过,如果没印象了可以回过头去看一下。

event_loop对象定义了以下几个核心方法:


// 初始化一个event_loop对象
struct event_loop *event_loop_init();

// 真正初始化event_loop对象的方法
struct event_loop *event_loop_init_with_name(char *thread_name);

// 使event_loop运行起来,也就是阻塞在epoll/poll函数
int event_loop_run(struct event_loop *ev_loop);

// 唤醒epoll,使得新加入的套接字可以初poll/epoll监听起来
void event_loop_wakeup(struct event_loop *ev_loop);

// 添加一个套接字到poll/epoll中
int event_loop_add_channel_event(struct event_loop *ev_loop, int fd, struct channel *chan);

// 从poll/epoll中删除套接字
int event_loop_remove_channel_event(struct event_loop *ev_loop, int fd, struct channel *chan);

// 修改套接字事件
int event_loop_update_channel_event(struct event_loop *ev_loop, int fd, struct channel *chan);

// 调用套接字的读/写回调函数
int channel_event_activate(struct event_loop *ev_loop, int fd, int res);

// 扫描所有的套接字进行新增/删除/修改操作
int event_loop_handle_pending_channel(struct event_loop *ev_loop);

这里我们挑其中几个比较核心的方法来分析一下,首先是event_loop的初始化,event_loop_init实际上调用的是event_loop_init_with_name方法,所以我们主要看event_loop_init_with_name


struct event_loop *event_loop_init_with_name(char *thread_name) {
    struct event_loop *ev_loop = malloc(sizeof(struct event_loop));
    pthread_mutex_init(&ev_loop->mutex, NULL);
    pthread_cond_init(&ev_loop->cond, NULL);

    if (thread_name != NULL) {
        ev_loop->thread_name = thread_name;
    } else {
        ev_loop->thread_name = "main thread";
    }

    ev_loop->quit = 0;
    ev_loop->chan_map = malloc(sizeof(struct channel_map));
    map_init(ev_loop->chan_map);

#ifdef EPOLL_ENABLE
    ev_loop->ev_dispatcher = &epoll_dispatcher;
#else
    ev_loop->event_dispatcher = &poll_dispatcher;
#endif

    ev_loop->event_dispatcher_data = ev_loop->ev_dispatcher->init(ev_loop);

    ev_loop->owner_thread_id = pthread_self();
    if (socketpair(AF_UNIX, SOCK_STREAM, 0, ev_loop->socket_pair) < 0) {
        perror("socketpair set failed.");
    }
    ev_loop->is_handle_pending = 0;
    ev_loop->pending_head = NULL;
    ev_loop->pending_tail = NULL;

    struct channel *chan = channel_new(ev_loop->socket_pair[1], EVENT_READ, handle_wakeup, NULL, ev_loop);
    event_loop_add_channel_event(ev_loop, ev_loop->socket_pair[1], chan);

    return ev_loop;
}

初始化主要做了以下几件事

  • 初始化当前event_loop线程的相关信息,包含条件变量cond和锁mutex
  • 给channel_map分配内存
  • 初始化dispatcher,如果检测到支持epoll就使用epoll,否则使用poll
  • 初始化socket_pair,并立即监听

添加事件event_loop_do_channel_event


int event_loop_do_channel_event(struct event_loop *ev_loop, int fd, struct channel *chan, int type) {
    pthread_mutex_lock(&ev_loop->mutex);
    assert(ev_loop->is_handle_pending == 0);
    event_loop_channel_buffer_nolock(ev_loop, fd, chan, type);

    pthread_mutex_unlock(&ev_loop->mutex);
    if (!is_in_same_thread(ev_loop)) {
        event_loop_wakeup(ev_loop);
    } else {
        event_loop_handle_pending_channel(ev_loop);
    }
}

这里要注意可能出现多个线程同时处理一个套接字的情况,所以加锁。接着将对应的套接字添加到当前event_loop的channel_element链表中,channel_element是为了方便快速找到注册到当前event_loop中的所有套接字,方便在后续操作。

接着,会判断传进来的event_loop对应的线程是否是当前线程,如果是说明是accept添加套接字到事件监听列表。因为需要让监听生效,这里需要再次调用epoll/poll函数,所以需要唤醒一次epoll/poll,唤醒的方式是向socket_pair的一端发送一个字符,这个操作是在event_loop_wakeup函数中实现的,我们在讲poll那篇文件有详细解释,如果没印象了可以回头再去看一下。

如果不是同一个线程,会遍历一次所有注册在当前event_loop上的套接字,根据type进行相应的操作,调用event_loop_handle_pending_channel方法,这个方法如下:


int event_loop_handle_pending_channel(struct event_loop *ev_loop) {
    pthread_mutex_lock(&ev_loop->mutex);
    ev_loop->is_handle_pending = 1;

    struct channel_element *chan_elem = ev_loop->pending_head;
    while(chan_elem != NULL) {
        struct channel *chan = chan_elem->channel;
        int fd = chan->fd;
        if (chan_elem->type == 1) {
            event_loop_handle_pending_add(ev_loop, fd, chan);
        } else if (chan_elem->type == 2) {
            event_loop_handle_pending_remove(ev_loop, fd, chan);
        } else if (chan_elem->type == 3) {
            event_loop_handle_pending_update(ev_loop, fd, chan);
        }
        chan_elem = chan_elem->next;
    }

    ev_loop->pending_head = ev_loop->pending_tail = NULL;
    ev_loop->is_handle_pending = 0;

    pthread_mutex_unlock(&ev_loop->mutex);

    return 0;
}

这个方法进来就将当前event_loop状态置为pending,接着遍历channel_element链表,其中type有3种类型分别是1-添加 2-删除 3-修改,这个类型是在添加event_loop事件的时候传进去的。下面我们依次来看一下这三种类型的处理。

添加event_loop_handle_pending_add


int event_loop_handle_pending_add(struct event_loop *ev_loop, int fd, struct channel *chan) {
    struct channel_map *map = ev_loop->chan_map;

    if (fd < 0) {
        return 0;
    }

    if (fd >= map->nentries) {
        if (map_make_space(map, fd, sizeof(struct channel *)) == -1) {
            return -1;
        }
    }

    if (map->entries[fd] == NULL) {
        map->entries[fd] = chan;

        struct event_dispatcher *ev_dispatcher = ev_loop->ev_dispatcher;
        ev_dispatcher->add(ev_loop, chan);
        return 1;
    }

    return 0;
}

添加主要做了两件事

  • 将套接字对应的channel添加到channel_map中,这样当有事件发生的时候就可以通过套接字找到对应的channel
  • 将套接字添加到dispatcher中监听起来(实际上就是epoll/poll)

删除event_loop_handle_pending_remove



int event_loop_handle_pending_remove(struct event_loop *ev_loop, int fd, struct channel *chan) {
    struct channel_map *map = ev_loop->chan_map;
    assert(fd == chan->fd);

    if (fd < 0) {
        return 0;
    }

    if (fd >= map->nentries) {
        return -1;
    }

    struct channel *ch = map->entries[fd];

    struct event_dispatcher *ev_dispatcher = ev_loop->ev_dispatcher;

    int retval = 0;
    if (ev_dispatcher->del(ev_loop, ch) == -1) {
        retval = -1;
    } else {
        retval = 1;
    }

    map->entries[fd] = NULL;
    return retval;
}

删除也比较简单,首先将套接字从channel_map中删除,然后从dispatcher中删除(epoll/poll)

修改event_loop_handle_pending_update



int event_loop_handle_pending_update(struct event_loop *ev_loop, int fd, struct channel *chan) {
    struct channel_map *map = ev_loop->chan_map;

    if (fd < 0) {
        return 0;
    }

    if (map->entries[fd] == NULL) {
        return -1;
    }

    struct event_dispatcher *ev_dispatcher = ev_loop->ev_dispatcher;
    ev_dispatcher->update(ev_loop, chan);
}

修改其实就是修改dispatcher的监听事件(epoll/poll)

当事件添加好了之后,就可以将event_loop跑起来了,如下:



int event_loop_run(struct event_loop *ev_loop) {
    assert(ev_loop != NULL);

    struct event_dispatcher *dispatcher = ev_loop->ev_dispatcher;

    if (ev_loop->owner_thread_id != pthread_self()) {
        exit(1);
    }

    struct timeval tv;
    tv.tv_sec = 1;

    while(!ev_loop->quit) {
        dispatcher->dispatch(ev_loop, &tv);

        event_loop_handle_pending_channel(ev_loop);
    }

    return 0;

}

在event_loop_run方法中,核心就是一个死循环,其中dispatcher->dispatch(ev_loop, &tv)可以理解为调用epoll/poll方法。

channel

channel的本质是套接字的上下文,它的结构如下:



struct channel {
    int fd;
    int events;

    event_read_callback ev_read_callback;
    event_write_callback ev_write_callback;
    void *data;
};

从结构定义上来看,每一个套接字都对应一个channel,这个channel里除了包含套接字本身(也就是fd)还有它关注的事件events和读、写回调函数。

channel的操作也比较简单,就3个方法,如下:



int channel_write_event_is_enabled(struct channel *chan) {
    return chan->events & EVENT_WRITE;
}

int channel_write_event_enable(struct channel *chan) {
    struct event_loop *ev_loop = (struct event_loop *) chan->data;
    chan->events = chan->events | EVENT_WRITE;
    event_loop_update_channel_event(ev_loop, chan->fd, chan);
}

int channel_write_event_disable(struct channel *chan) {
    struct event_loop *ev_loop = (struct event_loop *) chan->data;
    chan->events = chan->events & ~EVENT_WRITE;
    event_loop_update_channel_event(ev_loop, chan->fd, chan);
}

它们分别是判断套接字是否可写,开启写事件以及关闭写事件。

channel_map

当有事件发生,我们需要通过套接字找到对应的channel,就需要使用到channel_map,它的定义如下:



struct channel_map {
    void **entries;

    int nentries;
};

定义也非常简单,就是一个二维数组entries,可以用来表示套接字到channel的映射关系,nentries表示二维数组的长度。

event_dispatcher

dispatcher是对I/O复用模型的抽象,可以理解为是一个接口,select、poll、epoll都可以实现它,定义如下:



struct event_dispatcher {
    const char *name;

    void *(*init)(struct event_loop *ev_loop);

    int  (*add) (struct event_loop *ev_loop, struct channel *chan);

    int (*del) (struct event_loop *ev_loop, struct channel *chan);

    int (*update) (struct event_loop *ev_loop, struct channel *chan);

    int (*dispatch)(struct event_loop * eventLoop, struct timeval * tv);

    void (*clean) (struct event_loop *ev_loop);
};

可以看到,event_dispatcher中包含了一组函数指针,这组函数覆盖了事件的增、删、改、查。

上面讲event_loop跑起来最终会阻塞在dispatcher->dispatch(ev_loop, &tv)这一行,这里的dispatch实际上就是event_dispatcher中的dispatch方法,我们以epoll为例,其实现如下:


int epoll_dispatch(struct event_loop *ev_loop, struct timeval *tv) {
    epoll_dispatcher_data *ep_dis_data = (epoll_dispatcher_data *) ev_loop->event_dispatcher_data;

    int i, n;

    n = epoll_wait(ep_dis_data->efd, ep_dis_data->events, MAXEVENTS, -1);

    for (i = 0; i < n; i++) {
        if ((ep_dis_data->events[i].events & EPOLLERR) || (ep_dis_data->events[i].events & EPOLLHUP)) {
            perror("epoll error");
            close(ep_dis_data->events[i].data.fd);
            continue;
        }

        if (ep_dis_data->events[i].events & EPOLLIN) {
            channel_event_activate(ev_loop, ep_dis_data->events[i].data.fd, EVENT_READ);
        }

        if (ep_dis_data->events[i].events & EPOLLOUT) {
            channel_event_activate(ev_loop, ep_dis_data->events[i].data.fd, EVENT_WRITE);
        }
    }

    return 0;
}

可以看到,dispatch就是调用了epoll_wait方法,如果有事件发生,n就是发生事件的套接字的数量。接下来的for循环会依次处理所有的套接字。最后又会调用channel_event_activate方法,这个方法是在event_loop对像里,其实现如下:


int channel_event_activate(struct event_loop *ev_loop, int fd, int res) {
    struct channel_map *map = ev_loop->chan_map;

    if (fd < 0) {
        return 0;
    }

    if (fd >= map->nentries) {
        return -1;
    }

    struct channel *chan = map->entries[fd];
    assert(fd == chan->fd);

    if (res & EVENT_READ) {
        if (chan->ev_read_callback != NULL) {
            chan->ev_read_callback(chan->data);
        }
    }
    if (res & EVENT_WRITE) {
        if (chan->ev_write_callback) {
            chan->ev_write_callback(chan->data);
        }
    }

    return 0;
}

在这个方法里面,我们可以清楚的看到channel_map的作用,通过套接字描述符找到对应的channel对象,找到channel,从而调用channel对象中事先创建好的用于读和写的回调函数。

thread_pool

thread_pool维护了一个线程池的基础信息,比如线程池的大小,其定义如下:



struct thread_pool {
    struct event_loop *main_loop;
    int started;
    int thread_number;
    struct event_loop_thread *ev_loop_threads;
    int position;
};

thread_pool中也有一个event_loop,这个事件循环是用来监听是否有accept发生的,它还有3个方法,分别是:

创建线程池thread_pool_new



struct thread_pool *thread_pool_new(struct event_loop *main_loop, int thread_number) {
    struct thread_pool *th_pool = malloc(sizeof(struct thread_pool));
    th_pool->main_loop = main_loop;
    th_pool->position = 0;
    th_pool->thread_number = thread_number;
    th_pool->started = 0;
    th_pool->ev_loop_threads = NULL;
    return th_pool;
}

这个方法用来分配线程池需要的资源,以及设置线程池的大小

让线程跑起来thread_pool_start



void thread_pool_start(struct thread_pool *th_pool) {
    assert(!th_pool->started);
    assert_in_same_thread(th_pool->main_loop);

    th_pool->started = 1;
    void *tmp;

    if (th_pool->thread_number <= 0) {
        return;
    }

    th_pool->ev_loop_threads = malloc(th_pool->thread_number * sizeof(struct event_loop_thread));
    for (int i = 0; i < th_pool->thread_number; ++i) {
        event_loop_thread_init(&th_pool->ev_loop_threads[i], i);
        event_loop_thread_start(&th_pool->ev_loop_threads[i]);
    }
}

这里,如果线程池大小小于等于0,就会退化成主线程处理套接字事件的注册。如果大于0,就会分配对应大小的线程资源,并将线程跑起来。

event_loop_thread

event_loop_thread是线程池真正初始化和创建的地方



struct event_loop_thread {
    struct event_loop *ev_loop;
    pthread_t thread_id;
    pthread_mutex_t mutex;
    pthread_cond_t cond;
    char *thread_name;
    long thread_count;
};

在thread_pool中,将线程运行起来调用了两个方法,event_loop_thread_init和event_loop_thread_start。这两个方法就是在event_loop_thread对象中,分别用于初始化线程和创建线程,下面我们分别来看一下它们的实现

event_loop_thread_init



int event_loop_thread_init(struct event_loop_thread *ev_loop_thread, int i) {
    pthread_mutex_init(&ev_loop_thread->mutex, NULL);
    pthread_cond_init(&ev_loop_thread->cond, NULL);

    ev_loop_thread->ev_loop = NULL;
    ev_loop_thread->thread_count = 0;
    ev_loop_thread->thread_id = 0;

    char *buf = malloc(16);
    ev_loop_thread->thread_name = buf;

    return 0;
}

这个方法主要是初始化线程相关的资源,比如条件变量cond和互斥锁mutex

event_loop_thread_start



struct event_loop *event_loop_thread_start(struct event_loop_thread *ev_loop_thread) {
    pthread_create(&ev_loop_thread->thread_id, NULL, &event_loop_thread_run, ev_loop_thread);

    assert(pthread_mutex_lock(&ev_loop_thread->mutex) == 0);

    while(ev_loop_thread->ev_loop == NULL) {
        assert(pthread_cond_wait(&ev_loop_thread->cond, &ev_loop_thread->mutex) == 0);
    }

    assert(pthread_mutex_unlock(&ev_loop_thread->mutex) == 0);

    return ev_loop_thread->ev_loop;
}

这个方法主要的功能就是调用pthread_create创建出一个线程。这里的代码似曾相识,在while循环里调用pthread_cond_wait,这个我们在C语言最最最核心语法那篇文章中介绍过。

当线程初始化和创建都完成了之后,我们就可以把对应的event_loop运行起来了,如下:


void *event_loop_thread_run(void *arg) {
    struct event_loop_thread *ev_loop_thread = (struct event_loop_thread *) arg;

    pthread_mutex_lock(&ev_loop_thread->mutex);

    ev_loop_thread->ev_loop = event_loop_init_with_name(ev_loop_thread->thread_name);
    pthread_cond_signal(&ev_loop_thread->cond);

    pthread_mutex_unlock(&ev_loop_thread->mutex);

    event_loop_run(ev_loop_thread->ev_loop);
}

event_loop_thread_run这个方法是线程的入口函数,传入的参数是一个是event_loop_thread对象,首先初始化event_loop_thread对象中的event_loop对象。初始化完成就可以让event_loop跑起来了,调用event_loop_run方法,这个方法是在event_loop对象中,在上面已经讲过了,你可以回过头再去看一下。

tcp_connection

tcp_connection是已连接的套接字,它包含了接收和发送缓冲区,channel对象等。它的作用是避免应用程序直接操作channel对象。应用程序对套接字的操作只能通过tcp_connection对象,它的定义如下:



struct tcp_connection {
    struct event_loop *ev_loop;
    struct channel *channel;
    char *name;
    struct buffer *input_buffer;
    struct buffer *output_buffer;

    connection_completed_callback conn_completed_callback;
    message_callback msg_callback;
    write_completed_callback w_completed_callback;
    connection_closed_callback conn_closed_callback;

    void *data;
    void *request;
    void *response;
};

创建tcp_connection_new


struct tcp_connection *tcp_connection_new(int fd, struct event_loop *ev_loop, connection_completed_callback conn_completed_callback,
                                         message_callback msg_callback,
                                         write_completed_callback w_completed_callback,
                                         connection_closed_callback conn_closed_callback) {
    struct tcp_connection *tcp_conn = malloc(sizeof(struct tcp_connection));
    tcp_conn->w_completed_callback = w_completed_callback;
    tcp_conn->msg_callback = msg_callback;
    tcp_conn->conn_completed_callback = conn_completed_callback;
    tcp_conn->conn_closed_callback = conn_closed_callback;
    tcp_conn->ev_loop = ev_loop;
    tcp_conn->input_buffer = buffer_new();
    tcp_conn->output_buffer = buffer_new();

    char *buf = malloc(16);
    sprintf(buf, "connection-%d\\0", fd);
    tcp_conn->name = buf;

    struct channel *chan = channel_new(fd, EVENT_READ, handle_read, handle_write, tcp_conn);
    tcp_conn->channel = chan;

    if (tcp_conn->conn_completed_callback != NULL) {
        tcp_conn->conn_completed_callback(tcp_conn);
    }

    event_loop_add_channel_event(tcp_conn->ev_loop, fd, tcp_conn->channel);

    return tcp_conn;

}

创建一个tcp_connection对象,包含了各个回调函数,读和写buffer,channnel,最后将当前套接字添加到epoll/poll中监听起来。

tcp_connection对象主要还是负责套接字中数据的读写,提供了相应的方法



// tcp_connection_send_buffer方法最终也是调用这个方法
int tcp_connection_send_data(struct tcp_connection *tcp_conn, void *data, int size);

// 通过buffer发送数据
int tcp_connection_send_buffer(struct tcp_connection *tcp_conn, struct buffer *buffer);

// 关闭套接字
int tcp_connection_shutdown(struct tcp_connection *tcp_conn);

// 读数据
int handle_read(void *data);

// 写数据
int handle_write(void *data);

上面各个方法的实现都比较简单,你可以把代码下载下来自己去看一下。总结起来它们的执行流程大致如下图:

实例

最后,我们来看一下怎么将上面说的各个对象给串起来,看一个例子



int main(int argc, char *argv[]) {

    struct event_loop *ev_loop = event_loop_init();

    struct acceptor *acceptor = acceptor_init(SERV_PORT);

    struct TCPServer *tcp_server = tcp_server_init(ev_loop, acceptor, onConnectionCompleted, onMessage,
                                                   onWriteCompleted, onConnectionClosed, 4);

    tcp_server_start(tcp_server);

    event_loop_run(ev_loop);
}

在这个例子中,我们先初始化event_loop、accepter、TCPServer对象,event_loop我们已经很熟悉了,这里我们来看一下,acceptor和TCPServer

acceptor实际上就是创建一个服务端的套接字


struct acceptor *acceptor_init(int port) {
    struct acceptor *acc = malloc(sizeof(struct acceptor));
    acc->listen_port = port;
    acc->listen_fd = socket(AF_INET, SOCK_STREAM, 0);

    make_nonblocking(acc->listen_fd);

    struct sockaddr_in serv_addr;
    bzero(&serv_addr, sizeof(serv_addr));
    serv_addr.sin_family = AF_INET;
    serv_addr.sin_port = htons(acc->listen_port);
    serv_addr.sin_addr.s_addr = htonl(INADDR_ANY);

    int on = 1;
    setsockopt(acc->listen_fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));

    int rt1 = bind(acc->listen_fd, (struct sockaddr*)&serv_addr, sizeof(serv_addr));
    if (rt1 < 0) {
        perror("bind error");
    }

    int ret2 = listen(acc->listen_fd, 1024);
    if (ret2 < 0) {
        perror("listen error");
    }

    x_msgx("server start at: %d", acc->listen_port);

    return acc;
}

TCPServer对象是整个网络框架的入口,它的声明如下:


struct TCPServer {
    int port;
    struct event_loop *ev_loop;
    struct acceptor *acceptor;
    connection_completed_callback conn_completed_callback;
    message_callback msg_callback;
    write_completed_callback w_completed_callback;
    connection_closed_callback conn_close_callback;
    int thread_num;
    struct thread_pool *th_pool;
    void *data;
};

这个对象中包含了acceptor对象,也就是创建好的服务端套接字、各个回调函数以及线程池

这三个对象初始化完了之后,会调用tcp_server_start将服务运行起来,这个方法实现如下:


void tcp_server_start(struct TCPServer *tcp_server) {
    struct acceptor *acceptor = tcp_server->acceptor;
    struct event_loop *ev_loop = tcp_server->ev_loop;

    thread_pool_start(tcp_server->th_pool);

    struct channel *chan = channel_new(acceptor->listen_fd, EVENT_READ, handle_connection_established, NULL, tcp_server);

    event_loop_add_channel_event(ev_loop, chan->fd, chan);

    return;
}

这个方法里首先就调用了thead_pool对象中的thread_pool_start方法,用来创建处理注册事件的线程池。然后会将当前服务端的套接字放到一个channel里并调用event_loop_add_channel_event方法将自己监听起来。所以,我们可以发现,服务端套接字实际上也是使用了event_loop来管理的,通过一个event_loop不断的去监听accept事件。

上面的示例中最后还调用了event_loop_run方法,这个方法是event_loop对象中的方法,前面我们已经提到过了,这里调用event_loop_run方法是为了监听当前服务端套接字是否有客户端连接上来,也是整个框架的核心部分,可以理解为poll和epoll中的epoll_wait和poll函数。

在初始化TCPServer对象的时候,还传入了四个参数onConnectionCompleted、onMessage、onWriteCompleted、onConnectionClosed,它们分别对应连接建立完成时,读数据,写数据和连接关闭时的回调函数。

onConnectionCompoleted


int onConnectionCompleted(struct tcp_connection *tcpConnection) {
    printf("Connection completed\\n");
    return 0;
}

连接建立完成回调函数我们只打印了一条消息,没有任务其它操作

onMessage


int onMessage(struct buffer *input, struct tcp_connection *tcpConnection) {
    printf("get message from tcp connection %s\\n", tcpConnection->name);
    printf("%s", input->data);

    struct buffer *output = buffer_new();
    int size = buffer_readable_size(input);
    for (int i = 0; i < size; i++) {
        buffer_append_char(output, rot13_char(buffer_read_char(input)));
    }
    tcp_connection_send_buffer(tcpConnection, output);
    return 0;
}

onMessage中,我们创建了一个output的buffer,并往里面写入发接收到的数据,最后调用tcp_connection_send_buffer将数据发送出去,

onWriteCompleted


int onWriteCompleted(struct tcp_connection *tcpConnection) {
    printf("write completed\\n");
    return 0;
}

写完之后的回调我们也什么都没干

onConnectionClosed


int onConnectionClosed(struct tcp_connection *tcpConnection) {
    printf("connection closed\\n");
    return 0;
}

整个示例的大致流程最终如下图所示

总结

好了,这篇文章我们实现了一个通用的网络框架,其中还有一些对象没有介绍到,这是因为我们在之前的文章中已经详细介绍过了,比如buffer对象,channel_map对象。如果对此有疑问可以回过头再去看一下。

这篇文章的代码有点多,可能需要花一些时间才能搞明白。请一定要耐住性子,如果能把整个代码搞明白,最好是根据自己的理解再写一遍,相信你会非常明显的感觉到自己的功力在进步。

今天的代码地址:

https://github.com/benggee/x-net