如何使用ePump框架编写TCP服务器

这篇文章主要介绍“如何使用ePump框架编写TCP服务器”,在日常操作中,相信很多人在如何使用ePump框架编写TCP服务器问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”如何使用ePump框架编写TCP服务器”的疑惑有所帮助!接下来,请跟着小编一起来学习吧!

成都创新互联公司不只是一家网站建设的网络公司;我们对营销、技术、服务都有自己独特见解,公司采取“创意+综合+营销”一体化的方式为您提供更专业的服务!我们经历的每一步也许不一定是最完美的,但每一步都有值得深思的意义。我们珍视每一份信任,关注我们的成都网站建设、成都做网站质量和服务品质,在得到用户满意的同时,也能得到同行业的专业认可,能够为行业创新发展助力。未来将继续专注于技术创新,服务升级,满足企业一站式全网营销推广需求,让再小的品牌网站设计也能产生价值!

基于非阻塞、多线程、事件驱动模型的 ePump 框架可以很好地解决复杂的线程调度、高效通信等问题,使用 ePump 框架可快速开发各类通信服务器系统,像常见的HTTP Web服务器、RTMP流媒体服务器、及时通信消息系统等等。

一. 使用ePump框架开发前需下载安装的库

在使用ePump框架编程前,首先需要从GitHub下载并安装C语言基础库adif 数据结构和基础算法库和ePump框架,ePump框架依赖于adif基础库,adif 基础库 和 ePump 框架都是标准C语言开发,并以库的形式集成到应用程序中。下载这两个开源系统的代码到本地,make && make install 后,编译成功的动态库和静态库缺省地被安装到 /usr/local/lib 目录下,头文件则安装到 /usr/local/include/adif 和 /usr/local/include 中。

下面讲解如何使用ePump框架来开发一个echo功能的TCP服务器程序。

使用这两个库编程时,需要包含adif基础库和ePump框架的头文件:

#include "adifall.ext"
#include "epump.h"
#include 

其中adifall.ext文件包含了adif库中所有基础数据结构和算法模块的头文件,具体功能可以参考开源项目adif 数据结构和基础算法库。epump.h是调用ePump框架功能模块的头文件。由于需要处理信号,包含了signal.h文件。

二. 开发高性能通信服务器基本需求

大家都知道,创建TCP监听服务器时,最基本的通信开发启蒙知识三部曲,首先要创建socket_t文件描述符,绑定本地IP地址和端口,在指定端口上启动监听,等待客户端发起TCP连接请求。但是对于高级程序员或商业级服务器需求的系统来说,高性能是必须的终极需求,开发人员还需要严肃地考虑如下问题:

  1. TCP服务器系统既要支持IPv4地址,也要支持IPv6地址;

  2. 如何采用多线程或多进程来处理每一个连接请求;

  3. 短时间内产生大量的TCP连接请求时,如何在多个线程或多个进程间负载均衡

  4. 由于线程或进程总数有限(小于1024),单台机器处理几十万并发的TCP连接时,如何采用多路复用技术解决大并发I/O;

这些问题直接考验了商业级通信服务器系统其性能的高低、吞吐能力的大小、CPU处理能力的高效运用等等,能解决好这些问题,无疑是一个能经受实战的好系统,ePump框架天生就是解决这些问题的好手。

三. 创建框架实例

使用ePump框架,先调用API接口,创建ePump框架实例,

    epcore_t  * pcore = NULL;
    void      * mlisten = NULL;
    int         listenport = 8080;

    gpcore = pcore = epcore_new(65536, 1);

其中第一个参数是服务器系统能同时允许打开的文件描述符的最大数量,也就是这款TCP服务器能支撑的最大并发数量,第二个参数一般设置为1,指的是创建iodev_t等基础设备对象时,尽量将其派送到当前工作线程。

四. 启动TCP监听服务

使用ePump框架,启动TCP监听服务很简单,调用eptcp_mlisten接口即可,

    mlisten = eptcp_mlisten(pcore, NULL, listenport, NULL, echo_pump, pcore);
    if (!mlisten) goto exit;
 
    printf("EchoSrv TCP Port: %d being listened\n", listenport);

按照头文件中的描述,该接口函数定义如下:

/* Note: automatically detect if Linux kernel supported REUSEPORT. 
   if supported, create listen socket for every current running epump threads
   and future-started epump threads.
   if not, create only one listen socket for all epump threads to bind. */
 
void * eptcp_mlisten (void * vpcore, char * localip, int port, void * para,
                      IOHandler * cb, void * cbpara);

最新的Linux操作系统对于通信编程做了很多优化,其中对于内核对象Socket使用REUSEPORT选项,来解决端口复用问题,这样使得每一个线程或进程都可以监听同一个端口,并接受新的连接请求。那么大量的客户端同时对监听端口发起TCP三路握手,想建立到达服务器的TCP连接时,这些连接到底交给哪一个启动了监听服务的线程或进程?这个问题大家自己做功课,这里不赘述。

ePump框架中采用 epoll / select等多路复用接口来监听连接到来和读写事件,监听设备和定时器并产生事件的线程是epump线程,处理事件的线程是worker线程。eptcp_mlisten自动地为每一个epump线程创建listen socket(支持REUSEPORT时),或创建一个listen socket但绑定到每一个epump线程中。这样大量的连接请求到来时,将会由epump线程处理其负载均衡。

eptcp_mlisten函数的第一个参数是ePump框架实例;第二个参数为绑定的本机IP地址,如果绑定所有本机IP地址,该值为NULL;第三个参数为监听的端口号;第四个参数是设置当前正在创建的监听设备的绑定参数,一般跟当前监听设备iodev_t有关的实例对象;第五个参数是设置当前正在创建的监听设备iodev_t对象当有连接请求过来时的回调函数;第六个参数是传递给回调函数的参数变量。

作者在程序中习惯为所有的事件回调处理设置为一个统一的回调函数echo_pump,当然大家可以根据自己的爱好和习惯,为每个iodev_t对象的读写事件设置不同的回调函数。

这样,启动这个TCP服务器监听服务时,前面提到的各种商业TCP服务器需面对的问题,这里都解决了。

五. 定时器的运用

这个sample程序中,给大家示范了定时器的用法。使用定时器可以定期做一些检查或校验工作,譬如一个TCP连接长时间没有数据往来,通过定时器机制来关闭着这些不活跃的TCP连接。

    iotimer_start(pcore, 90*1000, 1001, NULL, echo_pump, pcore);

这是启动一个90秒后发送超时TIMEOUT事件的定时器,超时事件将由echo_pump函数来处理。本例的回调函数中没有处理不活跃TCP连接的代码,大家感兴趣可自行添加。

六. 启动ePump线程组

创建了TCP监听服务后,需要启动ePump框架的两类线程组:epump线程组 和 worker线程组,代码如下:

    cpunum = get_cpu_num();
    epumpnum = cpunum * 0.2;
    if (epumpnum < 3) epumpnum = 3;
    workernum = cpunum - epumpnum;
    if (workernum < 3) workernum = 3;
 
    /* start 3 worker threads */
    epcore_start_worker(pcore, workernum);
 
    /* start 3 epump threads */
    epcore_start_epump(pcore, epumpnum - 1);

    /* main thread executing the epump_main_proc as an epump thread */
    epump_main_start(pcore, 0);
 
    epcore_clean(pcore);
 
    printf("main thread exited\n");
    return 0;
 
exit:
    epcore_clean(pcore);
    printf("main thread exception exited\n");
    return -1;

七. ePump框架的回调函数

以上介绍的是主程序,下面需要介绍的回调函数echo_pump的实现.

回调函数的原型定义如下:

typedef int IOHandler (void * vmgmt, void * pobj, int event, int fdtype);

第一个参数是设置回调函数时给定的参数,第二个是当前产生事件的对象,或者是iodev_t对象,或者是iotimer_t对象,第三个参数是event事件类型,第四个参数是文件描述符类型。

其中的event事件类型如下:

/* define the event type, including getting connected, connection accepted, readable,
 * writable, timeout. the working threads will be driven by these events */
#define IOE_CONNECTED        1
#define IOE_CONNFAIL         2
#define IOE_ACCEPT           3
#define IOE_READ             4
#define IOE_WRITE            5
#define IOE_INVALID_DEV      6
#define IOE_TIMEOUT          100
#define IOE_DNS_RECV         200
#define IOE_USER_DEFINED     10000

其中的文件描述符类型fdtype预定义值共有:

/* the definition of FD type in the EventPump device */
#define FDT_LISTEN            0x01
#define FDT_CONNECTED         0x02
#define FDT_ACCEPTED          0x04
#define FDT_UDPSRV            0x08
#define FDT_UDPCLI            0x10
#define FDT_USOCK_LISTEN      0x20
#define FDT_USOCK_CONNECTED   0x40
#define FDT_USOCK_ACCEPTED    0x80
#define FDT_RAWSOCK           0x100
#define FDT_FILEDEV           0x200
#define FDT_TIMER             0x10000
#define FDT_USERCMD           0x20000
#define FDT_LINGER_CLOSE      0x40000
#define FDT_STDIN             0x100000
#define FDT_STDOUT            0x200000

八. 如何接受客户端连接请求

当被监听端口8080上收到一个TCP连接请求时,echo_pump函数会被回调,回调参数中event为IOE_ACCEPT,fdtype为FDT_LISTEN,其中pobj就是监听设备对象。

    switch (event) {
    case IOE_ACCEPT:
        if (fdtype != FDT_LISTEN)
            return -1;
 
        while (1) {
            pdev = eptcp_accept(iodev_epcore(vobj), vobj, NULL, &ret, echo_pump, pcore, BIND_ONE_EPUMP);
            if (!pdev) break;
 
            printf("\nThreadID=%lu, ListenFD=%d EPumpID=%lu WorkerID=%lu "
               " ==> Accept NewFD=%d EPumpID=%lu\n",
               get_threadid(), iodev_fd(vobj), epumpid(iodev_epump(vobj)),
               workerid(worker_thread_self(pcore)),
               iodev_fd(pdev), epumpid(iodev_epump(pdev)));
        }
        break;

这里使用了一个while循环来调用eptcp_accept函数,目的是解决多个TCP连接同时到来,ePump框架使用一个事件通知驱动回调函数去处理和执行的情况,不使用循环处理,就会漏掉某些客户的TCP连接请求。

函数 eptcp_accept 接受TCP连接请求,并创建新连接对应的iodev_t设备对象pdev,设置该对象在数据可读时的回调函数,这个函数会自动处理多线程之间的连接设备对象的负载均衡。函数成功执行完后的结果是一个新的客户端TCP连接建立起来了,针对该新连接进行数据读取操作的回调函数也都设置了。

eptcp_accept函数的原型如下:

void * eptcp_accept (void * vpcore, void * vld, void * para, int * retval,
                     IOHandler * cb, void * cbpara, int bindtype);

第一个参数为ePump框架实例,第二个参数是监听设备iodev_t对象,由回调函数携带进来,第三个参数是新创建的客户端TCP连接设备iodev_t对象的内置参数,第四个参数为返回值,大于等于0表示连接建立成功,小于0失败,第五个和第六个参数为新创建的连接对象的回调函数,第七个参数是设置绑定epump线程的指令类型,共有如下几种:

/* bind type specifying how iodev_t devices are bound to the underlying epump thread */
#define BIND_NONE                0
#define BIND_ONE_EPUMP           1
#define BIND_GIVEN_EPUMP         2
#define BIND_ALL_EPUMP           3
#define BIND_NEW_FOR_EPUMP       4
#define BIND_CURRENT_EPUMP       5

绑定epump线程的指令类型共有6个,其含义如下:

  • BIND_NONE是初始值,不绑定任何epump线程;

  • BIND_ONE_EPUMP是从当前epump线程中找一个负载最低的线程来绑定;

  • BIND_GIVEN_EPUMP是指定一个epump线程来建立绑定;

  • BIND_ALL_EPUMP是绑定所有的epump线程。这中情况一般是在监听设备对象创建后,一般在Linux内核版本低于3.9版本情况下,即不支持REUSEPORT功能时,使用这个类型。

  • BIND_NEW_FOR_EPUMP一般用于ePump框架内部,应用程序不建议使用。

  • BIND_CURRENT_EPUMP是绑定产生当前连接事件的epump线程。系统内部和操作系统内核对负载会实现均衡分配,一般建议应用开发时使用这个类型。

一旦绑定了epump线程,就可能立即产生可读事件,并驱动回调函数来处理。如果新的的pdev对象是由另外一个工作线程来处理时,上述这个例子中就可能出现打印语句还没结束,该新连接设备对象可读事件的回调函数就已经执行了。在商业级系统开发过程中,调用本函数接受客户端新连接并创建新的设备对象pdev后,需要做很多跟连接设备对象相关联的数据结构的初始化工作,在这些初始化操作完成之后,再调用 iodev_bind_epump 函数来绑定epump线程,所以,这种情况下接受新连接时,一般不设置绑定关系,而是将第七个参数设置为 BIND_NONE。

    pdev = eptcp_accept(iodev_epcore(vobj), vobj, NULL, &ret, echo_pump, pcore, BIND_NONE);
    
    /* do some initialization of related objects, examples as following */
    /*  pcon = http_con_fetch(mgmt);
        pcon->pdev = pdev;
        iodev_para_set(pdev, (void *)pcon->conid);
 
        pcon->hl = hl;
 
        pcon->casetype = HTTP_SERVER;
        pcon->reqdiag = hl->reqdiag;
        pcon->reqdiagobj = hl->reqdiagobj;
 
        pcon->ssl_link = hl->ssl_link;
 
        str_cpy(pcon->srcip, iodev_rip(pcon->pdev));
        str_cpy(pcon->dstip, iodev_lip(pcon->pdev));
        pcon->srcport = iodev_rport(pcon->pdev);
        pcon->dstport = iodev_lport(pcon->pdev);
        pcon->createtime = time(&pcon->stamp);
        pcon->transbgn = pcon->stamp;*/

    iodev_bind_epump(pdev, BIND_CURRENT_EPUMP, NULL);

九. 读取客户端的TCP请求数据

建立好TCP连接之后,客户端会发送数据到服务器,ePump框架中对所有socket文件描述符设置成了非阻塞模式,数据到达本机时,内核会产生可读事件,由ePump框架驱动回调函数来处理数据读操作。

    case IOE_READ:
        ret = tcp_nb_recv(iodev_fd(vobj), rcvbuf, sizeof(rcvbuf), &num);
        if (ret < 0) {
            printf("Client %s:%d close the connection while receiving, epump: %lu\n",
                   iodev_rip(vobj), iodev_rport(vobj), epumpid(iodev_epump(vobj)) );
            iodev_close(vobj);
            return -100;
        }
 
        ret = tcp_nb_send(iodev_fd(vobj), rcvbuf, num, &sndnum);
        if (ret < 0) {
            printf("Client %s:%d close the connection while sending, epump: %lu\n",
                   iodev_rip(vobj), iodev_rport(vobj), epumpid(iodev_epump(vobj)));
            iodev_close(vobj);
            return -100;
        }
        break;

采用非阻塞模式的读数据函数,读取客户端请求内容。这个读函数 tcp_nb_recv 是在adif基础库中实现的,调用系统调用read并一直读到出现 EAGAIN 错误为止,表示此次可读事件的所有数据都被读完。开发者需要注意的是,在回调函数中处理ePump框架的可读事件时,一定要将所有的位于内核缓冲区中的数据读取完,不建议读一部分数据、留一部分数据。

由于本sample程序实现的是echo回弹功能,读取了客户端多少数据,就返回客户端多少数据。所以立即使用 tcp_nb_send 函数发送这些数据到客户端。

十. 定时器超时事件回调处理

本例中示范了定时器的启动和超时处理,当定时器给定的时间逝去后,会产生TIMEOUT事件,并驱动回调函数来处理。ePump框架的定时器实例对象存活周期仅仅是在创建定时器到超时处理完成这段时间,即ePump框架的定时器是一次性的,超时处理完后,系统会自动销毁该定时器对象。对于循环定时器,需要在处理超时事件时,重新启动新的定时器实例。

    case IOE_TIMEOUT:
        cmdid = iotimer_cmdid(vobj);
        if (cmdid == 1001) { 
            printf("\nThreadID=%lu IOTimerID=%lu EPumpID=%lu timeout, curtick=%lu\n",
                   get_threadid(), iotimer_id(vobj), 
                   epumpid(iotimer_epump(vobj)), time(0));
            epcore_print(pcore);
            iotimer_start(pcore, 90*1000, 1001, NULL, echo_pump, pcore);
        }
        break;

定时器的用例非常广泛,开发人员可以根据实际需求来使用该功能。

十一. 完整的具有echo功能的TCP服务器代码

以上详细介绍了如何运用ePump框架实现一个完整的具有echo回弹功能的TCP服务器,代码详细如下:

/*
 * Copyright (c) 2003-2021 Ke Hengzhong 
 * All rights reserved.
 */
 
#include "adifall.ext"
#include 
#include "epump.h"
 
epcore_t  * gpcore = NULL;
 
int echo_pump (void * vpcore, void * vobj, int event, int fdtype);
 
 
static void signal_handler(int sig)
{
    switch(sig) {
    case SIGHUP:
        printf("hangup signal catched\n");
        break;
    case SIGTERM:
    case SIGKILL:
    case SIGINT:
        printf("terminate signal catched, now exiting...\n");
        epcore_stop_epump(gpcore);
        epcore_stop_worker(gpcore);
        usleep(1000);
        break;
    }
}
 
 
int main (int argc, char ** argv)
{
    epcore_t  * pcore = NULL;
    void      * mlisten = NULL;
    int         listenport = 8080;
 
    signal(SIGCHLD, SIG_IGN); /* ignore child */
    signal(SIGTSTP, SIG_IGN); /* ignore tty signals */
    signal(SIGTTOU, SIG_IGN);
    signal(SIGPIPE, SIG_IGN);
    signal(SIGTTIN, SIG_IGN);
    signal(SIGHUP,  signal_handler); /* catch hangup signal */
    signal(SIGTERM, signal_handler); /* catch kill signal */
    signal(SIGINT, signal_handler); /* catch SIGINT signal */
 
    gpcore = pcore = epcore_new(65536, 1);
 
    /* do some initialization */
    mlisten = eptcp_mlisten(pcore, NULL, listenport, NULL, echo_pump, pcore);
    if (!mlisten) goto exit;
 
    printf("EchoSrv TCP Port: %d being listened\n\n", listenport);
 
    iotimer_start(pcore, 90*1000, 1001, NULL, echo_pump, pcore);
 
    /* start 2 worker threads */
    epcore_start_worker(pcore, 2);
 
    /* start 1 epump threads */
    epcore_start_epump(pcore, 1);
 
    /* main thread executing the epump_main_proc as an epump thread */
    epump_main_start(pcore, 0);
 
    epcore_clean(pcore);
 
    printf("main thread exited\n");
    return 0;
 
exit:
    epcore_clean(pcore);
    printf("main thread exception exited\n");
    return -1;
}
 
 
int echo_pump (void * vpcore, void * vobj, int event, int fdtype)
{
    epcore_t  * pcore = (epcore_t *)vpcore;
    iodev_t   * pdev = NULL;
    int         cmdid;
    int         ret = 0, sndnum = 0;
    char        rcvbuf[2048];
    int         num = 0;
 
    switch (event) {
    case IOE_ACCEPT:
        if (fdtype != FDT_LISTEN)
            return -1;
 
        while (1) {
            pdev = eptcp_accept(iodev_epcore(vobj), vobj, NULL, &ret,
                                 echo_pump, pcore, BIND_ONE_EPUMP);
            if (!pdev) break;
 
            printf("\nThreadID=%lu, ListenFD=%d EPumpID=%lu WorkerID=%lu "
               " ==> Accept NewFD=%d EPumpID=%lu\n",
               get_threadid(), iodev_fd(vobj), epumpid(iodev_epump(vobj)),
               workerid(worker_thread_self(pcore)),
               iodev_fd(pdev), epumpid(iodev_epump(pdev)));
        }
        break;
 
    case IOE_READ:
        ret = tcp_nb_recv(iodev_fd(vobj), rcvbuf, sizeof(rcvbuf), &num);
        if (ret < 0) {
            printf("Client %s:%d close the connection while receiving, epump: %lu\n",
                   iodev_rip(vobj), iodev_rport(vobj), epumpid(iodev_epump(vobj)) );
            iodev_close(vobj);
            return -100;
        }
 
        printf("\nThreadID=%lu FD=%d EPumpID=%lu WorkerID=%lu Recv %d bytes from %s:%d\n",
               get_threadid(), iodev_fd(vobj), epumpid(iodev_epump(vobj)),
               workerid(worker_thread_self(pcore)),
               num, iodev_rip(vobj), iodev_rport(vobj));
        printOctet(stderr, rcvbuf, 0, num, 2);
 
        ret = tcp_nb_send(iodev_fd(vobj), rcvbuf, num, &sndnum);
        if (ret < 0) {
            printf("Client %s:%d close the connection while sending, epump: %lu\n",
                   iodev_rip(vobj), iodev_rport(vobj), epumpid(iodev_epump(vobj)));
            iodev_close(vobj);
            return -100;
        }
        break;
 
    case IOE_WRITE:
    case IOE_CONNECTED:
        break;
 
    case IOE_TIMEOUT:
        cmdid = iotimer_cmdid(vobj);
        if (cmdid == 1001) { 
            printf("\nThreadID=%lu IOTimerID=%lu EPumpID=%lu timeout, curtick=%lu\n",
                   get_threadid(), iotimer_id(vobj), 
                   epumpid(iotimer_epump(vobj)), time(0));
            epcore_print(pcore);
            iotimer_start(pcore, 90*1000, 1001, NULL, echo_pump, pcore);
        }
        break;
 
    case IOE_INVALID_DEV:
        break;
 
    default:
        break;
    }
 
    printf("ThreadID=%lu event: %d  fdtype: %d  WorkerID=%lu\n\n",
            get_threadid(), event, fdtype,
            workerid(worker_thread_self(pcore)));
 
    return 0;
}

这个示例中使用大量的多余的打印代码,看起没那么美观,有洁癖的程序员可以去掉。

使用gcc编译以上代码的命令如下:

gcc -g -O3 -Wall -DUNIX -I/usr/local/include -I/usr/local/include/adif -L/usr/local/lib -lm -lpthread -ladif -lepump echosrv.c -o echosrv

编译完成后大家执行并调试,享受编程乐趣。

十二. 使用ePump框架开发高性能程序总结

以上用一个TCP服务器程序来展示如何使用ePump框架进行编程的实例,管中窥豹,以一概全,感兴趣的程序员可以下载和体验。

使用ePump框架最成功的案例是eJet Web服务器开源项目,这是一个轻量级、高性能、嵌入式Web服务器,各项功能不逊于Nginx。研究这个项目可以有助于理解ePump框架的工作原理。

简单总结ePump框架的功能特点:

  • ePump框架封装了很多琐碎的容易出错误的细节,让开发人员将更多时间花在业务处理上;

  • 将复杂的各个操作系统都互不兼容的多路复用技术封装后,提供了标准的接口给程序员,大大节省了应用开发周期;

  • 高效利用事件驱动、多线程调度机制来实现多核CPU的并行运算能力;

  • 使用ePump开发高性能程序,代码简单干练,可靠性高;

  • 对IPv6、DNS等头提供了支持;

到此,关于“如何使用ePump框架编写TCP服务器”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注创新互联网站,小编会继续努力为大家带来更多实用的文章!


本文名称:如何使用ePump框架编写TCP服务器
当前链接:http://scyanting.com/article/iieooc.html