本人一直在尋找一個跨平台的網絡庫,boost與ACE比較龐大,不考慮。對比了libevent,libev,libuv後,最終選擇了libuv.可libuv文檔少,例子也簡單,對於tcp只有個echo-server的例子。網上也找過對其封裝的例子,如下
libsourcey庫,封裝了許多庫。對libuv的封裝跟其他代碼耦合比較緊,難為剝離 http://sourcey.com/libuv-cpp-wrappers/
C++11封裝的,可惜VS10未完全支持C++11 https://github.com/larroy/uvpp
C++封裝的 https://github.com/keepallsimple/uvpp
本人想實現一個raw tcp server,支持上萬鏈接數的,網上找到的都沒合適我的,沒辦法只能參照各例子自己封裝了。
/***************************************
* @file tcpsocket.h
* @brief 基於libuv封裝的tcp服務器與客戶端,使用log4z作日志工具
* @details
* @author 陳吉宏, [email protected]
* @date 2014-5-13
* @mod 2014-5-13 phata 修正服務器與客戶端的錯誤.現服務器支持多客戶端連接
修改客戶端測試代碼,支持並發多客戶端測試
****************************************/
#ifndef TCPSocket_H
#define TCPSocket_H
#include "uv.h"
#include <string>
#include <list>
#include <map>
#define BUFFERSIZE (1024*1024)
namespace uv
{
typedef void (*newconnect)(int clientid);
typedef void (*server_recvcb)(int cliendid, const char* buf, int bufsize);
typedef void (*client_recvcb)(const char* buf, int bufsize, void* userdata);
class TCPServer;
class clientdata
{
public:
clientdata(int clientid):client_id(clientid),recvcb_(nullptr) {
client_handle = (uv_tcp_t*)malloc(sizeof(*client_handle));
client_handle->data = this;
readbuffer = uv_buf_init((char*)malloc(BUFFERSIZE), BUFFERSIZE);
writebuffer = uv_buf_init((char*)malloc(BUFFERSIZE), BUFFERSIZE);
}
virtual ~clientdata() {
free(readbuffer.base);
readbuffer.base = nullptr;
readbuffer.len = 0;
free(writebuffer.base);
writebuffer.base = nullptr;
writebuffer.len = 0;
free(client_handle);
client_handle = nullptr;
}
int client_id;//客戶端id,惟一
uv_tcp_t* client_handle;//客戶端句柄
TCPServer* tcp_server;//服務器句柄(保存是因為某些回調函數需要到)
uv_buf_t readbuffer;//接受數據的buf
uv_buf_t writebuffer;//寫數據的buf
uv_write_t write_req;
server_recvcb recvcb_;//接收數據回調給用戶的函數
};
class TCPServer
{
public:
TCPServer(uv_loop_t* loop = uv_default_loop());
virtual ~TCPServer();
static void StartLog(const char* logpath = nullptr);//啟動日志,必須啟動才會生成日志
public:
//基本函數
bool Start(const char *ip, int port);//啟動服務器,地址為IP4
bool Start6(const char *ip, int port);//啟動服務器,地址為IP6
void close();
bool setNoDelay(bool enable);
bool setKeepAlive(int enable, unsigned int delay);
const char* GetLastErrMsg() const {
return errmsg_.c_str();
};
virtual int send(int clientid, const char* data, std::size_t len);
virtual void setnewconnectcb(newconnect cb);
virtual void setrecvcb(int clientid,server_recvcb cb);//設置接收回調函數,每個客戶端各有一個
protected:
int GetAvailaClientID()const;//獲取可用的client id
bool DeleteClient(int clientid);//刪除鏈表中的客戶端
//靜態回調函數
static void AfterServerRecv(uv_stream_t *client, ssize_t nread, const uv_buf_t* buf);
static void AfterSend(uv_write_t *req, int status);
static void onAllocBuffer(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf);
static void AfterServerClose(uv_handle_t *handle);
static void AfterClientClose(uv_handle_t *handle);
static void acceptConnection(uv_stream_t *server, int status);
private:
bool init();
bool run(int status = UV_RUN_DEFAULT);
bool bind(const char* ip, int port);
bool bind6(const char* ip, int port);
bool listen(int backlog = 1024);
uv_tcp_t server_;//服務器鏈接
std::map<int,clientdata*> clients_list_;//子客戶端鏈接
uv_mutex_t mutex_handle_;//保護clients_list_
uv_loop_t *loop_;
std::string errmsg_;
newconnect newconcb_;
bool isinit_;//是否已初始化,用於close函數中判斷
};
class TCPClient
{
//直接調用connect/connect6會進行連接
public:
TCPClient(uv_loop_t* loop = uv_default_loop());
virtual ~TCPClient();
static void StartLog(const char* logpath = nullptr);//啟動日志,必須啟動才會生成日志
public:
//基本函數
virtual bool connect(const char* ip, int port);//啟動connect線程,循環等待直到connect完成
virtual bool connect6(const char* ip, int port);//啟動connect線程,循環等待直到connect完成
virtual int send(const char* data, std::size_t len);
virtual void setrecvcb(client_recvcb cb, void* userdata);////設置接收回調函數,只有一個
void close();
//是否啟用Nagle算法
bool setNoDelay(bool enable);
bool setKeepAlive(int enable, unsigned int delay);
const char* GetLastErrMsg() const {
return errmsg_.c_str();
};
protected:
//靜態回調函數
static void AfterConnect(uv_connect_t* handle, int status);
static void AfterClientRecv(uv_stream_t *client, ssize_t nread, const uv_buf_t* buf);
static void AfterSend(uv_write_t *req, int status);
static void onAllocBuffer(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf);
static void AfterClose(uv_handle_t *handle);
static void ConnectThread(void* arg);//真正的connect線程
static void ConnectThread6(void* arg);//真正的connect線程
bool init();
bool run(int status = UV_RUN_DEFAULT);
private:
enum {
CONNECT_TIMEOUT,
CONNECT_FINISH,
CONNECT_ERROR,
CONNECT_DIS,
};
uv_tcp_t client_;//客戶端連接
uv_loop_t *loop_;
uv_write_t write_req_;//寫時請求
uv_connect_t connect_req_;//連接時請求
uv_thread_t connect_threadhanlde_;//線程句柄
std::string errmsg_;//錯誤信息
uv_buf_t readbuffer_;//接受數據的buf
uv_buf_t writebuffer_;//寫數據的buf
uv_mutex_t write_mutex_handle_;//保護write,保存前一write完成才進行下一write
int connectstatus_;//連接狀態
client_recvcb recvcb_;//回調函數
void* userdata_;//回調函數的用戶數據
std::string connectip_;//連接的服務器IP
int connectport_;//連接的服務器端口號
bool isinit_;//是否已初始化,用於close函數中判斷
};
}
#endif // TCPSocket_H
#include "tcpsocket.h"
#include "log4z.h"
std::string GetUVError(int retcode)
{
std::string err;
err = uv_err_name(retcode);
err +=":";
err += uv_strerror(retcode);
return std::move(err);
}
namespace uv
{
/*****************************************TCP Server*************************************************************/
TCPServer::TCPServer(uv_loop_t* loop)
:newconcb_(nullptr), isinit_(false)
{
loop_ = loop;
}
TCPServer::~TCPServer()
{
close();
LOGI("tcp server exit.");
}
//初始化與關閉--服務器與客戶端一致
bool TCPServer::init()
{
if (isinit_) {
return true;
}
if (!loop_) {
errmsg_ = "loop is null on tcp init.";
LOGE(errmsg_);
return false;
}
int iret = uv_mutex_init(&mutex_handle_);
if (iret) {
errmsg_ = GetUVError(iret);
LOGE(errmsg_);
return false;
}
iret = uv_tcp_init(loop_,&server_);
if (iret) {
errmsg_ = GetUVError(iret);
LOGE(errmsg_);
return false;
}
isinit_ = true;
server_.data = this;
//iret = uv_tcp_keepalive(&server_, 1, 60);//調用此函數後後續函數會調用出錯
//if (iret) {
// errmsg_ = GetUVError(iret);
// return false;
//}
return true;
}
void TCPServer::close()
{
for (auto it = clients_list_.begin(); it!=clients_list_.end(); ++it) {
auto data = it->second;
uv_close((uv_handle_t*)data->client_handle,AfterClientClose);
}
clients_list_.clear();
LOGI("close server");
if (isinit_) {
uv_close((uv_handle_t*) &server_, AfterServerClose);
LOGI("close server");
}
isinit_ = false;
uv_mutex_destroy(&mutex_handle_);
}
bool TCPServer::run(int status)
{
LOGI("server runing.");
int iret = uv_run(loop_,(uv_run_mode)status);
if (iret) {
errmsg_ = GetUVError(iret);
LOGE(errmsg_);
return false;
}
return true;
}
//屬性設置--服務器與客戶端一致
bool TCPServer::setNoDelay(bool enable)
{
int iret = uv_tcp_nodelay(&server_, enable ? 1 : 0);
if (iret) {
errmsg_ = GetUVError(iret);
LOGE(errmsg_);
return false;
}
return true;
}
bool TCPServer::setKeepAlive(int enable, unsigned int delay)
{
int iret = uv_tcp_keepalive(&server_, enable , delay);
if (iret) {
errmsg_ = GetUVError(iret);
LOGE(errmsg_);
return false;
}
return true;
}
//作為server時的函數
bool TCPServer::bind(const char* ip, int port)
{
struct sockaddr_in bind_addr;
int iret = uv_ip4_addr(ip, port, &bind_addr);
if (iret) {
errmsg_ = GetUVError(iret);
LOGE(errmsg_);
return false;
}
iret = uv_tcp_bind(&server_, (const struct sockaddr*)&bind_addr,0);
if (iret) {
errmsg_ = GetUVError(iret);
LOGE(errmsg_);
return false;
}
LOGI("server bind ip="<<ip<<", port="<<port);
return true;
}
bool TCPServer::bind6(const char* ip, int port)
{
struct sockaddr_in6 bind_addr;
int iret = uv_ip6_addr(ip, port, &bind_addr);
if (iret) {
errmsg_ = GetUVError(iret);
LOGE(errmsg_);
return false;
}
iret = uv_tcp_bind(&server_, (const struct sockaddr*)&bind_addr,0);
if (iret) {
errmsg_ = GetUVError(iret);
LOGE(errmsg_);
return false;
}
LOGI("server bind ip="<<ip<<", port="<<port);
return true;
}
bool TCPServer::listen(int backlog)
{
int iret = uv_listen((uv_stream_t*) &server_, backlog, acceptConnection);
if (iret) {
errmsg_ = GetUVError(iret);
LOGE(errmsg_);
return false;
}
LOGI("server listen");
return true;
}
bool TCPServer::Start( const char *ip, int port )
{
close();
if (!init()) {
return false;
}
if (!bind(ip,port)) {
return false;
}
if (!listen(SOMAXCONN)) {
return false;
}
if (!run()) {
return false;
}
LOGI("start listen "<<ip<<": "<<port);
return true;
}
bool TCPServer::Start6( const char *ip, int port )
{
close();
if (!init()) {
return false;
}
if (!bind6(ip,port)) {
return false;
}
if (!listen(SOMAXCONN)) {
return false;
}
if (!run()) {
return false;
}
return true;
}
//服務器發送函數
int TCPServer::send(int clientid, const char* data, std::size_t len)
{
auto itfind = clients_list_.find(clientid);
if (itfind == clients_list_.end()) {
errmsg_ = "can't find cliendid ";
errmsg_ += std::to_string((long long)clientid);
LOGE(errmsg_);
return -1;
}
//自己控制data的生命周期直到write結束
if (itfind->second->writebuffer.len < len) {
itfind->second->writebuffer.base = (char*)realloc(itfind->second->writebuffer.base,len);
itfind->second->writebuffer.len = len;
}
memcpy(itfind->second->writebuffer.base,data,len);
uv_buf_t buf = uv_buf_init((char*)itfind->second->writebuffer.base,len);
int iret = uv_write(&itfind->second->write_req, (uv_stream_t*)itfind->second->client_handle, &buf, 1, AfterSend);
if (iret) {
errmsg_ = GetUVError(iret);
LOGE(errmsg_);
return false;
}
return true;
}
//服務器-新客戶端函數
void TCPServer::acceptConnection(uv_stream_t *server, int status)
{
if (!server->data) {
return;
}
TCPServer *tcpsock = (TCPServer *)server->data;
int clientid = tcpsock->GetAvailaClientID();
clientdata* cdata = new clientdata(clientid);//uv_close回調函數中釋放
cdata->tcp_server = tcpsock;//保存服務器的信息
int iret = uv_tcp_init(tcpsock->loop_, cdata->client_handle);//析構函數釋放
if (iret) {
delete cdata;
tcpsock->errmsg_ = GetUVError(iret);
LOGE(tcpsock->errmsg_);
return;
}
iret = uv_accept((uv_stream_t*)&tcpsock->server_, (uv_stream_t*) cdata->client_handle);
if ( iret) {
tcpsock->errmsg_ = GetUVError(iret);
uv_close((uv_handle_t*) cdata->client_handle, NULL);
delete cdata;
LOGE(tcpsock->errmsg_);
return;
}
tcpsock->clients_list_.insert(std::make_pair(clientid,cdata));//加入到鏈接隊列
if (tcpsock->newconcb_) {
tcpsock->newconcb_(clientid);
}
LOGI("new client("<<cdata->client_handle<<") id="<< clientid);
iret = uv_read_start((uv_stream_t*)cdata->client_handle, onAllocBuffer, AfterServerRecv);//服務器開始接收客戶端的數據
return;
}
//服務器-接收數據回調函數
void TCPServer::setrecvcb(int clientid, server_recvcb cb )
{
auto itfind = clients_list_.find(clientid);
if (itfind != clients_list_.end()) {
itfind->second->recvcb_ = cb;
}
}
//服務器-新鏈接回調函數
void TCPServer::setnewconnectcb(newconnect cb )
{
newconcb_ = cb;
}
//服務器分析空間函數
void TCPServer::onAllocBuffer(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf)
{
if (!handle->data) {
return;
}
clientdata *client = (clientdata*)handle->data;
*buf = client->readbuffer;
}
void TCPServer::AfterServerRecv(uv_stream_t *handle, ssize_t nread, const uv_buf_t* buf)
{
if (!handle->data) {
return;
}
clientdata *client = (clientdata*)handle->data;//服務器的recv帶的是clientdata
if (nread < 0) {/* Error or EOF */
TCPServer *server = (TCPServer *)client->tcp_server;
if (nread == UV_EOF) {
fprintf(stdout,"客戶端(%d)連接斷開,關閉此客戶端\n",client->client_id);
LOGW("客戶端("<<client->client_id<<")主動斷開");
} else if (nread == UV_ECONNRESET) {
fprintf(stdout,"客戶端(%d)異常斷開\n",client->client_id);
LOGW("客戶端("<<client->client_id<<")異常斷開");
} else {
fprintf(stdout,"%s\n",GetUVError(nread));
LOGW("客戶端("<<client->client_id<<")異常斷開:"<<GetUVError(nread));
}
server->DeleteClient(client->client_id);//連接斷開,關閉客戶端
return;
} else if (0 == nread) {/* Everything OK, but nothing read. */
} else if (client->recvcb_) {
client->recvcb_(client->client_id,buf->base,nread);
}
}
//服務器與客戶端一致
void TCPServer::AfterSend(uv_write_t *req, int status)
{
if (status < 0) {
LOGE("發送數據有誤:"<<GetUVError(status));
fprintf(stderr, "Write error %s\n", GetUVError(status));
}
}
void TCPServer::AfterServerClose(uv_handle_t *handle)
{
//服務器,不需要做什麼
}
void TCPServer::AfterClientClose(uv_handle_t *handle)
{
clientdata *cdata = (clientdata*)handle->data;
LOGI("client "<<cdata->client_id<<" had closed.");
delete cdata;
}
int TCPServer::GetAvailaClientID() const
{
static int s_id = 0;
return ++s_id;
}
bool TCPServer::DeleteClient( int clientid )
{
uv_mutex_lock(&mutex_handle_);
auto itfind = clients_list_.find(clientid);
if (itfind == clients_list_.end()) {
errmsg_ = "can't find client ";
errmsg_ += std::to_string((long long)clientid);
LOGE(errmsg_);
uv_mutex_unlock(&mutex_handle_);
return false;
}
if (uv_is_active((uv_handle_t*)itfind->second->client_handle)) {
uv_read_stop((uv_stream_t*)itfind->second->client_handle);
}
uv_close((uv_handle_t*)itfind->second->client_handle,AfterClientClose);
clients_list_.erase(itfind);
LOGI("刪除客戶端"<<clientid);
uv_mutex_unlock(&mutex_handle_);
return true;
}
void TCPServer::StartLog( const char* logpath /*= nullptr*/ )
{
zsummer::log4z::ILog4zManager::GetInstance()->SetLoggerMonthdir(LOG4Z_MAIN_LOGGER_ID, true);
zsummer::log4z::ILog4zManager::GetInstance()->SetLoggerDisplay(LOG4Z_MAIN_LOGGER_ID,false);
zsummer::log4z::ILog4zManager::GetInstance()->SetLoggerLevel(LOG4Z_MAIN_LOGGER_ID,LOG_LEVEL_DEBUG);
zsummer::log4z::ILog4zManager::GetInstance()->SetLoggerLimitSize(LOG4Z_MAIN_LOGGER_ID,100);
if (logpath) {
zsummer::log4z::ILog4zManager::GetInstance()->SetLoggerPath(LOG4Z_MAIN_LOGGER_ID,logpath);
}
zsummer::log4z::ILog4zManager::GetInstance()->Start();
}
/*****************************************TCP Client*************************************************************/
TCPClient::TCPClient(uv_loop_t* loop)
:recvcb_(nullptr),userdata_(nullptr)
,connectstatus_(CONNECT_DIS)
, isinit_(false)
{
readbuffer_ = uv_buf_init((char*) malloc(BUFFERSIZE), BUFFERSIZE);
writebuffer_ = uv_buf_init((char*) malloc(BUFFERSIZE), BUFFERSIZE);
loop_ = loop;
connect_req_.data = this;
write_req_.data = this;
}
TCPClient::~TCPClient()
{
free(readbuffer_.base);
readbuffer_.base = nullptr;
readbuffer_.len = 0;
free(writebuffer_.base);
writebuffer_.base = nullptr;
writebuffer_.len = 0;
close();
LOGI("客戶端("<<this<<")退出");
}
//初始化與關閉--服務器與客戶端一致
bool TCPClient::init()
{
if (isinit_) {
return true;
}
if (!loop_) {
errmsg_ = "loop is null on tcp init.";
LOGE(errmsg_);
return false;
}
int iret = uv_tcp_init(loop_,&client_);
if (iret) {
errmsg_ = GetUVError(iret);
LOGE(errmsg_);
return false;
}
iret = uv_mutex_init(&write_mutex_handle_);
if (iret) {
errmsg_ = GetUVError(iret);
LOGE(errmsg_);
return false;
}
isinit_ = true;
fprintf(stdout,"客戶端(%p) init type = %d\n",&client_,client_.type);
client_.data = this;
//iret = uv_tcp_keepalive(&client_, 1, 60);//
//if (iret) {
// errmsg_ = GetUVError(iret);
// return false;
//}
LOGI("客戶端("<<this<<")Init");
return true;
}
void TCPClient::close()
{
if (!isinit_) {
return;
}
uv_mutex_destroy(&write_mutex_handle_);
uv_close((uv_handle_t*) &client_, AfterClose);
LOGI("客戶端("<<this<<")close");
isinit_ = false;
}
bool TCPClient::run(int status)
{
LOGI("客戶端("<<this<<")run");
int iret = uv_run(loop_,(uv_run_mode)status);
if (iret) {
errmsg_ = GetUVError(iret);
LOGE(errmsg_);
return false;
}
return true;
}
//屬性設置--服務器與客戶端一致
bool TCPClient::setNoDelay(bool enable)
{
//http://blog.csdn.net/u011133100/article/details/21485983
int iret = uv_tcp_nodelay(&client_, enable ? 1 : 0);
if (iret) {
errmsg_ = GetUVError(iret);
LOGE(errmsg_);
return false;
}
return true;
}
bool TCPClient::setKeepAlive(int enable, unsigned int delay)
{
int iret = uv_tcp_keepalive(&client_, enable , delay);
if (iret) {
errmsg_ = GetUVError(iret);
LOGE(errmsg_);
return false;
}
return true;
}
//作為client的connect函數
bool TCPClient::connect(const char* ip, int port)
{
close();
init();
connectip_ = ip;
connectport_ = port;
LOGI("客戶端("<<this<<")start connect to server("<<ip<<":"<<port<<")");
int iret = uv_thread_create(&connect_threadhanlde_, ConnectThread, this);//觸發AfterConnect才算真正連接成功,所以用線程
if (iret) {
errmsg_ = GetUVError(iret);
LOGE(errmsg_);
return false;
}
while ( connectstatus_ == CONNECT_DIS) {
#if defined (WIN32) || defined(_WIN32)
Sleep(100);
#else
usleep((100) * 1000)
#endif
}
return connectstatus_ == CONNECT_FINISH;
}
bool TCPClient::connect6(const char* ip, int port)
{
close();
init();
connectip_ = ip;
connectport_ = port;
LOGI("客戶端("<<this<<")start connect to server("<<ip<<":"<<port<<")");
int iret = uv_thread_create(&connect_threadhanlde_, ConnectThread6, this);//觸發AfterConnect才算真正連接成功,所以用線程
if (iret) {
errmsg_ = GetUVError(iret);
LOGE(errmsg_);
return false;
}
while ( connectstatus_ == CONNECT_DIS) {
//fprintf(stdout,"client(%p) wait, connect status %d\n",this,connectstatus_);
#if defined (WIN32) || defined(_WIN32)
Sleep(100);
#else
usleep((100) * 1000)
#endif
}
return connectstatus_ == CONNECT_FINISH;
}
void TCPClient::ConnectThread( void* arg )
{
TCPClient *pclient = (TCPClient*)arg;
fprintf(stdout,"client(%p) ConnectThread start\n",pclient);
struct sockaddr_in bind_addr;
int iret = uv_ip4_addr(pclient->connectip_.c_str(), pclient->connectport_, &bind_addr);
if (iret) {
pclient->errmsg_ = GetUVError(iret);
LOGE(pclient->errmsg_);
return;
}
iret = uv_tcp_connect(&pclient->connect_req_, &pclient->client_, (const sockaddr*)&bind_addr, AfterConnect);
if (iret) {
pclient->errmsg_ = GetUVError(iret);
LOGE(pclient->errmsg_);
return;
}
fprintf(stdout,"client(%p) ConnectThread end, connect status %d\n",pclient, pclient->connectstatus_);
pclient->run();
}
void TCPClient::ConnectThread6( void* arg )
{
TCPClient *pclient = (TCPClient*)arg;
LOGI("客戶端("<<pclient<<")Enter Connect Thread.");
fprintf(stdout,"client(%p) ConnectThread start\n",pclient);
struct sockaddr_in6 bind_addr;
int iret = uv_ip6_addr(pclient->connectip_.c_str(), pclient->connectport_, &bind_addr);
if (iret) {
pclient->errmsg_ = GetUVError(iret);
LOGE(pclient->errmsg_);
return;
}
iret = uv_tcp_connect(&pclient->connect_req_, &pclient->client_, (const sockaddr*)&bind_addr, AfterConnect);
if (iret) {
pclient->errmsg_ = GetUVError(iret);
LOGE(pclient->errmsg_);
return;
}
fprintf(stdout,"client(%p) ConnectThread end, connect status %d\n",pclient, pclient->connectstatus_);
LOGI("客戶端("<<pclient<<")Leave Connect Thread. connect status "<<pclient->connectstatus_);
pclient->run();
}
void TCPClient::AfterConnect(uv_connect_t* handle, int status)
{
fprintf(stdout,"start after connect\n");
TCPClient *pclient = (TCPClient*)handle->handle->data;
if (status) {
pclient->connectstatus_ = CONNECT_ERROR;
fprintf(stdout,"connect error:%s\n",GetUVError(status));
return;
}
int iret = uv_read_start(handle->handle, onAllocBuffer, AfterClientRecv);//客戶端開始接收服務器的數據
if (iret) {
fprintf(stdout,"uv_read_start error:%s\n",GetUVError(iret));
pclient->connectstatus_ = CONNECT_ERROR;
} else {
pclient->connectstatus_ = CONNECT_FINISH;
}
LOGI("客戶端("<<pclient<<")run");
fprintf(stdout,"end after connect\n");
}
//客戶端的發送函數
int TCPClient::send(const char* data, std::size_t len)
{
//自己控制data的生命周期直到write結束
if (!data || len <= 0) {
errmsg_ = "send data is null or len less than zero.";
return 0;
}
uv_mutex_lock(&write_mutex_handle_);
if (writebuffer_.len < len) {
writebuffer_.base = (char*)realloc(writebuffer_.base,len);
writebuffer_.len = len;
}
memcpy(writebuffer_.base,data,len);
uv_buf_t buf = uv_buf_init((char*)writebuffer_.base,len);
int iret = uv_write(&write_req_, (uv_stream_t*)&client_, &buf, 1, AfterSend);
if (iret) {
uv_mutex_unlock(&write_mutex_handle_);
errmsg_ = GetUVError(iret);
LOGE(errmsg_);
return false;
}
return true;
}
//客戶端-接收數據回調函數
void TCPClient::setrecvcb(client_recvcb cb, void* userdata )
{
recvcb_ = cb;
userdata_ = userdata;
}
//客戶端分析空間函數
void TCPClient::onAllocBuffer(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf)
{
if (!handle->data) {
return;
}
TCPClient *client = (TCPClient*)handle->data;
*buf = client->readbuffer_;
}
void TCPClient::AfterClientRecv(uv_stream_t *handle, ssize_t nread, const uv_buf_t* buf)
{
if (!handle->data) {
return;
}
TCPClient *client = (TCPClient*)handle->data;//服務器的recv帶的是TCPClient
if (nread < 0) {
if (nread == UV_EOF) {
fprintf(stdout,"服務器(%p)主動斷開\n",handle);
LOGW("服務器主動斷開");
} else if (nread == UV_ECONNRESET) {
fprintf(stdout,"服務器(%p)異常斷開\n",handle);
LOGW("服務器異常斷開");
} else {
fprintf(stdout,"服務器(%p)異常斷開:%s\n",handle,GetUVError(nread));
LOGW("服務器異常斷開"<<GetUVError(nread));
}
uv_close((uv_handle_t*)handle, AfterClose);
return;
}
if (nread > 0 && client->recvcb_) {
client->recvcb_(buf->base,nread,client->userdata_);
}
}
//服務器與客戶端一致
void TCPClient::AfterSend(uv_write_t *req, int status)
{
TCPClient *client = (TCPClient *)req->handle->data;
uv_mutex_unlock(&client->write_mutex_handle_);
if (status < 0) {
LOGE("發送數據有誤:"<<GetUVError(status));
fprintf(stderr, "Write error %s\n", GetUVError(status));
}
}
//服務器與客戶端一致
void TCPClient::AfterClose(uv_handle_t *handle)
{
fprintf(stdout,"客戶端(%p)已close\n",handle);
LOGI("客戶端("<<handle<<")已close");
}
void TCPClient::StartLog( const char* logpath /*= nullptr*/ )
{
zsummer::log4z::ILog4zManager::GetInstance()->SetLoggerMonthdir(LOG4Z_MAIN_LOGGER_ID, true);
zsummer::log4z::ILog4zManager::GetInstance()->SetLoggerDisplay(LOG4Z_MAIN_LOGGER_ID,false);
zsummer::log4z::ILog4zManager::GetInstance()->SetLoggerLevel(LOG4Z_MAIN_LOGGER_ID,LOG_LEVEL_DEBUG);
zsummer::log4z::ILog4zManager::GetInstance()->SetLoggerLimitSize(LOG4Z_MAIN_LOGGER_ID,100);
if (logpath) {
zsummer::log4z::ILog4zManager::GetInstance()->SetLoggerPath(LOG4Z_MAIN_LOGGER_ID,logpath);
}
zsummer::log4z::ILog4zManager::GetInstance()->Start();
}
}
代碼已上傳到git: https://github.com/wqvbjhc/libuv_tcp
按照例子,客戶端可以並發20多路,超過uv_write就會assert出錯,未找到原因
服務器可以接收幾十路連接。萬百上千路未測試過,因為沒有模擬環境。