下面的程序使用Proactor模式用UDP通信:
(1)發送端發送一個復合消息,並打印發送的內容
(2)接收端接收一個復合消息並打印接收到的內容
由於UDP是無連接的,所以這裡沒有Connector和Acceptor
本例是對ACE自帶的example的稍微修改了一下(打印發送和接收的內容,這樣更加直觀)
發送端:client_main.cpp
#include#include #include #include #include using namespace std; #include "ace/Reactor.h" #include "ace/Message_Queue.h" #include "ace/Asynch_IO.h" #include "ace/OS.h" #include "ace/Proactor.h" #include "ace/Asynch_Connector.h" #include //============================================================================= /** * @file test_udp_proactor.cpp * * $Id: test_udp_proactor.cpp 93639 2011-03-24 13:32:13Z johnnyw $ * * This program illustrates how the can be used to * implement an application that does asynchronous operations using * datagrams. * * * @author Irfan Pyarali and Roger Tragin */ //============================================================================= #include "ace/OS_NS_string.h" #include "ace/OS_main.h" #include "ace/Proactor.h" #include "ace/Asynch_IO.h" #include "ace/INET_Addr.h" #include "ace/SOCK_Dgram.h" #include "ace/Message_Block.h" #include "ace/Get_Opt.h" #include "ace/Log_Msg.h" // Keep track of when we're done. static int done = 0; /** * @class Sender * * @brief The class will be created by . */ class Sender : public ACE_Handler { public: Sender (void); ~Sender (void); //FUZZ: disable check_for_lack_ACE_OS ///FUZZ: enable check_for_lack_ACE_OS int open (const ACE_TCHAR *host, u_short port); protected: // These methods are called by the freamwork /// This is called when asynchronous writes from the dgram socket /// complete virtual void handle_write_dgram (const ACE_Asynch_Write_Dgram::Result &result); private: /// Network I/O handle ACE_SOCK_Dgram sock_dgram_; /// wd (write dgram): for writing to the socket ACE_Asynch_Write_Dgram wd_; const char* completion_key_; const char* act_; }; Sender::Sender (void) : completion_key_ ("Sender completion key"), act_ ("Sender ACT") { } Sender::~Sender (void) { this->sock_dgram_.close (); } int Sender::open (const ACE_TCHAR *host, u_short port) { // Initialize stuff if (this->sock_dgram_.open (ACE_INET_Addr::sap_any) == -1) ACE_ERROR_RETURN ((LM_ERROR, "[%D][line:%l]%p\n", "ACE_SOCK_Dgram::open"), -1); // Initialize the asynchronous read. if (this->wd_.open (*this, this->sock_dgram_.get_handle (), this->completion_key_, ACE_Proactor::instance ()) == -1) ACE_ERROR_RETURN ((LM_ERROR, "[%D][line:%l]%p\n", "ACE_Asynch_Write_Dgram::open"), -1); // We are using scatter/gather to send the message header and // message body using 2 buffers // create a message block for the message header ACE_Message_Block* msg = 0; ACE_NEW_RETURN (msg, ACE_Message_Block (100), -1); const char raw_msg [] = "To be or not to be."; // Copy buf into the Message_Block and update the wr_ptr (). msg->copy (raw_msg, ACE_OS::strlen (raw_msg) + 1); // create a message block for the message body ACE_Message_Block* body = 0; ACE_NEW_RETURN (body, ACE_Message_Block (100), -1); ACE_OS::memset (body->wr_ptr (), 'X', 100); body->wr_ptr (100); // always remember to update the wr_ptr () // set body as the cont of msg. This associates the 2 message blocks so // that a send will send the first block (which is the header) up to // length (), and use the cont () to get the next block to send. You can // chain up to IOV_MAX message block using this method. msg->cont (body); // do the asynch send size_t number_of_bytes_sent = 0; ACE_INET_Addr serverAddr (port, host); int res = this->wd_.send (msg, number_of_bytes_sent, 0, serverAddr, this->act_); ACE_Message_Block* p = 0; p= msg; switch (res) { case 0: // this is a good error. The proactor will call our handler when the // send has completed. break; case 1: // actually sent something, we will handle it in the handler callback ACE_DEBUG ((LM_DEBUG, "********************\n")); ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes sent immediately", number_of_bytes_sent)); while (p != NULL) { ACE_DEBUG ((LM_DEBUG,"YOU SEND[%s]\n",p->rd_ptr())); p = p->cont(); } ACE_DEBUG ((LM_DEBUG, "********************\n")); res = 0; break; case -1: // Something else went wrong. ACE_ERROR ((LM_ERROR, "[%D][line:%l]%p\n", "ACE_Asynch_Write_Dgram::recv")); // the handler will not get called in this case so lets clean up our msg msg->release (); break; default: // Something undocumented really went wrong. ACE_ERROR ((LM_ERROR, "[%D][line:%l]%p\n", "ACE_Asynch_Write_Dgram::recv")); msg->release (); break; } return res; } void Sender::handle_write_dgram (const ACE_Asynch_Write_Dgram::Result &result) { ACE_DEBUG ((LM_DEBUG, "handle_write_dgram called\n")); ACE_DEBUG ((LM_DEBUG, "********************\n")); ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_to_write", result.bytes_to_write ())); ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "handle", result.handle ())); ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_transfered", result.bytes_transferred ())); ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "flags", result.flags ())); ACE_DEBUG ((LM_DEBUG, "%s = %s\n", "act", result.act ())); ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "success", result.success ())); ACE_DEBUG ((LM_DEBUG, "%s = %s\n", "completion_key", result.completion_key ())); ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "error", result.error ())); ACE_DEBUG ((LM_DEBUG, "********************\n")); ACE_DEBUG ((LM_DEBUG, "Sender completed\n")); // No need for this message block anymore. result.message_block ()->release (); // Note that we are done with the test. done++; } int ACE_TMAIN (int argc, ACE_TCHAR *argv[]) { //ACE_LOG_MSG->clr_flags(0); //ACE_LOG_MSG->set_flags(ACE_Log_Msg::STDERR | ACE_Log_Msg::VERBOSE); Sender sender; // Port that we're receiving connections on. u_short port = ACE_DEFAULT_SERVER_PORT; // Host that we're connecting to. string host("localhost"); if (sender.open (host.c_str(), port) == -1) return -1; while (true) { ACE_Proactor::instance ()->handle_events (); } return 0; }
接收端server_main.cpp
#include "ace/OS_NS_string.h" #include "ace/OS_main.h" #include "ace/Proactor.h" #include "ace/Asynch_IO.h" #include "ace/INET_Addr.h" #include "ace/SOCK_Dgram.h" #include "ace/Message_Block.h" #include "ace/Get_Opt.h" #include "ace/Log_Msg.h" // Host that we're connecting to. static ACE_TCHAR *host = 0; // Port that we're receiving connections on. static u_short port = ACE_DEFAULT_SERVER_PORT; // Keep track of when we're done. static int done = 0; /** * @class Receiver * * @brief This class will receive data from * the network connection and dump it to a file. */ class Receiver : public ACE_Service_Handler { public: // = Initialization and termination. Receiver (void); ~Receiver (void); int open_addr (const ACE_INET_Addr &localAddr); protected: // These methods are called by the framework /// This method will be called when an asynchronous read completes on /// a UDP socket. virtual void handle_read_dgram (const ACE_Asynch_Read_Dgram::Result &result); private: ACE_SOCK_Dgram sock_dgram_; /// rd (read dgram): for reading from a UDP socket. ACE_Asynch_Read_Dgram rd_; const char* completion_key_; const char* act_; }; Receiver::Receiver (void) : completion_key_ ("Receiver Completion Key"), act_ ("Receiver ACT") { } Receiver::~Receiver (void) { sock_dgram_.close (); } int Receiver::open_addr (const ACE_INET_Addr &localAddr) { ACE_DEBUG ((LM_DEBUG, "[%D][line:%l]Receiver::open_addr called\n")); // Create a local UDP socket to receive datagrams. if (this->sock_dgram_.open (localAddr) == -1) ACE_ERROR_RETURN ((LM_ERROR, "[%D][line:%l]%p\n", "ACE_SOCK_Dgram::open"), -1); // Initialize the asynchronous read. if (this->rd_.open (*this, this->sock_dgram_.get_handle (), this->completion_key_, ACE_Proactor::instance ()) == -1) ACE_ERROR_RETURN ((LM_ERROR, "[%D][line:%l]%p\n", "ACE_Asynch_Read_Dgram::open"), -1); // Create a buffer to read into. We are using scatter/gather to // read the message header and message body into 2 buffers // create a message block to read the message header ACE_Message_Block* msg = 0; ACE_NEW_RETURN (msg, ACE_Message_Block (1024), -1); // the next line sets the size of the header, even though we // allocated a the message block of 1k, by setting the size to 20 // bytes then the first 20 bytes of the reveived datagram will be // put into this message block. msg->size (20); // size of header to read is 20 bytes // create a message block to read the message body ACE_Message_Block* body = 0; ACE_NEW_RETURN (body, ACE_Message_Block (1024), -1); // The message body will not exceed 1024 bytes, at least not in this test. // set body as the cont of msg. This associates the 2 message // blocks so that a read will fill the first block (which is the // header) up to size (), and use the cont () block for the rest of // the data. You can chain up to IOV_MAX message block using this // method. msg->cont (body); // ok lets do the asynch read size_t number_of_bytes_recvd = 0; int res = rd_.recv (msg, number_of_bytes_recvd, 0, PF_INET, this->act_); switch (res) { case 0: // this is a good error. The proactor will call our handler when the // read has completed. break; case 1: // actually read something, we will handle it in the handler callback ACE_DEBUG ((LM_DEBUG, "********************\n")); ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes recieved immediately", number_of_bytes_recvd)); ACE_DEBUG ((LM_DEBUG, "********************\n")); res = 0; break; case -1: // Something else went wrong. ACE_ERROR ((LM_ERROR, "[%D][line:%l]%p\n", "ACE_Asynch_Read_Dgram::recv")); // the handler will not get called in this case so lets clean up our msg msg->release (); break; default: // Something undocumented really went wrong. ACE_ERROR ((LM_ERROR, "[%D][line:%l]%p\n", "ACE_Asynch_Read_Dgram::recv")); msg->release (); break; } return res; } void Receiver::handle_read_dgram (const ACE_Asynch_Read_Dgram::Result &result) { ACE_DEBUG ((LM_DEBUG, "handle_read_dgram called\n")); ACE_DEBUG ((LM_DEBUG, "********************\n")); ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_to_read", result.bytes_to_read ())); ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "handle", result.handle ())); ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_transfered", result.bytes_transferred ())); ACE_INET_Addr peerAddr; result.remote_address (peerAddr); ACE_DEBUG ((LM_DEBUG, "%s = %s:%d\n", "peer_address", peerAddr.get_host_addr (), peerAddr.get_port_number ())); ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "flags", result.flags ())); ACE_DEBUG ((LM_DEBUG, "%s = %s\n", "act", result.act ())); ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "success", result.success ())); ACE_DEBUG ((LM_DEBUG, "%s = %s\n", "completion_key", result.completion_key ())); ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "error", result.error ())); ACE_DEBUG ((LM_DEBUG, "********************\n")); if (result.success () && result.bytes_transferred () != 0) { // loop through our message block and print out the contents for (const ACE_Message_Block* msg = result.message_block (); msg != 0; msg = msg->cont ()) { // use msg->length () to get the number of bytes written to the message // block. ACE_DEBUG ((LM_DEBUG, "Buf=[size=<%d>", msg->length ())); for (u_long i = 0; i < msg->length (); ++i) ACE_DEBUG ((LM_DEBUG, "%c", (msg->rd_ptr ())[i])); ACE_DEBUG ((LM_DEBUG, "]\n")); } } ACE_DEBUG ((LM_DEBUG, "Receiver completed\n")); // No need for this message block anymore. result.message_block ()->release (); // Note that we are done with the test. done++; } int ACE_TMAIN (int argc, ACE_TCHAR *argv[]) { //ACE_LOG_MSG->set_flags(ACE_Log_Msg::STDERR | ACE_Log_Msg::VERBOSE); Receiver receiver; if (receiver.open_addr (ACE_INET_Addr (port)) == -1) return -1; while (true) { ACE_Proactor::instance ()->handle_events (); } return 0; }
先運行接收端,再運行發送端,你懂的。
發送端程序運行結果:
接收端運行結果:
程序的功能:
(1)UDP發送內容到P1,IP2,...,IPn(地址列表從文件讀取)
(1)發送內容從文件中讀取;
(1)發送時間間隔從文件中讀取;
//============================================================================= /** * @file test_udp_proactor.cpp * * $Id: test_udp_proactor.cpp 93639 2011-03-24 13:32:13Z johnnyw $ * * This program illustrates how the can be used to * implement an application that does asynchronous operations using * datagrams. * * * @author Irfan Pyaraliand Roger Tragin */ //============================================================================= #include #include #include #include #include using namespace std; //#include "ace/Reactor.h" #include "ace/Message_Queue.h" #include "ace/Asynch_IO.h" #include "ace/OS.h" #include "ace/Proactor.h" #include "ace/Asynch_Connector.h" #include #include "ace/OS_NS_string.h" #include "ace/OS_main.h" #include "ace/INET_Addr.h" #include "ace/SOCK_Dgram.h" #include "ace/Message_Block.h" #include "ace/Get_Opt.h" #include "ace/Log_Msg.h" #include "ace/Event_Handler.h" #include "ace/Date_Time.h" #include "ace/WIN32_Proactor.h" namespace global { int delay = 2; //int interval = 60*10;//每interval 秒計時一次 int interval = 2;//每interval 秒計時一次 void print_current_time(void) { ACE_Date_Time date(ACE_OS::gettimeofday()); cout<<"當前時間:" < bool read_server_addr(vector & addrs) { ifstream fin("server_addr.ini"); if (!fin) { cout<<"找不到配置文件:local_port.ini"< first(fin),last; vector temp_addrs(first,last); if (temp_addrs.size()==0) { cout<<"配置文件中找不到服務器地址!"< bool read_interval(T& interval) { ifstream fin("interval_second.ini"); if (!fin) { cout<<"找不到配置文件:interval_second.ini"< first(fin),last; //vector temp_addrs(first,last); fin>>interval; if (!fin) { cout<<"配置文件中找不到發送時間間隔數據!"< . */ class Sender : public ACE_Handler, public ACE_Event_Handler { public: Sender (const int delay,const int interval); ~Sender (void); //FUZZ: disable check_for_lack_ACE_OS ///FUZZ: enable check_for_lack_ACE_OS int open (const ACE_TCHAR *host, u_short port); int handle_timeout(const ACE_Time_Value& , const void *act /* = 0 */);//計時器到期後執行的回調函數 protected: /// This is called when asynchronous writes from the dgram socket /// complete virtual void handle_write_dgram (const ACE_Asynch_Write_Dgram::Result &result); private: void start_timing(void); int send_to_one_server(const string&,const string&); int send_to_multi_server(void); void read_content(string&); /// Network I/O handle ACE_SOCK_Dgram sock_dgram_; /// wd (write dgram): for writing to the socket ACE_Asynch_Write_Dgram wd_; const char* completion_key_; const char* act_; long time_handle_;//在計時器隊列中的ID int delay_;//啟動多久開始第一次觸發超時 int interval_;//循環計時的間隔 }; Sender::Sender (const int delay,const int interval) : completion_key_ ("Sender completion key"), act_ ("Sender ACT"), delay_(delay), interval_(interval) { ACE_DEBUG ((LM_DEBUG, "Sender::Sender (const int delay,const int interval)\n")); } Sender::~Sender (void) { this->sock_dgram_.close (); } int Sender::open (const ACE_TCHAR *host, u_short port) { ACE_DEBUG ((LM_DEBUG, "Sender::open(%s,%d)\n",host,port)); // Initialize stuff //初始化和socket有關的成員 if (this->sock_dgram_.open (ACE_INET_Addr::sap_any) == -1) ACE_ERROR_RETURN ((LM_ERROR, "[%D][line:%l]%p\n", "ACE_SOCK_Dgram::open"), -1); // Initialize the asynchronous read. if (this->wd_.open (*this, this->sock_dgram_.get_handle (), this->completion_key_, ACE_Proactor::instance ()) == -1) ACE_ERROR_RETURN ((LM_ERROR, "[%D][line:%l]%p\n", "ACE_Asynch_Write_Dgram::open"), -1); //init time clock //啟動計時 start_timing(); return 0; } void Sender::start_timing(void) { ACE_DEBUG ((LM_DEBUG, "Sender::start_timing:delay[%d]interval[%d]\n", this->delay_,this->interval_)); this->reactor(ACE_Reactor::instance()); this->time_handle_ = this->reactor()->schedule_timer(this,//在這裡注冊定時器 0, ACE_Time_Value(this->delay_),//程序一開始延遲delay秒開始首次執行到期函數 ACE_Time_Value(this->interval_));//循環計時,每隔interval秒重復執行 } int Sender::handle_timeout(const ACE_Time_Value& , const void *act /* = 0 */) { cout<<"\n\n\n計時器"< interval_/60<<"分鐘到期"< iter_begin(fin),iter_end; string send_str(iter_begin,iter_end); content.swap(send_str); } int Sender::send_to_one_server(const string& addr,const string& sent_content) { // create a message block for the message header ACE_Message_Block* msg = 0; ACE_NEW_RETURN (msg, ACE_Message_Block (100), -1); // Copy buf into the Message_Block and update the wr_ptr (). msg->copy (sent_content.c_str(), sent_content.size()); // do the asynch send size_t number_of_bytes_sent = 0; ACE_INET_Addr serverAddr (addr.c_str()); int res = this->wd_.send (msg, number_of_bytes_sent, 0, serverAddr, this->act_); ACE_Message_Block* p = 0; p= msg; switch (res) { case 0: // this is a good error. The proactor will call our handler when the // send has completed. break; case 1: // actually sent something, we will handle it in the handler callback ACE_DEBUG ((LM_DEBUG, "********************\n")); ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes sent immediately", number_of_bytes_sent)); while (p != NULL) { string temp; for (int i=0;i length();++i) { temp.push_back(*(p->rd_ptr()+i)); } ACE_DEBUG ((LM_DEBUG,"YOU SEND[%s]\n",temp.c_str())); p = p->cont(); } ACE_DEBUG ((LM_DEBUG, "********************\n")); res = 0; break; case -1: // Something else went wrong. ACE_ERROR ((LM_ERROR, "[%D][line:%l]%p\n", "ACE_Asynch_Write_Dgram::recv")); // the handler will not get called in this case so lets clean up our msg msg->release (); break; default: // Something undocumented really went wrong. ACE_ERROR ((LM_ERROR, "[%D][line:%l]%p\n", "ACE_Asynch_Write_Dgram::recv")); msg->release (); break; } return res; } int Sender::send_to_multi_server(void) { string send_content; this->read_content(send_content); vector server_addrs; global::read_server_addr(server_addrs); int send_success_number = 0; for (vector ::const_iterator iter = server_addrs.cbegin(); iter != server_addrs.cend(); ++iter) { if (send_to_one_server(*iter,send_content)) { ++send_success_number; } } return send_success_number; } void Sender::handle_write_dgram (const ACE_Asynch_Write_Dgram::Result &result) { ACE_DEBUG ((LM_DEBUG, "handle_write_dgram called\n")); ACE_DEBUG ((LM_DEBUG, "********************\n")); ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_to_write", result.bytes_to_write ())); ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "handle", result.handle ())); ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_transfered", result.bytes_transferred ())); ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "flags", result.flags ())); ACE_DEBUG ((LM_DEBUG, "%s = %s\n", "act", result.act ())); ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "success", result.success ())); ACE_DEBUG ((LM_DEBUG, "%s = %s\n", "completion_key", result.completion_key ())); ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "error", result.error ())); ACE_DEBUG ((LM_DEBUG, "********************\n")); ACE_DEBUG ((LM_DEBUG, "Sender completed\n")); // No need for this message block anymore. result.message_block ()->release (); } int ACE_TMAIN (int argc, ACE_TCHAR *argv[]) { ACE_DEBUG ((LM_DEBUG, "(%t|%P) work starup/n")); ACE_Proactor::close_singleton (); ACE_WIN32_Proactor *impl = new ACE_WIN32_Proactor (0, 1); ACE_Proactor::instance (new ACE_Proactor (impl, 1), 1); ACE_Reactor::instance ()->register_handler(impl, impl->get_handle ()); global::read_interval(global::interval); //ACE_LOG_MSG->clr_flags(0); //ACE_LOG_MSG->set_flags(ACE_Log_Msg::STDERR | ACE_Log_Msg::VERBOSE); Sender sender(global::delay,global::interval); // Port that we're receiving connections on. u_short port = ACE_DEFAULT_SERVER_PORT; // Host that we're connecting to. string host("localhost"); if (sender.open (host.c_str(), port) == -1) return -1; ACE_Reactor::instance()->run_event_loop(); ACE_Reactor::instance ()->remove_handler (impl, ACE_Event_Handler::DONT_CALL); ACE_DEBUG ((LM_DEBUG, "(%t|%P) work complete/n")); return 0; }
所需配置文件: