程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
 程式師世界 >> 編程語言 >> C語言 >> C++ >> 關於C++ >> c++11實現一個半同步半異步線程池

c++11實現一個半同步半異步線程池

編輯:關於C++

在處理大量並發任務的時候,如果按照傳統的方式,一個請求一個線程來處理請求任務,大量的線程創建和銷毀將消耗過多的系統資源,還增加了線程上下文切換的開銷,而通過線程池技術就可以很好的解決這些問題,線程池技術通過在系統中預先創建一定數量的線程,當任務請求到來時從線程池中分配一個預先創建的線程去處理任務,線程在完成任務之後還可以重用,不會銷毀,而是等待下次任務的到來.

分層

半同步半異步線程池分為三層:

同步服務層: 它處理來自上層的任務請求,上層的請求可能是並發的,這些請求不是馬上就會被處理的,而是將這些任務放到一個同步排隊層中,等待處理.

同步排隊層: 來自上層的任務請求都會加到排隊層中等待處理.

異步服務層: 這一層中會有多個線程同時處理排隊層中的任務,異步服務層從同步排隊層中取出任務並行的處理.

這裡寫圖片描述

線程池實現

#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 

using namespace std;

/********************************同步隊列******************************/

template 
class SyncQueue
{
public:
    SyncQueue(int maxSize): m_maxSize(maxSize), m_needStop(false) { }

    //添加事件
    void Put(const T& x)
    {
        Add(x);
    }

    //添加事件
    void Put(T && x)
    {
        //調用內部接口,進行完美轉發
        Add(std::forward(x));
    }

    //從隊列中取事件,取所有事件
    void Take(std::list &list)
    {
        std::unique_lock locker(m_mutex);
        //當不滿足任何一個則等待,但是若m_needStop為true是因為任務要終止了所以不阻塞
        m_notEmpty.wait(locker, [this]{return (m_needStop || NotEmpty()); });
        if (m_needStop)
        {
            return;
        }

        list = std::move(m_queue);
        m_notFull.notify_one();
    }

    //取一個事件
    void Take(T &t)
    {
        std::unique_lock locker(m_mutex);
        m_notEmpty.wait(locker, [this]{return m_needStop || NotEmpty(); });
        if (m_needStop)
        {
            return;
        }

        t = m_queue.front();
        m_queue.pop_front();
        m_notFull.notify_one();
    }

    //終止同步隊列
    void Stop()
    {
        {
            //鎖作用域就在這對大括號內
            std::lock_guard locker(m_mutex);
            //將終止標志設為true
            m_needStop = true;
        }

        //喚醒所有進程一一終止
        m_notFull.notify_all();
        m_notEmpty.notify_all();
    }

    //隊列為空
    bool Empty()
    {
        std::lock_guard locker(m_mutex);
        return m_queue.empty();
    }

    //隊列為滿
    bool Full()
    {
        std::lock_guard locker(m_mutex);
        return m_queue.size() == m_maxSize;
    }

    //隊列大小
    size_t Size()
    {
        std::lock_guard locker(m_mutex);
        return m_queue.size();
    }

    //隊列大小
    int Count()
    {
        return m_queue.size();
    }

private:
    //隊列不為滿
    bool NotFull() const
    {
        bool full = (m_queue.size() >= m_maxSize);
        if (full)
        {
            cout << "the queue is full, need wait..." << endl;
        }

        return !full;
    }

    //隊列不為空
    bool NotEmpty() const
    {
        bool empty = m_queue.empty();
        if (empty)
        {
            cout << "the queue is empty, need wait..., 異步層的線程ID: " << this_thread::get_id() << endl;
        }

        return !empty;
    }

    //向隊列中添加事件,若不為滿且終止標志為false則添加事件
    template 
    void Add(F && x)
    {
        std::unique_lock locker(m_mutex);
        //當不滿足任何一個則等待,但是若m_needStop為true是因為任務要終止了所以不阻塞
        m_notFull.wait(locker, [this]{return m_needStop || NotFull(); });
        if (m_needStop)
        {
            return;
        }

        m_queue.push_back(std::forward(x));
        m_notEmpty.notify_one();
    }

private:
    //緩沖區
    std::list m_queue;
    //互斥量
    std::mutex m_mutex;
    //隊列不為空的條件變量
    std::condition_variable m_notEmpty;
    //隊列不為滿的條件變量
    std::condition_variable m_notFull;
    //任務隊列最大長度
    int m_maxSize;
    //終止的標識,當為true時代表同步隊列要終止
    bool m_needStop;
};



/**************************線程池********************************/

//傳遞給同步隊列的最大個數
const int MaxTaskCount = 100;
class ThreadPool
{
public:
    using Task = std::function;
    //構造函數,默認參數hardware_concurrency()獲取CPU核心數量
    ThreadPool(int numThreads = std::thread::hardware_concurrency()):m_queue(MaxTaskCount)
    {
        cout << "numThreads: " << numThreads << endl;
        Start(numThreads);
    }

    ~ThreadPool()
    {
        Stop();
    }

    //保證多線程環境下只調用一次StopThreadGroup函數
    void Stop()
    {
        std::call_once(m_flag, [this]{ StopThreadGroup(); });
    }

    //添加任務,右值完美轉發
    void AddTask(Task && task)
    {
        m_queue.Put(std::forward (task));
    }

    //添加任務
    void AddTask(const Task && task)
    {
        m_queue.Put(task);
    }

private:
    //建立numThreads個數的線程組
    void Start(int numThreads)
    {
        m_running  = true;

        for (int i = 0; i < numThreads; i++)
        {
            //多個線程依次的處理
            m_threadgroup.push_back(std::make_shared(&ThreadPool::RunInThread, this));
        }
    }

    //取出任務隊列中的全部,依次執行
    void RunInThread()
    {
        while (m_running)
        {
            std::list list;
            m_queue.Take(list);

            for (auto & task : list)
            {
                if (!m_running)
                {
                    return ;
                }

                //執行任務
                task();
            }
        }
    }

    //終止所有任務的執行
    void StopThreadGroup()
    {       
        //終止同步隊列
        m_queue.Stop();
        m_running = false;

        for (auto thread : m_threadgroup)
        {
            if (thread)
            {
                thread->join();
            }
        }

        m_threadgroup.clear();
    }

private: 
    //處理任務的線程組
    std::list> m_threadgroup;
    //同步隊列
    SyncQueue m_queue;
    //運行的標志,flase代表終止
    atomic_bool m_running;
    //保證在函數在多線程環境中只被調用一次
    std::once_flag m_flag;
};

int main()
{
    ThreadPool pool;

    //pool.Start(2);
    std::thread thd1([&pool]
    {
        for (int i = 0; i < 10; i++)
        {
            auto thdId = this_thread::get_id();
            pool.AddTask([thdId]
            {
                cout << "1.thread id: " << thdId << endl;
            });
        }
    });
    std::thread thd2([&pool]
    {
        for (int i = 0; i < 10; i++)
        {
            auto thdId = this_thread::get_id();
            pool.AddTask([thdId]
            {
                cout << "2.thread id: " << thdId << endl;
            });
        }
    });

    this_thread::sleep_for(std::chrono::seconds(2));
    getchar();
    pool.Stop();
    thd1.join();
    thd2.join();
}

這裡寫圖片描述

對象池

對象池對於創建開銷較大的對象來說很有意義,為了避免重復創建開銷較大的對象,可以通過對象池來優化.

對象池的思路比較簡單,實現創建好一批對象,放到一個集合中,每當程序需要新的對象時,就從對象池中獲取,程序用完該對象後都會把該對象歸還給對象池.這樣會避免重復創建對象,提高程序性能.

#include 
#include 
#include 
#include

using namespace std;

//要成為不可復制的類,典型的方法是將類的復制構造函數和賦值運算符設置為private或protected
//為了使ObjectPool為不可復制的類,我們定義了類NonCopyable,只需繼承起則可為不可復制的類
class NonCopyable
{
protected:
    NonCopyable() = default;
    ~NonCopyable() = default;
    NonCopyable(const NonCopyable&) = delete;
    NonCopyable& operator =(const NonCopyable &) = delete;
};

//對象最大個數
const int MaxObjectNum = 10;

template 
class ObjectPool : NonCopyable
{
    template 
    using Constructor = function (Args...)>;
private:
    //定義multimap類型的私有成員通過Constructor類型獲得字符串,則通過字符串類型一對多的對應特定的對象.
    multimap> m_object_map;

public:
    //初始化創建對象
    template 
    void Init(size_t num, Args ...args)
    {
        if (num <= 0 || num > MaxObjectNum)
        {
            throw std::logic_error("Object num out of range");
        }

        //Init時的模板類型不同所得到的constructName字符串不同
        //所以相同的初始化類型對應m_object_map中的first相同,不同類型的則不同
        auto constructName = typeid(Constructor).name();
        //cout << "Init: " << constructName << endl;
        for (size_t i = 0; i < num; i++)
        {
            //刪除器中不直接刪除對象,而是回收到對象池中,以供下次使用
            m_object_map.emplace(constructName, 
                shared_ptr(new T(std::forward(args)...), [this, constructName](T *p)
            {
                cout << "dis: " << constructName << endl;
                m_object_map.emplace(std::move(constructName),shared_ptr(p));
            }));
        }
    }

    //從對象池獲取一個對象
    template 
    std::shared_ptr Get()
    {
        string constructName = typeid(Constructor).name();
        cout << constructName << endl;

        //通過Get的模板類型得到對應的字符串,通過該字符串找到所有該字符串的對應
        auto range = m_object_map.equal_range(constructName);
        //從該類型對應的對象中獲取其中一個
        for (auto it = range.first; it != range.second; it++)
        {
            auto ptr = it -> second;
            m_object_map.erase(it);
            return ptr;
        } 

        return nullptr;
    }
};
  1. 上一頁:
  2. 下一頁:
Copyright © 程式師世界 All Rights Reserved