條件變量是線程之前同步的另一種機制。條件變量給多線程提供了一種會和的場所。當條件變量和互斥鎖一起使用時,允許線程以無競爭的方式等待特定的條件發生。這樣大大減少了鎖競爭引起的線程調度和線程等待。
消息隊列是服務器端開發過程中繞不開的一道坎,前面,我已經實現了一個基於互斥鎖和三隊列的消息隊列,性能很不錯。博客園中的其他園主也實現了很多基於環形隊列和lock-free的消息隊列,很不錯,今天我們將要實現一個基於雙緩沖、互斥鎖和條件變量的消息隊列;這個大概也參考了一下java的blockingqueue,在前面一個博客中有簡單介紹!!基於三緩沖的隊列,雖然最大限度上解除了線程競爭,但是在玩家很少,消息很小的時候,需要添加一些buff去填充數據,這大概也是其一個缺陷吧!
消息隊列在服務器開發過程中主要用於什麼對象呢?
1: 我想大概就是通信層和邏輯層之間的交互,通信層接受到的網絡數據,驗證封包之後,通過消息隊列傳遞給邏輯層,邏輯層將處理結果封包再傳遞給通信層!
2:邏輯線程和數據庫IO線程的分離;數據庫IO線程負責對數據庫的讀寫更新,邏輯層對數據庫的操作,封裝成消息去請求數據庫IO線程,數據庫IO線程處理完之後,再交回給邏輯層。
3:日志;處理模式與方式2 類似。不過日志大概是不需要返回的!
給出源代碼:
BlockingQueue.h文件
/*
* BlockingQueue.h
*
* Created on: Apr 19, 2013
* Author: archy_yu
*/
#ifndef BLOCKINGQUEUE_H_
#define BLOCKINGQUEUE_H_
#include <queue>
#include <pthread.h>
typedef
void
* CommonItem;
class
BlockingQueue
{
public
:
BlockingQueue();
virtual
~BlockingQueue();
int
peek(CommonItem &item);
int
append(CommonItem item);
private
:
pthread_mutex_t _mutex;
pthread_cond_t _cond;
std::queue<CommonItem> _read_queue;
std::queue<CommonItem> _write_queue;
};
#endif /* BLOCKINGQUEUE_H_ */
BlockingQueue.cpp 文件代碼
/*
* BlockingQueue.cpp
*
* Created on: Apr 19, 2013
* Author: archy_yu
*/
#include "BlockingQueue.h"
BlockingQueue::BlockingQueue()
{
pthread_mutex_init(&
this
->_mutex,NULL);
pthread_cond_init(&
this
->_cond,NULL);
}
BlockingQueue::~BlockingQueue()
{
pthread_mutex_destroy(&
this
->_mutex);
pthread_cond_destroy(&
this
->_cond);
}
int
BlockingQueue::peek(CommonItem &item)
{
if
( !
this
->_read_queue.empty() )
{
item =
this
->_read_queue.front();
this
->_read_queue.pop();
}
else
{
pthread_mutex_lock(&
this
->_mutex);
while
(
this
->_write_queue.empty())
{
pthread_cond_wait(&
this
->_cond,&
this
->_mutex);
}
while
(!
this
->_write_queue.empty())
{
this
->_read_queue.push(
this
->_write_queue.front());
this
->_write_queue.pop();
}
pthread_mutex_unlock(&
this
->_mutex);
}
return
0;
}
int
BlockingQueue::append(CommonItem item)
{
pthread_mutex_lock(&
this
->_mutex);
this
->_write_queue.push(item);
pthread_cond_signal(&
this
->_cond);
pthread_mutex_unlock(&
this
->_mutex);
return
0;
}
測試代碼:
BlockingQueue _queue;
void
* process(
void
* arg)
{
int
i=0;
while
(
true
)
{
int
*j =
new
int
();
*j = i;
_queue.append((
void
*)j);
i ++;
}
return
NULL;
}
int
main(
int
argc,
char
** argv)
{
pthread_t pid;
pthread_create(&pid,0,process,0);
long
long
int
start = get_os_system_time();
int
i = 0;
while
(
true
)
{
int
* j = NULL;
_queue.peek((
void
* &)j);
i ++;
if
(j != NULL && (*j) == 100000)
{
long
long
int
end = get_os_system_time();
printf
(
"consume %d\n"
,end - start);
break
;
}
}
return
0;
}
歡迎拍磚!!!