程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
 程式師世界 >> 編程語言 >> C語言 >> C++ >> 關於C++ >> C++——boost:asio的使用介紹

C++——boost:asio的使用介紹

編輯:關於C++

背景知識

高效網絡編程一般都要依賴於IO復用,IO復用是指同時發送並監聽處理很多socket或者文件讀寫的事件。IO復用的高效方式目前常用的有兩種:Reactor和Proactor。這兩種方式在操作系統級都是異步和非阻塞的,也就是說用戶提交了一個請求後都可以直接返回。但是Reactor在用戶層級看來是同步的,就是在提交了一系列的操作給操作系統後,需要阻塞監聽等待事件的發生,如果有事件發生則手動調用相關的函數進行處理,其具體在操作系統級利用的是操作系統的事件通知接口。而Proactor在用戶看來是異步的,他是在調用的時候同時注冊一個回調函數,如果請求在操作系統級有結果了,其注冊的回調函數就會自動調用。這個在操作系統級使用的aio異步調用接口。

 

最顯著的不同時,以TCP距離,Reactor會在內核收到TCP數據的時候通知上層應用程序,後面從內核中取出數據並調用處理函數處理用用戶完成(什麼時候,怎麼處理)。Proactor會在TCP收到數據後由內核將數據拷貝到用戶指定的空間,然後立即調用注冊的回調函數進行處理。

看起來Proactor會明顯比Reactor簡單和快速,但是由於工程原因,這個也是不一定的。

 

介紹

將整個異步平台抽象成boost::asio::io_service,想要使用asio都要先建立這個對象。異步平台上可以使用很多組件,比如boost::asio::ip::tcp::socket,這些組件又有各自的方法。但是過程是統一的:(asio可以執行同步和異步兩種調用)

對於同步的調用。調用socket.connect(server_endpoint),或者其他隊遠端交互的方法。請求會首先發送給io_service,io_service會調用操作系統的具體方法,然後返回結果到io_service,io_service會通知到上層用戶組件。錯誤用異常通知(可以阻止),正確用返回值。

對於異步調用。在組件調用io_service執行命令的同時要提供一個回調函數,可以同時發布多個異步請求,所有的返回結果都會放在io_service的隊列裡存儲。進程調用io_service::run()會逐個的拿出存儲在隊列裡的請求調用提前傳入的回調函數進行處理。

I/O對象是用來完成實際功能的組件,有多種對象 :

boost::asio::ip::tcp::socket

boost::asio::ip::tcp::resolver

boost::asio::ip::tcp::acceptor

boost::asio::local::stream_protocol::socket本地連接

boost::asio::posix::stream_descriptor 面向流的文件描述符,比如stdout,stdin

boost::asio::deadline_timer 定時器

boost::asio::signal_set 信號處理

這些對象大部分需要io_service來初始化。還有一個用於控制io_service生命周期的work類,和用來存儲數據的buffer類。

 

io_service

run() vs poll()

run()和poll()都循環執行I/O對象的事件,區別在於如果事件沒有被觸發(ready),run()會等待,但是poll()會立即返回。也就是說poll()只會執行已經觸發的I/O事件。

比如I/O對象socket1,socket2, socket3都綁定了socket.async_read_some()事件,而此時socket1、socket3有數據過來。則調用poll()會執行socket1、socket3相應的handler,然後返回;而調用run()也會執行socket1和socket3的相應的handler,但會繼續等待socket2的讀事件。

stop()

調用 io_service.stop() 會中止 run loop,一般在多線程中使用。

post() vs dispatch()

post()和dispatch()都是要求io_service執行一個handler,但是dispatch()要求立即執行,而post()總是先把該handler加入事件隊列。

什麼時候需要使用post()?當不希望立即調用一個handler,而是異步調用該handler,則應該調用post()把該handler交由io_service放到事件隊列裡去執行。比如,Boost.Asio自帶的聊天室示例,其中實現了一個支持異步IO的聊天室客戶端,是個很好的例子。

chat_client.cpp 的write()函數之所以要使用post(),是為了避免臨界區同步問題。write()調用和do_write()裡async_write()的執行分別屬於兩個線程,前者會往write_msgs_裡寫數據,而後者會從write_msgs_裡讀數據,如果不使用post(),而直接調用do_write(),顯然需要使用鎖來同步write_msgs_。但是使用post()相當於由io_service來調度write_msgs_的讀寫,這就在一個線程內完成,無需額外的鎖機制。

work類

work類用於通知io_service是否可以結束,只要對象work(io_service)存在,io_service就不會結束。所以work類用起來更像是一個標識,比如:

boost::asio::io_serviceio_service;

boost::asio::io_service::work*work = new boost::asio::io_service::work( io_service );

// deletework; // 如果不注釋掉這一句,則run loop不會退出;一般用shared_ptr維護work對象,使用work.reset()來結束其生命周期。

io_service.run()

buffer類

buffer類分mutable_buffer和const_buffer兩個類,buffer類特別簡單,僅有兩個成員變量:指向數據的指針 和 相應的數據長度。buffer類本身並不申請內存,只是提供了一個對現有內存的封裝。

需要注意的是,所有async_write()、async_read()之類函數接受的buffer類型是MutableBufferSequence / ConstBufferSequence,這意味著它們既可以接受boost::asio::buffer,也可以接受std::vector 這樣的類型。

緩沖區管理

緩沖區的生命期是使用asio最需要重視的兩件事之一,緩沖區之所以需要重視的原因在於Asio異步調用Reference裡的這段描述:

Althoughthe buffers object may be copied as necessary, ownership of the underlyingmemory blocks is retained by the caller, which must guarantee that they remainvalid until the handler is called.

這意味著緩沖區從發起異步調用到handler被執行,這段時間內需要交由io_service控制,這個限制常常導致asio的某些代碼變得可能比Reactor相應代碼還要麻煩一些。

還是舉上面聊天室的那個例子。chat_client.cpp的do_write()函數收到用戶輸入數據後,之所以把該數據保存到std::deque write_msgs_ 隊列,而不是存到類似chardata[]的數組裡,然後去調用async_write(..data..)發送數據,是為了避免這種情況:輸入數據速度過快,當上一次async_write()調用的handler還沒有來得及處理,又收到一份新的數據,如果直接保存到data,會導致覆蓋上一次async_write()的緩沖區。async_write()要求這個緩沖區從調用async_write()開始,直到handler處理這個時間段是不變的。

同樣的,在do_write()函數裡調用async_write()函數之前,先判斷write_msgs_隊列是否為空,也是為了保證async_write()總是從write_msgs_隊列頭取得有效的數據,而在handle_write()裡當數據發送完畢後,再pop_front()彈出已經發送的數據包。以此避免出現前一個async_write()的handler還沒執行完畢,就把隊列頭彈出去,導致對應的緩沖區失效問題。

這裡主要還是因為async_write()和async_read()的區別,前者是主動發起的,後者可以由io_service控制,所以後者不用擔心這種緩沖區被覆蓋問題。因為在同一個線程裡,哪怕需要讀取的事件觸發得再快,也需要由io_service逐一處理。

在這個聊天室的例子裡,如果不考慮把數據按用戶輸入順序發送出去的話,可以使用更簡單的辦法來處理do_write()函數,例如:

 

:::c++

voiddo_write(chat_message msg)

{

chat_message* pmsg = new chat_message(msg);// implement copy ctor for chat_message firstly

boost::asio::async_write(socket_,

boost::asio::buffer(pmsg->data(), pmsg->length()),

boost::bind(&chat_client::handle_write,this,

boost::asio::placeholders::error, pmsg));

}

voidhandle_write(const boost::system::error_code& error, chat_message* pmsg)

{

if (!error) {

 

}else{

do_close();

}

delete pmsg;

}

這裡相當於給每個異步調用分配一塊屬於自己的內存,異步調用完成即自動釋放掉,有些類似於閉包了。如果不希望頻繁new/delete內存,也可以考慮使用boost::circular_buffer一次性分配內存後逐項使用。

I/O對象

socket

Boost.Asio最常用的對象應該就是socket了,常用的函數一般有這幾個:

讀寫TCP socket的時候,一般使用read(),async_read(), write(), async_write(),為了避免所謂的short readsand writes,一般不使用receive(), async_receive(), send(), async_send()。

讀寫有連接的UDP socket的時候,一般使用receive(),async_receive(), send(), async_send()。

讀寫無連接的UDP socket的時候,一般使用receive_from(),async_receive_from(), send_to(), async_send_to()。

而自由函數boost::asio::async_write()和類成員函數socket.async_write_some()的有什麼區別呢(boost::asio::async_read()和socket.async_read_some()類似):

boost::asio::async_write()異步寫,立即返回。但它可以保證寫完整個緩沖區的內容,否則將報錯。boost::asio::async_write() 是通過調用n次socket.async_write_some()來實現的,所以代碼必須確保在boost::asio::async_write()執行的時候,沒有其他的寫操作在同一socket上執行。在調用boost::asio::async_write()的時候,如果指定buffer的length沒有寫完或出錯,是不會回調相應的handler的,它將一直在run loop中執行;直到buffer裡所有的數據都寫完或出錯(此時handler裡返回的長度肯定會小於buffer length),才會調用handler繼續處理;而socket.async_write_some()不會有這樣的問題,它只會嘗試寫一次,寫完的長度會在handler的參數裡返回。

所以,這裡強調使用asio時第二件需要重視的事情,就是handler的返回值(一般可能聲明為boost::asio::placeholders::error)。因為asio裡所有的任務都由io_service異步執行,只有執行成功或者失敗之後才會回調handler,所以返回值是你了解當前異步操作狀況的唯一辦法,記住不要忽略任何handler的返回值處理。

信號處理

Boost.Asio的信號處理非常簡單,聲明一個信號集合,然後把相應的異步handler綁上就可以了。如果你希望在一個信號集裡處理所有的信號,那麼你可以根據handler的第二個參數,來獲取當前觸發的是那個信號。比如:

boost::asio::signal_set signals(io_service,SIGINT, SIGTERM);

signals.add(SIGUSR1); // 也可以直接用add函數添加信號

 

signals.async_wait(boost::bind(handler, _1,_2));

 

void handler(

constboost::system::error_code& error,

intsignal_number // 通過這個參數獲取當前觸發的信號值

);

定時器

Boost.Asio的定時器用起來根信號集一樣簡單,但由於它太過簡單,也有不方便的地方。比如,在一個UDP伺服器裡,一般收到的每個UDP包中都會包含一個sequence number,用於標識該UDP,以應對包處理超時情況。假設每個UDP包處理時間只有100ms,如果超時則直接給客戶端返回超時標記。這種最簡單的定時器常用的一些Reactor框架都有很完美的解決方案,一般是建一個定時器鏈表來實現,但是Asio中的定時器沒法單獨完成這個工作。

boost::asio::deadline_timer只有兩種狀態:超時和未超時。所以,只能很土的對每個UDP包創建一個定時器,然後借助std::map和boost::shared_ptr保存sequence number到定時器的映射,根據定時器handler的返回值判斷該定時器是超時,還是被主動cancel。

strand

在多線程中,多個I/O對象的handler要訪問同一塊臨界區,此時可以使用strand來保證這些handler之間的同步。

示例:

 

我們向定時器注冊 func1 和 func2,它們可能會同時訪問全局的對象(比如 std::cout )。這時我們希望對 func1 和 func2 的調用是同步的,即執行其中一個的時候,另一個要等待。

 

這時就可以用到boost::asio::strand 類,它可以把幾個cmd包裝成同步執行的。例如,我們向定時器注冊 func1 和 func2 時,可以改為:

 

boost::asio::strand the_strand;

t1.async_wait(the_strand.wrap(func1)); //包裝為同步執行的

t2.async_wait(the_strand.wrap(func2));

這樣就保證了在任何時刻,func1 和 func2 都不會同時在執行。

還有就是如果你希望把一個io_service對象綁定到多個線程。此時需要boost::asio::strand來確保handler不會被同時執行,因為異步操作,比如async_write、async_receive_from之類會影響到臨界區buffer。

具體可參考asio examples裡的示例:HTTPServer 2和HTTP Server 3的connection.hpp設計。

 
  1. 上一頁:
  2. 下一頁:
Copyright © 程式師世界 All Rights Reserved