欢迎访问 生活随笔!

生活随笔

当前位置: 首页 > 编程资源 > 编程问答 >内容正文

编程问答

处理大并发之五 使用libevent利器bufferevent

发布时间:2025/3/15 编程问答 59 豆豆
生活随笔 收集整理的这篇文章主要介绍了 处理大并发之五 使用libevent利器bufferevent 小编觉得挺不错的,现在分享给大家,帮大家做个参考.
理大并发之五 使用libevent利器bufferevent

         首先来翻译一段文章

         你可能注意到随着我们代码变得越来越高效,程序也变得更加复杂。当我们产生一个进程的时候,我们没有必要为每一个链接管理一个buffer,我们只需要每个处理独立栈分配缓冲区就可以了。在读和写的时候,我们不必明确的跟踪每一个socket,这在我们的代码里是一个暗示,我们没有必要定义一个结构体去跟踪每一个操作什么时候完成,我们只需要使用循环栈变量就可以了。

         此外,如果你在windows网络编程方面有着丰富的经验,当你在使用上一篇博客中的例子时,你可能认识到libevent可能达不到最理想的性能。在windows上,你做的快速异步IO不是用的select,它使用的IOCP API。和其他的快速网络API不同,当你的程序执行完成,sock准备完成,IOCP不会通知你的程序,取而代之的是,程序告诉windows网络栈开启一个网络操作,并且当操作执行完成时,IOCP会告诉程序。

         幸运的是,libevent2 的bufferevents接口解决了上面的这些冲突,它使得程序更加容易写,并且为windows和unix提供了有效的接口。

分析:

libevent的bufferevent在event的基础上自己维护了一个buffer,这样的话,就不需要再自己管理一个buffer了,上一篇博客是自己维护一个buffer,维护过程复杂,且过程难以理解,既然libevent自己提供了bufferevent这个神器,且有API,何必自己维护呢?

先看看struct bufferevent这个结构体

[cpp] view plaincopy print?
  • struct bufferevent {  
  •                struct event_base *ev_base;  
  •         const struct bufferevent_ops *be_ops;  
  •         struct event ev_read;  
  •         struct event ev_write;  
  •         struct evbuffer *input;  
  •         struct evbuffer *output;  
  •         ……  
  •         bufferevent_data_cb readcb;  
  •         bufferevent_data_cb writecb;  
  •         bufferevent_event_cb errorcb;  
  •         ……  
  • }  
  • struct bufferevent {struct event_base *ev_base;const struct bufferevent_ops *be_ops;struct event ev_read;struct event ev_write;struct evbuffer *input;struct evbuffer *output;……bufferevent_data_cb readcb;bufferevent_data_cb writecb;bufferevent_event_cb errorcb;…… }

    可以看出struct bufferevent内置了两个event(读/写)和对应的缓冲区。当有数据被读入(input)的时候,readcb被调用,当output被输出完成的时候,writecb被调用,当网络I/O出现错误,如链接中断,超时或其他错误时,errorcb被调用。

    使用bufferevent的过程:

    1. 设置sock为非阻塞的

    [cpp] view plaincopy print?
  • eg:  evutil_make_socket_nonblocking(fd);  
  • eg: evutil_make_socket_nonblocking(fd);

    2. 使用bufferevent_socket_new创建一个structbufferevent *bev,关联该sockfd,托管给event_base

    函数原型为:

    [cpp] view plaincopy print?
  • struct bufferevent * bufferevent_socket_new(struct event_base *base, evutil_socket_t fd,  int options)  
  • eg:  struct bufferevent *bev;  
  • bev = bufferevent_socket_new(base, fd, BEV_OPT_CLOSE_ON_FREE);  
  • struct bufferevent * bufferevent_socket_new(struct event_base *base, evutil_socket_t fd, int options) eg: struct bufferevent *bev; bev = bufferevent_socket_new(base, fd, BEV_OPT_CLOSE_ON_FREE);

    3. 设置读写对应的回调函数

    函数原型为:

    [cpp] view plaincopy print?
  • void bufferevent_setcb(struct bufferevent *bufev,  
  •     bufferevent_data_cb readcb, bufferevent_data_cb writecb,  
  •     bufferevent_event_cb eventcb, void *cbarg)  
  • eg.  bufferevent_setcb(bev, readcb, NULL, errorcb, NULL);  
  • void bufferevent_setcb(struct bufferevent *bufev,bufferevent_data_cb readcb, bufferevent_data_cb writecb,bufferevent_event_cb eventcb, void *cbarg) eg. bufferevent_setcb(bev, readcb, NULL, errorcb, NULL);

    4. 启用读写事件,其实是调用了event_add将相应读写事件加入事件监听队列poll。正如文档所说,如果相应事件不置为true,bufferevent是不会读写数据的

    函数原型:

    [cpp] view plaincopy print?
  • int bufferevent_enable(struct bufferevent *bufev, short event)  
  • eg.  bufferevent_enable(bev, EV_READ|EV_WRITE);  
  • int bufferevent_enable(struct bufferevent *bufev, short event) eg. bufferevent_enable(bev, EV_READ|EV_WRITE);

    5. 进入bufferevent_setcb回调函数:

    在readcb里面从input中读取数据,处理完毕后填充到output中;

    writecb对于服务端程序,只需要readcb就可以了,可以置为NULL;

    errorcb用于处理一些错误信息。


    针对这些使用过程进入源码进行分析:

    1. bufferevent_socket_new

    (1)在bufferevent_init_common中调用evbuffer_new()初始化input和output

    (2)在event_assign中初始化bufferevent中的ev_read和ev_write事件。

    (3)在evbuffer_add_cb中给output添加了一个callback bufferevent_socket_outbuf_cb

    2. bufferevent_setcb

    该函数的作用主要是赋值,把该函数后面的参数,赋值给第一个参数struct bufferevent *bufev定义的变量

    3. bufferevent_enable

    调用event_add将读写事件加入到事件监听队列中。

     

    对bufferevent常用的几个函数进行分析:

    [cpp] view plaincopy print?
  • char *evbuffer_readln(struct evbuffer*buffer, size_t *n_read_out,enum evbuffer_eol_style eol_style);  
  • char *evbuffer_readln(struct evbuffer*buffer, size_t *n_read_out,enum evbuffer_eol_style eol_style);

    含义:Read a single line from an evbuffer.

    返回值:读到的一行内容

    [cpp] view plaincopy print?
  • int evbuffer_add(struct evbuffer *buf,const void *data, size_t datlen);  
  • int evbuffer_add(struct evbuffer *buf,const void *data, size_t datlen);

    含义:将数据添加到evbuffer的结尾

    返回值:成功返回0,失败返回-1

    [cpp] view plaincopy print?
  • int evbuffer_remove(struct evbuffer*buf, void *data, size_t datlen);  
  • int evbuffer_remove(struct evbuffer*buf, void *data, size_t datlen);

    含义:从evbuffer读取数据到data

    返回值:成功返回0,失败返回-1

    [cpp] view plaincopy print?
  • size_t evbuffer_get_length(const structevbuffer *buf);  
  • size_t evbuffer_get_length(const structevbuffer *buf);

    含义:返回evbuffer中存储的字节长度

    暂时先分析到这里,下面是代码,客户端发送消息:HTTP/1.0, Client 0 send Message:

    Request: Hello Server! over,服务端一条消息收完成后,会回复:Response ok! Hello Client!

    服务端从bufferevent中取出消息是按行取的。代码可能有不完善的地方,由于才疏学浅,研究时间短(3天),希望高手提出宝贵意见。

    libevent_eventbuffer_server.c

    [cpp] view plaincopy print?
  • #include <netinet/in.h>  
  • #include <sys/socket.h>  
  • #include <fcntl.h>  
  •   
  • #include <event2/event.h>  
  • #include <event2/buffer.h>  
  • #include <event2/bufferevent.h>  
  •   
  • #include <assert.h>  
  • #include <unistd.h>  
  • #include <string.h>  
  • #include <stdlib.h>  
  • #include <stdio.h>  
  • #include <errno.h>  
  •   
  • void do_read(evutil_socket_t fd, short events, void *arg);  
  •   
  • //struct bufferevent内建了两个event(read/write)和对应的缓冲区(struct evbuffer *input, *output),并提供相应的函数用来操作>  
  • 缓冲区(或者直接操作bufferevent)  
  • //接收到数据后,判断是不一样一条消息的结束,结束标志为"over"字符串  
  • void readcb(struct bufferevent *bev, void *ctx)  
  • {  
  •     printf("called readcb!\n");  
  •     struct evbuffer *input, *output;  
  •     char *request_line;  
  •     size_t len;  
  •     input = bufferevent_get_input(bev);//其实就是取出bufferevent中的input  
  •     output = bufferevent_get_output(bev);//其实就是取出bufferevent中的output  
  •   
  •     size_t input_len = evbuffer_get_length(input);  
  •     printf("input_len: %d\n", input_len);  
  •     size_t output_len = evbuffer_get_length(output);  
  •     printf("output_len: %d\n", output_len);  
  •   
  •     while(1)  
  •     {  
  •         request_line = evbuffer_readln(input, &len, EVBUFFER_EOL_CRLF);//从evbuffer前面取出一行,用一个新分配的空字符结束  
  • 的字符串返回这一行,EVBUFFER_EOL_CRLF表示行尾是一个可选的回车,后随一个换行符  
  •         if(NULL == request_line)  
  •         {  
  •             printf("The first line has not arrived yet.\n");  
  •             free(request_line);//之所以要进行free是因为 line = mm_malloc(n_to_copy+1)),在这里进行了malloc  
  •             break;  
  •         }  
  •         else  
  • <span style="white-space: pre;">    </span>{  
  •             printf("Get one line date: %s\n", request_line);  
  •             if(strstr(request_line, "over") != NULL)//用于判断是不是一条消息的结束  
  •             {  
  •                 char *response = "Response ok! Hello Client!\r\n";  
  •                 evbuffer_add(output, response, strlen(response));//Adds data to an event buffer  
  •                 printf("服务端接收一条数据完成,回复客户端一条消息: %s\n", response);  
  •                 free(request_line);  
  •                 break;  
  •             }  
  •         }  
  •         free(request_line);  
  •     }  
  •   
  •     size_t input_len1 = evbuffer_get_length(input);  
  •     printf("input_len1: %d\n", input_len1);  
  •     size_t output_len1 = evbuffer_get_length(output);  
  •     printf("output_len1: %d\n\n", output_len1);  
  • }  
  •   
  • void errorcb(struct bufferevent *bev, short error, void *ctx)  
  • {  
  •     if (error & BEV_EVENT_EOF)  
  •     {  
  •         /* connection has been closed, do any clean up here */  
  •         printf("connection closed\n");  
  •     }  
  •     else if (error & BEV_EVENT_ERROR)  
  •     {  
  •         /* check errno to see what error occurred */  
  •         printf("some other error\n");  
  •     }  
  •     else if (error & BEV_EVENT_TIMEOUT)  
  •     {  
  •         /* must be a timeout event handle, handle it */  
  •         printf("Timed out\n");  
  •     }  
  •     bufferevent_free(bev);  
  • }  
  •   
  • void do_accept(evutil_socket_t listener, short event, void *arg)  
  • {  
  •     struct event_base *base = arg;  
  •     struct sockaddr_storage ss;  
  •     socklen_t slen = sizeof(ss);  
  •     int fd = accept(listener, (struct sockaddr*)&ss, &slen);  
  •     if (fd < 0)  
  •     {  
  •         perror("accept");  
  •     }  
  •     else if (fd > FD_SETSIZE)  
  •     {  
  •         close(fd);  
  •     }  
  •     else  
  •     {  
  •         struct bufferevent *bev;  
  •         evutil_make_socket_nonblocking(fd);  
  •   
  •         //使用bufferevent_socket_new创建一个struct bufferevent *bev,关联该sockfd,托管给event_base  
  •         BEV_OPT_CLOSE_ON_FREE表示释放bufferevent时关闭底层传输端口。这将关闭底层套接字,释放底层bufferevent等。  
  •         bev = bufferevent_socket_new(base, fd, BEV_OPT_CLOSE_ON_FREE);  
  •   
  •         //设置读写对应的回调函数  
  •         bufferevent_setcb(bev, readcb, NULL, errorcb, NULL);  
  • //      bufferevent_setwatermark(bev, EV_READ, 0, MAX_LINE);  
  •   
  •         //启用读写事件,其实是调用了event_add将相应读写事件加入事件监听队列poll。正如文档所说,如果相应事件不置为true,buf  
  • ferevent是不会读写数据的  
  •         bufferevent_enable(bev, EV_READ|EV_WRITE);  
  •     }  
  • }  
  •   
  • void run(void)  
  • {  
  •     evutil_socket_t listener;  
  •     struct sockaddr_in sin;  
  •     struct event_base *base;  
  •     struct event *listener_event;  
  •   
  •     base = event_base_new();  
  •     if (!base)  
  •         return/*XXXerr*/  
  •   
  •     sin.sin_family = AF_INET;  
  •     sin.sin_addr.s_addr = 0;  
  •     sin.sin_port = htons(8000);  
  •   
  •     listener = socket(AF_INET, SOCK_STREAM, 0);  
  •     evutil_make_socket_nonblocking(listener);  
  •   
  • #ifndef WIN32  
  •     {  
  •         int one = 1;  
  •         setsockopt(listener, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));  
  •     }  
  • #endif  
  •   
  •     if (bind(listener, (struct sockaddr*)&sin, sizeof(sin)) < 0)  
  •     {  
  •         perror("bind");  
  •         return;  
  •     }  
  • if (listen(listener, 16)<0)  
  •     {  
  •         perror("listen");  
  •         return;  
  •     }  
  •   
  •     listener_event = event_new(base, listener, EV_READ|EV_PERSIST, do_accept, (void*)base);  
  •     /*XXX check it */  
  •     event_add(listener_event, NULL);  
  •   
  •     event_base_dispatch(base);  
  • }  
  •   
  • int main(int argc, char **argv)  
  • {  
  •     setvbuf(stdout, NULL, _IONBF, 0);  
  •   
  •     run();  
  •     return 0;  
  • }  
  • #include <netinet/in.h> #include <sys/socket.h> #include <fcntl.h>#include <event2/event.h> #include <event2/buffer.h> #include <event2/bufferevent.h>#include <assert.h> #include <unistd.h> #include <string.h> #include <stdlib.h> #include <stdio.h> #include <errno.h>void do_read(evutil_socket_t fd, short events, void *arg);//struct bufferevent内建了两个event(read/write)和对应的缓冲区(struct evbuffer *input, *output),并提供相应的函数用来操作> 缓冲区(或者直接操作bufferevent) //接收到数据后,判断是不一样一条消息的结束,结束标志为"over"字符串 void readcb(struct bufferevent *bev, void *ctx) {printf("called readcb!\n");struct evbuffer *input, *output;char *request_line;size_t len;input = bufferevent_get_input(bev);//其实就是取出bufferevent中的inputoutput = bufferevent_get_output(bev);//其实就是取出bufferevent中的outputsize_t input_len = evbuffer_get_length(input);printf("input_len: %d\n", input_len);size_t output_len = evbuffer_get_length(output);printf("output_len: %d\n", output_len);while(1){request_line = evbuffer_readln(input, &len, EVBUFFER_EOL_CRLF);//从evbuffer前面取出一行,用一个新分配的空字符结束 的字符串返回这一行,EVBUFFER_EOL_CRLF表示行尾是一个可选的回车,后随一个换行符if(NULL == request_line){printf("The first line has not arrived yet.\n");free(request_line);//之所以要进行free是因为 line = mm_malloc(n_to_copy+1)),在这里进行了mallocbreak;}else {printf("Get one line date: %s\n", request_line);if(strstr(request_line, "over") != NULL)//用于判断是不是一条消息的结束{char *response = "Response ok! Hello Client!\r\n";evbuffer_add(output, response, strlen(response));//Adds data to an event bufferprintf("服务端接收一条数据完成,回复客户端一条消息: %s\n", response);free(request_line);break;}}free(request_line);}size_t input_len1 = evbuffer_get_length(input);printf("input_len1: %d\n", input_len1);size_t output_len1 = evbuffer_get_length(output);printf("output_len1: %d\n\n", output_len1); }void errorcb(struct bufferevent *bev, short error, void *ctx) {if (error & BEV_EVENT_EOF){/* connection has been closed, do any clean up here */printf("connection closed\n");}else if (error & BEV_EVENT_ERROR){/* check errno to see what error occurred */printf("some other error\n");}else if (error & BEV_EVENT_TIMEOUT){/* must be a timeout event handle, handle it */printf("Timed out\n");}bufferevent_free(bev); }void do_accept(evutil_socket_t listener, short event, void *arg) {struct event_base *base = arg;struct sockaddr_storage ss;socklen_t slen = sizeof(ss);int fd = accept(listener, (struct sockaddr*)&ss, &slen);if (fd < 0){perror("accept");}else if (fd > FD_SETSIZE){close(fd);}else{struct bufferevent *bev;evutil_make_socket_nonblocking(fd);//使用bufferevent_socket_new创建一个struct bufferevent *bev,关联该sockfd,托管给event_baseBEV_OPT_CLOSE_ON_FREE表示释放bufferevent时关闭底层传输端口。这将关闭底层套接字,释放底层bufferevent等。bev = bufferevent_socket_new(base, fd, BEV_OPT_CLOSE_ON_FREE);//设置读写对应的回调函数bufferevent_setcb(bev, readcb, NULL, errorcb, NULL); // bufferevent_setwatermark(bev, EV_READ, 0, MAX_LINE);//启用读写事件,其实是调用了event_add将相应读写事件加入事件监听队列poll。正如文档所说,如果相应事件不置为true,buf ferevent是不会读写数据的bufferevent_enable(bev, EV_READ|EV_WRITE);} }void run(void) {evutil_socket_t listener;struct sockaddr_in sin;struct event_base *base;struct event *listener_event;base = event_base_new();if (!base)return; /*XXXerr*/sin.sin_family = AF_INET;sin.sin_addr.s_addr = 0;sin.sin_port = htons(8000);listener = socket(AF_INET, SOCK_STREAM, 0);evutil_make_socket_nonblocking(listener);#ifndef WIN32{int one = 1;setsockopt(listener, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));} #endifif (bind(listener, (struct sockaddr*)&sin, sizeof(sin)) < 0){perror("bind");return;} if (listen(listener, 16)<0){perror("listen");return;}listener_event = event_new(base, listener, EV_READ|EV_PERSIST, do_accept, (void*)base);/*XXX check it */event_add(listener_event, NULL);event_base_dispatch(base); }int main(int argc, char **argv) {setvbuf(stdout, NULL, _IONBF, 0);run();return 0; }

    编译:gcc -I/usr/include-o test libevent_eventbuffer_server.c -L/usr/local/lib –levent

    运行:

    服务端:


    客户端:



    今晚博客暂时写完了,时间比较仓促,错误估计不会少,关于bufferevent很多API都还不是很熟悉,还有libevent 添加事件event_add是非线程安全的,如果使用多线程,需要保证event_add不能出现在多个线程中,以后有时间慢慢研究。

    体会:关于源码,还需要好好研究,其实今晚挺郁闷的,弄了半天,没有什么进展,现在自己还有很多疑问,主要是自己太急了,不过3天时间做到基本了解,自己还算满意,下一步有时间多研究下吧。晚安,北京

    如是转载,请指明原出处: http://blog.csdn.net/feitianxuxue ,谢谢合作! 新人创作打卡挑战赛发博客就能抽奖!定制产品红包拿不停!

    总结

    以上是生活随笔为你收集整理的处理大并发之五 使用libevent利器bufferevent的全部内容,希望文章能够帮你解决所遇到的问题。

    如果觉得生活随笔网站内容还不错,欢迎将生活随笔推荐给好友。