程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
 程式師世界 >> 編程語言 >> C語言 >> C++ >> 關於C++ >> 分布式基礎學習【二】 —— 分布式計算系統(Map/Reduce)

分布式基礎學習【二】 —— 分布式計算系統(Map/Reduce)

編輯:關於C++

二. 分布式計算(Map/Reduce)

分布式式計算,同樣是一個寬泛的概念,在這裡,它狹義的指代,按Google Map/Reduce 框架所設計的分布式框架。在Hadoop中,分布式文件系統,很大程度上,是為各種分布式計 算需求所服務的。我們說分布式文件系統就是加了分布式的文件系統,類似的定義推廣到分 布式計算上,我們可以將其視為增加了分布式支持的計算函數。從計算的角度上看, Map/Reduce框架接受各種格式的鍵值對文件作為輸入,讀取計算後,最終生成自定義格式的 輸出文件。而從分布式的角度上看,分布式計算的輸入文件往往規模巨大,且分布在多個機 器上,單機計算完全不可支撐且效率低下,因此Map/Reduce框架需要提供一套機制,將此計 算擴展到無限規模的機器集群上進行。依照這樣的定義,我們對整個Map/Reduce的理解,也 可以分別沿著這兩個流程去看。。。

在Map/Reduce框架中,每一次計算請求,被稱為作業。在分布式計算Map/Reduce框架中, 為了完成這個作業,它進行兩步走的戰略,首先是將其拆分成若干個Map任務,分配到不同的 機器上去執行,每一個Map任務拿輸入文件的一部分作為自己的輸入,經過一些計算,生成某 種格式的中間文件,這種格式,與最終所需的文件格式完全一致,但是僅僅包含一部分數據 。因此,等到所有Map任務完成後,它會進入下一個步驟,用以合並這些中間文件獲得最後的 輸出文件。此時,系統會生成若干個Reduce任務,同樣也是分配到不同的機器去執行,它的 目標,就是將若干個Map任務生成的中間文件為匯總到最後的輸出文件中去。當然,這個匯總 不總會像1 + 1 = 2那麼直接了當,這也就是Reduce任務的價值所在。經過如上步驟,最終, 作業完成,所需的目標文件生成。整個算法的關鍵,就在於增加了一個中間文件生成的流程 ,大大提高了靈活性,使其分布式擴展性得到了保證。。。

I. 術語對照

和分布式文件系統一樣,Google、Hadoop和....我,各執一種方式表述統一概念,為了保 證其統一性,特有下表。。。

文中翻譯 Hadoop術語 Google術語 相關解釋 作業 Job Job 用戶的每一個計算請求,就稱為一個作業。 作業服務器 JobTracker Master 用戶提交作業的服務器,同時,它還負責各個作業任務的分 配,管理所有的任務服務器。 任務服務器 TaskTracker Worker 任勞任怨的工蜂,負責執行具體的任務。 任務 Task Task 每一個作業,都需要拆分開了,交由多個服務器來完成,拆 分出來的執行單位,就稱為任務。 備份任務 Speculative Task Buckup Task 每一個任務,都有可能執行失敗或者緩慢,為了降低為此付 出的代價,系統會未雨綢缪的實現在另外的任務服務器上執行同樣一個任務,這就是備份任 務。

II. 基本架構

與分布式文件系統類似,Map/Reduce的集群,也由三類服務器構成。其中作業服務器,在 Hadoop中稱為Job Tracker,在Google論文中稱為Master。前者告訴我們,作業服務器是負責 管理運行在此框架下所有作業的,後者告訴我們,它也是為各個作業分配任務的核心。與 HDFS的主控服務器類似,它也是作為單點存在的,簡化了負責的同步流程。具體的負責執行 用戶定義操作的,是任務服務器,每一個作業被拆分成很多的任務,包括Map任務和Reduce任 務等,任務是具體執行的基本單元,它們都需要分配到合適任務服務器上去執行,任務服務 器一邊執行一邊向作業服務器匯報各個任務的狀態,以此來幫助作業服務器了解作業執行的 整體情況,分配新的任務等等。。。

除了作業的管理者執行者,還需要有一個任務的提交者,這就是客戶端。與分布式文件系 統一樣,客戶端也不是一個單獨的進程,而是一組API,用戶需要自定義好自己需要的內容, 經由客戶端相關的代碼,將作業及其相關內容和配置,提交到作業服務器去,並時刻監控執 行的狀況。。。

同作為Hadoop的實現,與HDFS的通信機制相同,Hadoop Map/Reduce也是用了協議接口來 進行服務器間的交流。實現者作為RPC服務器,調用者經由RPC的代理進行調用,如此,完成 大部分的通信,具體服務器的架構,和其中運行的各個協議狀況,參見下圖。從圖中可以看 到,與HDFS相比,相關的協議少了幾個,客戶端與任務服務器,任務服務器之間,都不再有 直接通信關系。這並不意味著客戶端就不需要了解具體任務的執行狀況,也不意味著,任務 服務器之間不需要了解別家任務執行的情形,只不過,由於整個集群各機器的聯系比HDFS復 雜的多,直接通信過於的難以維系,所以,都統一由作業服務器整理轉發。另外,從這幅圖 可以看到,任務服務器不是一個人在戰斗,它會像孫悟空一樣招出一群寶寶幫助其具體執行 任務。這樣做的好處,個人覺得,應該有安全性方面的考慮,畢竟,任務的代碼是用戶提交 的,數據也是用戶指定的,這質量自然良莠不齊,萬一碰上個搞破壞的,把整個任務服務器 進程搞死了,就因小失大了。因此,放在單獨的地盤進行,愛咋咋地,也算是權責明確了。 。。

與分布式文件系統相比,Map/Reduce框架的還有一個特點,就是可定制性強。文件系統中 很多的算法,都是很固定和直觀的,不會由於所存儲的內容不同而有太多的變化。而作為通 用的計算框架,需要面對的問題則要復雜很多,在各種不同的問題、不同的輸入、不同的需 求之間,很難有一種包治百病的藥能夠一招鮮吃遍天。作為Map/Reduce框架而言,一方面要 盡可能的抽取出公共的一些需求,實現出來。更重要的,是需要提供良好的可擴展機制,滿 足用戶自定義各種算法的需求。Hadoop是由Java來實現的,因此通過反射來實現自定義的擴 展,顯得比較小菜一碟了。在JobConf類中,定義了大量的接口,這基本上是Hadoop Map/Reduce框架所有可定制內容的一次集中展示。在JobConf中,有大量set接口接受一個 Class<? extends xxx>的參數,通常它都有一個默認實現的類,用戶如果不滿意,則 可自定義實現。。。

III. 計算流程

如果一切都按部就班的進行,那麼整個作業的計算流程,應該是作業的提交 -> Map任 務的分配和執行 -> Reduce任務的分配和執行 -> 作業的完成。而在每個任務的執行 中,又包含輸入的准備 -> 算法的執行 -> 輸出的生成,三個子步驟。沿著這個流程 ,我們可以很快的整理清晰整個Map/Reduce框架下作業的執行。。。

1、作業的提交

一個作業,在提交之前,需要把所有應該配置的東西都配置好,因為一旦提交到了作業服 務器上,就陷入了完全自動化的流程,用戶除了觀望,最多也就能起一個監督作用,懲治一 些不好好工作的任務。。。

基本上,用戶在提交代碼階段,需要做的工作主要是這樣的:

首先,書寫好所有自定的代碼,最起碼,需要有Map和Reduce的執行代碼。在Hadoop中, Map需要派生自Mapper<K1, V1, K2, V2>接口,Reduce需要派生自Reducer<K2, V2, K3, V3>接口。這裡都是用的泛型,用以支持不同的鍵值類型。這兩個接口都僅有一個方 法,一個是map,一個是reduce,這兩個方法都直接受四個參數,前兩個是輸入的鍵和值相關 的數據結構,第三個是作為輸出相關的數據結構,最後一個,是一個Reporter類的實例,實 現的時候可以利用它來統計一些計數。除了這兩個接口,還有大量可以派生的接口,比如分 割的Partitioner<K2, V2>接口。。。

然後,需要書寫好主函數的代碼,其中最主要的內容就是實例化一個JobConf類的對象, 然後調用其豐富的setXXX接口,設定好所需的內容,包括輸入輸出的文件路徑,Map和Reduce 的類,甚至包括讀取寫入文件所需的格式支持類,等等。。。

最後,調用JobClient的runJob方法,提交此JobConf對象。runJob方法會先行調用到 JobSubmissionProtocol接口所定義的submitJob方法,將此作業,提交給作業服務器。接著 ,runJob開始循環,不停的調用JobSubmissionProtocol的getTaskCompletionEvents方法, 獲得TaskCompletionEvent類的對象實例,了解此作業各任務的執行狀況。。。

2、Map任務的分配

當一個作業提交到了作業服務器上,作業服務器會生成若干個Map任務,每一個Map任務, 負責將一部分的輸入轉換成格式與最終格式相同的中間文件。通常一個作業的輸入都是基於 分布式文件系統的文件(當然在單機環境下,文件系統單機的也可以...),因為,它可以很 天然的和分布式的計算產生聯系。而對於一個Map任務而言,它的輸入往往是輸入文件的一個 數據塊,或者是數據塊的一部分,但通常,不跨數據塊。因為,一旦跨了數據塊,就可能涉 及到多個服務器,帶來了不必要的復雜性。。。

當一個作業,從客戶端提交到了作業服務器上,作業服務器會生成一個JobInProgress對 象,作為與之對應的標識,用於管理。作業被拆分成若干個Map任務後,會預先掛在作業服務 器上的任務服務器拓撲樹。這是依照分布式文件數據塊的位置來劃分的,比如一個Map任務需 要用某個數據塊,這個數據塊有三份備份,那麼,在這三台服務器上都會掛上此任務,可以 視為是一個預分配。。。

關於任務管理和分配的大部分的真實功能和邏輯的實現,JobInProgress則依托 JobInProgressListener和TaskScheduler的子類。TaskScheduler,顧名思義是用於任務分配 的策略類(為了簡化描述,用它代指所有TaskScheduler的子類...)。它會掌握好所有作業 的任務信息,其assignTasks函數,接受一個TaskTrackerStatus作為參數,依照此任務服務 器的狀態和現有的任務狀況,為其分配新的任務。而為了掌握所有作業相關任務的狀況, TaskScheduler會將若干個JobInProgressListener注冊到JobTracker中去,當有新的作業到 達、移除或更新的時候,JobTracker會告知給所有的JobInProgressListener,以便它們做出 相應的處理。。。

任務分配是一個重要的環節,所謂任務分配,就是將合適作業的合適任務分配到合適的服 務器上。不難看出,裡面蘊含了兩個步驟,先是選擇作業,然後是在此作業中選擇任務。和 所有分配工作一樣,任務分配也是一個復雜的活。不良好的任務分配,可能會導致網絡流量 增加、某些任務服務器負載過重效率下降,等等。不僅如此,任務分配還是一個無一致模式 的問題,不同的業務背景,可能需要不同的算法才能滿足需求。因此,在Hadoop中,有很多 TaskScheduler的子類,像Facebook,Yahoo,都為其貢獻出了自家用的算法。在Hadoop中, 默認的任務分配器,是JobQueueTaskScheduler類。它選擇作業的基本次序是:Map Clean Up Task(Map任務服務器的清理任務,用於清理相關的過期的文件和環境...) -> Map Setup Task(Map任務服務器的安裝任務,負責配置好相關的環境...) -> Map Tasks - > Reduce Clean Up Task -> Reduce Setup Task -> Reduce Tasks。在這個前提 下,具體到Map任務的分配上來。當一個任務服務器工作的游刃有余,期待獲得新的任務的時 候,JobQueueTaskScheduler會按照各個作業的優先級,從最高優先級的作業開始分配。每分 配一個,還會為其留出余量,已被不時之需。舉一個例子:系統目前有優先級3、2、1的三個 作業,每個作業都有一個可分配的Map任務,一個任務服務器來申請新的任務,它還有能力承 載3個任務的執行,JobQueueTaskScheduler會先從優先級3的作業上取一個任務分配給它,然 後再留出一個1任務的余量。此時,系統只能在將優先級2作業的任務分配給此服務器,而不 能分配優先級1的任務。這樣的策略,基本思路就是一切為高優先級的作業服務,優先分配不 說,分配了好保留有余力以備不時之需,如此優待,足以讓高優先級的作業喜極而泣,讓低 優先級的作業感慨既生瑜何生亮,甚至是活活餓死。。。

確定了從哪個作業提取任務後,具體的分配算法,經過一系列的調用,最後實際是由 JobInProgress的findNewMapTask函數完成的。它的算法很簡單,就是盡全力為此服務器非配 且盡可能好的分配任務,也就是說,只要還有可分配的任務,就一定會分給它,而不考慮後 來者。作業服務器會從離它最近的服務器開始,看上面是否還掛著未分配的任務(預分配上 的),從近到遠,如果所有的任務都分配了,那麼看有沒有開啟多次執行,如果開啟,考慮 把未完成的任務再分配一次(後面有地方詳述...)。。。

對於作業服務器來說,把一個任務分配出去了,並不意味著它就徹底解放,可以對此任務 可以不管不顧了。因為任務可以在任務服務器上執行失敗,可能執行緩慢,這都需要作業服 務器幫助它們再來一次。因此在Task中,記錄有一個TaskAttemptID,對於任務服務器而言, 它們每次跑的,其實都只是一個Attempt而已,Reduce任務只需要采信一個的輸出,其他都算 白忙乎了。。。

3、Map任務的執行

與HDFS類似,任務服務器是通過心跳消息,向作業服務器匯報此時此刻其上各個任務執行 的狀況,並向作業服務器申請新的任務的。具體實現,是TaskTracker調用 InterTrackerProtocol協議的heartbeat方法來做的。這個方法接受一個TaskTrackerStatus 對象作為參數,它描述了此時此任務服務器的狀態。當其有余力接受新的任務的時候,它還 會傳入acceptNewTasks為true的參數,表示希望作業服務器委以重任。JobTracker接收到相 關的參數後,經過處理,會返回一個HeartbeatResponse對象。這個對象中,定義了一組 TaskTrackerAction,用於指導任務服務器進行下一步的工作。系統中已定義的了一堆其 TaskTrackerAction的子類,有的對攜帶的參數進行了擴充,有的只是標明了下ID,具體不詳 寫了,一看便知。。。

當TaskTracker收到的TaskTrackerAction中,包含了LaunchTaskAction,它會開始執行所 分配的新的任務。在TaskTracker中,有一個TaskTracker.TaskLauncher線程(確切的說是兩 個,一個等Map任務,一個等Reduce任務),它們在癡癡的守候著新任務的來到。一旦等到了 ,會最終調用到Task的createRunner方法,構造出一個TaskRunner對象,新建一個線程來執 行。對於一個Map任務,它對應的Runner是TaskRunner的子類MapTaskRunner,不過,核心部 分都在TaskRunner的實現內。TaskRunner會先將所需的文件全部下載並拆包好,並記錄到一 個全局緩存中,這是一個全局的目錄,可以供所有此作業的所有任務使用。它會用一些軟鏈 接,將一些文件名鏈接到這個緩存中來。然後,根據不同的參數,配置出一個JVM執行的環境 ,這個環境與JvmEnv類的對象對應。

接著,TaskRunner會調用JvmManager的launchJvm方法,提交給JvmManager處理。 JvmManager用於管理該TaskTracker上所有運行的Task子進程。在目前的實現中,嘗試的是池 化的方式。有若干個固定的槽,如果槽沒有滿,那麼就啟動新的子進程,否則,就尋找idle 的進程,如果是同Job的直接放進去,否則殺死這個進程,用一個新的進程代替。每一個進程 都是由JvmRunner來管理的,它也是位於單獨線程中的。但是從實現上看,這個機制好像沒有 部署開,子進程是死循環等待,而不會阻塞在父進程的相關線程上,父線程的變量一直都沒 有個調整,一旦分配,始終都處在繁忙的狀況了。

真實的執行載體,是Child,它包含一個main函數,進程執行,會將相關參數傳進來,它 會拆解這些參數,並且構造出相關的Task實例,調用其run函數進行執行。每一個子進程,可 以執行指定個數量的Task,這就是上面所說的池化的配置。但是,這套機制在我看來,並沒 有運行起來,每個進程其實都沒有機會不死而執行新的任務,只是傻傻的等待進程池滿,而 被一刀斃命。也許是我老眼昏花,沒看出其中實現的端倪。。。

4、Reduce任務的分配與執行

比之Map任務,Reduce的分配及其簡單,基本上是所有Map任務完成了,有空閒的任務服務 器,來了就給分配一個Job任務。因為Map任務的結果星羅棋布,且變化多端,真要搞一個全 局優化的算法,絕對是得不償失。而Reduce任務的執行進程的構造和分配流程,與Map基本完 全的一致,沒有啥可說的了。。。

但其實,Reduce任務與Map任務的最大不同,是Map任務的文件都在本地隔著,而Reduce任 務需要到處采集。這個流程是作業服務器經由此Reduce任務所處的任務服務器,告訴Reduce 任務正在執行的進程,它需要的Map任務執行過的服務器地址,此Reduce任務服務器會於原 Map任務服務器聯系(當然本地就免了...),通過FTP服務,下載過來。這個隱含的直接數據 聯系,就是執行Reduce任務與執行Map任務最大的不同了。。。

5、作業的完成

當所有Reduce任務都完成了,所需數據都寫到了分布式文件系統上,整個作業才正式完成 了。此中,涉及到很多的類,很多的文件,很多的服務器,所以說起來很費勁,話說,一圖 解千語,說了那麼多,我還是畫兩幅圖,徹底表達一下吧。。。

首先,是一個時序圖。它模擬了一個由3個Map任務和1個Reduce任務構成的作業執行流程 。我們可以看到,在執行的過程中,只要有人太慢,或者失敗,就會增加一次嘗試,以此換 取最快的執行總時間。一旦所有Map任務完成,Reduce開始運作(其實,不一定要這樣的... ),對於每一個Map任務來說,只有執行到Reduce任務把它上面的數據下載完成,才算成功, 否則,都是失敗,需要重新進行嘗試。。。

而第二副圖,不是我畫的,就不轉載了,參見這裡,它描述了整個Map/Reduce的服務器狀 況圖,包括整體流程、所處服務器進程、輸入輸出等,看清楚這幅圖,對Map/Reduce的基本 流程應該能完全跑通了。有這幾點,可能圖中描述的不夠清晰需要提及一下,一個是在HDFS 中,其實還有日志文件,圖中沒有標明;另一個是步驟5,其實是由TaskTracker主動去拉取 而不是JobTracker推送過來的;還有步驟8和步驟11,創建出來的MapTask和ReduceTask,在 Hadoop中都是運行在獨立的進程上的。。。

IV. Map任務詳請

從上面,可以了解到整個Map和Reduce任務的整體流程,而後面要啰嗦的,是具體執行中 的細節。Map任務的輸入,是分布式文件系統上的,包含鍵值對信息的文件。為了給每一個 Map任務指定輸入,我們需要掌握文件格式把它分切成塊,並從每一塊中分離出鍵值信息。在 HDFS中,輸入的文件格式,是由InputFormat<K, V>類來表示的,在JobConf中,它的 默認值是TextInputFormat類(見getInputFormat),此類是特化的 FileInputFormat<LongWritable, Text>子類,而FileInputFormat<K, V>正是 InputFormat<K, V>的子類。通過這樣的關系我們可以很容易的理解,默認的文件格式 是文本文件,且鍵是LongWritable類型(整形數),值是Text類型(字符串)。僅僅知道文 件類型是不夠的,我們還需要將文件中的每一條數據,分離成鍵值對,這個工作,是 RecordReader<K, V>來做的。在TextInputFormat的getRecordReader方法中我們可以 看到,與TextInputFormat默認配套使用的,是LineRecordReader類,是特化的 RecordReader<LongWritable, Text>的子類,它將每一行作為一個記錄,起始的位置 作為鍵,整行的字符串作為值。有了格式,分出了鍵值,還需要切開分給每一個Map任務。每 一個Map任務的輸入用InputSplit接口表示,對於一個文件輸入而言,其實現是FileSplit, 它包含著文件名、起始位置、長度和存儲它的一組服務器地址。。。

當Map任務拿到所屬的InputSplit後,就開始一條條讀取記錄,並調用用於定義的Mapper ,進行計算(參見MapRunner<K1, V1, K2, V2>和MapTask的run方法),然後,輸出。 MapTask會傳遞給Mapper一個OutputCollector<K, V>對象,作為輸出的數據結構。它 定義了一個collect的函數,接受一個鍵值對。在MapTask中,定義了兩個OutputCollector的 子類,一個是MapTask.DirectMapOutputCollector<K, V>,人如其名,它的實現確實 很Direct,直截了當。它會利用一個RecordWriter<K, V>對象,collect一調用,就直 接調用RecordWriter<K, V>的write方法,寫入本地的文件中去。如果覺著 RecordWriter<K, V>出現的很突兀,那麼看看上一段提到的RecordReader<K, V>,基本上,數據結構都是對應著的,一個是輸入一個是輸出。輸出很對稱也是由 RecordWriter<K, V>和OutputFormat<K, V>來協同完成的,其默認實現是 LineRecordWriter<K, V>和TextOutputFormat<K, V>,多麼的眼熟啊。。。

除了這個非常直接的實現之外,MapTask中還有一個復雜的多的實現,是 MapTask.MapOutputBuffer<K extends Object, V extends Object>。有道是簡單壓倒 一切,那為什麼有很簡單的實現,要琢磨一個復雜的呢。原因在於,看上去很美的往往帶著 刺,簡單的輸出實現,每調用一次collect就寫一次文件,頻繁的硬盤操作很有可能導致此方 案的低效。為了解決這個問題,這就有了這個復雜版本,它先開好一段內存做緩存,然後制 定一個比例做阈值,開一個線程監控此緩存。collect來的內容,先寫到緩存中,當監控線程 發現緩存的內容比例超過阈值,掛起所有寫入操作,建一個新的文件,把緩存的內容批量刷 到此文件中去,清空緩存,重新開放,接受繼續collect。。。

為什麼說是刷到文件中去呢。因為這不是一個簡單的照本宣科簡單復制的過程,在寫入之 前,會先將緩存中的內存,經過排序、合並器(Combiner)統計之後,才會寫入。如果你覺 得Combiner這個名詞聽著太陌生,那麼考慮一下Reducer,Combiner也就是一個Reducer類, 通過JobConf的setCombinerClass進行設置,在常用的配置中,Combiner往往就是用用戶為 Reduce任務定義的那個Reducer子類。只不過,Combiner只是服務的范圍更小一些而已,它在 Map任務執行的服務器本地,依照Map處理過的那一小部分數據,先做一次Reduce操作,這樣 ,可以壓縮需要傳輸內容的大小,提高速度。每一次刷緩存,都會開一個新的文件,等此任 務所有的輸入都處理完成後,就有了若干個有序的、經過合並的輸出文件。系統會將這些文 件搞在一起,再做一個多路的歸並外排,同時使用合並器進行合並,最終,得到了唯一的、 有序的、經過合並的中間文件(注:文件數量等同於分類數量,在不考慮分類的時候,簡單 的視為一個...)。它,就是Reduce任務夢寐以求的輸入文件。。。

除了做合並,復雜版本的OutputCollector,還具有分類的功能。分類,是通過 Partitioner<K2, V2>來定義的,默認實現是HashPartitioner<K2, V2>,作業 提交者可以通過JobConf的setPartitionerClass來自定義。分類的含義是什麼呢,簡單的說 ,就是將Map任務的輸出,劃分到若干個文件中(通常與Reduce任務數目相等),使得每一個 Reduce任務,可以處理某一類文件。這樣的好處是大大的,舉一個例子說明一下。比如有一 個作業是進行單詞統計的,其Map任務的中間結果應該是以單詞為鍵,以單詞數量為值的文件 。如果這時候只有一個Reduce任務,那還好說,從全部的Map任務那裡收集文件過來,分別統 計得到最後的輸出文件就好。但是,如果單Reduce任務無法承載此負載或效率太低,就需要 多個Reduce任務並行執行。此時,再沿用之前的模式就有了問題。每個Reduce任務從一部分 Map任務那裡獲得輸入文件,但最終的輸出結果並不正確,因為同一個單詞可能在不同的 Reduce任務那裡都有統計,需要想方法把它們統計在一起才能獲得最後結果,這樣就沒有將 Map/Reduce的作用完全發揮出來。這時候,就需要用到分類。如果此時有兩個Reduce任務, 那麼將輸出分成兩類,一類存放字母表排序較高的單詞,一類存放字母表排序低的單詞,每 一個Reduce任務從所有的Map任務那裡獲取一類的中間文件,得到自己的輸出結果。最終的結 果,只需要把各個Reduce任務輸出的,拼接在一起就可以了。本質上,這就是將Reduce任務 的輸入,由垂直分割,變成了水平分割。Partitioner的作用,正是接受一個鍵值,返回一個 分類的序號。它會在從緩存刷到文件之前做這個工作,其實只是多了一個文件名的選擇而已 ,別的邏輯都不需要變化。。。

除了緩存、合並、分類等附加工作之外,復雜版本的OutputCollector還支持錯誤數據的 跳過功能,在後面分布式將排錯的時候,還會提及,標記一下,按下不表。。。

V. Reduce任務詳情

理論上看,Reduce任務的整個執行流程要比Map任務更為的羅嗦一些,因為,它需要收集 輸入文件,然後才能進行處理。Reduce任務,主要有這麼三個步驟:Copy、Sort、Reduce( 參見ReduceTask的run方法)。所謂Copy,就是從執行各個Map任務的服務器那裡,收羅到本 地來。拷貝的任務,是由ReduceTask.ReduceCopier類來負責,它有一個內嵌類,叫 MapOutputCopier,它會在一個單獨的線程內,負責某個Map任務服務器上文件的拷貝工作。 遠程拷貝過來的內容(當然也可以是本地了...),作為MapOutput對象存在,它可以在內存 中也可以序列化在磁盤上,這個根據內存使用狀況來自動調節。整個拷貝過程是一個動態的 過程,也就是說它不是一次給好所有輸入信息就不再變化了。它會不停的調用 TaskUmbilicalProtocol協議的getMapCompletionEvents方法,向其父TaskTracker詢問此作 業個Map任務的完成狀況(TaskTracker要向JobTracker詢問後再轉告給它...)。當獲取到相 關Map任務執行服務器的信息後,都會有一個線程開啟,做具體的拷貝工作。同時,還有一個 內存Merger線程和一個文件Merger線程在同步工作,它們將新鮮下載過來的文件(可能在內 存中,簡單的統稱為文件...),做著歸並排序,以此,節約時間,降低輸入文件的數量,為 後續的排序工作減負。。。

Sort,排序工作,就相當於上述排序工作的一個延續。它會在所有的文件都拷貝完畢後進 行,因為雖然同步有做著歸並的工作,但可能留著尾巴,沒做徹底。經過這一個流程,該徹 底的都徹底了,一個嶄新的、合並了所有所需Map任務輸出文件的新文件,誕生了。而那些千 行萬苦從其他各個服務器網羅過來的Map任務輸出文件,很快的結束了它們的歷史使命,被掃 地出門一掃而光,全部刪除了。。。

所謂好戲在後頭,Reduce任務的最後一個階段,正是Reduce本身。它也會准備一個 OutputCollector收集輸出,與MapTask不同,這個OutputCollector更為簡單,僅僅是打開一 個RecordWriter,collect一次,write一次。最大的不同在於,這次傳入RecordWriter的文 件系統,基本都是分布式文件系統,或者說是HDFS。而在輸入方面,ReduceTask會從JobConf 那裡調用一堆getMapOutputKeyClass、getMapOutputValueClass、getOutputKeyComparator 等等之類的自定義類,構造出Reducer所需的鍵類型,和值的迭代類型Iterator(一個鍵到了 這裡一般是對應一組值)。具體實現頗為拐彎抹角,建議看一下Merger.MergeQueue, RawKeyValueIterator,ReduceTask.ReduceValuesIterator等等之類的實現。有了輸入,有 了輸出,不斷循環調用自定義的Reducer,最終,Reduce階段完成。。。

VI. 分布式支持

1、服務器正確性保證

Hadoop Map/Reduce服務器狀況和HDFS很類似,由此可知,救死扶傷的方法也是大同小異 。廢話不多說了,直接切正題。同作為客戶端,Map/Reduce的客戶端只是將作業提交,就開 始搬個板凳看戲,沒有占茅坑的行動。因此,一旦它掛了,也就掛了,不傷大雅。而任務服 務器,也需要隨時與作業服務器保持心跳聯系,一旦有了問題,作業服務器可以將其上運行 的任務,移交給它人完成。作業服務器,作為一個單點,非常類似的是利用還原點(等同於 HDFS的鏡像)和歷史記錄(等同於HDFS的日志),來進行恢復。其上,需要持久化用於恢復 的內容,包含作業狀況、任務狀況、各個任務嘗試的工作狀況等。有了這些內容,再加上任 務服務器的動態注冊,就算挪了個窩,還是很容易恢復的。JobHistory是歷史記錄相關的一 個靜態類,本來,它也就是一個干寫日志活的,只是在Hadoop的實現中,對日志的寫入做了 面向對象的封裝,同時又大量用到觀察者模式做了些嵌入,使得看起來不是那麼直觀。本質 上,它就是打開若干個日志文件,利用各類接口來往裡面寫內容。只不過,這些日志,會放 在分布式文件系統中,就不需要像HDFS那樣,來一個SecondXXX隨時候命,由此可見,有巨人 在腳下踩著,真好。JobTracker.RecoveryManager類是作業服務器中用於進行恢復相關的事 情,當作業服務器啟動的時候,會調用其recover方法,恢復日志文件中的內容。其中步驟, 注釋中寫的很清楚,請自行查看。。。

2、任務執行的正確和速度

整個作業流程的執行,秉承著木桶原理。執行的最慢的Map任務和Reduce任務,決定了系 統整體執行時間(當然,如果執行時間在整個流程中占比例很小的話,也許就微不足道了... )。因此,盡量加快最慢的任務執行速度,成為提高整體速度關鍵。所使用的策略,簡約而 不簡單,就是一個任務多次執行。當所有未執行的任務都分配出去了,並且先富起來的那部 分任務已經完成了,並還有任務服務器孜孜不倦的索取任務的時候,作業服務器會開始炒剩 飯,把那些正在吭哧吭哧在某個服務器上慢慢執行的任務,再把此任務分配到一個新的任務 服務器上,同時執行。兩個服務器各盡其力,成王敗寇,先結束者的結果將被采納。這樣的 策略,隱含著一個假設,就是我們相信,輸入文件的分割算法是公平的,某個任務執行慢, 並不是由於這個任務本身負擔太重,而是由於服務器不爭氣負擔太重能力有限或者是即將撒 手西去,給它換個新環境,人挪死樹挪活事半功倍。。。

當然,肯定有哽咽的任務,不論是在哪個服務器上,都無法順利完成。這就說明,此問題 不在於服務器上,而是任務本身天資有缺憾。缺憾在何處?每個作業,功能代碼都是一樣的 ,別的任務成功了,就是這個任務不成功,很顯然,問題出在輸入那裡。輸入中有非法的輸 入條目,導致程序無法辨識,只能揮淚惜別。說到這裡,解決策略也浮出水面了,三十六計 走位上,惹不起,還是躲得起的。在MapTask中的MapTask.SkippingRecordReader<K, V>和ReduceTask裡的ReduceTask.SkippingReduceValuesIterator<KEY,VALUE>,都 是用於干這個事情的。它們的原理很簡單,就是在讀一條記錄前,把當前的位置信息,封裝 成SortedRanges.Range對象,經由Task的reportNextRecordRange方法提交到TaskTracker上 去。TaskTracker會把這些內容,擱在TaskStatus對象中,隨著心跳消息,匯報到JobTracker 上面。這樣,作業服務器就可以隨時隨刻了解清楚,每個任務正讀取在那個位置,一旦出錯 ,再次執行的時候,就在分配的任務信息裡面添加一組SortedRanges信息。MapTask或 ReduceTask讀取的時候,會看一下這些區域,如果當前區域正好處於上述雷區,跳過不讀。 如此反復,正可謂,道路曲折,前途光明啊。。。

VII. 總結

對於Map/Reduce而言,真正的困難,在於提高其適應能力,打造一款能夠包治百病的執行 框架。Hadoop已經做得很好了,但只有真正搞清楚了整個流程,你才能幫助它做的更好。

  1. 上一頁:
  2. 下一頁:
Copyright © 程式師世界 All Rights Reserved