欢迎访问 生活随笔!

生活随笔

当前位置: 首页 > 编程资源 > 编程问答 >内容正文

编程问答

ACE_Proactor UDP V2.0

发布时间:2025/3/21 编程问答 31 豆豆
生活随笔 收集整理的这篇文章主要介绍了 ACE_Proactor UDP V2.0 小编觉得挺不错的,现在分享给大家,帮大家做个参考.

单次发送单次接收

下面的程序使用Proactor模式用UDP通信:

(1)发送端发送一个复合消息,并打印发送的内容

(2)接收端接收一个复合消息并打印接收到的内容

 由于UDP是无连接的,所以这里没有Connector和Acceptor

本例是对ACE自带的example的稍微修改了一下(打印发送和接收的内容,这样更加直观)

发送端:client_main.cpp

[cpp] view plain copy
  • #include <vector>  
  • #include <fstream>  
  • #include <iterator>  
  • #include <iostream>  
  • #include <string>  
  • using namespace std;  
  • #include "ace/Reactor.h"  
  • #include "ace/Message_Queue.h"  
  • #include "ace/Asynch_IO.h"  
  • #include "ace/OS.h"  
  • #include "ace/Proactor.h"  
  • #include "ace/Asynch_Connector.h"  
  • #include <ace/SOCK_Dgram.h>   
  •   
  •   
  •   
  •   
  • //=============================================================================  
  • /** 
  •  *  @file    test_udp_proactor.cpp 
  •  * 
  •  *  $Id: test_udp_proactor.cpp 93639 2011-03-24 13:32:13Z johnnyw $ 
  •  * 
  •  *  This program illustrates how the <ACE_Proactor> can be used to 
  •  *  implement an application that does asynchronous operations using 
  •  *  datagrams. 
  •  * 
  •  * 
  •  *  @author Irfan Pyarali <irfan@cs.wustl.edu> and Roger Tragin <r.tragin@computer.org> 
  •  */  
  • //=============================================================================  
  •   
  •   
  • #include "ace/OS_NS_string.h"  
  • #include "ace/OS_main.h"  
  • #include "ace/Proactor.h"  
  • #include "ace/Asynch_IO.h"  
  • #include "ace/INET_Addr.h"  
  • #include "ace/SOCK_Dgram.h"  
  • #include "ace/Message_Block.h"  
  • #include "ace/Get_Opt.h"  
  • #include "ace/Log_Msg.h"  
  •   
  •   
  •   
  •   
  •   
  •   
  •   
  • // Keep track of when we're done.  
  • static int done = 0;  
  •   
  • /** 
  •  * @class Sender 
  •  * 
  •  * @brief The class will be created by <main>. 
  •  */  
  • class Sender : public ACE_Handler  
  • {  
  • public:  
  •   Sender (void);  
  •   ~Sender (void);  
  •   
  •   //FUZZ: disable check_for_lack_ACE_OS  
  •   ///FUZZ: enable check_for_lack_ACE_OS  
  •   int open (const ACE_TCHAR *host, u_short port);  
  •   
  • protected:  
  •   // These methods are called by the freamwork  
  •   
  •   /// This is called when asynchronous writes from the dgram socket  
  •   /// complete  
  •   virtual void handle_write_dgram (const ACE_Asynch_Write_Dgram::Result &result);  
  •   
  • private:  
  •   
  •   /// Network I/O handle  
  •   ACE_SOCK_Dgram sock_dgram_;  
  •   
  •   /// wd (write dgram): for writing to the socket  
  •   ACE_Asynch_Write_Dgram wd_;  
  •   
  •   const char* completion_key_;  
  •   const char* act_;  
  • };  
  •   
  • Sender::Sender (void)  
  •   : completion_key_ ("Sender completion key"),  
  •     act_ ("Sender ACT")  
  • {  
  • }  
  •   
  • Sender::~Sender (void)  
  • {  
  •   this->sock_dgram_.close ();  
  • }  
  •   
  • int  
  • Sender::open (const ACE_TCHAR *host,  
  •               u_short port)  
  • {  
  •   // Initialize stuff  
  •   
  •   if (this->sock_dgram_.open (ACE_INET_Addr::sap_any) == -1)  
  •     ACE_ERROR_RETURN ((LM_ERROR,  
  •                        "[%D][line:%l]%p\n",  
  •                        "ACE_SOCK_Dgram::open"), -1);  
  •   
  •   // Initialize the asynchronous read.  
  •   if (this->wd_.open (*this,  
  •                       this->sock_dgram_.get_handle (),  
  •                       this->completion_key_,  
  •                       ACE_Proactor::instance ()) == -1)  
  •     ACE_ERROR_RETURN ((LM_ERROR,  
  •                        "[%D][line:%l]%p\n",  
  •                        "ACE_Asynch_Write_Dgram::open"), -1);  
  •   
  •   // We are using scatter/gather to send the message header and  
  •   // message body using 2 buffers  
  •   
  •   // create a message block for the message header  
  •   ACE_Message_Block* msg = 0;  
  •   ACE_NEW_RETURN (msg, ACE_Message_Block (100), -1);  
  •   const char raw_msg [] = "To be or not to be.";  
  •   // Copy buf into the Message_Block and update the wr_ptr ().  
  •   msg->copy (raw_msg, ACE_OS::strlen (raw_msg) + 1);  
  •   
  •   // create a message block for the message body  
  •   ACE_Message_Block* body = 0;  
  •   ACE_NEW_RETURN (body, ACE_Message_Block (100), -1);  
  •   ACE_OS::memset (body->wr_ptr (), 'X', 100);  
  •   body->wr_ptr (100); // always remember to update the wr_ptr ()  
  •   
  •   // set body as the cont of msg.  This associates the 2 message blocks so  
  •   // that a send will send the first block (which is the header) up to  
  •   // length (), and use the cont () to get the next block to send.  You can  
  •   // chain up to IOV_MAX message block using this method.  
  •   msg->cont (body);  
  •   
  •   // do the asynch send  
  •   size_t number_of_bytes_sent = 0;  
  •   ACE_INET_Addr serverAddr (port, host);  
  •   int res = this->wd_.send (msg, number_of_bytes_sent, 0, serverAddr, this->act_);  
  •     
  •   
  •   ACE_Message_Block* p = 0;  
  •   p= msg;  
  •   
  •   switch (res)  
  •     {  
  •     case 0:  
  •       // this is a good error.  The proactor will call our handler when the  
  •       // send has completed.  
  •       break;  
  •     case 1:  
  •       // actually sent something, we will handle it in the handler callback  
  •       ACE_DEBUG ((LM_DEBUG, "********************\n"));  
  •       ACE_DEBUG ((LM_DEBUG,  
  •                   "%s = %d\n",  
  •                   "bytes sent immediately",  
  •                   number_of_bytes_sent));  
  •   
  •       while (p != NULL)  
  •       {  
  •           ACE_DEBUG ((LM_DEBUG,"YOU SEND[%s]\n",p->rd_ptr()));  
  •           p = p->cont();  
  •       }  
  •         
  •       ACE_DEBUG ((LM_DEBUG, "********************\n"));  
  •       res = 0;  
  •       break;  
  •     case -1:  
  •       // Something else went wrong.  
  •       ACE_ERROR ((LM_ERROR,  
  •                   "[%D][line:%l]%p\n",  
  •                   "ACE_Asynch_Write_Dgram::recv"));  
  •       // the handler will not get called in this case so lets clean up our msg  
  •       msg->release ();  
  •       break;  
  •     default:  
  •       // Something undocumented really went wrong.  
  •       ACE_ERROR ((LM_ERROR,  
  •                   "[%D][line:%l]%p\n",  
  •                   "ACE_Asynch_Write_Dgram::recv"));  
  •       msg->release ();  
  •       break;  
  •     }  
  •   return res;  
  • }  
  •   
  • void  
  • Sender::handle_write_dgram (const ACE_Asynch_Write_Dgram::Result &result)  
  • {  
  •   ACE_DEBUG ((LM_DEBUG,  
  •               "handle_write_dgram called\n"));  
  •   
  •   ACE_DEBUG ((LM_DEBUG, "********************\n"));  
  •   ACE_DEBUG ((LM_DEBUG, "%s = %d\n""bytes_to_write", result.bytes_to_write ()));  
  •   ACE_DEBUG ((LM_DEBUG, "%s = %d\n""handle", result.handle ()));  
  •   ACE_DEBUG ((LM_DEBUG, "%s = %d\n""bytes_transfered", result.bytes_transferred ()));  
  •   ACE_DEBUG ((LM_DEBUG, "%s = %d\n""flags", result.flags ()));  
  •   ACE_DEBUG ((LM_DEBUG, "%s = %s\n""act", result.act ()));  
  •   ACE_DEBUG ((LM_DEBUG, "%s = %d\n""success", result.success ()));  
  •   ACE_DEBUG ((LM_DEBUG, "%s = %s\n""completion_key", result.completion_key ()));  
  •   ACE_DEBUG ((LM_DEBUG, "%s = %d\n""error", result.error ()));  
  •   ACE_DEBUG ((LM_DEBUG, "********************\n"));  
  •   
  •   ACE_DEBUG ((LM_DEBUG,  
  •               "Sender completed\n"));  
  •   
  •   // No need for this message block anymore.  
  •   result.message_block ()->release ();  
  •   
  •   // Note that we are done with the test.  
  •   done++;  
  • }  
  •   
  •   
  •   
  • int  
  • ACE_TMAIN (int argc, ACE_TCHAR *argv[])  
  • {  
  •   
  •     //ACE_LOG_MSG->clr_flags(0);  
  •     //ACE_LOG_MSG->set_flags(ACE_Log_Msg::STDERR | ACE_Log_Msg::VERBOSE);  
  •   
  •     Sender sender;  
  •     // Port that we're receiving connections on.  
  •     u_short port = ACE_DEFAULT_SERVER_PORT;  
  •     // Host that we're connecting to.  
  •     string host("localhost");  
  •     if (sender.open (host.c_str(), port) == -1)  
  •     return -1;  
  •   
  •     while (true)  
  •     {  
  •         ACE_Proactor::instance ()->handle_events ();  
  •     }  
  •       
  •     return 0;  
  • }  
  • 接收端server_main.cpp

    [cpp] view plain copy
  • #include "ace/OS_NS_string.h"  
  • #include "ace/OS_main.h"  
  • #include "ace/Proactor.h"  
  • #include "ace/Asynch_IO.h"  
  • #include "ace/INET_Addr.h"  
  • #include "ace/SOCK_Dgram.h"  
  • #include "ace/Message_Block.h"  
  • #include "ace/Get_Opt.h"  
  • #include "ace/Log_Msg.h"  
  •   
  •   
  •   
  • // Host that we're connecting to.  
  • static ACE_TCHAR *host = 0;  
  •   
  • // Port that we're receiving connections on.  
  • static u_short port = ACE_DEFAULT_SERVER_PORT;  
  •   
  • // Keep track of when we're done.  
  • static int done = 0;  
  •   
  • /** 
  •  * @class Receiver 
  •  * 
  •  * @brief This class will receive data from 
  •  * the network connection and dump it to a file. 
  •  */  
  • class Receiver : public ACE_Service_Handler  
  • {  
  • public:  
  •   // = Initialization and termination.  
  •   Receiver (void);  
  •   ~Receiver (void);  
  •   
  •   int open_addr (const ACE_INET_Addr &localAddr);  
  •   
  • protected:  
  •   // These methods are called by the framework  
  •   
  •   /// This method will be called when an asynchronous read completes on  
  •   /// a UDP socket.  
  •   virtual void handle_read_dgram (const ACE_Asynch_Read_Dgram::Result &result);  
  •   
  • private:  
  •   ACE_SOCK_Dgram sock_dgram_;  
  •   
  •   /// rd (read dgram): for reading from a UDP socket.  
  •   ACE_Asynch_Read_Dgram rd_;  
  •   const char* completion_key_;  
  •   const char* act_;  
  • };  
  •   
  • Receiver::Receiver (void)  
  •   : completion_key_ ("Receiver Completion Key"),  
  •     act_ ("Receiver ACT")  
  • {  
  • }  
  •   
  • Receiver::~Receiver (void)  
  • {  
  •   sock_dgram_.close ();  
  • }  
  •   
  • int  
  • Receiver::open_addr (const ACE_INET_Addr &localAddr)  
  • {  
  •   ACE_DEBUG ((LM_DEBUG,  
  •               "[%D][line:%l]Receiver::open_addr called\n"));  
  •   
  •   // Create a local UDP socket to receive datagrams.  
  •   if (this->sock_dgram_.open (localAddr) == -1)  
  •     ACE_ERROR_RETURN ((LM_ERROR,  
  •                        "[%D][line:%l]%p\n",  
  •                        "ACE_SOCK_Dgram::open"), -1);  
  •   
  •   // Initialize the asynchronous read.  
  •   if (this->rd_.open (*this,  
  •                       this->sock_dgram_.get_handle (),  
  •                       this->completion_key_,  
  •                       ACE_Proactor::instance ()) == -1)  
  •     ACE_ERROR_RETURN ((LM_ERROR,  
  •                        "[%D][line:%l]%p\n",  
  •                        "ACE_Asynch_Read_Dgram::open"), -1);  
  •   
  •   // Create a buffer to read into.  We are using scatter/gather to  
  •   // read the message header and message body into 2 buffers  
  •   
  •   // create a message block to read the message header  
  •   ACE_Message_Block* msg = 0;  
  •   ACE_NEW_RETURN (msg, ACE_Message_Block (1024), -1);  
  •   
  •   // the next line sets the size of the header, even though we  
  •   // allocated a the message block of 1k, by setting the size to 20  
  •   // bytes then the first 20 bytes of the reveived datagram will be  
  •   // put into this message block.  
  •   msg->size (20); // size of header to read is 20 bytes  
  •   
  •   // create a message block to read the message body  
  •   ACE_Message_Block* body = 0;  
  •   ACE_NEW_RETURN (body, ACE_Message_Block (1024), -1);  
  •   // The message body will not exceed 1024 bytes, at least not in this test.  
  •   
  •   // set body as the cont of msg.  This associates the 2 message  
  •   // blocks so that a read will fill the first block (which is the  
  •   // header) up to size (), and use the cont () block for the rest of  
  •   // the data.  You can chain up to IOV_MAX message block using this  
  •   // method.  
  •   msg->cont (body);  
  •   
  •   // ok lets do the asynch read  
  •   size_t number_of_bytes_recvd = 0;  
  •   
  •   int res = rd_.recv (msg,  
  •                       number_of_bytes_recvd,  
  •                       0,  
  •                       PF_INET,  
  •                       this->act_);  
  •   switch (res)  
  •     {  
  •     case 0:  
  •       // this is a good error.  The proactor will call our handler when the  
  •       // read has completed.  
  •       break;  
  •     case 1:  
  •       // actually read something, we will handle it in the handler callback  
  •       ACE_DEBUG ((LM_DEBUG, "********************\n"));  
  •       ACE_DEBUG ((LM_DEBUG,  
  •                   "%s = %d\n",  
  •                   "bytes recieved immediately",  
  •                   number_of_bytes_recvd));  
  •   
  •       ACE_DEBUG ((LM_DEBUG, "********************\n"));  
  •       res = 0;  
  •       break;  
  •     case -1:  
  •       // Something else went wrong.  
  •       ACE_ERROR ((LM_ERROR,  
  •                   "[%D][line:%l]%p\n",  
  •                   "ACE_Asynch_Read_Dgram::recv"));  
  •       // the handler will not get called in this case so lets clean up our msg  
  •       msg->release ();  
  •       break;  
  •     default:  
  •       // Something undocumented really went wrong.  
  •       ACE_ERROR ((LM_ERROR,  
  •                   "[%D][line:%l]%p\n",  
  •                   "ACE_Asynch_Read_Dgram::recv"));  
  •       msg->release ();  
  •       break;  
  •     }  
  •   
  •   return res;  
  • }  
  •   
  • void  
  • Receiver::handle_read_dgram (const ACE_Asynch_Read_Dgram::Result &result)  
  • {  
  •   ACE_DEBUG ((LM_DEBUG,  
  •               "handle_read_dgram called\n"));  
  •   
  •   ACE_DEBUG ((LM_DEBUG, "********************\n"));  
  •   ACE_DEBUG ((LM_DEBUG, "%s = %d\n""bytes_to_read", result.bytes_to_read ()));  
  •   ACE_DEBUG ((LM_DEBUG, "%s = %d\n""handle", result.handle ()));  
  •   ACE_DEBUG ((LM_DEBUG, "%s = %d\n""bytes_transfered", result.bytes_transferred ()));  
  •   ACE_INET_Addr peerAddr;  
  •   result.remote_address (peerAddr);  
  •   ACE_DEBUG ((LM_DEBUG, "%s = %s:%d\n""peer_address", peerAddr.get_host_addr (), peerAddr.get_port_number ()));  
  •   ACE_DEBUG ((LM_DEBUG, "%s = %d\n""flags", result.flags ()));  
  •   ACE_DEBUG ((LM_DEBUG, "%s = %s\n""act", result.act ()));  
  •   ACE_DEBUG ((LM_DEBUG, "%s = %d\n""success", result.success ()));  
  •   ACE_DEBUG ((LM_DEBUG, "%s = %s\n""completion_key", result.completion_key ()));  
  •   ACE_DEBUG ((LM_DEBUG, "%s = %d\n""error", result.error ()));  
  •   ACE_DEBUG ((LM_DEBUG, "********************\n"));  
  •   
  •   if (result.success () && result.bytes_transferred () != 0)  
  •     {  
  •       // loop through our message block and print out the contents  
  •       for (const ACE_Message_Block* msg = result.message_block (); msg != 0; msg = msg->cont ())  
  •         { // use msg->length () to get the number of bytes written to the message  
  •           // block.  
  •           ACE_DEBUG ((LM_DEBUG, "Buf=[size=<%d>", msg->length ()));  
  •           for (u_long i = 0; i < msg->length (); ++i)  
  •             ACE_DEBUG ((LM_DEBUG,  
  •                         "%c", (msg->rd_ptr ())[i]));  
  •           ACE_DEBUG ((LM_DEBUG, "]\n"));  
  •         }  
  •     }  
  •   
  •   ACE_DEBUG ((LM_DEBUG,  
  •               "Receiver completed\n"));  
  •   
  •   // No need for this message block anymore.  
  •   result.message_block ()->release ();  
  •   
  •   // Note that we are done with the test.  
  •   done++;  
  • }  
  •   
  •   
  • int  
  •     ACE_TMAIN (int argc, ACE_TCHAR *argv[])  
  • {  
  •   
  •     //ACE_LOG_MSG->set_flags(ACE_Log_Msg::STDERR | ACE_Log_Msg::VERBOSE);  
  •   
  •     Receiver receiver;  
  •   
  •     if (receiver.open_addr (ACE_INET_Addr (port)) == -1)  
  •         return -1;  
  •   
  •     while (true)  
  •     {  
  •         ACE_Proactor::instance ()->handle_events ();  
  •     }  
  •   
  •     return 0;  
  • }  
  • 先运行接收端,再运行发送端,你懂的。

    发送端程序运行结果:

    接收端运行结果:


    定时多目标发送

    程序的功能:

    (1)UDP发送内容到P1,IP2,...,IPn(地址列表从文件读取)
    (1)发送内容从文件中读取;
    (1)发送时间间隔从文件中读取;

    [cpp] view plain copy
  • //=============================================================================  
  • /** 
  •  *  @file    test_udp_proactor.cpp 
  •  * 
  •  *  $Id: test_udp_proactor.cpp 93639 2011-03-24 13:32:13Z johnnyw $ 
  •  * 
  •  *  This program illustrates how the <ACE_Proactor> can be used to 
  •  *  implement an application that does asynchronous operations using 
  •  *  datagrams. 
  •  * 
  •  * 
  •  *  @author Irfan Pyarali <irfan@cs.wustl.edu> and Roger Tragin <r.tragin@computer.org> 
  •  */  
  • //=============================================================================  
  •   
  • #include <vector>  
  • #include <fstream>  
  • #include <iterator>  
  • #include <iostream>  
  • #include <string>  
  • using namespace std;  
  • //#include "ace/Reactor.h"  
  • #include "ace/Message_Queue.h"  
  • #include "ace/Asynch_IO.h"  
  • #include "ace/OS.h"  
  • #include "ace/Proactor.h"  
  • #include "ace/Asynch_Connector.h"  
  • #include <ace/SOCK_Dgram.h>   
  •   
  • #include "ace/OS_NS_string.h"  
  • #include "ace/OS_main.h"  
  • #include "ace/INET_Addr.h"  
  • #include "ace/SOCK_Dgram.h"  
  • #include "ace/Message_Block.h"  
  • #include "ace/Get_Opt.h"  
  • #include "ace/Log_Msg.h"  
  • #include "ace/Event_Handler.h"  
  • #include "ace/Date_Time.h"  
  • #include "ace/WIN32_Proactor.h"  
  •   
  • namespace global  
  • {  
  •     int delay = 2;  
  •     //int interval = 60*10;//每interval 秒计时一次  
  •     int interval = 2;//每interval 秒计时一次  
  •     void print_current_time(void)  
  •     {  
  •         ACE_Date_Time date(ACE_OS::gettimeofday());  
  •         cout<<"当前时间:"  
  •             <<date.year()<<"年"  
  •             <<date.month()<<"月"  
  •             <<date.day()<<"日"  
  •             <<date.hour()<<"时"  
  •             <<date.minute()<<"分"  
  •             <<date.second()<<"秒"<<endl;  
  •     }  
  •     template<typename T>  
  •     bool read_server_addr(vector<T>& addrs)  
  •     {  
  •         ifstream fin("server_addr.ini");  
  •         if (!fin)  
  •         {  
  •             cout<<"找不到配置文件:local_port.ini"<<endl;  
  •             return false;  
  •         }  
  •         istream_iterator<T> first(fin),last;  
  •         vector<string> temp_addrs(first,last);  
  •   
  •         if (temp_addrs.size()==0)  
  •         {  
  •             cout<<"配置文件中找不到服务器地址!"<<endl;  
  •             return false;  
  •         }  
  •         else  
  •         {  
  •             addrs.swap(temp_addrs);  
  •             return true;  
  •         }  
  •     }  
  •     template<typename T>  
  •     bool read_interval(T& interval)  
  •     {  
  •   
  •         ifstream fin("interval_second.ini");  
  •         if (!fin)  
  •         {  
  •             cout<<"找不到配置文件:interval_second.ini"<<endl;  
  •             return false;  
  •         }  
  •         //istream_iterator<T> first(fin),last;  
  •         //vector<string> temp_addrs(first,last);  
  •         fin>>interval;  
  •         if (!fin)  
  •         {  
  •             cout<<"配置文件中找不到发送时间间隔数据!"<<endl;  
  •             return false;  
  •         }  
  •         else  
  •         {  
  •             global::interval = interval;  
  •             return true;  
  •         }  
  •     }  
  • }  
  •   
  • /** 
  •  * @class Sender 
  •  * 
  •  * @brief The class will be created by <main>. 
  •  */  
  • class Sender : public ACE_Handler, public ACE_Event_Handler  
  •   
  • {  
  • public:  
  •   Sender (const int delay,const int interval);  
  •   ~Sender (void);  
  •   
  •   //FUZZ: disable check_for_lack_ACE_OS  
  •   ///FUZZ: enable check_for_lack_ACE_OS  
  •   int open (const ACE_TCHAR *host, u_short port);  
  •   int handle_timeout(const ACE_Time_Value& , const void *act /* = 0 */);//计时器到期后执行的回调函数  
  • protected:  
  •   /// This is called when asynchronous writes from the dgram socket  
  •   /// complete  
  •   virtual void handle_write_dgram (const ACE_Asynch_Write_Dgram::Result &result);  
  •   
  • private:  
  •   void start_timing(void);  
  •   int send_to_one_server(const string&,const string&);  
  •   int send_to_multi_server(void);  
  •   void read_content(string&);  
  •   /// Network I/O handle  
  •   ACE_SOCK_Dgram sock_dgram_;  
  •   
  •   /// wd (write dgram): for writing to the socket  
  •   ACE_Asynch_Write_Dgram wd_;  
  •   
  •   const char* completion_key_;  
  •   const char* act_;  
  •   long time_handle_;//在计时器队列中的ID  
  •   int delay_;//启动多久开始第一次触发超时  
  •   int interval_;//循环计时的间隔  
  • };  
  •   
  • Sender::Sender (const int delay,const int interval)  
  •   : completion_key_ ("Sender completion key"),  
  •     act_ ("Sender ACT"),  
  •     delay_(delay),  
  •     interval_(interval)  
  • {  
  •     ACE_DEBUG ((LM_DEBUG, "Sender::Sender (const int delay,const int interval)\n"));  
  • }  
  •   
  • Sender::~Sender (void)  
  • {  
  •   this->sock_dgram_.close ();  
  • }  
  •   
  • int  
  • Sender::open (const ACE_TCHAR *host,  
  •               u_short port)  
  • {  
  •     ACE_DEBUG ((LM_DEBUG, "Sender::open(%s,%d)\n",host,port));  
  •   
  •   // Initialize stuff  
  •   //初始化和socket有关的成员  
  •   if (this->sock_dgram_.open (ACE_INET_Addr::sap_any) == -1)  
  •     ACE_ERROR_RETURN ((LM_ERROR,  
  •                        "[%D][line:%l]%p\n",  
  •                        "ACE_SOCK_Dgram::open"), -1);  
  •   
  •   // Initialize the asynchronous read.  
  •   if (this->wd_.open (*this,  
  •                       this->sock_dgram_.get_handle (),  
  •                       this->completion_key_,  
  •                       ACE_Proactor::instance ()) == -1)  
  •     ACE_ERROR_RETURN ((LM_ERROR,  
  •                        "[%D][line:%l]%p\n",  
  •                        "ACE_Asynch_Write_Dgram::open"), -1);  
  •   
  •   //init time clock  
  •   //启动计时  
  •   start_timing();  
  •   return 0;  
  •     
  • }  
  •   
  • void Sender::start_timing(void)  
  • {  
  •     ACE_DEBUG ((LM_DEBUG, "Sender::start_timing:delay[%d]interval[%d]\n",  
  •         this->delay_,this->interval_));  
  •     this->reactor(ACE_Reactor::instance());  
  •     this->time_handle_ = this->reactor()->schedule_timer(this,//在这里注册定时器  
  •         0,  
  •         ACE_Time_Value(this->delay_),//程序一开始延迟delay秒开始首次执行到期函数  
  •         ACE_Time_Value(this->interval_));//循环计时,每隔interval秒重复执行  
  • }  
  •   
  • int Sender::handle_timeout(const ACE_Time_Value& , const void *act /* = 0 */)  
  • {  
  •     cout<<"\n\n\n计时器"<<this->interval_/60<<"分钟到期"<<endl;  
  •     global::print_current_time();  
  •       
  •     //if (date.minute() /10 == 3 )//在30分-39分之间这个时间段的时候开始任务  
  •     if (true )//在30分-39分之间这个时间段的时候开始任务  
  •     {  
  •         cout<<"当前时间在31分-39分之间, ,开始查找文件并尝试发送到对端";  
  •         cout<<"开始发送文件到对端"<<endl;  
  •         int num = send_to_multi_server();  
  •         ACE_DEBUG ((LM_DEBUG, "UDP成功发送文件内容到%d个目标地址\n",num));  
  •     }  
  •     else  
  •     {  
  •         global::print_current_time();  
  •         cout<<"不需要上传"<<endl;  
  •     }  
  •       
  •     return 0;  
  • }  
  •   
  • void Sender::read_content(string& content)  
  • {  
  •     string file_name("temp_file.xml");  
  •     ifstream fin(file_name);  
  •     istream_iterator<char> iter_begin(fin),iter_end;  
  •     string send_str(iter_begin,iter_end);  
  •     content.swap(send_str);  
  • }  
  •   
  • int Sender::send_to_one_server(const string& addr,const string& sent_content)  
  • {  
  •     // create a message block for the message header  
  •     ACE_Message_Block* msg = 0;  
  •     ACE_NEW_RETURN (msg, ACE_Message_Block (100), -1);  
  •     // Copy buf into the Message_Block and update the wr_ptr ().  
  •     msg->copy (sent_content.c_str(), sent_content.size());  
  •     // do the asynch send  
  •   
  •     size_t number_of_bytes_sent = 0;  
  •     ACE_INET_Addr serverAddr (addr.c_str());  
  •     int res = this->wd_.send (msg, number_of_bytes_sent, 0, serverAddr, this->act_);  
  •   
  •     ACE_Message_Block* p = 0;  
  •     p= msg;  
  •   
  •     switch (res)  
  •     {  
  •     case 0:  
  •         // this is a good error.  The proactor will call our handler when the  
  •         // send has completed.  
  •         break;  
  •     case 1:  
  •         // actually sent something, we will handle it in the handler callback  
  •         ACE_DEBUG ((LM_DEBUG, "********************\n"));  
  •         ACE_DEBUG ((LM_DEBUG,  
  •             "%s = %d\n",  
  •             "bytes sent immediately",  
  •             number_of_bytes_sent));  
  •   
  •         while (p != NULL)  
  •         {  
  •             string temp;  
  •             for (int i=0;i<p->length();++i)  
  •             {  
  •                 temp.push_back(*(p->rd_ptr()+i));  
  •             }  
  •             ACE_DEBUG ((LM_DEBUG,"YOU SEND[%s]\n",temp.c_str()));  
  •               
  •             p = p->cont();  
  •         }  
  •   
  •         ACE_DEBUG ((LM_DEBUG, "********************\n"));  
  •         res = 0;  
  •         break;  
  •     case -1:  
  •         // Something else went wrong.  
  •         ACE_ERROR ((LM_ERROR,  
  •             "[%D][line:%l]%p\n",  
  •             "ACE_Asynch_Write_Dgram::recv"));  
  •         // the handler will not get called in this case so lets clean up our msg  
  •         msg->release ();  
  •         break;  
  •     default:  
  •         // Something undocumented really went wrong.  
  •         ACE_ERROR ((LM_ERROR,  
  •             "[%D][line:%l]%p\n",  
  •             "ACE_Asynch_Write_Dgram::recv"));  
  •         msg->release ();  
  •         break;  
  •     }  
  •     return res;  
  • }  
  • int Sender::send_to_multi_server(void)  
  • {  
  •   
  •     string send_content;  
  •     this->read_content(send_content);  
  •   
  •     vector<string> server_addrs;  
  •     global::read_server_addr(server_addrs);  
  •     int send_success_number = 0;  
  •     for (vector<string>::const_iterator iter = server_addrs.cbegin();  
  •         iter != server_addrs.cend();  
  •         ++iter)  
  •     {  
  •         if (send_to_one_server(*iter,send_content))  
  •         {  
  •             ++send_success_number;  
  •         }  
  •     }  
  •     return send_success_number;  
  • }  
  •   
  • void  
  • Sender::handle_write_dgram (const ACE_Asynch_Write_Dgram::Result &result)  
  • {  
  •   ACE_DEBUG ((LM_DEBUG,  
  •               "handle_write_dgram called\n"));  
  •   
  •   ACE_DEBUG ((LM_DEBUG, "********************\n"));  
  •   ACE_DEBUG ((LM_DEBUG, "%s = %d\n""bytes_to_write", result.bytes_to_write ()));  
  •   ACE_DEBUG ((LM_DEBUG, "%s = %d\n""handle", result.handle ()));  
  •   ACE_DEBUG ((LM_DEBUG, "%s = %d\n""bytes_transfered", result.bytes_transferred ()));  
  •   ACE_DEBUG ((LM_DEBUG, "%s = %d\n""flags", result.flags ()));  
  •   ACE_DEBUG ((LM_DEBUG, "%s = %s\n""act", result.act ()));  
  •   ACE_DEBUG ((LM_DEBUG, "%s = %d\n""success", result.success ()));  
  •   ACE_DEBUG ((LM_DEBUG, "%s = %s\n""completion_key", result.completion_key ()));  
  •   ACE_DEBUG ((LM_DEBUG, "%s = %d\n""error", result.error ()));  
  •   ACE_DEBUG ((LM_DEBUG, "********************\n"));  
  •   
  •   ACE_DEBUG ((LM_DEBUG,  
  •               "Sender completed\n"));  
  •   
  •   // No need for this message block anymore.  
  •   result.message_block ()->release ();  
  •   
  • }  
  •   
  •   
  •   
  • int  
  • ACE_TMAIN (int argc, ACE_TCHAR *argv[])  
  • {  
  •   
  •     ACE_DEBUG ((LM_DEBUG, "(%t|%P) work starup/n"));  
  •     ACE_Proactor::close_singleton ();   
  •   
  •     ACE_WIN32_Proactor *impl = new ACE_WIN32_Proactor (0, 1);   
  •     ACE_Proactor::instance (new ACE_Proactor (impl, 1), 1);  
  •   
  •     ACE_Reactor::instance ()->register_handler(impl, impl->get_handle ());  
  •   
  •     global::read_interval(global::interval);  
  •   
  •     //ACE_LOG_MSG->clr_flags(0);  
  •     //ACE_LOG_MSG->set_flags(ACE_Log_Msg::STDERR | ACE_Log_Msg::VERBOSE);  
  •   
  •     Sender sender(global::delay,global::interval);  
  •     // Port that we're receiving connections on.  
  •     u_short port = ACE_DEFAULT_SERVER_PORT;  
  •     // Host that we're connecting to.  
  •     string host("localhost");  
  •     if (sender.open (host.c_str(), port) == -1)  
  •     return -1;  
  •   
  •   
  •   
  •     ACE_Reactor::instance()->run_event_loop();  
  •     ACE_Reactor::instance ()->remove_handler (impl,  
  •         ACE_Event_Handler::DONT_CALL);  
  •     ACE_DEBUG ((LM_DEBUG, "(%t|%P) work complete/n"));  
  •   
  •   
  •     return 0;  
  • }  

  • 所需配置文件:

    总结

    以上是生活随笔为你收集整理的ACE_Proactor UDP V2.0的全部内容,希望文章能够帮你解决所遇到的问题。

    如果觉得生活随笔网站内容还不错,欢迎将生活随笔推荐给好友。