程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
 程式師世界 >> 編程語言 >> C語言 >> C++ >> C++入門知識 >> 無鎖棧實現一例

無鎖棧實現一例

編輯:C++入門知識

[cpp] 
<SPAN style="FONT-FAMILY: 'Microsoft YaHei'; FONT-SIZE: 18px">一、何謂無鎖隊列</SPAN> 

一、何謂無鎖隊列無鎖隊列,顧名思義,即不需要加鎖的隊列;之所以不需要額外加鎖,是因為其本身已經是線程安全的。

二、為什麼要在隊列中集成線程安全的機制?

這裡我想采用對比的方式來講述。有鎖隊列,這可能是最簡單的一種隊列了,比如我們在多線程情況下使用標准STD的deque,那麼毫無疑問需要對其加鎖。加鎖其實是將協調過程交給了操作系統來管理,但無鎖隊列卻是在CPU層面就做到了協調,所以在效率上會高很多。更詳細的解釋請參見http://www.searchtb.com/2012/10/introduction_to_disruptor.html

三、如何實現?

這裡主要是采用ACS。

1. 定義隊列。這裡由於測試的緣故,隊列節點內的數據比較簡單。


[cpp] 
/* ACS node define. */ 
typedef struct acs_node_t { 
    std::string id; 
    int index; 
    struct acs_node_t* next; 
} acs_node_t; 
 
/* ACS deque define. */ 
typedef struct acs_deque_t { 
    struct acs_node_t head; 
    struct acs_node_t* tail; 
} acs_deque_t; 

/* ACS node define. */
typedef struct acs_node_t {
    std::string id;
    int index;
    struct acs_node_t* next;
} acs_node_t;

/* ACS deque define. */
typedef struct acs_deque_t {
    struct acs_node_t head;
    struct acs_node_t* tail;
} acs_deque_t;

 

2. 定義接口。這裡定義了隊列初始化,入隊列以及出隊列三個接口。

[cpp] 
void acs_deque_init(acs_deque_t* deq); 
int acs_deque_empty(acs_deque_t* deq); 
void acs_deque_push(acs_deque_t* deq, acs_node_t* node); 
acs_node_t* acs_deque_pop(acs_deque_t* deq); 

void acs_deque_init(acs_deque_t* deq);
int acs_deque_empty(acs_deque_t* deq);
void acs_deque_push(acs_deque_t* deq, acs_node_t* node);
acs_node_t* acs_deque_pop(acs_deque_t* deq);

 

3.下面是接口的實現。

[html]
void acs_deque_init(acs_deque_t* deq) 

    if (deq) { 
        deq->tail = &deq->head; 
        deq->head.next = NULL; 
    } 

 
int acs_deque_empty(acs_deque_t* deq) 

    if (!deq) 
        return 1; 
    return deq->head.next == NULL; 

 
void acs_deque_push(acs_deque_t* deq, acs_node_t* node) 

    acs_node_t* q = NULL; 
     
    do { 
        q = deq->tail; 
    } while (InterlockedCompareExchangePointer((PVOID*)&q->next, 0, node) != q->next); 
 
    InterlockedCompareExchangePointer((PVOID*)&deq->tail, q, node); 

 
acs_node_t* acs_deque_pop(acs_deque_t* deq) 

    acs_node_t* q = NULL; 
     
    do { 
        q = deq->head.next; 
        if (q == NULL) 
            return NULL; 
    } while (InterlockedCompareExchangePointer((PVOID*)&deq->head.next, q, q->next) != deq->head.next); 
     
    return q; 

void acs_deque_init(acs_deque_t* deq)
{
    if (deq) {
        deq->tail = &deq->head;
        deq->head.next = NULL;
    }
}

int acs_deque_empty(acs_deque_t* deq)
{
    if (!deq)
        return 1;
    return deq->head.next == NULL;
}

void acs_deque_push(acs_deque_t* deq, acs_node_t* node)
{
    acs_node_t* q = NULL;
   
    do {
        q = deq->tail;
    } while (InterlockedCompareExchangePointer((PVOID*)&q->next, 0, node) != q->next);

    InterlockedCompareExchangePointer((PVOID*)&deq->tail, q, node);
}

acs_node_t* acs_deque_pop(acs_deque_t* deq)
{
    acs_node_t* q = NULL;
   
    do {
        q = deq->head.next;
        if (q == NULL)
            return NULL;
    } while (InterlockedCompareExchangePointer((PVOID*)&deq->head.next, q, q->next) != deq->head.next);
   
    return q;
}接口采用了Windows的ACS函數,當然你也可以將其更改為linux版本的ACS函數。

4. 其他代碼為測試代碼,全部代碼為


[cpp] 
#include <stdio.h>  
 
#include <string>  
 
#define WIN32_LEAN_AND_MEAN  
#include <Windows.h>  
 
#include <deque>  
 
static const int knThreadCount = 4; 
static const int knMaxNodeCount = 50000; 
static const int knPopedCount = 5000; 
 
/* ACS node define. */ 
typedef struct acs_node_t { 
    std::string id; 
    int index; 
    struct acs_node_t* next; 
} acs_node_t; 
 
/* ACS deque define. */ 
typedef struct acs_deque_t { 
    struct acs_node_t head; 
    struct acs_node_t* tail; 
} acs_deque_t; 
 
typedef struct DequeData { 
    std::string id; 
    int index; 
} DequeData; 
 
typedef std::deque<DequeData*> StdDeque; 
 
CRITICAL_SECTION g_stdcs; 
 
int g_aes_cost_time = 0; 
int g_std_cost_time = 0; 
 
class HRTimer { 
public: 
    HRTimer(); 
    ~HRTimer() {} 
 
    double GetFrequency(void); 
    void StartTimer(void); 
    double StopTimer(void); 
 
private: 
    LARGE_INTEGER start_; 
    LARGE_INTEGER stop_; 
    double frequency_; 
}; 
 
HRTimer::HRTimer() 
    : start_(), 
    stop_(), 
    frequency_(0.f) 

    frequency_ = this->GetFrequency(); 

 
double HRTimer::GetFrequency(void) 

    LARGE_INTEGER proc_freq; 
    if (!::QueryPerformanceFrequency(&proc_freq)) 
        return 0.f; 
    return proc_freq.QuadPart; 

 
void HRTimer::StartTimer(void) 

    HANDLE curth = ::GetCurrentThread(); 
    DWORD_PTR oldmask = ::SetThreadAffinityMask(curth, 0); 
    ::QueryPerformanceCounter(&start_); 
    ::SetThreadAffinityMask(curth, oldmask); 

 
double HRTimer::StopTimer(void) 

    HANDLE curth = ::GetCurrentThread(); 
    DWORD_PTR oldmask = ::SetThreadAffinityMask(curth, 0); 
    ::QueryPerformanceCounter(&stop_); 
    ::SetThreadAffinityMask(curth, oldmask); 
    return ((stop_.QuadPart - start_.QuadPart) / frequency_) * 1000; 

 
class AutoHRTimer { 
public: 
    AutoHRTimer(HRTimer& hrt, const char* name); 
    ~AutoHRTimer(); 
 
private: 
    HRTimer& hrt_; 
    const char* name_; 
}; 
 
AutoHRTimer::AutoHRTimer(HRTimer& hrt, const char* name) 
    : hrt_(hrt), 
    name_(name) 

    hrt_.StartTimer(); 

 
AutoHRTimer::~AutoHRTimer() 

    double diff = hrt_.StopTimer(); 
    fprintf(stdout, "%s cost time %f ms\n", name_, diff); 

 
HRTimer g_hrtimer; 
 
void acs_deque_init(acs_deque_t* deq); 
int acs_deque_empty(acs_deque_t* deq); 
void acs_deque_push(acs_deque_t* deq, acs_node_t* node); 
acs_node_t* acs_deque_pop(acs_deque_t* deq); 
 
void acs_deque_init(acs_deque_t* deq) 

    if (deq) { 
        deq->tail = &deq->head; 
        deq->head.next = NULL; 
    } 

 
int acs_deque_empty(acs_deque_t* deq) 

    if (!deq) 
        return 1; 
    return deq->head.next == NULL; 

 
void acs_deque_push(acs_deque_t* deq, acs_node_t* node) 

    acs_node_t* q = NULL; 
     
    do { 
        q = deq->tail; 
    } while (InterlockedCompareExchangePointer((PVOID*)&q->next, 0, node) != q->next); 
 
    InterlockedCompareExchangePointer((PVOID*)&deq->tail, q, node); 

 
acs_node_t* acs_deque_pop(acs_deque_t* deq) 

    acs_node_t* q = NULL; 
     
    do { 
        q = deq->head.next; 
        if (q == NULL) 
            return NULL; 
    } while (InterlockedCompareExchangePointer((PVOID*)&deq->head.next, q, q->next) != deq->head.next); 
     
    return q; 

 
static DWORD AesThreadFunc(void* arg) 

    acs_deque_t* ad = (acs_deque_t*)arg; 
 
    for (int i = 0; i < knMaxNodeCount; ++i) { 
        acs_node_t* an = new acs_node_t; 
        an->id = "randid_"; 
        an->id.push_back((i % 10) + '0'); 
        an->index = i; 
        an->next = NULL; 
        acs_deque_push(ad, an); 
    }  // for  
 
    return 0; 

 
static void TestAcsDeque() 

    acs_deque_t ad; 
    acs_node_t* poped_node = NULL; 
    HANDLE th[knThreadCount]; 
 
    { 
        AutoHRTimer ahr(g_hrtimer, "ACS push 50000"); 
        acs_deque_init(&ad); 
        for (int i = 0; i < knThreadCount; ++i) { 
            th[i] = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)AesThreadFunc, &ad, 0, NULL); 
        }  // for  
        ::WaitForMultipleObjects(knThreadCount, th, TRUE, INFINITE); 
    } 
 
    { 
        // Test pop.  
        AutoHRTimer ahr(g_hrtimer, "ACS pop 5000"); 
        for (int i = 0; i < knPopedCount; ++i) { 
            poped_node = acs_deque_pop(&ad); 
            delete poped_node; 
        } 
    } 
 
    { 
        AutoHRTimer ahr(g_hrtimer, "ACS free 45000"); 
        acs_node_t* cur = ad.head.next; 
        while (cur != NULL) { 
            acs_node_t* tmp = cur; 
            cur = cur->next; 
            delete tmp; 
        } 
    } 

 
static DWORD StdThreadFunc(void* arg) 

    StdDeque* deq_list = (StdDeque*)arg; 
 
    for (int i = 0; i < knMaxNodeCount; ++i) { 
        DequeData* dd = new DequeData; 
        dd->id = "randid_"; 
        dd->id.push_back((i % 10) + '0'); 
        dd->index = i; 
        EnterCriticalSection(&g_stdcs); 
        deq_list->push_back(dd); 
        LeaveCriticalSection(&g_stdcs); 
    }  // for  
 
    return 0; 

 
static void TestLockedDeque() 

    StdDeque deq_list; 
    DequeData* poped_dd = NULL; 
    HANDLE th[knThreadCount]; 
    InitializeCriticalSectionAndSpinCount(&g_stdcs, 2000); 
 
    { 
        AutoHRTimer ahr(g_hrtimer, "STD push 50000"); 
        for (int i = 0; i < knThreadCount; ++i) { 
            th[i] = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)StdThreadFunc, &deq_list, 0, NULL); 
        }  // for  
        ::WaitForMultipleObjects(knThreadCount, th, TRUE, INFINITE); 
    } 
 
    { 
        AutoHRTimer ahr(g_hrtimer, "STD pop 5000"); 
        for (int i = 0; i < knPopedCount; ++i) { 
            poped_dd = deq_list.front(); 
            deq_list.pop_front(); 
            delete poped_dd; 
        } 
    } 
 
    { 
        AutoHRTimer ahr(g_hrtimer, "STD free 45000"); 
        StdDeque::iterator iter = deq_list.begin(); 
        while (iter != deq_list.end()) { 
            DequeData* dd = *iter; 
            delete dd; 
            ++iter; 
        } 
        deq_list.clear(); 
    } 
 
    DeleteCriticalSection(&g_stdcs); 

 
int main() 

    while (1) { 
        TestAcsDeque(); 
        TestLockedDeque(); 
        Sleep(3000); 
        fprintf(stdout, "--------------------------------------\n"); 
    } 
 
    getchar(); 
 
    return 0; 

#include <stdio.h>

#include <string>

#define WIN32_LEAN_AND_MEAN
#include <Windows.h>

#include <deque>

static const int knThreadCount = 4;
static const int knMaxNodeCount = 50000;
static const int knPopedCount = 5000;

/* ACS node define. */
typedef struct acs_node_t {
    std::string id;
    int index;
    struct acs_node_t* next;
} acs_node_t;

/* ACS deque define. */
typedef struct acs_deque_t {
    struct acs_node_t head;
    struct acs_node_t* tail;
} acs_deque_t;

typedef struct DequeData {
    std::string id;
    int index;
} DequeData;

typedef std::deque<DequeData*> StdDeque;

CRITICAL_SECTION g_stdcs;

int g_aes_cost_time = 0;
int g_std_cost_time = 0;

class HRTimer {
public:
    HRTimer();
    ~HRTimer() {}

    double GetFrequency(void);
    void StartTimer(void);
    double StopTimer(void);

private:
    LARGE_INTEGER start_;
    LARGE_INTEGER stop_;
    double frequency_;
};

HRTimer::HRTimer()
    : start_(),
    stop_(),
    frequency_(0.f)
{
    frequency_ = this->GetFrequency();
}

double HRTimer::GetFrequency(void)
{
    LARGE_INTEGER proc_freq;
    if (!::QueryPerformanceFrequency(&proc_freq))
        return 0.f;
    return proc_freq.QuadPart;
}

void HRTimer::StartTimer(void)
{
    HANDLE curth = ::GetCurrentThread();
    DWORD_PTR oldmask = ::SetThreadAffinityMask(curth, 0);
    ::QueryPerformanceCounter(&start_);
    ::SetThreadAffinityMask(curth, oldmask);
}

double HRTimer::StopTimer(void)
{
    HANDLE curth = ::GetCurrentThread();
    DWORD_PTR oldmask = ::SetThreadAffinityMask(curth, 0);
    ::QueryPerformanceCounter(&stop_);
    ::SetThreadAffinityMask(curth, oldmask);
    return ((stop_.QuadPart - start_.QuadPart) / frequency_) * 1000;
}

class AutoHRTimer {
public:
    AutoHRTimer(HRTimer& hrt, const char* name);
    ~AutoHRTimer();

private:
    HRTimer& hrt_;
    const char* name_;
};

AutoHRTimer::AutoHRTimer(HRTimer& hrt, const char* name)
    : hrt_(hrt),
    name_(name)
{
    hrt_.StartTimer();
}

AutoHRTimer::~AutoHRTimer()
{
    double diff = hrt_.StopTimer();
    fprintf(stdout, "%s cost time %f ms\n", name_, diff);
}

HRTimer g_hrtimer;

void acs_deque_init(acs_deque_t* deq);
int acs_deque_empty(acs_deque_t* deq);
void acs_deque_push(acs_deque_t* deq, acs_node_t* node);
acs_node_t* acs_deque_pop(acs_deque_t* deq);

void acs_deque_init(acs_deque_t* deq)
{
    if (deq) {
        deq->tail = &deq->head;
        deq->head.next = NULL;
    }
}

int acs_deque_empty(acs_deque_t* deq)
{
    if (!deq)
        return 1;
    return deq->head.next == NULL;
}

void acs_deque_push(acs_deque_t* deq, acs_node_t* node)
{
    acs_node_t* q = NULL;
   
    do {
        q = deq->tail;
    } while (InterlockedCompareExchangePointer((PVOID*)&q->next, 0, node) != q->next);

    InterlockedCompareExchangePointer((PVOID*)&deq->tail, q, node);
}

acs_node_t* acs_deque_pop(acs_deque_t* deq)
{
    acs_node_t* q = NULL;
   
    do {
        q = deq->head.next;
        if (q == NULL)
            return NULL;
    } while (InterlockedCompareExchangePointer((PVOID*)&deq->head.next, q, q->next) != deq->head.next);
   
    return q;
}

static DWORD AesThreadFunc(void* arg)
{
    acs_deque_t* ad = (acs_deque_t*)arg;

    for (int i = 0; i < knMaxNodeCount; ++i) {
        acs_node_t* an = new acs_node_t;
        an->id = "randid_";
        an->id.push_back((i % 10) + '0');
        an->index = i;
        an->next = NULL;
        acs_deque_push(ad, an);
    }  // for

    return 0;
}

static void TestAcsDeque()
{
    acs_deque_t ad;
    acs_node_t* poped_node = NULL;
    HANDLE th[knThreadCount];

    {
        AutoHRTimer ahr(g_hrtimer, "ACS push 50000");
        acs_deque_init(&ad);
        for (int i = 0; i < knThreadCount; ++i) {
            th[i] = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)AesThreadFunc, &ad, 0, NULL);
        }  // for
        ::WaitForMultipleObjects(knThreadCount, th, TRUE, INFINITE);
    }

    {
        // Test pop.
        AutoHRTimer ahr(g_hrtimer, "ACS pop 5000");
        for (int i = 0; i < knPopedCount; ++i) {
            poped_node = acs_deque_pop(&ad);
            delete poped_node;
        }
    }

    {
        AutoHRTimer ahr(g_hrtimer, "ACS free 45000");
        acs_node_t* cur = ad.head.next;
        while (cur != NULL) {
            acs_node_t* tmp = cur;
            cur = cur->next;
            delete tmp;
        }
    }
}

static DWORD StdThreadFunc(void* arg)
{
    StdDeque* deq_list = (StdDeque*)arg;

    for (int i = 0; i < knMaxNodeCount; ++i) {
        DequeData* dd = new DequeData;
        dd->id = "randid_";
        dd->id.push_back((i % 10) + '0');
        dd->index = i;
        EnterCriticalSection(&g_stdcs);
        deq_list->push_back(dd);
        LeaveCriticalSection(&g_stdcs);
    }  // for

    return 0;
}

static void TestLockedDeque()
{
    StdDeque deq_list;
    DequeData* poped_dd = NULL;
    HANDLE th[knThreadCount];
    InitializeCriticalSectionAndSpinCount(&g_stdcs, 2000);

    {
        AutoHRTimer ahr(g_hrtimer, "STD push 50000");
        for (int i = 0; i < knThreadCount; ++i) {
            th[i] = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)StdThreadFunc, &deq_list, 0, NULL);
        }  // for
        ::WaitForMultipleObjects(knThreadCount, th, TRUE, INFINITE);
    }

    {
        AutoHRTimer ahr(g_hrtimer, "STD pop 5000");
        for (int i = 0; i < knPopedCount; ++i) {
            poped_dd = deq_list.front();
            deq_list.pop_front();
            delete poped_dd;
        }
    }

    {
        AutoHRTimer ahr(g_hrtimer, "STD free 45000");
        StdDeque::iterator iter = deq_list.begin();
        while (iter != deq_list.end()) {
            DequeData* dd = *iter;
            delete dd;
            ++iter;
        }
        deq_list.clear();
    }

    DeleteCriticalSection(&g_stdcs);
}

int main()
{
    while (1) {
        TestAcsDeque();
        TestLockedDeque();
        Sleep(3000);
        fprintf(stdout, "--------------------------------------\n");
    }

    getchar();

    return 0;
}

5. 將無鎖隊列同std的有鎖隊列進行對比,效果如下圖

 \
 

 

  1. 上一頁:
  2. 下一頁:
Copyright © 程式師世界 All Rights Reserved