程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
 程式師世界 >> 編程語言 >> C語言 >> C++ >> C++入門知識 >> 基於條件變量的消息隊列,條件變量消息隊列

基於條件變量的消息隊列,條件變量消息隊列

編輯:C++入門知識

基於條件變量的消息隊列,條件變量消息隊列


條件變量是線程之前同步的另一種機制。條件變量給多線程提供了一種會和的場所。當條件變量和互斥鎖一起使用時,允許線程以無競爭的方式等待特定的條件發生。這樣大大減少了鎖競爭引起的線程調度和線程等待。

     消息隊列是服務器端開發過程中繞不開的一道坎,前面,我已經實現了一個基於互斥鎖和三隊列的消息隊列,性能很不錯。博客園中的其他園主也實現了很多基於環形隊列和lock-free的消息隊列,很不錯,今天我們將要實現一個基於雙緩沖、互斥鎖和條件變量的消息隊列;這個大概也參考了一下java的blockingqueue,在前面一個博客中有簡單介紹!!基於三緩沖的隊列,雖然最大限度上解除了線程競爭,但是在玩家很少,消息很小的時候,需要添加一些buff去填充數據,這大概也是其一個缺陷吧!

     消息隊列在服務器開發過程中主要用於什麼對象呢?

     1: 我想大概就是通信層和邏輯層之間的交互,通信層接受到的網絡數據,驗證封包之後,通過消息隊列傳遞給邏輯層,邏輯層將處理結果封包再傳遞給通信層!

     2:邏輯線程和數據庫IO線程的分離;數據庫IO線程負責對數據庫的讀寫更新,邏輯層對數據庫的操作,封裝成消息去請求數據庫IO線程,數據庫IO線程處理完之後,再交回給邏輯層。

     3:日志;處理模式與方式2 類似。不過日志大概是不需要返回的!

給出源代碼:

   BlockingQueue.h文件

/*  * BlockingQueue.h  *  *  Created on: Apr 19, 2013  *      Author: archy_yu  */   #ifndef BLOCKINGQUEUE_H_ #define BLOCKINGQUEUE_H_   #include <queue> #include <pthread.h>   typedef void* CommonItem;   class BlockingQueue { public:     BlockingQueue();       virtual ~BlockingQueue();       int peek(CommonItem &item);       int append(CommonItem item);   private:       pthread_mutex_t _mutex;       pthread_cond_t _cond;       std::queue<CommonItem> _read_queue;       std::queue<CommonItem> _write_queue;   };     #endif /* BLOCKINGQUEUE_H_ */

  BlockingQueue.cpp 文件代碼

/*  * BlockingQueue.cpp  *  *  Created on: Apr 19, 2013  *      Author: archy_yu  */   #include "BlockingQueue.h"   BlockingQueue::BlockingQueue() {     pthread_mutex_init(&this->_mutex,NULL);     pthread_cond_init(&this->_cond,NULL); }   BlockingQueue::~BlockingQueue() {     pthread_mutex_destroy(&this->_mutex);     pthread_cond_destroy(&this->_cond); }   int BlockingQueue::peek(CommonItem &item) {       if( !this->_read_queue.empty() )     {         item = this->_read_queue.front();         this->_read_queue.pop();     }     else     {         pthread_mutex_lock(&this->_mutex);           while(this->_write_queue.empty())         {             pthread_cond_wait(&this->_cond,&this->_mutex);         }           while(!this->_write_queue.empty())         {             this->_read_queue.push(this->_write_queue.front());             this->_write_queue.pop();         }           pthread_mutex_unlock(&this->_mutex);     }         return 0; }   int BlockingQueue::append(CommonItem item) {     pthread_mutex_lock(&this->_mutex);     this->_write_queue.push(item);     pthread_cond_signal(&this->_cond);     pthread_mutex_unlock(&this->_mutex);     return 0; }

  測試代碼:

BlockingQueue _queue;   void* process(void* arg) {       int i=0;     while(true)     {         int *j = new int();         *j = i;         _queue.append((void *)j);         i ++;     }     return NULL; }   int main(int argc,char** argv) {     pthread_t pid;     pthread_create(&pid,0,process,0);       long long int start = get_os_system_time();     int i = 0;     while(true)     {         int* j = NULL;         _queue.peek((void* &)j);           i ++;           if(j != NULL && (*j) == 100000)         {             long long int end = get_os_system_time();             printf("consume %d\n",end - start);             break;         }     }       return 0; }

歡迎拍磚!!!

  1. 上一頁:
  2. 下一頁:
Copyright © 程式師世界 All Rights Reserved