生活随笔
收集整理的这篇文章主要介绍了
ACE_Proactor UDP V2.0
小编觉得挺不错的,现在分享给大家,帮大家做个参考.
单次发送单次接收
下面的程序使用Proactor模式用UDP通信:
(1)发送端发送一个复合消息,并打印发送的内容
(2)接收端接收一个复合消息并打印接收到的内容
由于UDP是无连接的,所以这里没有Connector和Acceptor
本例是对ACE自带的example的稍微修改了一下(打印发送和接收的内容,这样更加直观)
发送端:client_main.cpp
[cpp] view plain
copy #include <vector> #include <fstream> #include <iterator> #include <iostream> #include <string> using namespace std; #include "ace/Reactor.h" #include "ace/Message_Queue.h" #include "ace/Asynch_IO.h" #include "ace/OS.h" #include "ace/Proactor.h" #include "ace/Asynch_Connector.h" #include <ace/SOCK_Dgram.h> #include "ace/OS_NS_string.h" #include "ace/OS_main.h" #include "ace/Proactor.h" #include "ace/Asynch_IO.h" #include "ace/INET_Addr.h" #include "ace/SOCK_Dgram.h" #include "ace/Message_Block.h" #include "ace/Get_Opt.h" #include "ace/Log_Msg.h" static int done = 0; class Sender : public ACE_Handler { public: Sender (void); ~Sender (void); int open (const ACE_TCHAR *host, u_short port); protected: virtual void handle_write_dgram (const ACE_Asynch_Write_Dgram::Result &result); private: ACE_SOCK_Dgram sock_dgram_; ACE_Asynch_Write_Dgram wd_; const char* completion_key_; const char* act_; }; Sender::Sender (void) : completion_key_ ("Sender completion key"), act_ ("Sender ACT") { } Sender::~Sender (void) { this->sock_dgram_.close (); } int Sender::open (const ACE_TCHAR *host, u_short port) { if (this->sock_dgram_.open (ACE_INET_Addr::sap_any) == -1) ACE_ERROR_RETURN ((LM_ERROR, "[%D][line:%l]%p\n", "ACE_SOCK_Dgram::open"), -1); if (this->wd_.open (*this, this->sock_dgram_.get_handle (), this->completion_key_, ACE_Proactor::instance ()) == -1) ACE_ERROR_RETURN ((LM_ERROR, "[%D][line:%l]%p\n", "ACE_Asynch_Write_Dgram::open"), -1); ACE_Message_Block* msg = 0; ACE_NEW_RETURN (msg, ACE_Message_Block (100), -1); const char raw_msg [] = "To be or not to be."; msg->copy (raw_msg, ACE_OS::strlen (raw_msg) + 1); ACE_Message_Block* body = 0; ACE_NEW_RETURN (body, ACE_Message_Block (100), -1); ACE_OS::memset (body->wr_ptr (), 'X', 100); body->wr_ptr (100); msg->cont (body); size_t number_of_bytes_sent = 0; ACE_INET_Addr serverAddr (port, host); int res = this->wd_.send (msg, number_of_bytes_sent, 0, serverAddr, this->act_); ACE_Message_Block* p = 0; p= msg; switch (res) { case 0: break; case 1: ACE_DEBUG ((LM_DEBUG, "********************\n")); ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes sent immediately", number_of_bytes_sent)); while (p != NULL) { ACE_DEBUG ((LM_DEBUG,"YOU SEND[%s]\n",p->rd_ptr())); p = p->cont(); } ACE_DEBUG ((LM_DEBUG, "********************\n")); res = 0; break; case -1: ACE_ERROR ((LM_ERROR, "[%D][line:%l]%p\n", "ACE_Asynch_Write_Dgram::recv")); msg->release (); break; default: ACE_ERROR ((LM_ERROR, "[%D][line:%l]%p\n", "ACE_Asynch_Write_Dgram::recv")); msg->release (); break; } return res; } void Sender::handle_write_dgram (const ACE_Asynch_Write_Dgram::Result &result) { ACE_DEBUG ((LM_DEBUG, "handle_write_dgram called\n")); ACE_DEBUG ((LM_DEBUG, "********************\n")); ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_to_write", result.bytes_to_write ())); ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "handle", result.handle ())); ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_transfered", result.bytes_transferred ())); ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "flags", result.flags ())); ACE_DEBUG ((LM_DEBUG, "%s = %s\n", "act", result.act ())); ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "success", result.success ())); ACE_DEBUG ((LM_DEBUG, "%s = %s\n", "completion_key", result.completion_key ())); ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "error", result.error ())); ACE_DEBUG ((LM_DEBUG, "********************\n")); ACE_DEBUG ((LM_DEBUG, "Sender completed\n")); result.message_block ()->release (); done++; } int ACE_TMAIN (int argc, ACE_TCHAR *argv[]) { Sender sender; u_short port = ACE_DEFAULT_SERVER_PORT; string host("localhost"); if (sender.open (host.c_str(), port) == -1) return -1; while (true) { ACE_Proactor::instance ()->handle_events (); } return 0; }
接收端server_main.cpp
[cpp] view plain
copy #include "ace/OS_NS_string.h" #include "ace/OS_main.h" #include "ace/Proactor.h" #include "ace/Asynch_IO.h" #include "ace/INET_Addr.h" #include "ace/SOCK_Dgram.h" #include "ace/Message_Block.h" #include "ace/Get_Opt.h" #include "ace/Log_Msg.h" static ACE_TCHAR *host = 0; static u_short port = ACE_DEFAULT_SERVER_PORT; static int done = 0; class Receiver : public ACE_Service_Handler { public: Receiver (void); ~Receiver (void); int open_addr (const ACE_INET_Addr &localAddr); protected: virtual void handle_read_dgram (const ACE_Asynch_Read_Dgram::Result &result); private: ACE_SOCK_Dgram sock_dgram_; ACE_Asynch_Read_Dgram rd_; const char* completion_key_; const char* act_; }; Receiver::Receiver (void) : completion_key_ ("Receiver Completion Key"), act_ ("Receiver ACT") { } Receiver::~Receiver (void) { sock_dgram_.close (); } int Receiver::open_addr (const ACE_INET_Addr &localAddr) { ACE_DEBUG ((LM_DEBUG, "[%D][line:%l]Receiver::open_addr called\n")); if (this->sock_dgram_.open (localAddr) == -1) ACE_ERROR_RETURN ((LM_ERROR, "[%D][line:%l]%p\n", "ACE_SOCK_Dgram::open"), -1); if (this->rd_.open (*this, this->sock_dgram_.get_handle (), this->completion_key_, ACE_Proactor::instance ()) == -1) ACE_ERROR_RETURN ((LM_ERROR, "[%D][line:%l]%p\n", "ACE_Asynch_Read_Dgram::open"), -1); ACE_Message_Block* msg = 0; ACE_NEW_RETURN (msg, ACE_Message_Block (1024), -1); msg->size (20); ACE_Message_Block* body = 0; ACE_NEW_RETURN (body, ACE_Message_Block (1024), -1); msg->cont (body); size_t number_of_bytes_recvd = 0; int res = rd_.recv (msg, number_of_bytes_recvd, 0, PF_INET, this->act_); switch (res) { case 0: break; case 1: ACE_DEBUG ((LM_DEBUG, "********************\n")); ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes recieved immediately", number_of_bytes_recvd)); ACE_DEBUG ((LM_DEBUG, "********************\n")); res = 0; break; case -1: ACE_ERROR ((LM_ERROR, "[%D][line:%l]%p\n", "ACE_Asynch_Read_Dgram::recv")); msg->release (); break; default: ACE_ERROR ((LM_ERROR, "[%D][line:%l]%p\n", "ACE_Asynch_Read_Dgram::recv")); msg->release (); break; } return res; } void Receiver::handle_read_dgram (const ACE_Asynch_Read_Dgram::Result &result) { ACE_DEBUG ((LM_DEBUG, "handle_read_dgram called\n")); ACE_DEBUG ((LM_DEBUG, "********************\n")); ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_to_read", result.bytes_to_read ())); ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "handle", result.handle ())); ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_transfered", result.bytes_transferred ())); ACE_INET_Addr peerAddr; result.remote_address (peerAddr); ACE_DEBUG ((LM_DEBUG, "%s = %s:%d\n", "peer_address", peerAddr.get_host_addr (), peerAddr.get_port_number ())); ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "flags", result.flags ())); ACE_DEBUG ((LM_DEBUG, "%s = %s\n", "act", result.act ())); ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "success", result.success ())); ACE_DEBUG ((LM_DEBUG, "%s = %s\n", "completion_key", result.completion_key ())); ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "error", result.error ())); ACE_DEBUG ((LM_DEBUG, "********************\n")); if (result.success () && result.bytes_transferred () != 0) { for (const ACE_Message_Block* msg = result.message_block (); msg != 0; msg = msg->cont ()) { ACE_DEBUG ((LM_DEBUG, "Buf=[size=<%d>", msg->length ())); for (u_long i = 0; i < msg->length (); ++i) ACE_DEBUG ((LM_DEBUG, "%c", (msg->rd_ptr ())[i])); ACE_DEBUG ((LM_DEBUG, "]\n")); } } ACE_DEBUG ((LM_DEBUG, "Receiver completed\n")); result.message_block ()->release (); done++; } int ACE_TMAIN (int argc, ACE_TCHAR *argv[]) { Receiver receiver; if (receiver.open_addr (ACE_INET_Addr (port)) == -1) return -1; while (true) { ACE_Proactor::instance ()->handle_events (); } return 0; }
先运行接收端,再运行发送端,你懂的。
发送端程序运行结果:
接收端运行结果:
定时多目标发送
程序的功能:
(1)UDP发送内容到P1,IP2,...,IPn(地址列表从文件读取)
(1)发送内容从文件中读取;
(1)发送时间间隔从文件中读取;
[cpp] view plain
copy #include <vector> #include <fstream> #include <iterator> #include <iostream> #include <string> using namespace std; #include "ace/Message_Queue.h" #include "ace/Asynch_IO.h" #include "ace/OS.h" #include "ace/Proactor.h" #include "ace/Asynch_Connector.h" #include <ace/SOCK_Dgram.h> #include "ace/OS_NS_string.h" #include "ace/OS_main.h" #include "ace/INET_Addr.h" #include "ace/SOCK_Dgram.h" #include "ace/Message_Block.h" #include "ace/Get_Opt.h" #include "ace/Log_Msg.h" #include "ace/Event_Handler.h" #include "ace/Date_Time.h" #include "ace/WIN32_Proactor.h" namespace global { int delay = 2; int interval = 2; void print_current_time(void) { ACE_Date_Time date(ACE_OS::gettimeofday()); cout<<"当前时间:" <<date.year()<<"年" <<date.month()<<"月" <<date.day()<<"日" <<date.hour()<<"时" <<date.minute()<<"分" <<date.second()<<"秒"<<endl; } template<typename T> bool read_server_addr(vector<T>& addrs) { ifstream fin("server_addr.ini"); if (!fin) { cout<<"找不到配置文件:local_port.ini"<<endl; return false; } istream_iterator<T> first(fin),last; vector<string> temp_addrs(first,last); if (temp_addrs.size()==0) { cout<<"配置文件中找不到服务器地址!"<<endl; return false; } else { addrs.swap(temp_addrs); return true; } } template<typename T> bool read_interval(T& interval) { ifstream fin("interval_second.ini"); if (!fin) { cout<<"找不到配置文件:interval_second.ini"<<endl; return false; } fin>>interval; if (!fin) { cout<<"配置文件中找不到发送时间间隔数据!"<<endl; return false; } else { global::interval = interval; return true; } } } class Sender : public ACE_Handler, public ACE_Event_Handler { public: Sender (const int delay,const int interval); ~Sender (void); int open (const ACE_TCHAR *host, u_short port); int handle_timeout(const ACE_Time_Value& , const void *act ); protected: virtual void handle_write_dgram (const ACE_Asynch_Write_Dgram::Result &result); private: void start_timing(void); int send_to_one_server(const string&,const string&); int send_to_multi_server(void); void read_content(string&); ACE_SOCK_Dgram sock_dgram_; ACE_Asynch_Write_Dgram wd_; const char* completion_key_; const char* act_; long time_handle_; int delay_; int interval_; }; Sender::Sender (const int delay,const int interval) : completion_key_ ("Sender completion key"), act_ ("Sender ACT"), delay_(delay), interval_(interval) { ACE_DEBUG ((LM_DEBUG, "Sender::Sender (const int delay,const int interval)\n")); } Sender::~Sender (void) { this->sock_dgram_.close (); } int Sender::open (const ACE_TCHAR *host, u_short port) { ACE_DEBUG ((LM_DEBUG, "Sender::open(%s,%d)\n",host,port)); if (this->sock_dgram_.open (ACE_INET_Addr::sap_any) == -1) ACE_ERROR_RETURN ((LM_ERROR, "[%D][line:%l]%p\n", "ACE_SOCK_Dgram::open"), -1); if (this->wd_.open (*this, this->sock_dgram_.get_handle (), this->completion_key_, ACE_Proactor::instance ()) == -1) ACE_ERROR_RETURN ((LM_ERROR, "[%D][line:%l]%p\n", "ACE_Asynch_Write_Dgram::open"), -1); start_timing(); return 0; } void Sender::start_timing(void) { ACE_DEBUG ((LM_DEBUG, "Sender::start_timing:delay[%d]interval[%d]\n", this->delay_,this->interval_)); this->reactor(ACE_Reactor::instance()); this->time_handle_ = this->reactor()->schedule_timer(this, 0, ACE_Time_Value(this->delay_), ACE_Time_Value(this->interval_)); } int Sender::handle_timeout(const ACE_Time_Value& , const void *act ) { cout<<"\n\n\n计时器"<<this->interval_/60<<"分钟到期"<<endl; global::print_current_time(); if (true ) { cout<<"当前时间在31分-39分之间, ,开始查找文件并尝试发送到对端"; cout<<"开始发送文件到对端"<<endl; int num = send_to_multi_server(); ACE_DEBUG ((LM_DEBUG, "UDP成功发送文件内容到%d个目标地址\n",num)); } else { global::print_current_time(); cout<<"不需要上传"<<endl; } return 0; } void Sender::read_content(string& content) { string file_name("temp_file.xml"); ifstream fin(file_name); istream_iterator<char> iter_begin(fin),iter_end; string send_str(iter_begin,iter_end); content.swap(send_str); } int Sender::send_to_one_server(const string& addr,const string& sent_content) { ACE_Message_Block* msg = 0; ACE_NEW_RETURN (msg, ACE_Message_Block (100), -1); msg->copy (sent_content.c_str(), sent_content.size()); size_t number_of_bytes_sent = 0; ACE_INET_Addr serverAddr (addr.c_str()); int res = this->wd_.send (msg, number_of_bytes_sent, 0, serverAddr, this->act_); ACE_Message_Block* p = 0; p= msg; switch (res) { case 0: break; case 1: ACE_DEBUG ((LM_DEBUG, "********************\n")); ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes sent immediately", number_of_bytes_sent)); while (p != NULL) { string temp; for (int i=0;i<p->length();++i) { temp.push_back(*(p->rd_ptr()+i)); } ACE_DEBUG ((LM_DEBUG,"YOU SEND[%s]\n",temp.c_str())); p = p->cont(); } ACE_DEBUG ((LM_DEBUG, "********************\n")); res = 0; break; case -1: ACE_ERROR ((LM_ERROR, "[%D][line:%l]%p\n", "ACE_Asynch_Write_Dgram::recv")); msg->release (); break; default: ACE_ERROR ((LM_ERROR, "[%D][line:%l]%p\n", "ACE_Asynch_Write_Dgram::recv")); msg->release (); break; } return res; } int Sender::send_to_multi_server(void) { string send_content; this->read_content(send_content); vector<string> server_addrs; global::read_server_addr(server_addrs); int send_success_number = 0; for (vector<string>::const_iterator iter = server_addrs.cbegin(); iter != server_addrs.cend(); ++iter) { if (send_to_one_server(*iter,send_content)) { ++send_success_number; } } return send_success_number; } void Sender::handle_write_dgram (const ACE_Asynch_Write_Dgram::Result &result) { ACE_DEBUG ((LM_DEBUG, "handle_write_dgram called\n")); ACE_DEBUG ((LM_DEBUG, "********************\n")); ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_to_write", result.bytes_to_write ())); ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "handle", result.handle ())); ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_transfered", result.bytes_transferred ())); ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "flags", result.flags ())); ACE_DEBUG ((LM_DEBUG, "%s = %s\n", "act", result.act ())); ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "success", result.success ())); ACE_DEBUG ((LM_DEBUG, "%s = %s\n", "completion_key", result.completion_key ())); ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "error", result.error ())); ACE_DEBUG ((LM_DEBUG, "********************\n")); ACE_DEBUG ((LM_DEBUG, "Sender completed\n")); result.message_block ()->release (); } int ACE_TMAIN (int argc, ACE_TCHAR *argv[]) { ACE_DEBUG ((LM_DEBUG, "(%t|%P) work starup/n")); ACE_Proactor::close_singleton (); ACE_WIN32_Proactor *impl = new ACE_WIN32_Proactor (0, 1); ACE_Proactor::instance (new ACE_Proactor (impl, 1), 1); ACE_Reactor::instance ()->register_handler(impl, impl->get_handle ()); global::read_interval(global::interval); Sender sender(global::delay,global::interval); u_short port = ACE_DEFAULT_SERVER_PORT; string host("localhost"); if (sender.open (host.c_str(), port) == -1) return -1; ACE_Reactor::instance()->run_event_loop(); ACE_Reactor::instance ()->remove_handler (impl, ACE_Event_Handler::DONT_CALL); ACE_DEBUG ((LM_DEBUG, "(%t|%P) work complete/n")); return 0; }
所需配置文件:
总结
以上是生活随笔为你收集整理的ACE_Proactor UDP V2.0的全部内容,希望文章能够帮你解决所遇到的问题。
如果觉得生活随笔网站内容还不错,欢迎将生活随笔推荐给好友。