[cpp]
<SPAN style="FONT-FAMILY: 'Microsoft YaHei'; FONT-SIZE: 18px">一、何謂無鎖隊列</SPAN>
一、何謂無鎖隊列無鎖隊列,顧名思義,即不需要加鎖的隊列;之所以不需要額外加鎖,是因為其本身已經是線程安全的。
二、為什麼要在隊列中集成線程安全的機制?
這裡我想采用對比的方式來講述。有鎖隊列,這可能是最簡單的一種隊列了,比如我們在多線程情況下使用標准STD的deque,那麼毫無疑問需要對其加鎖。加鎖其實是將協調過程交給了操作系統來管理,但無鎖隊列卻是在CPU層面就做到了協調,所以在效率上會高很多。更詳細的解釋請參見http://www.searchtb.com/2012/10/introduction_to_disruptor.html
三、如何實現?
這裡主要是采用ACS。
1. 定義隊列。這裡由於測試的緣故,隊列節點內的數據比較簡單。
[cpp]
/* ACS node define. */
typedef struct acs_node_t {
std::string id;
int index;
struct acs_node_t* next;
} acs_node_t;
/* ACS deque define. */
typedef struct acs_deque_t {
struct acs_node_t head;
struct acs_node_t* tail;
} acs_deque_t;
/* ACS node define. */
typedef struct acs_node_t {
std::string id;
int index;
struct acs_node_t* next;
} acs_node_t;
/* ACS deque define. */
typedef struct acs_deque_t {
struct acs_node_t head;
struct acs_node_t* tail;
} acs_deque_t;
2. 定義接口。這裡定義了隊列初始化,入隊列以及出隊列三個接口。
[cpp]
void acs_deque_init(acs_deque_t* deq);
int acs_deque_empty(acs_deque_t* deq);
void acs_deque_push(acs_deque_t* deq, acs_node_t* node);
acs_node_t* acs_deque_pop(acs_deque_t* deq);
void acs_deque_init(acs_deque_t* deq);
int acs_deque_empty(acs_deque_t* deq);
void acs_deque_push(acs_deque_t* deq, acs_node_t* node);
acs_node_t* acs_deque_pop(acs_deque_t* deq);
3.下面是接口的實現。
[html]
void acs_deque_init(acs_deque_t* deq)
{
if (deq) {
deq->tail = &deq->head;
deq->head.next = NULL;
}
}
int acs_deque_empty(acs_deque_t* deq)
{
if (!deq)
return 1;
return deq->head.next == NULL;
}
void acs_deque_push(acs_deque_t* deq, acs_node_t* node)
{
acs_node_t* q = NULL;
do {
q = deq->tail;
} while (InterlockedCompareExchangePointer((PVOID*)&q->next, 0, node) != q->next);
InterlockedCompareExchangePointer((PVOID*)&deq->tail, q, node);
}
acs_node_t* acs_deque_pop(acs_deque_t* deq)
{
acs_node_t* q = NULL;
do {
q = deq->head.next;
if (q == NULL)
return NULL;
} while (InterlockedCompareExchangePointer((PVOID*)&deq->head.next, q, q->next) != deq->head.next);
return q;
}
void acs_deque_init(acs_deque_t* deq)
{
if (deq) {
deq->tail = &deq->head;
deq->head.next = NULL;
}
}
int acs_deque_empty(acs_deque_t* deq)
{
if (!deq)
return 1;
return deq->head.next == NULL;
}
void acs_deque_push(acs_deque_t* deq, acs_node_t* node)
{
acs_node_t* q = NULL;
do {
q = deq->tail;
} while (InterlockedCompareExchangePointer((PVOID*)&q->next, 0, node) != q->next);
InterlockedCompareExchangePointer((PVOID*)&deq->tail, q, node);
}
acs_node_t* acs_deque_pop(acs_deque_t* deq)
{
acs_node_t* q = NULL;
do {
q = deq->head.next;
if (q == NULL)
return NULL;
} while (InterlockedCompareExchangePointer((PVOID*)&deq->head.next, q, q->next) != deq->head.next);
return q;
}接口采用了Windows的ACS函數,當然你也可以將其更改為linux版本的ACS函數。
4. 其他代碼為測試代碼,全部代碼為
[cpp]
#include <stdio.h>
#include <string>
#define WIN32_LEAN_AND_MEAN
#include <Windows.h>
#include <deque>
static const int knThreadCount = 4;
static const int knMaxNodeCount = 50000;
static const int knPopedCount = 5000;
/* ACS node define. */
typedef struct acs_node_t {
std::string id;
int index;
struct acs_node_t* next;
} acs_node_t;
/* ACS deque define. */
typedef struct acs_deque_t {
struct acs_node_t head;
struct acs_node_t* tail;
} acs_deque_t;
typedef struct DequeData {
std::string id;
int index;
} DequeData;
typedef std::deque<DequeData*> StdDeque;
CRITICAL_SECTION g_stdcs;
int g_aes_cost_time = 0;
int g_std_cost_time = 0;
class HRTimer {
public:
HRTimer();
~HRTimer() {}
double GetFrequency(void);
void StartTimer(void);
double StopTimer(void);
private:
LARGE_INTEGER start_;
LARGE_INTEGER stop_;
double frequency_;
};
HRTimer::HRTimer()
: start_(),
stop_(),
frequency_(0.f)
{
frequency_ = this->GetFrequency();
}
double HRTimer::GetFrequency(void)
{
LARGE_INTEGER proc_freq;
if (!::QueryPerformanceFrequency(&proc_freq))
return 0.f;
return proc_freq.QuadPart;
}
void HRTimer::StartTimer(void)
{
HANDLE curth = ::GetCurrentThread();
DWORD_PTR oldmask = ::SetThreadAffinityMask(curth, 0);
::QueryPerformanceCounter(&start_);
::SetThreadAffinityMask(curth, oldmask);
}
double HRTimer::StopTimer(void)
{
HANDLE curth = ::GetCurrentThread();
DWORD_PTR oldmask = ::SetThreadAffinityMask(curth, 0);
::QueryPerformanceCounter(&stop_);
::SetThreadAffinityMask(curth, oldmask);
return ((stop_.QuadPart - start_.QuadPart) / frequency_) * 1000;
}
class AutoHRTimer {
public:
AutoHRTimer(HRTimer& hrt, const char* name);
~AutoHRTimer();
private:
HRTimer& hrt_;
const char* name_;
};
AutoHRTimer::AutoHRTimer(HRTimer& hrt, const char* name)
: hrt_(hrt),
name_(name)
{
hrt_.StartTimer();
}
AutoHRTimer::~AutoHRTimer()
{
double diff = hrt_.StopTimer();
fprintf(stdout, "%s cost time %f ms\n", name_, diff);
}
HRTimer g_hrtimer;
void acs_deque_init(acs_deque_t* deq);
int acs_deque_empty(acs_deque_t* deq);
void acs_deque_push(acs_deque_t* deq, acs_node_t* node);
acs_node_t* acs_deque_pop(acs_deque_t* deq);
void acs_deque_init(acs_deque_t* deq)
{
if (deq) {
deq->tail = &deq->head;
deq->head.next = NULL;
}
}
int acs_deque_empty(acs_deque_t* deq)
{
if (!deq)
return 1;
return deq->head.next == NULL;
}
void acs_deque_push(acs_deque_t* deq, acs_node_t* node)
{
acs_node_t* q = NULL;
do {
q = deq->tail;
} while (InterlockedCompareExchangePointer((PVOID*)&q->next, 0, node) != q->next);
InterlockedCompareExchangePointer((PVOID*)&deq->tail, q, node);
}
acs_node_t* acs_deque_pop(acs_deque_t* deq)
{
acs_node_t* q = NULL;
do {
q = deq->head.next;
if (q == NULL)
return NULL;
} while (InterlockedCompareExchangePointer((PVOID*)&deq->head.next, q, q->next) != deq->head.next);
return q;
}
static DWORD AesThreadFunc(void* arg)
{
acs_deque_t* ad = (acs_deque_t*)arg;
for (int i = 0; i < knMaxNodeCount; ++i) {
acs_node_t* an = new acs_node_t;
an->id = "randid_";
an->id.push_back((i % 10) + '0');
an->index = i;
an->next = NULL;
acs_deque_push(ad, an);
} // for
return 0;
}
static void TestAcsDeque()
{
acs_deque_t ad;
acs_node_t* poped_node = NULL;
HANDLE th[knThreadCount];
{
AutoHRTimer ahr(g_hrtimer, "ACS push 50000");
acs_deque_init(&ad);
for (int i = 0; i < knThreadCount; ++i) {
th[i] = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)AesThreadFunc, &ad, 0, NULL);
} // for
::WaitForMultipleObjects(knThreadCount, th, TRUE, INFINITE);
}
{
// Test pop.
AutoHRTimer ahr(g_hrtimer, "ACS pop 5000");
for (int i = 0; i < knPopedCount; ++i) {
poped_node = acs_deque_pop(&ad);
delete poped_node;
}
}
{
AutoHRTimer ahr(g_hrtimer, "ACS free 45000");
acs_node_t* cur = ad.head.next;
while (cur != NULL) {
acs_node_t* tmp = cur;
cur = cur->next;
delete tmp;
}
}
}
static DWORD StdThreadFunc(void* arg)
{
StdDeque* deq_list = (StdDeque*)arg;
for (int i = 0; i < knMaxNodeCount; ++i) {
DequeData* dd = new DequeData;
dd->id = "randid_";
dd->id.push_back((i % 10) + '0');
dd->index = i;
EnterCriticalSection(&g_stdcs);
deq_list->push_back(dd);
LeaveCriticalSection(&g_stdcs);
} // for
return 0;
}
static void TestLockedDeque()
{
StdDeque deq_list;
DequeData* poped_dd = NULL;
HANDLE th[knThreadCount];
InitializeCriticalSectionAndSpinCount(&g_stdcs, 2000);
{
AutoHRTimer ahr(g_hrtimer, "STD push 50000");
for (int i = 0; i < knThreadCount; ++i) {
th[i] = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)StdThreadFunc, &deq_list, 0, NULL);
} // for
::WaitForMultipleObjects(knThreadCount, th, TRUE, INFINITE);
}
{
AutoHRTimer ahr(g_hrtimer, "STD pop 5000");
for (int i = 0; i < knPopedCount; ++i) {
poped_dd = deq_list.front();
deq_list.pop_front();
delete poped_dd;
}
}
{
AutoHRTimer ahr(g_hrtimer, "STD free 45000");
StdDeque::iterator iter = deq_list.begin();
while (iter != deq_list.end()) {
DequeData* dd = *iter;
delete dd;
++iter;
}
deq_list.clear();
}
DeleteCriticalSection(&g_stdcs);
}
int main()
{
while (1) {
TestAcsDeque();
TestLockedDeque();
Sleep(3000);
fprintf(stdout, "--------------------------------------\n");
}
getchar();
return 0;
}
#include <stdio.h>
#include <string>
#define WIN32_LEAN_AND_MEAN
#include <Windows.h>
#include <deque>
static const int knThreadCount = 4;
static const int knMaxNodeCount = 50000;
static const int knPopedCount = 5000;
/* ACS node define. */
typedef struct acs_node_t {
std::string id;
int index;
struct acs_node_t* next;
} acs_node_t;
/* ACS deque define. */
typedef struct acs_deque_t {
struct acs_node_t head;
struct acs_node_t* tail;
} acs_deque_t;
typedef struct DequeData {
std::string id;
int index;
} DequeData;
typedef std::deque<DequeData*> StdDeque;
CRITICAL_SECTION g_stdcs;
int g_aes_cost_time = 0;
int g_std_cost_time = 0;
class HRTimer {
public:
HRTimer();
~HRTimer() {}
double GetFrequency(void);
void StartTimer(void);
double StopTimer(void);
private:
LARGE_INTEGER start_;
LARGE_INTEGER stop_;
double frequency_;
};
HRTimer::HRTimer()
: start_(),
stop_(),
frequency_(0.f)
{
frequency_ = this->GetFrequency();
}
double HRTimer::GetFrequency(void)
{
LARGE_INTEGER proc_freq;
if (!::QueryPerformanceFrequency(&proc_freq))
return 0.f;
return proc_freq.QuadPart;
}
void HRTimer::StartTimer(void)
{
HANDLE curth = ::GetCurrentThread();
DWORD_PTR oldmask = ::SetThreadAffinityMask(curth, 0);
::QueryPerformanceCounter(&start_);
::SetThreadAffinityMask(curth, oldmask);
}
double HRTimer::StopTimer(void)
{
HANDLE curth = ::GetCurrentThread();
DWORD_PTR oldmask = ::SetThreadAffinityMask(curth, 0);
::QueryPerformanceCounter(&stop_);
::SetThreadAffinityMask(curth, oldmask);
return ((stop_.QuadPart - start_.QuadPart) / frequency_) * 1000;
}
class AutoHRTimer {
public:
AutoHRTimer(HRTimer& hrt, const char* name);
~AutoHRTimer();
private:
HRTimer& hrt_;
const char* name_;
};
AutoHRTimer::AutoHRTimer(HRTimer& hrt, const char* name)
: hrt_(hrt),
name_(name)
{
hrt_.StartTimer();
}
AutoHRTimer::~AutoHRTimer()
{
double diff = hrt_.StopTimer();
fprintf(stdout, "%s cost time %f ms\n", name_, diff);
}
HRTimer g_hrtimer;
void acs_deque_init(acs_deque_t* deq);
int acs_deque_empty(acs_deque_t* deq);
void acs_deque_push(acs_deque_t* deq, acs_node_t* node);
acs_node_t* acs_deque_pop(acs_deque_t* deq);
void acs_deque_init(acs_deque_t* deq)
{
if (deq) {
deq->tail = &deq->head;
deq->head.next = NULL;
}
}
int acs_deque_empty(acs_deque_t* deq)
{
if (!deq)
return 1;
return deq->head.next == NULL;
}
void acs_deque_push(acs_deque_t* deq, acs_node_t* node)
{
acs_node_t* q = NULL;
do {
q = deq->tail;
} while (InterlockedCompareExchangePointer((PVOID*)&q->next, 0, node) != q->next);
InterlockedCompareExchangePointer((PVOID*)&deq->tail, q, node);
}
acs_node_t* acs_deque_pop(acs_deque_t* deq)
{
acs_node_t* q = NULL;
do {
q = deq->head.next;
if (q == NULL)
return NULL;
} while (InterlockedCompareExchangePointer((PVOID*)&deq->head.next, q, q->next) != deq->head.next);
return q;
}
static DWORD AesThreadFunc(void* arg)
{
acs_deque_t* ad = (acs_deque_t*)arg;
for (int i = 0; i < knMaxNodeCount; ++i) {
acs_node_t* an = new acs_node_t;
an->id = "randid_";
an->id.push_back((i % 10) + '0');
an->index = i;
an->next = NULL;
acs_deque_push(ad, an);
} // for
return 0;
}
static void TestAcsDeque()
{
acs_deque_t ad;
acs_node_t* poped_node = NULL;
HANDLE th[knThreadCount];
{
AutoHRTimer ahr(g_hrtimer, "ACS push 50000");
acs_deque_init(&ad);
for (int i = 0; i < knThreadCount; ++i) {
th[i] = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)AesThreadFunc, &ad, 0, NULL);
} // for
::WaitForMultipleObjects(knThreadCount, th, TRUE, INFINITE);
}
{
// Test pop.
AutoHRTimer ahr(g_hrtimer, "ACS pop 5000");
for (int i = 0; i < knPopedCount; ++i) {
poped_node = acs_deque_pop(&ad);
delete poped_node;
}
}
{
AutoHRTimer ahr(g_hrtimer, "ACS free 45000");
acs_node_t* cur = ad.head.next;
while (cur != NULL) {
acs_node_t* tmp = cur;
cur = cur->next;
delete tmp;
}
}
}
static DWORD StdThreadFunc(void* arg)
{
StdDeque* deq_list = (StdDeque*)arg;
for (int i = 0; i < knMaxNodeCount; ++i) {
DequeData* dd = new DequeData;
dd->id = "randid_";
dd->id.push_back((i % 10) + '0');
dd->index = i;
EnterCriticalSection(&g_stdcs);
deq_list->push_back(dd);
LeaveCriticalSection(&g_stdcs);
} // for
return 0;
}
static void TestLockedDeque()
{
StdDeque deq_list;
DequeData* poped_dd = NULL;
HANDLE th[knThreadCount];
InitializeCriticalSectionAndSpinCount(&g_stdcs, 2000);
{
AutoHRTimer ahr(g_hrtimer, "STD push 50000");
for (int i = 0; i < knThreadCount; ++i) {
th[i] = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)StdThreadFunc, &deq_list, 0, NULL);
} // for
::WaitForMultipleObjects(knThreadCount, th, TRUE, INFINITE);
}
{
AutoHRTimer ahr(g_hrtimer, "STD pop 5000");
for (int i = 0; i < knPopedCount; ++i) {
poped_dd = deq_list.front();
deq_list.pop_front();
delete poped_dd;
}
}
{
AutoHRTimer ahr(g_hrtimer, "STD free 45000");
StdDeque::iterator iter = deq_list.begin();
while (iter != deq_list.end()) {
DequeData* dd = *iter;
delete dd;
++iter;
}
deq_list.clear();
}
DeleteCriticalSection(&g_stdcs);
}
int main()
{
while (1) {
TestAcsDeque();
TestLockedDeque();
Sleep(3000);
fprintf(stdout, "--------------------------------------\n");
}
getchar();
return 0;
}
5. 將無鎖隊列同std的有鎖隊列進行對比,效果如下圖