html">在之前的一篇文章中,介紹了mongos的啟動流程,在那篇文章的結尾,介紹了mongos使用balancer來進行均衡,今天就繼續講其實現方式。
首先我們看一下Balancer及相關實現策略的類圖:
可以看到Balancer類裡包含一個BalancerPolicy,其指向一個均衡策略,該策略會實現查找並收集要遷移的chunk。
這裡先看一下Balancer的類定義,如下:
//balace.h
class Balancer : public BackgroundJob {
public:
Balancer();
virtual ~Balancer();
// BackgroundJob methods
virtual void run();
virtual string name() const { return "Balancer"; }
private:
typedef BalancerPolicy::ChunkInfo CandidateChunk;
typedef shared_ptr<CandidateChunk> CandidateChunkPtr;
//mongos名稱(hostname:port)
string _myid;
// Balancer 啟動時間
time_t _started;
// 前移的chunks數量
int _balancedLastTime;
// 均衡策略(確定要遷移的chunks)
BalancerPolicy* _policy;
//初始化,檢查balancer 能否鏈接到servers.該方法可能拋出網絡異常
bool _init();
/**
* 收集關於shards及chunks的信息,以及可能需要遷移的chunks
* @param conn: 指向config server(s)連接
* @param candidateChunks (IN/OUT): 可能需要遷移的chunks
*/
void _doBalanceRound( DBClientBase& conn, vector<CandidateChunkPtr>* candidateChunks );
/**
* 逐個遷移chunk.並返回最終遷移的chunk數量
* @param candidateChunks 可能需要遷移的chunks
* @return number of chunks effectively moved
*/
int _moveChunks( const vector<CandidateChunkPtr>* candidateChunks );
/*在config server(s)中標記並前balancer為活動狀態.*/
void _ping( DBClientBase& conn );
//當configdb中的所有服務均可用時,返回true
bool _checkOIDs();
};
可以看出balancer繼承自BackgroundJob,所以它是以後台方式運行的。了解了該類的方法和屬性之後,下面我們著手看一下mongos主函數中啟動balancer.go()的調用流程。因為balancer繼承自BackgroundJob,所以還要看一下BackgroundJob裡go()方法的執行代碼, 如下:
//background.cpp 線程方式運行下面的jobBody方法
BackgroundJob& BackgroundJob::go() {
boost::thread t( boost::bind( &BackgroundJob::jobBody , this, _status ) );
return *this;
}
////background.cpp. Background object can be only be destroyed after jobBody() ran
void BackgroundJob::jobBody( boost::shared_ptr<JobStatus> status ) {
....
const string threadName = name();
if( ! threadName.empty() )
setThreadName( threadName.c_str() );
try {
run();//到這裡,mongos開始執行子類balancer中的run方法
}
....
if( status->deleteSelf )
delete this;
}
上面代碼最終會將執行流程轉到balancer類的run()方法,如下
void Balancer::run() {
/* this is the body of a BackgroundJob so if we throw
here were basically ending the balancer thread prematurely */
while ( ! inShutdown() ) {
if ( ! _init() ) {//檢查balancer是否鏈到config server和其它shard上
log() << "will retry to initialize balancer in one minute" << endl;
sleepsecs( 60 );
continue;
}
break;
}
//構造鏈接串信息
ConnectionString config = configServer.getConnectionString();
//聲明分布式鎖
DistributedLock balanceLock( config , "balancer" );
while ( ! inShutdown() ) {//一直循環直到程序中斷或關閉
try {
// 判斷chunk均衡功能是否有效
if ( ! grid.shouldBalance() ) {
log(1) << "skipping balancing round because balancing is disabled" << endl;
sleepsecs( 30 );
continue;
}
&nbs