本節主要介紹緩沖相關的傳輸類,緩存的作用就是為了提高讀寫的效率。Thrift在實現緩存傳輸的時候首先建立一個緩存的基類,然後需要實現緩存功能的類都可以直接從這個基類繼承。下面就詳細分析這個基類以及一個具體的實現類。
緩存基類TBufferBase
緩存基類就是讓傳輸類所有的讀寫函數都提供緩存來提高性能。它在通常情況下采用memcpy來設計和實現快路徑的讀寫訪問操作,這些操作函數通常都是小、非虛擬和內聯函數。TBufferBase是一個抽象的基類,子類必須實現慢路徑的讀寫函數等操作,慢路徑的讀寫等操作主要是為了在緩存已經滿或空的情況下執行。首先看看緩存基類的定義,代碼如下:
[cpp]
class TBufferBase : public TVirtualTransport<TBufferBase> {
public:
uint32_t read(uint8_t* buf, uint32_t len) {//讀函數
uint8_t* new_rBase = rBase_ + len;//得到需要讀到的緩存邊界
if (TDB_LIKELY(new_rBase <= rBound_)) {//判斷緩存是否有足夠的數據可讀,采用了分支預測技術
std::memcpy(buf, rBase_, len);//直接內存拷貝
rBase_ = new_rBase;//更新新的緩存讀基地址
return len;//返回讀取的長度
}
return readSlow(buf, len);//如果緩存已經不能夠滿足讀取長度需要就執行慢讀
}
uint32_t readAll(uint8_t* buf, uint32_t len) {
uint8_t* new_rBase = rBase_ + len;//同read函數
if (TDB_LIKELY(new_rBase <= rBound_)) {
std::memcpy(buf, rBase_, len);
rBase_ = new_rBase;
return len;
}
return apache::thrift::transport::readAll(*this, buf, len);//調用父類的
}
void write(const uint8_t* buf, uint32_t len) {//快速寫函數
uint8_t* new_wBase = wBase_ + len;//寫入後的新緩存基地址
if (TDB_LIKELY(new_wBase <= wBound_)) {//判斷緩存是否有足夠的空間可以寫入
std::memcpy(wBase_, buf, len);//內存拷貝
wBase_ = new_wBase;//更新基地址
return;
}
writeSlow(buf, len);//緩存空間不足就調用慢寫函數
}
const uint8_t* borrow(uint8_t* buf, uint32_t* len) {//快速路徑借
if (TDB_LIKELY(static_cast<ptrdiff_t>(*len) <= rBound_ - rBase_)) {//判斷是否足夠借的長度
*len = static_cast<uint32_t>(rBound_ - rBase_);
return rBase_;//返回借的基地址
}
return borrowSlow(buf, len);//不足就采用慢路徑借
}
void consume(uint32_t len) {//消費函數
if (TDB_LIKELY(static_cast<ptrdiff_t>(len) <= rBound_ - rBase_)) {//判斷緩存是否夠消費
rBase_ += len;//更新已經消耗的長度
} else {
throw TTransportException(TTransportException::BAD_ARGS,
"consume did not follow a borrow.");//不足拋異常
}
}
protected:
virtual uint32_t readSlow(uint8_t* buf, uint32_t len) = 0;//慢函數
virtual void writeSlow(const uint8_t* buf, uint32_t len) = 0;
virtual const uint8_t* borrowSlow(uint8_t* buf, uint32_t* len) = 0;
TBufferBase()
: rBase_(NULL)
, rBound_(NULL)
, wBase_(NULL)
, wBound_(NULL)
{}//構造函數,把所有的緩存空間設置為NULL
void setReadBuffer(uint8_t* buf, uint32_t len) {//設置讀緩存空間地址
rBase_ = buf;//讀緩存開始地址
rBound_ = buf+len;//讀緩存地址界限
}
void setWriteBuffer(uint8_t* buf, uint32_t len) {//設置寫緩存地址空間
wBase_ = buf;//起
wBound_ = buf+len;//邊界
}
virtual ~TBufferBase() {}
uint8_t* rBase_;//讀從這兒開始
uint8_t* rBound_;//讀界限
uint8_t* wBase_;//寫開始地址
uint8_t* wBound_;//寫界限
};
class TBufferBase : public TVirtualTransport<TBufferBase> {
public:
uint32_t read(uint8_t* buf, uint32_t len) {//讀函數
uint8_t* new_rBase = rBase_ + len;//得到需要讀到的緩存邊界
if (TDB_LIKELY(new_rBase <= rBound_)) {//判斷緩存是否有足夠的數據可讀,采用了分支預測技術
std::memcpy(buf, rBase_, len);//直接內存拷貝
rBase_ = new_rBase;//更新新的緩存讀基地址
return len;//返回讀取的長度
}
return readSlow(buf, len);//如果緩存已經不能夠滿足讀取長度需要就執行慢讀
}
uint32_t readAll(uint8_t* buf, uint32_t len) {
uint8_t* new_rBase = rBase_ + len;//同read函數
if (TDB_LIKELY(new_rBase <= rBound_)) {
std::memcpy(buf, rBase_, len);
rBase_ = new_rBase;
return len;
}
return apache::thrift::transport::readAll(*this, buf, len);//調用父類的
}
void write(const uint8_t* buf, uint32_t len) {//快速寫函數
uint8_t* new_wBase = wBase_ + len;//寫入後的新緩存基地址
if (TDB_LIKELY(new_wBase <= wBound_)) {//判斷緩存是否有足夠的空間可以寫入
std::memcpy(wBase_, buf, len);//內存拷貝
wBase_ = new_wBase;//更新基地址
return;
}
writeSlow(buf, len);//緩存空間不足就調用慢寫函數
}
const uint8_t* borrow(uint8_t* buf, uint32_t* len) {//快速路徑借
if (TDB_LIKELY(static_cast<ptrdiff_t>(*len) <= rBound_ - rBase_)) {//判斷是否足夠借的長度
*len = static_cast<uint32_t>(rBound_ - rBase_);
return rBase_;//返回借的基地址
}
return borrowSlow(buf, len);//不足就采用慢路徑借
}
void consume(uint32_t len) {//消費函數
if (TDB_LIKELY(static_cast<ptrdiff_t>(len) <= rBound_ - rBase_)) {//判斷緩存是否夠消費
rBase_ += len;//更新已經消耗的長度
} else {
throw TTransportException(TTransportException::BAD_ARGS,
"consume did not follow a borrow.");//不足拋異常
}
}
protected:
virtual uint32_t readSlow(uint8_t* buf, uint32_t len) = 0;//慢函數
virtual void writeSlow(const uint8_t* buf, uint32_t len) = 0;
virtual const uint8_t* borrowSlow(uint8_t* buf, uint32_t* len) = 0;
TBufferBase()
: rBase_(NULL)
, rBound_(NULL)
, wBase_(NULL)
, wBound_(NULL)
{}//構造函數,把所有的緩存空間設置為NULL
void setReadBuffer(uint8_t* buf, uint32_t len) {//設置讀緩存空間地址
rBase_ = buf;//讀緩存開始地址
rBound_ = buf+len;//讀緩存地址界限
}
void setWriteBuffer(uint8_t* buf, uint32_t len) {//設置寫緩存地址空間
wBase_ = buf;//起
wBound_ = buf+len;//邊界
}
virtual ~TBufferBase() {}
uint8_t* rBase_;//讀從這兒開始
uint8_t* rBound_;//讀界限
uint8_t* wBase_;//寫開始地址
uint8_t* wBound_;//寫界限
}; 從TBufferBase定義可以看出,它也是從虛擬類繼承,主要采用了memcpy函數來實現緩存的快速讀取,在判斷是否有足夠的緩存空間可以操作時采用了分支預測技術來提供代碼的執行效率,且所有快路徑函數都是非虛擬的、內聯的小代碼量函數。下面繼續看看一個具體實現緩存基類的一個子類的情況!
TBufferedTransport
緩存傳輸類是從緩存基類繼承而來,它對於讀:實際讀數據的大小比實際請求的大很多,多余的數據將為將來超過本地緩存的數據服務;對於寫:數據在它被發送出去以前將被先寫入內存緩存。
緩存的大小默認是512字節(代碼:static const int DEFAULT_BUFFER_SIZE = 512;),提供多個構造函數,可以只指定一個傳輸類(另一層次的)、也可以指定讀寫緩存公用的大小或者分別指定。因為它是一個可以實際使用的緩存類,所以需要實現慢讀和慢寫功能的函數。它還實現了打開函數open、關閉函數close、刷新函數flush等,判斷是否有數據處於未決狀態函數peek定義和實現如下:
[cpp]
bool peek() {
if (rBase_ == rBound_) {//判斷讀的基地址與讀邊界是否重合了,也就是已經讀取完畢
setReadBuffer(rBuf_.get(), transport_->read(rBuf_.get(), rBufSize_));//是:重新讀取底層來的數據
}
return (rBound_ > rBase_);//邊界大於基地址就是有未決狀態數據
}
下面繼續看看慢讀函數和慢寫函數的實現細節(快讀和快寫繼承基類的:也就是默認的讀寫都是直接從緩存中讀取,所謂的快讀和快寫)。慢讀函數實現如下(詳細注釋):
uint32_t TBufferedTransport::readSlow(uint8_t* buf, uint32_t len) {
uint32_t have = rBound_ - rBase_;//計算還有多少數據在緩存中
// 如果讀取緩存中已經存在的數據不能滿足我們,
// 我們(也僅僅在這種情況下)應該才從慢路徑讀數據。
assert(have < len);
// 如果我們有一些數據在緩存,拷貝出來並返回它
// 我們不得不返回它而去嘗試讀更多的數據,因為我們不能保證
// 下層傳輸實際有更多的數據, 因此嘗試阻塞式讀取它。
if (have > 0) {
memcpy(buf, rBase_, have);//拷貝數據
setReadBuffer(rBuf_.get(), 0);//設置讀緩存,基類實現該函數
return have;//返回緩存中已經存在的不完整數據
}
// 在我們的緩存中沒有更多的數據可用。從下層傳輸得到更多以達到buffer的大小。
// 注意如果len小於rBufSize_可能會產生多種場景否則幾乎是沒有意義的。
setReadBuffer(rBuf_.get(), transport_->read(rBuf_.get(), rBufSize_));//讀取數據並設置讀緩存
// 處理我們已有的數據
uint32_t give = std::min(len, static_cast<uint32_t>(rBound_ - rBase_));
memcpy(buf, rBase_, give);
rBase_ += give;
return give;
}
bool peek() {
if (rBase_ == rBound_) {//判斷讀的基地址與讀邊界是否重合了,也就是已經讀取完畢
setReadBuffer(rBuf_.get(), transport_->read(rBuf_.get(), rBufSize_));//是:重新讀取底層來的數據
}
return (rBound_ > rBase_);//邊界大於基地址就是有未決狀態數據
}
下面繼續看看慢讀函數和慢寫函數的實現細節(快讀和快寫繼承基類的:也就是默認的讀寫都是直接從緩存中讀取,所謂的快讀和快寫)。慢讀函數實現如下(詳細注釋):
uint32_t TBufferedTransport::readSlow(uint8_t* buf, uint32_t len) {
uint32_t have = rBound_ - rBase_;//計算還有多少數據在緩存中
// 如果讀取緩存中已經存在的數據不能滿足我們,
// 我們(也僅僅在這種情況下)應該才從慢路徑讀數據。
assert(have < len);
// 如果我們有一些數據在緩存,拷貝出來並返回它
// 我們不得不返回它而去嘗試讀更多的數據,因為我們不能保證
// 下層傳輸實際有更多的數據, 因此嘗試阻塞式讀取它。
if (have > 0) {
memcpy(buf, rBase_, have);//拷貝數據
setReadBuffer(rBuf_.get(), 0);//設置讀緩存,基類實現該函數
return have;//返回緩存中已經存在的不完整數據
}
// 在我們的緩存中沒有更多的數據可用。從下層傳輸得到更多以達到buffer的大小。
// 注意如果len小於rBufSize_可能會產生多種場景否則幾乎是沒有意義的。
setReadBuffer(rBuf_.get(), transport_->read(rBuf_.get(), rBufSize_));//讀取數據並設置讀緩存
// 處理我們已有的數據
uint32_t give = std::min(len, static_cast<uint32_t>(rBound_ - rBase_));
memcpy(buf, rBase_, give);
rBase_ += give;
return give;
} 慢讀函數主要考慮的問題就是緩存中還有一部分數據,但是不夠我們需要讀取的長度;還有比較麻煩的情況是雖然現在緩存中沒有數據,但是我們從下層傳輸去讀,讀取的長度可能大於、小於或等於我們需要讀取的長度,所以需要考慮各種情況。下面繼續分析慢寫函數實現細節:
[cpp]
void TBufferedTransport::writeSlow(const uint8_t* buf, uint32_t len) {
uint32_t have_bytes = wBase_ - wBuf_.get();//計算寫緩存區中已有的字節數
uint32_t space = wBound_ - wBase_;//計算剩余寫緩存空間
// 如果在緩存區的空閒空間不能容納我們的數據,我們采用慢路徑寫(僅僅)
assert(wBound_ - wBase_ < static_cast<ptrdiff_t>(len));
//已有數據加上需要寫入的數據是否大於2倍寫緩存區或者緩存區為空
if ((have_bytes + len >= 2*wBufSize_) || (have_bytes == 0)) {
if (have_bytes > 0) {//緩存大於0且加上需要再寫入數據的長度大於2倍緩存區
transport_->write(wBuf_.get(), have_bytes);//先將已有數據寫入下層傳輸
}
transport_->write(buf, len);//寫入這次的len長度的數據
wBase_ = wBuf_.get();//重新得到寫緩存的基地址
return;
}
memcpy(wBase_, buf, space);//填充我們的內部緩存區為了寫
buf += space;
len -= space;
transport_->write(wBuf_.get(), wBufSize_);//寫入下層傳輸
assert(len < wBufSize_);
memcpy(wBuf_.get(), buf, len);//拷貝剩余的數據到我們的緩存
wBase_ = wBuf_.get() + len;//重新得到寫緩存基地址
return;
}
void TBufferedTransport::writeSlow(const uint8_t* buf, uint32_t len) {
uint32_t have_bytes = wBase_ - wBuf_.get();//計算寫緩存區中已有的字節數
uint32_t space = wBound_ - wBase_;//計算剩余寫緩存空間
// 如果在緩存區的空閒空間不能容納我們的數據,我們采用慢路徑寫(僅僅)
assert(wBound_ - wBase_ < static_cast<ptrdiff_t>(len));
//已有數據加上需要寫入的數據是否大於2倍寫緩存區或者緩存區為空
if ((have_bytes + len >= 2*wBufSize_) || (have_bytes == 0)) {
if (have_bytes > 0) {//緩存大於0且加上需要再寫入數據的長度大於2倍緩存區
transport_->write(wBuf_.get(), have_bytes);//先將已有數據寫入下層傳輸
}
transport_->write(buf, len);//寫入這次的len長度的數據
wBase_ = wBuf_.get();//重新得到寫緩存的基地址
return;
}
memcpy(wBase_, buf, space);//填充我們的內部緩存區為了寫
buf += space;
len -= space;
transport_->write(wBuf_.get(), wBufSize_);//寫入下層傳輸
assert(len < wBufSize_);
memcpy(wBuf_.get(), buf, len);//拷貝剩余的數據到我們的緩存
wBase_ = wBuf_.get() + len;//重新得到寫緩存基地址
return;
} 慢寫函數也有棘手的問題,就是我們應該拷貝我們的數據到我們的內部緩存並且從那兒發送出去,或者我們應該僅僅用一次系統調用把當前內部寫緩存區的內容寫出去,然後再用一次系統調用把我們當前需要寫入長度為len的數據再次寫入出去。如果當前緩存區的數據加上我們這次需要寫入數據的長度至少是我們緩存區長度的兩倍,我們將不得不至少調用兩次系統調用(緩存區為空時有可能例外),那麼我們就不拷貝了。否則我們就是按順序遞加的。具體實現分情況處理,最後我們在看看慢借函數的實現,借相關函數主要是為了實現可變長度編碼。慢借函數實現細節如下:
[cpp]
const uint8_t* TBufferedTransport::borrowSlow(uint8_t* buf, uint32_t* len) {
(void) buf;
(void) len;
return NULL;//默認返回空
}
const uint8_t* TBufferedTransport::borrowSlow(uint8_t* buf, uint32_t* len) {
(void) buf;
(void) len;
return NULL;//默認返回空
}
在這個類我們可以看出,它什麼也沒有做,只是簡單的返回NULL,所以需要阻塞去借。按照官方的說法,下面兩種行為應該當前的版本中實現,在將來的版本可能會發生改變:
如果需要借的長度最多為緩存區的長度,那麼永遠不會返回NULL。依賴底層傳輸,它應該拋出一個異常或者永遠不會掛掉;
一些借用請求可能內部字節拷貝,如果借用的長度最多是緩存區的一半,那麼不去內部拷貝。為了優化性能保存這個限制。