欢迎访问 生活随笔!

生活随笔

当前位置: 首页 > 编程资源 > 编程问答 >内容正文

编程问答

ACE_Select_Reactor 一 ——入门

发布时间:2025/3/21 编程问答 48 豆豆
生活随笔 收集整理的这篇文章主要介绍了 ACE_Select_Reactor 一 ——入门 小编觉得挺不错的,现在分享给大家,帮大家做个参考.
ACE_Select_Reactor 1 ——入门

    ACE Reactor 框架实现了Reactor模式,允许事件驱动的应用对源自许多不同事件源的事件做出反映,比如IO句柄,定时器,以及信号,应用重新定义框架所定义的挂钩方法。框架随机对其进行分派来处理事件,Reactor负责:(1)检查多路分离器来自各种事件源的、不同类型的连接和数据事件,(2)将这些事件分派给应用所定义的处理器,由它进行处理。

    反应式服务器响应来自一个或多个事件源的时间,在理想情况下,对时间的响应会足够快,以使所有请求看起来像是被同时处理的,尽管事件处理通常是由单个线程处理的。同步事件多路分离器位于各个反应式服务器的心脏处。这种机制检测源自许多事件源的事件并对其作出响应,从而同步地使事件作为服务器正常执行路径的一部分提供给服务器。

    select()函数是最为常见的同步事件多路分离器。这个系统函数在同一组IO句柄上等待指定的事件发生,当一个或者是多个IO句柄开始活动时,或是在指定的时间过去后,select函数就会返回。

    ACE_Select_Reactor是ACE_Reactor接口的一种实现,它使用select同步时间多路分离器函数来检测IO和定时器事件,除了支持ACE_Reactor接口的所有特性外,ACE_Select_Reactor类还提供了以下能力:

    1、它支持重入的反应器调用,应用可以从正在由统一反应器分派的事件处理器中调用handle_event方法;

    2、它可以被配置为同步化的或异步化的,在线程安全性和降低开销之间进行折中;

    3、它在再次调用select函数之前,分派其句柄集中的所有活动句柄,从而保证了公正性。

 

 

[cpp] view plain copy
  • #include "ace/streams.h"  
  • #include "ace/Reactor.h"  
  • #include "ace/Select_Reactor.h"  
  • #include "ace/Thread_Manager.h"  
  •   
  • #include <string>  
  •   
  • #include "Reactor_Logging_Server_T.h"  
  • #include "Logging_Acceptor_Ex.h"  
  •   
  • typedef Reactor_Logging_Server<Logging_Acceptor_Ex>  
  •         Server_Logging_Daemon;  
  •   
  •   
  • static ACE_THR_FUNC_RETURN event_loop (void *arg)   
  • {  
  •   ACE_Reactor *reactor = static_cast<ACE_Reactor *> (arg);  
  •   
  •   reactor->owner (ACE_OS::thr_self ());  
  •   reactor->run_reactor_event_loop ();  
  •   return 0;  
  • }  
  •   
  •   
  • static ACE_THR_FUNC_RETURN controller (void *arg)   
  • {  
  •   for (;;)   
  •   {  
  •     std::string user_input;  
  •     std::getline (cin, user_input, '\n');  
  •     if (user_input == "quit")   
  •     {  
  •       break;  
  •     }  
  •   }  
  •   return 0;  
  • }  
  •   
  •   
  • int main(int argc,char **argv)  
  • {  
  •   ACE_Select_Reactor select_reactor;  
  •   ACE_Reactor reactor (&select_reactor);  
  •   
  •   Server_Logging_Daemon *server;  
  •   // Ignore argv[0]...  
  •   --argc; ++argv;  
  •   ACE_NEW_RETURN (server,  
  •                   Server_Logging_Daemon (argc, argv, &reactor),  
  •                   1);  
  •   ACE_Thread_Manager::instance ()->spawn (event_loop, &reactor);  
  •   ACE_Thread_Manager::instance ()->spawn (controller, &reactor);  
  •   return ACE_Thread_Manager::instance ()->wait ();  
  • }  
  •     在上面的代码中,在51行,由ACE_Thread_Manager单体派生一个线程,并让其运行event_loop()函数,在52行,由ACE_Thread_Manager单体派生一个线程,让其运行controller()函数。

        在从main函数返回之前,等待其他两个线程推出,ACE_Thread_Manager:wait()还会收取两个线程的退出状态,以免内存泄漏。


    ACE_Select_Reactor 2 —— 服务器网关

        ACE中的流包装提供面向连接的通信。流数据传输包装类包括ACE_SOCK_Stream和ACE_LSOCK_Stream,他们分别包装TCP/IP和UNIX域Socket协议数据传输功能。连接建立类包括针对TCP/IP的ACE_SOCK_Connector和ACE_SOCK_Acceptor,以及针对UNIX域Socket的ACE_LSOCK_Connector和ACE_LSOCK_Acceptor。

     

     

    [cpp] view plain copy
  • class Server  
  • {  
  • public:  
  •     Server (int port): server_addr_(port),peer_acceptor_(server_addr_)  
  •     {  
  •         data_buf_= new char[SIZE_BUF];  
  •     }  
  •     int handle_connection()  
  •     {  
  •         // Read data from client  
  •             int byte_count=0;  
  •             if( (byte_count=new_stream_.recv (data_buf_, SIZE_DATA, 0))==-1)  
  •                 ACE_ERROR ((LM_ERROR, "%p\n""Error in recv"));  
  •             else  
  •             {  
  •                 data_buf_[byte_count]=0;  
  •                 ACE_DEBUG((LM_DEBUG,"Server received %s \n",data_buf_));  
  •             }  
  •         // Close new endpoint  
  •         if (new_stream_.close () == -1)  
  •             ACE_ERROR ((LM_ERROR, "%p\n""close"));  
  •         return 0;  
  •     }  
  •     int accept_connections ()  
  •     {  
  •         if (peer_acceptor_.get_local_addr (server_addr_) == -1)  
  •             ACE_ERROR_RETURN ((LM_ERROR,"%p\n","Error in get_local_addr"),1);  
  •         ACE_DEBUG ((LM_DEBUG,"Starting server at port %d\n",  
  •             server_addr_.get_port_number ()));  
  •         while(1)  
  •         {  
  •             ACE_Time_Value timeout (ACE_DEFAULT_TIMEOUT);  
  •             if (peer_acceptor_.accept (new_stream_, &client_addr_, &timeout)== -1)  
  •             {  
  •                 ACE_ERROR ((LM_ERROR, "%p\n""accept"));  
  •                 continue;  
  •             }  
  •             else  
  •             {  
  •                 ACE_DEBUG((LM_DEBUG,  
  •                     "Connection established with remote %s:%d\n",  
  •                     client_addr_.get_host_name(),client_addr_.get_port_number()));  
  •                 //Handle the connection  
  •                 handle_connection();  
  •             }  
  •         }  
  •     }  
  • private:  
  •     char *data_buf_;  
  •     ACE_INET_Addr server_addr_;  
  •     ACE_INET_Addr client_addr_;  
  •     ACE_SOCK_Acceptor peer_acceptor_;  
  •     ACE_SOCK_Stream new_stream_;  
  • };  
  •      在上面的Server类中,创建了一个被动服务器,侦听到来的客户端连接,在连接建立之后,服务器接收来自客户端的数据,然后关闭链接。

        Server类包含的accept_connection()方法使用接收器来将连接接受“进”ACE_SOCK_Stream new_stream_。该操作完成的基本流程是:调用接收器上的accept(),并将流作为参数传入其中。一旦连接已经建立进流中,流的包装方法send()和recv()就可以用来在新建立的链路上发送和接收数据,还有一个空的ACE_INET_Addr也被传入接收器的accept()方法,并在其中被设定为发起连接的远程机器地址。

        在连接建立后,服务器调用handle_connection()方法,它开始从客户端那里收取一个预先知道的单词,然后将流关闭。连接关闭通过调用流上的close()方法来完成,该方法会释放所有的Socket资源并终止连接。

     

        http://acme-ltt.iteye.com/blog/1455556中提到的ACE_Select_Reactor,在static ACE_THR_FUNC_RETURN controller (void *arg)函数中,调用上述的Server类,搭建基于ACE_Select_Reactor的Socket服务器网关。

     

     

        客户端程序:

     

    [cpp] view plain copy
  • #include "ace/SOCK_Connector.h"  
  • #include "ace/INET_Addr.h"  
  • #include "ace/OS.h"  
  • #include "ace/Log_Msg.h"  
  • #include <stdlib.h>  
  • #include "text.h"  
  • #include "ace/Thread_Manager.h"  
  • #include <iostream>  
  • #define SIZE_BUF 128  
  • static const ACE_Time_Value TIME_INTERVAL(0, 1000000);  
  • class Client  
  • {  
  • public:  
  •     Client(char *hostname, int port):remote_addr_(port,hostname)  
  •     {     
  •           
  •           
  •     }  
  •     int connect_to_server()  
  •     {  
  •           
  •         ACE_DEBUG ((LM_DEBUG, "(%P|%t) Starting connect to %s:%d\n",  
  •             remote_addr_.get_host_name(),remote_addr_.get_port_number()));  
  •         if (connector_.connect (client_stream_, remote_addr_) == -1)  
  •             ACE_ERROR_RETURN ((LM_ERROR,"(%P|%t) %p\n","connection failed"),-1);  
  •         else  
  •             ACE_DEBUG ((LM_DEBUG,"(%P|%t) connected to %s\n",  
  •             remote_addr_.get_host_name ()));  
  •         return 0;  
  •     }  
  •       
  •     int send_to_server()  
  •     {  
  •         iovec iov[3];  
  •         iov[0].iov_base = (void *)"get";  
  •         iov[0].iov_len = 3;  
  •         iov[1].iov_base = getdata()/* some data */;  
  •         iov[1].iov_len = strlen(getdata());  
  •         iov[2].iov_base = (void *)"end.";  
  •         iov[2].iov_len = 4;  
  •           
  •         if (client_stream_.sendv_n (iov,3) == -1)  
  •         {  
  •             ACE_ERROR_RETURN ((LM_ERROR,"(%P|%t) %p\n","send_n"),0);  
  •             //break;  
  •             exit(0);  
  •         }  
  •         close();  
  •     }  
  •     int close()  
  •     {  
  •         if (client_stream_.close () == -1)  
  •             ACE_ERROR_RETURN ((LM_ERROR,"(%P|%t) %p\n","close"),-1);  
  •         else  
  •             return 0;  
  •     }  
  • private:  
  •     ACE_SOCK_Stream client_stream_;  
  •     ACE_INET_Addr remote_addr_;  
  •     ACE_SOCK_Connector connector_;  
  •     char *data_buf_;  
  • };  
  • int main (int argc, char *argv[])  
  • {  
  •     if(argc<3)  
  •     {  
  •         ACE_DEBUG((LM_DEBUG,”Usage %s <hostname> <port_number>\n”, argv[0]));  
  •         ACE_OS::exit(1);  
  •     }  
  •     Client client(argv[1],ACE_OS::atoi(argv[2]));  
  •     client.connect_to_server();  
  •     client.send_to_server();  
  • }  
  •      客户端由单个Client类表示。Client含有connect_to_server()和send_to_server()方法。

        connect_to_server()方法使用类型为ACE_SOCK_Connector的连接器来主动地建立连接。连接的设置通过调用连接器上的connect()方法来完成:传入的参数为想要连接的机器的远程地址,以及用于在其中建立连接的空ACE_SOCK_Stream 。远程机器在运行时参数中指定。一旦connect()方法成功返回,通过使用ACE_SOCK_Stream封装类中的send()和recv()方法族,流就可以用于在新建立的链路上发送和接收数据。

     

        在该代码中,一旦连接建立好,send_to_server()方法就会被调用,将一个iovec类型的数组,用sendv_n()方法,发送到服务器上。



    采用 ACE Reactor 实现服务程序例子

    此文版权属于作者所有,任何人、媒体或者网站转载、借用都必须征得作者本人同意!

    ACE 使用方法及例子,网上有不少,下面贴一段我写的采用 ACE Reactor 模式写的 echo 服务的例子代码,通过例子可以看出,采用 ACE 开发多客户端的服务程序那是相当简单的!

    代码中,handle_input()和 handle_output()都会对 _bufs 进行操作,因为这两个函数都是运行在 reactor 的线程里,不会冲突,所以没有必要对 _bufs 的操作进行锁操作。

    /* $Id: cpp.tpl 3412 2009-11-14 14:23:44Z luozhiyong $ */ /** * \file ACEReactorSvrSample.cpp * * \brief 采用ACE Reactor 实现服务程序例子 * * \version $Rev: 3412 $ * \author   * \date     2009年09月08日08:17:10 * * \note 修改历史:<br> * <table> *     <tr><th>日期</th><th>修改人</th><th>内容</th></tr> *     <tr><td>2009-9-8</td><td></td><td>创建初稿</td> *     </tr> * </table> */ #include <ace/Message_Block.h> #include <ace/Svc_Handler.h> #include <ace/SOCK_Acceptor.h> #include <ace/Acceptor.h> #include <ace/Select_Reactor.h> #include <list> #include <string>   #ifdef _DEBUG #    define ACE_RT_OPT "d" #else #    define ACE_RT_OPT #endif   #if defined_DLL #    define ACE_LIB_THREAD_OPT #else #    define ACE_LIB_THREAD_OPT "s" #endif   #pragma comment(lib"ACE"ACE_LIB_THREAD_OPT ACE_RT_OPT ".lib") class EchoServicepublic ACE_Event_Handler { public: typedef ACE_SOCK_STREAM stream_type; typedef EchoService my_type; typedef ACE_Acceptor<my_typeACE_SOCK_ACCEPTORacceptor_type; EchoService() { printf("EchoService创建\n"); } ~EchoService() { printf("EchoService销毁\n"); } // 响应socket 已经打开,连接已经建立事件 int open(void*) { // 注册读事件 if (reactor()->register_handler(this,ACE_Event_Handler::READ_MASK)) { // 无法注册handler return -1; } // 注册写事件 if (reactor()->register_handler(this,ACE_Event_Handler::WRITE_MASK)) { // 无法注册handler return -1; } // 取消写事件,等待有数据时唤醒 reactor()->cancel_wakeup(this,ACE_Event_Handler::WRITE_MASK); printf("EchoService已打开\n"); return 0; } // 响应有数据可读事件 int handle_input(ACE_HANDLE) { char buf[24]; ssize_t c = _peer.recv(buf,sizeof(buf) - 1); if (c == 0) { // 连接已经关闭 return -1; } _bufs.push_back(std::string(buf,c)); if (_bufs.size() == 1) { // 缓冲区尺寸为1 说明原来缓冲区为空,写事件是取消的,这里唤醒它 reactor()->schedule_wakeup(this,ACE_Event_Handler::WRITE_MASK); } return 0; } // 响应可以发送数据了事件 int handle_output(ACE_HANDLE) { while (!_bufs.empty()) { std::string&buf(*_bufs.begin()); char const*      s(buf.c_str()); char const*const e(s +buf.size()); while (s !=e) { ssize_t c(_peer.send(s,e - s)); if (c == -1 ||c == 0) { // 发送不成功不论发送过程中是否发生阻塞, if (ACE_OS::last_error() ==EWOULDBLOCK) { // 输出缓冲区满,无法再发送数据了(如果你还是继续发送数据,发送会阻塞的) break; }else{ // 连接已关闭 break; } }else{ s += c; } } if (s ==e) { _bufs.pop_front(); }else{ buf = std::string(s,e - s); break; } } if (_bufs.empty()) { // 缓冲区为空,取消写事件监听 reactor()->cancel_wakeup(this,ACE_Event_Handler::WRITE_MASK); } // 不论发送是否成功都返回0,因为,如果发送失败,handle_input 也会发生读失败事件, // 错误处理有handle_input 返回-1 来触发 return 0; } int handle_close(ACE_HANDLE = ACE_INVALID_HANDLEACE_Reactor_Mask mask =ACE_Event_Handler::ALL_EVENTS_MASK) { if (mask ==ACE_Event_Handler::WRITE_MASK) return 0; _peer.close(); delete this; return 0; } // 这个函数主要给reactor::register_handler 时使用的 ACE_HANDLE get_handle () const { return _peer.get_handle(); } // 这个函数主要给acceptor 使用的 stream_typepeer() { return _peer; } // 这个函数主要给acceptor 使用的 int close (u_long = 0) { return handle_close(); } private: stream_type _peer; std::list<std::string>_bufs; }; int main(int /*argc*/,char/*argv*/[]) { u_short port = 20001; ACE_Reactor::instance(newACE_Reactor(newACE_Select_Reactortrue)); EchoService::acceptor_typeacceptor; ACE_INET_Addr svrAddr(port); if (acceptor.open(svrAddr)) { fprintf(stderr,"服务打开失败:%s\n",ACE_OS::strerror(ACE_OS::last_error())); return 1; }else{ fprintf(stdout,"服务已打开,端口为:%u\n",port); ACE_Reactor::instance()->run_reactor_event_loop(); return 0; } }

    总结

    以上是生活随笔为你收集整理的ACE_Select_Reactor 一 ——入门的全部内容,希望文章能够帮你解决所遇到的问题。

    如果觉得生活随笔网站内容还不错,欢迎将生活随笔推荐给好友。