#include "stdafx.h"
#include <iostream>
#include <fstream>
#include <ostream>
#include <sstream>
#include <functional>
#include <thread>
#include <mutex>
#include <deque>
#include <future>
#include <math.h>
#include <regex>
#include <cmath>
#include <condition_variable>
using namespace std;
//阻塞式隊列
template <class T> class CBlockingDeque
{
public:
CBlockingDeque(int maxcount):m_maxcount(maxcount)
{
//
}
bool push_back(T &t)
{
bool result = false;
std::unique_lock<std::mutex> lock(m_mutex);
if(m_deque.size() < this->m_maxcount)
{
m_deque.push_back(t);
m_cond.notify_all();
result = true;
}
return result;
}
bool pop_front(T &t,int milliseconds)
{
if(milliseconds <= 0)
{
milliseconds = 10;
}
bool result = false;
std::unique_lock<std::mutex> lock(m_mutex);
if(m_deque.empty())
{
std::cv_status::cv_status cv = std::cv_status::no_timeout;
cv = m_cond.wait_for(lock,std::chrono::milliseconds(milliseconds));
if(std::cv_status::no_timeout == cv)
{
//
}
}
if(!m_deque.empty())
{
t = m_deque.front();
m_deque.pop_front();
result = true;
}
return result;
}
//
size_t size()
{
std::lock_guard<std::mutex> m(this->m_mutex);
return m_deque.size();
}
private:
//最大容量
const size_t m_maxcount;
//互斥鎖
std::mutex m_mutex;
//std::unique_lock<std::mutex> lock(m,std::defer_lock);
//條件變量
std::condition_variable m_cond;
//標准隊列
std::deque<T> m_deque;
};
class CTestBlockingDeque
{
public:
void push_back(CBlockingDeque<std::string> *bd,int total)
{
size_t k = 0;
for(size_t i = 0;i < total;++i)
{
std::stringstream str;
str<<"ITEM"<<i;
std::string s = str.str();
if(bd->push_back(s))
{
k++;
}
}
std::cout<<std::this_thread::get_id()<<" push success total:"<<k<<endl;
}
int pop_front(CBlockingDeque<std::string> *bd,int milliseconds)
{
int count = 0;
int failcount = 0;
while(failcount < 1)
{
std::string s = "";
if(bd->pop_front(s,milliseconds))
{
count++;
}
else
{
failcount++;
}
}
std::cout<<std::this_thread::get_id()<<" pop_front fail total:"<<failcount<<endl;
return count;
}
static void start_test()
{
const int totalitem = 100000;
CBlockingDeque<std::string> bd(totalitem);
CTestBlockingDeque test_bd;
//構造3個插入線程
auto fnarg = std::bind(&CTestBlockingDeque::push_back,&test_bd,&bd,std::placeholders::_1);
std::thread push_task_1(fnarg,50000);
std::thread push_task_2(fnarg,40000);
std::thread push_task_3(fnarg,30000);
//std::this_thread::sleep_for(std::chrono::milliseconds(100));
//構造2個輸出線程
auto fnarg_1 = std::bind(&CTestBlockingDeque::pop_front,&test_bd,&bd,500);
auto fnarg_2 = std::bind(&CTestBlockingDeque::pop_front,&test_bd,&bd,100);
std::packaged_task<int (CBlockingDeque<std::string> *,int)> pos_packaged_1(fnarg_1);
std::packaged_task<int (CBlockingDeque<std::string> *,int)> pos_packaged_2(fnarg_2);
std::future<int> future_1 = pos_packaged_1.get_future();
std::future<int> future_2 = pos_packaged_2.get_future();
std::thread pop_task_1(std::move(pos_packaged_1),nullptr,0);
std::thread pop_task_2(std::move(pos_packaged_2),nullptr,0);
std::thread::id id_1 = pop_task_1.get_id();
std::thread::id id_2 = pop_task_2.get_id();
//等待線程都正常退出
push_task_1.join();
push_task_2.join();
push_task_3.join();
pop_task_1.join();
pop_task_2.join();
int total_1 = future_1.get();
int total_2 = future_2.get();
std::cout<<id_1<<" pop total:"<<total_1<<endl;
std::cout<<id_2<<" pop total:"<<total_2<<endl;
std::cout<<"pop total:"<<total_1 + total_2<<endl;
}
};
int _tmain(int argc, _TCHAR* argv[])
{
CTestBlockingDeque::start_test();
system("pause");
return 0;
}
VS2012 Update2 下運行結果:
75288 pop_front fail total:1
74000 push success total:30000
73720 push success total:40000
74216 push success total:50000
75228 pop_front fail total:1
75288 pop total:3837
75228 pop total:116163
pop total:120000