寫在前面的話
MapReduce作為hadoop的編程框架,是工程師最常接觸的部分,也是除去了網絡環境和集群配 置之外對整個Job執行效率影響很大的部分,所以很有必要深入了解整個過程。本文寫作的目的在於使得讀者對整個MapReduce過程有比較細致的了解,當自己需要定制MapReduce行為時,知道該重寫 哪些類和方法。在寫作時,我貼了部分認為重要的源碼和接口,並跟著自己的理解,對於某些內容,結 合了自己在工作中遇到的問題,給出了實踐參考。
總體概覽
比較High Level的來看,整個MapReduce過程分為三步:· Map:讀取輸入,做初步的處理,輸出形式的中間結果· Shuffle:按照key對中間結果進行排序聚合,輸出給reduce線程·Reduce:對相同key的輸入進行最終的處理,並將結果寫入到文件中。
用經典的WordCount例子來簡單說明一下上面的過程。假設我們現在要做的是統計一個文本中單詞的個數,我們將文件切分成幾個部分,然後創建多個Map線程,處理這些輸入,輸出的中間結果是的形式,shuffle過程將同樣Key的元組,也就是word相同的,分配到同樣的reduce線程中,reduce線程 匯總同一個word的元組個數,最終輸出。
我這麼一說,你是不是感覺已經理解MapReduce了?差不多吧,但是理解與深入理解是1與10000 的差距,下面讓我提幾個細節方面的問題:
1. 原始數據是怎麼切分的,又是以什麼形式傳遞給Map線程的?
2. 有多少個map線程,怎樣控制他們?
3. 輸出寫到磁盤的過程是怎樣的?
4. 如果要保證同一個中間結果key交給同一個reduce,要不要排序?什麼時候排序?
5. 滿足什麼條件的中間結果會調用一次reduce方法,滿足什麼條件的中間結果會交給一個reduce 線程?
6. 有多少reduce線程,怎樣控制他們? 7. 有多少輸出文件? ...
是不是有很多問題都看不懂啦?沒關系,下面我就詳細講解這個過程。
Yarn的資源分配與任務調度
之所以要講解這一部分,是因為MapReduce過程牽扯到了框架本身的東西,我們得知道計算線程 是怎麼來的,怎麼沒的。
Hadoop由1.0進化成2.0,變更還是很大的,1.0裡整個job的資源分配,任務調度和監控管理都是 由一個JobTracker來做的,擴展性很差,2.0對整個過程重新設計了一下,我們重點來看2.0的內容。
一個Job要在集群中運行起來,需要幾個條件,首先,運算資源,可能包括內存,cpu等,其次,得 有一個任務的調度算法,安排運行的先後順序,最後,得知道工作進行的順不順利,並把情況及時的反饋給上級,以便及時的做出響應。下面分別說明。
下面我們首先看看1.0時代hadoop集群是怎麼管理資源和調度任務的。
hadoop1.0的資源管理
hadoop1.0的資源管理
[本圖來自百度百科的“MapReduce”詞條]
對於一個集群來說,資源有很多維度,比如內存,CPU等,1.0時代將節點上的資源切成等份,使用 slot的概念來抽象,根據對資源占用情況的不同,又可細分為Map slot和reduceslot。slot代表一種運 行的能力,像許可證一樣,MapTask只有獲得了Map slot後才可以執行,ReduceTask同理。對於一個 節點,有多少slot是事先配置好的。
JobTracker和TaskTracker共同管理這些slot,其中JobTracker運行在NameNode上,負責資源 的分配和任務的調度,TaskTracker運行在Data Node上,負責所在節點上資源的監控和task的管理。 具體一點,當用戶的任務提交給jobtracker之後,jobtracker根據任務的情況決定要啟動多少MapTask 和ReduceTask,然後根據TaskTracker反饋的slot使用情況(以及其他的因素,比如根據數據的存儲情 況),決定給哪幾個TaskTracker分配多少個MapTask和多少個ReduceTask。接收到任務後,TaskTracker 負責啟動JVM來運行這些Task,並把運行情況實時反饋給JobTracker。
注意,TaskTracker只有監控權,沒有調度權,也就是它只能把運行情況反饋給JobTracker,在他這裡有多少個Task,當task失敗時,重啟task之類的管理權限,都在JobTracker那裡。JobTracker的 任務管理是Task級別的,也即JobTracker負責了集群資源的管理,job的調度,以及一個Job的每個Task 的調度與運行。
打個比方,JobTracker是一個極度專權的君王,TaskTracer是大臣,君王握有所有的權利,大臣們 被架空,君王說事情怎麼做,底下的就得怎麼做,大臣只管執行,並把進行情況告訴君王,如果事情搞砸了,大臣也不得擅作主張的重新做一遍,得上去請示君王,君王要麼再給他一次機會,要麼直接拖出 去砍了,換個人完成。
極度專權早晚累死,而且一個人的力量終歸是有限的,這也是1.0時代很大的問題。所以新時代采取了全新的設計。Yarn的資源控制與任務調度
Yarn用Container的概念來抽象資源,Container描述了自己的位置,自己擁有的CPU,內存等資 源的數量。Container跟任務完全獨立了,是一個完全硬件的抽象。比1.0裡使用計算時槽更加細粒度, 也更易於理解。
資源控制由ResourceManage(RM)和Node Manager(NM)兩個角色參與,其中Node Manager 管理所在node上的container,並把資源的使用情況匯報給ResourceManager,Resource Manager 通過Node Manager返回的信息,掌握著整個集群的資源情況。為了便於管理,Hadoop集群的管理 員可以建立多個隊列,每個隊列配置一定量的資源,用戶可以向一個或多個隊列提交Job。作為集群的 用戶,可以到50030端口查看集群的隊列的分配和負載情況。
當一個用戶提交了一個job給ResourceManager, Resource Manager 並不是直接衡量它所需的 資源並調度,而是下放給一個Application Master(AM)的角色,這個AM全權負責用戶提交的這個Job,它會根據Job的情況向RM申請資源,RM告訴AM它可以使用的Container的信息,AM再將自己 Job的task放到這些Container中運行並監控。如果有失敗的task,AM可以根據情況選擇重啟task。
有幾個關鍵的點我列出來,以確保理解正確:
1. 集群的資源監控由RM與NM合作完成,任務調度與監控由RM與AM完成,結構更加清晰。
2. RM對任務的管理是Job級別的,即它只負責為整個Job分配資源,並交給AM去管理。RM得到了大大的解放。
3. 與TaskTracker相比,AM擁有更多的權利,它可以申請資源並全權負責task級別的運行情況。
4. 與TaskTracker相比,AM可以使用其他機器上的計算資源(即Container)。這些資源也不再有Map和Reduce的區別。
繼續上面的例子。我用壯丁來比喻Container,壯丁有很多屬性,比如家鄉(location),力氣( 內存),財產(CPU),君王(RM)通過錦衣衛(NM)來掌握各個地方(Node)壯丁的使用情況。 當有百姓提出一個要求(提交一個Job),比如興修水利,君王不再事無巨細的過問這件事情,而是叫 一個合適的大臣(AM)過來,比如此例中的水利大臣,問他需要多少人,多少錢,然後衡量一下國力, 播一些壯丁給他用。水利大臣可以使用全國范圍內的壯丁,對他們有絕對的領導權,讓他們干嘛就得干 嘛。事情要麼圓滿完成,水利大臣給君王報喜,要麼發現難度太大啊,嘗試了好多辦法都失敗了(job嘗 試次數到達一定數量),只好回去請罪。
君王遵循政務公開的原則,所有job的運行情況都可以通過50030端口查看:
好了,講了這麼一大通,我想關於Job怎麼跑起來,task怎麼來怎麼沒,應該有個概念了。用戶將自 己的代碼上傳到集群的一個client Node上,運行代碼,代碼裡會對自己的job進行配置,比如輸入在 哪,有哪些依賴的jar包,輸出寫到哪,以什麼格式寫,然後提交給ResourceManager,ResourceManager 會在一個Node上啟動ApplicationMaster負責用戶的這個Job,AM申請資源,得到RM的批准和分配 後,在得到的Container裡啟動MapTask和ReduceTask,這兩種task會調用我們編寫的Mapper和Reducer等代碼,完成任務。任務的運行情況可以通過web端口查看。
MapReduce計算框架最重要的兩個類是Mapper和Reducer,用戶可以繼承這兩個類完成自己的 業務邏輯,下面以這兩個類的輸入輸出為主線詳細講解整個過程。例子總是最容易被人理解的,所以講解過程有看不懂的,可以回來查看這個簡單的job。用戶想使用MapReduce的過程統計一組文件中每 個單詞出現的次數,與經典的WordCount不同的是,要求大寫字母開頭的單詞寫到一個文件裡面,小寫 的寫到另一個文件中。Mapper的輸入
所謂源碼之前,了無秘密,先上mapper的源碼。
Mapper的源碼
public class Mapper{ /*** The Context
passed on to the {@link Mapper} implementations. */ public abstract class Contextimplements MapContext{ } /*** Called once at the beginning of the task. */ protected void setup(Context context) throws IOException, InterruptedException { // NOTHING } /*** Called once for each key/value pair in the input split. Most applications * should override this, but the default is the identity function.*/ @SuppressWarnings("unchecked")protected void map(KEYIN key, VALUEIN value, Context context) throws IOException, InterruptedException { context.write((KEYOUT) key, (VALUEOUT) value); } /*** Called once at the end of the task. */ protected void cleanup(Context context) throws IOException, InterruptedException { // NOTHING } /*** Expert users can override this method for more complete control over the * execution of the Mapper.* @param context* @throws IOException*/ public void run(Context context) throws IOException, InterruptedException { setup(context); try { while (context.nextKeyValue()) { map(context.getCurrentKey(), context.getCurrentValue(), context); } } finally { cleanup(context); } } }
public class MapContextImplextends TaskInputOutputContextImpl implements MapContext { private RecordReader reader; private InputSplit split; public MapContextImpl(Configuration conf, TaskAttemptID taskid, RecordReader reader, RecordWriter writer, OutputCommitter committer, StatusReporter reporter, InputSplit split) { super(conf, taskid, writer, committer, reporter); this.reader = reader;this.split = split; } /** * Get the input split for this map. */ public InputSplit getInputSplit() { return split; } @Override public KEYIN getCurrentKey() throws IOException, InterruptedException { return reader.getCurrentKey(); } @Override public VALUEIN getCurrentValue() throws IOException, InterruptedException { return reader.getCurrentValue(); } @Override public boolean nextKeyValue() throws IOException, InterruptedException { return reader.nextKeyValue(); } }
MapContextImpl類組合了兩個類型的對象,即InputSplit和RecordReader,並封裝了獲取輸入 的Key和Value的方法,在深入探討InputSplit和RecordReader之前,我們先看一下這個Context是 怎麼傳遞給我們編寫的Mapper函數的。下面是我從MapTask類中摘出的一段代碼:
public class MapTask extends Task { privatevoid runNewMapper(final JobConf job, final TaskSplitIndex splitIndex, final TaskUmbilicalProtocol umbilical, TaskReporter reporter ) throws IOException, ClassNotFoundException, InterruptedException { // make a task context so we can get the classes org.apache.hadoop.mapreduce.TaskAttemptContext taskContext = new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job, getTaskID(), reporter); // make a mapper org.apache.hadoop.mapreduce.Mapper mapper = (org.apache.hadoop.mapreduce.Mapper ) ReflectionUtils.newInstance(taskContext.getMapperClass(), job); // make the input format org.apache.hadoop.mapreduce.InputFormat inputFormat = (org.apache.hadoop.mapreduce.InputFormat ) ReflectionUtils.newInstance(taskContext.getInputFormatClass(), job); // rebuild the input split org.apache.hadoop.mapreduce.InputSplit split = null; split = getSplitDetails(new Path(splitIndex.getSplitLocation()), splitIndex.getStartOffset()); LOG.info("Processing split: " + split); org.apache.hadoop.mapreduce.RecordReader input = new NewTrackingRecordReader (split, inputFormat, reporter, taskContext); ... org.apache.hadoop.mapreduce.MapContext mapContext = new MapContextImpl (job, getTaskID(), input, output, committer, reporter, split); org.apache.hadoop.mapreduce.Mapper .Context mapperContext = new WrappedMapper ().getMapContext( mapContext); try { input.initialize(split, mapperContext); mapper.run(mapperContext); ... }finally{ closeQuietly(input); ... }
從代碼中可以看出,我們在客戶端提交Job之前所進行的配置以JobContext的形式傳遞給了MapTask, 在MapTask的runNewMapper()的方法中,它使用反射實例化出了用戶指定的Mapper類和inputFormat 類,並新建了InputSplit和RecorderReader用來實例化上面提交的MapContext,MapContext以參數的形式被傳遞給了包裝類WrappedMapper,在對input進行初始化後,以
mapper.run(mapperContext);
的形式正式調用我們用戶的代碼。
InputSplit
源碼中對InputSplit類的描述是:
/**
* InputSplit
represents the data to be processed by an
* individual {@link Mapper}. *
*
Typically, it presents a byte-oriented view on the input and is the * responsibility of {@link RecordReader} of the job to process this and present * a record-oriented view.*/ public abstract class InputSplit {
public abstract long getLength() throws IOException, InterruptedException;
public abstractString[] getLocations() throws IOException, InterruptedException; }
更易於理解的表述是,InputSplit決定了一個Map Task需要處理的輸入。更進一步的,有多少個 InputSplit,就對應了多少個處理他們的Map Task。從接口上來看,InputSplit中並沒有存放文件的內 容,而是指定了文件的文件的位置以及長度。
既然inputsplit與MapTask之間是一一對應的,那麼我們就可以通過控制InputSplit的個數來調整 MapTask的並行性。當文件量一定時,InputSplit越小,並行性越強。inputsplit的大小並不是任意的, 雖然最大值和最小值都可以通過配置文件來指定,但是最大值是不能超過一個block大小的。
Block是什麼?用戶通過HDFS的接口,看到的是一個完整文件層面,在HDFS底層,文件會被切成固 定大小的Block,並冗余以達到可靠存儲的目的。一般默認大小是64MB,可以調節配置指定。
InputSplit是從字節的角度來描述輸入的,回頭查看一下Mapper,它裡面沒有這種東西啊,用到的 Key,Value是從哪來的?有請RecordReader。
RecordReader
RecordReader
按照慣例,先上源碼:
public abstract class RecordReaderimplements Closeable { /** * Called once at initialization. * @param split the split that defines the range of records to read * @param context the information about the task * @throws IOException * @throws InterruptedException */ public abstract void initialize(InputSplit split, TaskAttemptContext context ) throws IOException, InterruptedException; /** * Read the next key, value pair. * @return true if a key/value pair was read * @throws IOException * @throws InterruptedException */ public abstract boolean nextKeyValue() throws IOException, InterruptedException; /** * Get the current key * @return the current key or null if there is no current key * @throws IOException * @throws InterruptedException */ public abstract KEYIN getCurrentKey() throws IOException, InterruptedException; /** * Get the current value. * @return the object that was read * @throws IOException * @throws InterruptedException */ public abstract VALUEIN getCurrentValue() throws IOException, InterruptedException; /** * The current progress of the record reader through its data. * @return a number between 0.0 and 1.0 that is the fraction of the data read * @throws IOException * @throws InterruptedException */ public abstract float getProgress() throws IOException, InterruptedException; /** * Close the record reader. */ public abstract void close() throws IOException; }
啊哈,InputSplit原來是RecordReader的一個參數啊。recordReader從InputSplit描述的輸入裡 取出一個KeyValue,作為mapper.map()方法的輸入,跑一遍Map方法。打個比方,InputSplit像一 桌大餐,吃還是得一口一口吃,怎樣算一口,就看RecordReader怎麼實現了。
好了,如果我想自己實現InputSplit和RecordReader,應該寫在哪呢?下面就講InputFormat。
InputFormat
上文我們提到了InputFormat,這個類我們在配置Job的時候經常會指定它的實現類。先來看接口。
public abstract class InputFormat{ public abstract List getSplits(JobContext context) throws IOException, InterruptedException; public abstract RecordReader createRecordReader(InputSplit split, TaskAttemptContext context ) throws IOException, InterruptedException; }
明白了吧,InputSplit是在getSplit函數裡面算出來的,RecordReader也是在這裡Create出來 的。如果你想以自己的方式讀取輸入,就可以自己寫一個InputFormat的實現類,重寫裡面的方法。
當然,如果你說我很懶,不想自己寫怎麼辦?好辦,之所以要用框架,很重要的一點就是人家提供 了默認實現啦。WordCount裡面一般用的是TextInputFormat,我們看一下它的實現。
public class TextInputFormat extends FileInputFormatimplements JobConfigurable { public RecordReader getRecordReader( InputSplit genericSplit, JobConf job, Reporter reporter) throws IOException { reporter.setStatus(genericSplit.toString()); String delimiter = job.get("textinputformat.record.delimiter"); byte[] recordDelimiterBytes = null; if (null != delimiter) { recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8); } return new LineRecordReader(job, (FileSplit) genericSplit, recordDelimiterBytes); } }
有沒有一下明白了的感覺?它實現了自己的getRecordReader方法,裡面從配置中取了Delimiter, 這個東西的默認值是"\n"!然後返回了以Delimiter劃分的一個LineRecordReader,知道為什麼你制定了InputFormat之後,Mapper裡面讀到的就是一行一行的輸入了吧。
在我們加強版的WordCount裡,也完全可以使用默認實現的TextInputFormat。關於Mapper的 輸入暫時就講這些,下面我們來看Mapper的輸出。
Mapper的輸出
注意到上文貼出的Mapper的默認實現的map方法中,是將Key和Value直接寫入到context當中,我們已經知道了context是從MapContextImpl來的,那這個Write方法是怎麼回事?
Context.Write的來歷
Write方法是它從MapContextImpl父類TaskInputOutputContextImpl繼承來的,看一下這個類 的部分代碼:
public abstract class TaskInputOutputContextImplextendsTaskAttemptContextImpl implements TaskInputOutputContext { private RecordWriter output; private OutputCommitter committer; public TaskInputOutputContextImpl(Configuration conf, TaskAttemptID taskid, RecordWriter output, OutputCommitter committer, StatusReporter reporter) { super(conf, taskid, reporter); this.output = output; this.committer = committer; } /** * Generate an output key/value pair. */ public void write(KEYOUT key,VALUEOUT value ) throws IOException, InterruptedException { output.write(key, value); } }
org.apache.hadoop.mapreduce.RecordWriter output = null; // get an output object if (job.getNumReduceTasks() == 0) { output = new NewDirectOutputCollector(taskContext, job, umbilical, reporter); } else { output = new NewOutputCollector(taskContext, job, umbilical, reporter); }
判斷條件滿足時,說明這個Job沒有ReduceTask,這時RecordWriter被實例化成了 NewDirectOutputCollector,否則的話,實例化為NewOutputCollector。來具體看看這兩個內部類。
private class NewDirectOutputCollectorextends org.apache.hadoop.mapreduce.RecordWriter { private final org.apache.hadoop.mapreduce.RecordWriter out; ... out = outputFormat.getRecordWriter(taskContext); ... out.write(key, value); }
前者直接調用了OutputFormat來實例化自己,我們寫Job的時候一般會指定Job的OutputFormat,這個類在MapTask中是通過反射的方式引入的。可見,第一個分支的邏輯是會直接把map的輸出寫入到 我們整個Job的輸出當中。具體是怎麼個寫入的過程,我們留到reduce的輸出中講,畢竟那裡才是最常規的會寫輸出文件的地方。
private class NewOutputCollectorextends org.apache.hadoop.mapreduce.RecordWriter { private final MapOutputCollector collector; private final org.apache.hadoop.mapreduce.Partitioner partitioner; private final int partitions; @SuppressWarnings("unchecked") NewOutputCollector(org.apache.hadoop.mapreduce.JobContext jobContext, JobConf job, TaskUmbilicalProtocol umbilical, TaskReporter reporter ) throws IOException, ClassNotFoundException { collector = createSortingCollector(job, reporter); partitions = jobContext.getNumReduceTasks(); if (partitions > 1) { partitioner = (org.apache.hadoop.mapreduce.Partitioner ) ReflectionUtils.newInstance(jobContext.getPartitionerClass(), job); } else { partitioner = new org.apache.hadoop.mapreduce.Partitioner () { @Override public int getPartition(K key, V value, int numPartitions) { return partitions - 1; } }; } } @Override public void write(K key, V value) throws IOException, InterruptedException { collector.collect(key, value, partitioner.getPartition(key, value, partitions)); } @Override public void close(TaskAttemptContext context ) throws IOException,InterruptedException { try { collector.flush(); } catch (ClassNotFoundException cnf) { throw new IOException("can't find class ", cnf); } collector.close(); } }
這個內部類有兩個成員變量,一個是MapOutputCollector,一個是Partitioner。最終的寫入調 用的是MapOutputCollector的Write方法完成的。Partitioner的名氣更大一些,我們先來介紹。
Partitioner
但凡了解一點MapReduce的人應該都知道這個類,它的作用是根據Key將Map的輸出分區,然後 發送給Reduce線程。有多少個Partition,就對應有多少個Reduce線程。Reduce線程的個數是可以 在配置文件中設定的。上面代碼的邏輯就是先讀一下這個配置,看一下需要分到少個分區,如果分區數 少於1,就實例化出一個Partitioner的默認實現,否則的話,用反射讀取用戶設置的實現類。
我們一般只重寫它的一個方法:getPartition,參數是一個Key Value對以及Partition的總數,比 較常見的實現是取Key的hashcode再對總的分區數取模。
注意,為了提高整個job的運行速度,reduce task應該盡可能均勻的接收Map的輸出。partition 作為Map輸出分配的唯一參考標准,映射規則至關重要,partition返回值一樣的Map的輸出,將會交 給一個reducetask,在實際工作中,我們就遇到了partition返回值不合理,好多Mapper的輸出都壓 在一個reduce的task上,造成這個reducetask執行非常緩慢,整體的job一直結束不了的情況。盡可 能均勻的分配partition!
MapOutputCollector
這個Collector我們可以自己實現,不過不是很常見。它有一個默認實現,叫MapOutputBuffer。有關MapOutputBuffer的分析,文獻[4]有非常清晰的解釋,值得一看。
MapOutputBuffer
Combiner的意思是局部的reduce,它可以在job配置的時候指定,實現的邏輯也跟reduce一致, Combiner的作用是可以減少Mapper和Reducer之間傳輸的數據量。以我們上面大小寫敏感的word count來說,同一台機器上的Mapper輸出,可以先合並一次,將n個合並成的形式,再傳遞給reducer。
我把這個類裡關鍵的方法列一下,源碼比較多,就不貼了,可以參照那篇帖子。
init
public void init(Context context) throws IOException, ClassNotFoundException;
collect
public synchronized void collect(K key, V value, int partition) throws IOException;
flush
public synchronized void collect(K key, V value, int partition) throws IOException;
當map所有的輸出都收集完了之後,處理殘留在緩沖區,沒有溢寫到磁盤的數據。
sortAndSpill
private void sortAndSpill() throws IOException, ClassNotFoundException, InterruptedException;
溢寫的關鍵邏輯,其中會調用排序函數和combiner。Combiner的邏輯與reducer的完全一樣,相 當於每個map線程的局部預處理,通過對局部數據的合並,來起到減少shuffle階段數據量的作用。
spillSingleRecord
private void spillSingleRecord(K key, V value, int partition) throws IOException;
當緩沖區沒有達到溢出條件,並且放不下當前這條記錄的時候會調用的方法,主要用來處理大鍵值 對的邊界條件。這種情況直接寫磁盤。
compare&&swap
public int compare(int mi, int mj) { int kvi = this.offsetFor(mi % this.maxRec); int kvj = this.offsetFor(mj % this.maxRec); int kvip = this.kvmeta.get(kvi + 2); int kvjp = this.kvmeta.get(kvj + 2); return kvip != kvjp?kvip - kvjp:this.comparator.compare(this.kvbuffer, this.kvmeta.get(kvi + 1), this.kvmeta.get(kvi + 0) - this.kvmeta.get (kvi + 1), this.kvbuffer, this.kvmeta.get(kvj + 1), this.kvmeta.get(kvj + 0) - this.kvmeta.get(kvj + 1)); } public void swap(int mi, int mj) { int iOff = mi % this.maxRec * 16; int jOff = mj % this.maxRec * 16; System.arraycopy(this.kvbuffer, iOff, this.META_BUFFER_TMP, 0, 16); System.arraycopy(this.kvbuffer, jOff, this.kvbuffer, iOff, 16); System.arraycopy(this.META_BUFFER_TMP, 0, this.kvbuffer, jOff, 16); }
從這兩個函數可以猜出排序函數的行為。代碼裡出現的kvmeta就是上文中提到的index數組,他是 kvbuffer的一種int視角,比較的對象就是它的兩個元素,如果有亂序情況,交換的也是這兩個元素的位 置。
mergeParts
private void mergeParts() throws IOException, InterruptedException, ClassNotFoundException;
合並前後的示意圖還是很形象的。最終在shuffle的時候,只要根據index查找對應的數據就可以了。
業務場景
我一直沒有想過MapTask是否會對輸出自動排序,直到有一天我正真需要自己動手修改業務代碼。
我在的組做的是數據處理,在我們的業務場景中,有兩種數據結構,event和session,用戶在電商網站上操作時,會在後台產生一系列的event,比如你查詢了一件商品,後台就有一個查詢event產生。event用guid和timestamp唯一標示,可能還含有其他的屬性(比如ip等),guid可以簡單的理解成用 戶的一種標示,event說白了是某個用戶在某一時刻產生的某種動作。session的意思某個用戶在一段連 續時間內產生的動作集合,比event的抽象層次更高,它用sessionId和timestamp來標示,也有諸如這 個session一共包含了多少個event這種統計信息。sessionId跟guid一樣,某個用戶在一定時間內是唯 一的,session的timestamp取的是這段時間這個用戶的第一個event的timestamp。
好了,我們需要寫一個MapReduce的job,輸入是event,輸出是session。在map階段,從event 裡面提取出key,然後同一個用戶產生的event,應該一起在reduce階段統計。既然有時序的問題,是 不是在統計之前應該先排個序?可我翻遍了代碼,都沒有找到對key排序的邏輯,是前輩代碼的巨大bug ?
當然不是,在我們將guid與timestamp作為key輸出時,MapTask已經按照這兩個字段做了排序。 注意,這種有序,指的只是當前MapTask局部輸出的有序。從Mapper的輸出,到真正Reducer的輸入,還有很重要的一個過程要走。
Shuffle
從語義上說,Shuffle應該是Map和Reduce中間的過程,從源碼的代碼結構上看,shuffle過程是 在reduceTask中得。前段時間在考公司的hadoop測試的時候,有這種變態的問題,說下面屬於reduce 過程的操作有。。至今不知道正確答案是什麼。
ReduceTask有三個Phase,即copyPhase,sortPhase和reducePhase,主流的做法應該是將前兩個phase歸為Shuffle階段,reducephase作為狹義的reduce過程。
ShuffleConsumerPlugin
Shuffle過程通過調用抽象類ShuffleConsumerPlugin來完成,它有個實現類,就叫做“Shuffle”。下面是Shuffle類最主要的run方法的實現:
@Override public RawKeyValueIterator run() throws IOException, InterruptedException { // Scale the maximum events we fetch per RPC call to mitigate OOM issues // on the ApplicationMaster when a thundering herd of reducers fetch events // TODO: This should not be necessary after HADOOP-8942 int eventsPerReducer = Math.max(MIN_EVENTS_TO_FETCH, MAX_RPC_OUTSTANDING_EVENTS / jobConf.getNumReduceTasks()); int maxEventsToFetch = Math.min(MAX_EVENTS_TO_FETCH, eventsPerReducer); // Start the map-completion events fetcher thread final EventFetchereventFetcher = new EventFetcher (reduceId, umbilical, scheduler, this, maxEventsToFetch); eventFetcher.start(); // Start the map-output fetcher threads boolean isLocal = localMapFiles != null; final int numFetchers = isLocal ? 1 : jobConf.getInt(MRJobConfig.SHUFFLE_PARALLEL_COPIES, 5); Fetcher [] fetchers = new Fetcher[numFetchers]; if (isLocal) { fetchers[0] = new LocalFetcher (jobConf, reduceId, scheduler, merger, reporter, metrics, this, reduceTask.getShuffleSecret(), localMapFiles); fetchers[0].start(); } else { for (int i=0; i < numFetchers; ++i) { fetchers[i] = new Fetcher (jobConf, reduceId, scheduler, merger, reporter, metrics, this, reduceTask.getShuffleSecret()); fetchers[i].start(); } } // Wait for shuffle to complete successfully while (!scheduler.waitUntilDone(PROGRESS_FREQUENCY)) { reporter.progress(); synchronized (this) { if (throwable != null) { throw new ShuffleError("error in shuffle in " + throwingThreadName, throwable); } } } // Stop the event-fetcher thread eventFetcher.shutDown(); // Stop the map-output fetcher threads for (Fetcher fetcher : fetchers) { fetcher.shutDown(); } // stop the scheduler scheduler.close(); copyPhase.complete(); // copy is already complete taskStatus.setPhase(TaskStatus.Phase.SORT); reduceTask.statusUpdate(umbilical); // Finish the on-going merges... RawKeyValueIterator kvIter = null; try { kvIter = merger.close(); } catch (Throwable e) { throw new ShuffleError("Error while doing final merge " , e); } // Sanity check synchronized (this) { if (throwable != null) { throw new ShuffleError("error in shuffle in " + throwingThreadName, throwable); } } return kvIter; }
Shuffle的時候,會先判斷是不是local run的,如果不是的話,會默認啟動5個Fetcher線程拉取 map的輸出,Fetcher會先找到一個主機,確定這台機器上它要拉取的map task的輸出,然後使用http協議獲取response的stream,交給MapOutput類型的對象去完成具體的下載任務。
當文件拉取完成,就會進入sort階段。注意到我們拉取到數據都是局部有序的,因此,排序的過程, 實際上也就是一個Merge的過程。Copy phase結束之後,Shuffle會調用
kvIter = merger.close();
方法來得到排序完成的map的key value輸出。
MapOutput
MapOutput有兩個實現類,即OnDiskMapOutput和InMemoryMapOutput,具體哪一個被實例化,是看當前要shuffle的數據適不適合放到內存中。
OnDiskMapOutput的行為如下所示:
final int BYTES_TO_READ = 64 * 1024; byte[] buf = new byte[BYTES_TO_READ]; while (bytesLeft > 0) { int n = input.read(buf, 0, (int) Math.min(bytesLeft, BYTES_TO_READ)); if (n < 0) { throw new IOException("read past end of stream reading " + getMapId()); } disk.write(buf, 0, n); bytesLeft -= n; metrics.inputBytes(n); reporter.progress(); }
public static void readFully(InputStream in, byte buf[], int off, int len) throws IOException { int toRead = len; while (toRead > 0) { int ret = in.read(buf, off, toRead); if (ret < 0) { throw new IOException( "Premature EOF from inputStream"); } toRead -= ret; off += ret; } }
代碼比較簡單,前者有個buffer,一邊讀一邊寫文件,後者將數據緩存在一個byte數組裡,跟類名 看上去的行為完全一致。
當MapOutput拷貝方法shuffle返回時,Fetcher會調用Scheduler的copySucceed方法做一些 收尾工作,比如將已經拷貝過的host從待拷貝列表中刪除。比較重要的一點是,它會調用Mapoutput的commit方法。兩種Mapoutput的實現在這裡的差異不大,都會調用MergeManagerImpl的closeXXXXFile 方法。
MapOutput負責的是將數據從集群中得其他機器上拉取過來,拉取到的數據怎麼Merge到一起, 就是MergeManagerImpl考慮的事情了。
MergeManagerImpl
MergeManagerImpl幾乎handle了所有與merge相關的實現。他有兩個(其實是三個)內部類, InMemeryMerger和OnDiskMerger,分別對應了前面的兩種不同的MapOutput。
我們先看一下這兩個Merger共同的父類MergeThread,比較容易理解它做得事情。
abstract class MergeThreadextends Thread { private LinkedList > pendingToBeMerged; public synchronized void close() throws InterruptedException { closed = true; waitForMerge(); interrupt(); } public void startMerge(Set
inputs) { if (!closed) { numPending.incrementAndGet(); List toMergeInputs = new ArrayList (); Iterator iter=inputs.iterator(); for (int ctr = 0; iter.hasNext() && ctr < mergeFactor; ++ctr) { toMergeInputs.add(iter.next()); iter.remove(); } LOG.info(getName() + ": Starting merge with " + toMergeInputs.size() + " segments, while ignoring " + inputs.size() + " segments"); synchronized(pendingToBeMerged) { pendingToBeMerged.addLast(toMergeInputs); pendingToBeMerged.notifyAll(); } } } public synchronized void waitForMerge() throws InterruptedException { while (numPending.get() > 0) { wait(); } } public void run() { while (true) { List inputs = null; try { // Wait for notification to start the merge... synchronized (pendingToBeMerged) { while(pendingToBeMerged.size() <= 0) { pendingToBeMerged.wait(); } // Pickup the inputs to merge. inputs = pendingToBeMerged.removeFirst(); } // Merge merge(inputs); } catch (InterruptedException ie) { numPending.set(0); return; } catch(Throwable t) { numPending.set(0); reporter.reportException(t); return; } finally { synchronized (this) { numPending.decrementAndGet(); notifyAll(); } } } } public abstract void merge(List inputs) throws IOException; }
容易看出,MergeThread有一個LinkedList,用於存放MapOutput得到的輸出,startMerge方 法會將這些輸出添加到List中,run方法會不斷的從中取出Mapoutput的輸出,並調用merge方法, Close的時候會等待所有的merge過程結束。startMerge方法正是在MergeManagerImpl的 closeXXXXMergedFile調用的。
這樣整個過程就清晰一些了,Shuffle時調用Fetcher來下載Map的輸出,Fetcher根據數據量的大小,判斷是實例化InMemoryMapOutput還是OnDiskMapOutput,實例化出的MapOutput拉取完 畢後,Fetcher通過一個Shuffle的scheduler調用Mapoutput的commit方法,commit方法中調用到 MergeManagerImpl的closeXXXXMergedFile方法,這個方法又調用到MergeThread實現類中得 startMerge方法,下載到得數據最終就被傳遞到了MergeThread的實現類了。
剩下的問題,就是怎麼Merge了。
Merge
整個Merge的過程比較復雜,牽扯到得代碼也比較多,我按照我的理解,在邏輯的層面簡單敘述一下這個過程。
從整體上講,Merge的過程是先Merge掉InMemory的,InMemory的結果也會加入到onDisk的待 Merge隊列中,最後補上一記finalMerge,合並起InMemory剩余的與onDisk剩余的。每種Merger的Merge操作最終都是交給一個叫Merger的工具類的靜態方法實現的。
除了參數的不同,實際merge的過程是類似的。Merge就是將小文件合並成大文件,對於初始有序 的數據,為了減少比較次數,每次應該合並數據最少的兩組,也就是霍夫曼樹的思想。從源碼看,貌似 是自己用ArrayList實現了一個。
InMemory的Merge行為是,先將InMemoryMapOutput中的buffer結構化成一組segment, segment含有需要merge的數據,最重要的,它含有這些數據的長度信息,這個信息會再霍夫曼樹式的 merge用到。接下來它會new出一個path對象用來存放merge的結果,一個Writer來寫入,然後就會調用Merger工具類的相應merge方法進行實際的merge。在實際寫入文件的時候,會判斷有沒有指定 Combiner,也就是會不會對相同key的輸出進行進一步的合並。InMemoryMerger的最終結果會寫入到 文件,並將這個文件的信息注冊到onDiskMerger中,以便後續的合並。
onDiskMerger的行為與InMemoryMerger的行為基本一致,只是在調用Merger的時候給定了不 同的參數。
finalMerge的行為是,先判斷有沒有inMemory的output,有的話構造出segment,合並,最終 的結果是一個文件,添加到onDisk得output隊裡中,然後合並onDisk的ouput,比較特別的,finalMerge 是有返回值的,最終合並的結果輸出是RawKeyValueIterator類型,代表這一個reduce所接收到的所 有輸入。
MergeManagerImpl的close方法
在shuffle的run方法中,在copyPhase結束之後,調用了MergeManagerImpl的close方法,該 方法的實現如下:
public RawKeyValueIterator close() throws Throwable { // Wait for on-going merges to complete if (memToMemMerger != null) { memToMemMerger.close(); } inMemoryMerger.close(); onDiskMerger.close(); List> memory = new ArrayList >(inMemoryMergedMapOutputs); inMemoryMergedMapOutputs.clear(); memory.addAll(inMemoryMapOutputs); inMemoryMapOutputs.clear(); List disk = new ArrayList (onDiskMapOutputs); onDiskMapOutputs.clear(); return finalMerge(jobConf, rfs, memory, disk); }
Reduce的輸入
Shuffle過程結束之後,reduce階段獲得了RawKeyValueIterator類型的輸入,ReduceTask的 run方法會調用runNewReducer方法,該方法的簽名如下:
privatevoid runNewReducer(JobConf job, final TaskUmbilicalProtocol umbilical, final TaskReporter reporter, RawKeyValueIterator rIter, RawComparator comparator, Class keyClass, Class valueClass ) throws IOException,InterruptedException, ClassNotFoundException;
public class Reducer{ /** * The Context
passed on to the {@link Reducer} implementations. */ public abstract class Context implements ReduceContext{ } /** * Called once at the start of the task. */ protected void setup(Context context ) throws IOException, InterruptedException { // NOTHING } /** * This method is called once for each key. Most applications will define * their reduce class by overriding this method. The default implementation * is an identity function. */ @SuppressWarnings("unchecked") protected void reduce(KEYIN key, Iterable values,Context context ) throws IOException, InterruptedException { for(VALUEIN value: values) { context.write((KEYOUT) key, (VALUEOUT) value); } } /** * Called once at the end of the task. */ protected void cleanup(Context context ) throws IOException, InterruptedException { // NOTHING } /** * Advanced application writers can use the * {@link #run(org.apache.hadoop.mapreduce.Reducer.Context)} method to * control how the reduce task works. */ public void run(Context context) throws IOException, InterruptedException { setup(context); try { while (context.nextKey()) { reduce(context.getCurrentKey(), context.getValues(), context); // If a back up store is used, reset it Iterator iter = context.getValues().iterator(); if(iter instanceof ReduceContext.ValueIterator) { ((ReduceContext.ValueIterator )iter).resetBackupStore(); } } } finally { cleanup(context); } } }
Reducer的輸入輸出的信息同樣是封裝在Context中。Reducer與Mapper看上去很像,但是有很 多關鍵的不同。比如reduce方法的參數,還有run方法的實現。
注意到,reduce方法的第二個參數,不再是一個VALUE類型,而是一個迭代器,意指key相同的 value會一次性的扔給這個reduce方法,那麼到底怎樣算key相同呢?我們看到reduce方法是在run方法中調用的,第一個參數與Mapper相同,也是context的currentKey,第二個不一樣,是從context 獲得的values,在ReduceContextImpl中,這個getValues方法,直接返回一個迭代器。
從語義上說,Reducer的reduce方法應該每次處理Key相同的那一組輸入,那麼到底什麼樣的一組 key,算是相同的key呢?這個關鍵的問題由構造ReduceContext的一個不起眼的參數,GroupingComparator 來解決。
GroupingComparator
我們知道,哪些Mapper的輸出交給一個Reduce線程是由Partitioner決定的,但是這些輸入並不 是一次性處理的,舉個例子,我們在做大小寫敏感wordcount的時候,假設使用的partition策略是根據單詞首字母大小來指定reducer,有2個reducer的話,"an"和"car"都會交給同一個reduce線程, 但是統計每個單詞個數的時候,他倆是不能混起來的,也就是一個reduce線程實際上將整個輸入分成了好多組,在每一組上運行了一次reduce的過程。這個組怎麼分,就是GroupingComparator做得事情。針對word count這個實例,我們應該將完全相等的兩個單詞作為一組,運行一次reduce的方法。
我們看一下GroupingComparator接口的定義:
public interface RawComparatorextends Comparator { /** * Compare two objects in binary. ? * b1[s1:l1] is the first object, and b2[s2:l2] is the second object. * ? * @param b1 The first byte array. ? * @param s1 The position index in b1. The object under comparison's starting index. ? * @param l1 The length of the object in b1. ? * @param b2 The second byte array. ? * @param s2 The position index in b2. The object under comparison's starting index. ? * @param l2 The length of the object under comparison in b2. ? * @return An integer result of the comparison. */ ?public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2); ?}
可見,它是從字節碼的角度來判定是否相等的,具體比較哪一部分,可以根據我們自己的實現來控 制。
經過了Shuffle過程,所有的輸入都已經按照Key排序好了。所以,在判定兩個Key要不要交給同一 個Reduce方法時,只要判定相鄰的兩個key就可以了。這個比較的過程,實際上在我們在reduce方法 中,對value的迭代器不斷的取next的時候,就悄悄發生了。
業務場景
接著前面的業務場景,還是event和session的問題,我已經講過一段時間內同一個guid的event應該劃分成一個session,也就是event的key是guid和timestamp時,guid一樣的event要按照timestamp排好序的順序交給一個reduce方法來處理,因此,我們自己實現的GroupingComparator應該只比較 event key的guid,按照guid來聚合。Reduce的輸出
在構造ReduceContext的時候,傳入了兩個跟輸出相關的參數,一個是RecordWriter類型,一個 是OutputCommitter類型。但是,當查看這兩個對象構造的過程時,會發現他們的幕後boss居然是 OutputFormat!這貨看起來是不是非常眼熟?沒錯,我們在之前講Map的輸出時提到過一次,沒有展開 講。它跟InputFormat的功能其實很呼應。
OutputFormat
按照慣例,我們還是來看看它的接口。
public abstract class OutputFormat{ /** * Get the {@link RecordWriter} for the given task. * * @param context the information about the current task. * @return a {@link RecordWriter} to write the output for the job. * @throws IOException */ public abstract RecordWriter getRecordWriter(TaskAttemptContext context ) throws IOException, InterruptedException; /** * Check for validity of the output-specification for the job. * * This is to validate the output specification for the job when it is * a job is submitted. Typically checks that it does not already exist, * throwing an exception when it already exists, so that output is not * overwritten.
* * @param context information about the job * @throws IOException when output should not be attempted */ public abstract void checkOutputSpecs(JobContext context ) throws IOException, InterruptedException; /** * Get the output committer for this output format. This is responsible * for ensuring the output is committed correctly. * @param context the task context * @return an output committer * @throws IOException * @throws InterruptedException */ public abstract OutputCommitter getOutputCommitter(TaskAttemptContext context ) throws IOException, InterruptedException; }
從源碼的描述來看,OutputFormat主要做兩件事情,一是驗證job的輸出配置是否合理(比如查看目標路徑是否存在),二是提供一個RecordWriter的實現,來寫入最終的輸出。三個抽象方法,分別用 於返回RecordWriter,返回OutputCommitter,以及驗證輸出的配置。
你可能會想不通一個輸出為什麼要搞這麼復雜,反正一個reducer產生一個文件,指定一下目錄,直接往裡寫不就行了嗎?怎麼還要recordWriter,還要committer的。我們回想一下hadoop設計的初 衷,在不可靠的機器上,得到可靠的輸出。也就是,hadoop的設計者認為一個task它很可能是會運行 失敗的,我們常常需要嘗試多次,因此,除了寫入操作之外,我們應該先寫在臨時目錄,確定成功了, 再提交到正式的輸出目錄裡,這個工作其實就是committer做得,而recordWriter只要專注於寫入操作 就可以了。
我們當然可以從頭開始寫一個OutputFormat,但更一般的做法是,繼承自一個典型的實現 FileOutputFormat。
FileOutputFormat
下面是它對checkOutputSpecs的實現:
public void checkOutputSpecs(FileSystem ignored, JobConf job) throws FileAlreadyExistsException, InvalidJobConfException, IOException { // Ensure that the output directory is set and not already there Path outDir = getOutputPath(job); if (outDir == null && job.getNumReduceTasks() != 0) { throw new InvalidJobConfException("Output directory not set in JobConf."); } if (outDir != null) { FileSystem fs = outDir.getFileSystem(job); // normalize the output directory outDir = fs.makeQualified(outDir); setOutputPath(job, outDir); // get delegation token for the outDir's file system TokenCache.obtainTokensForNamenodes(job.getCredentials(), new Path[] {outDir}, job); // check its existence if (fs.exists(outDir)) { throw new FileAlreadyExistsException("Output directory " + outDir + " already exists"); } } }
最開始接觸hadoop跑測試的時候,經常遇到FileAlreadyExistsException這個錯誤,原因就是 沒有刪掉上一次跑的結果。直到現在,才知道原來是從這裡拋出的啊。hadoop之所以這樣設定,是為 了防止因為粗心覆蓋掉之前生成的數據,我覺得這是合理的。
FileOutputFormat還提供了一些好用的方法,比如下面這個:
public synchronized static String getUniqueFile(TaskAttemptContext context, String name, String extension) { TaskID taskId = context.getTaskAttemptID().getTaskID(); int partition = taskId.getId(); StringBuilder result = new StringBuilder(); result.append(name); result.append('-'); result.append( TaskID.getRepresentingCharacter(taskId.getTaskType())); result.append('-'); result.append(NUMBER_FORMAT.format(partition)); result.append(extension); return result.toString(); }
OutputCommitter
從源碼的注釋,我們知道OutputCommitter負責下面的工作:
? 在job啟動時setup job。例如,在job的啟動期間建立臨時的輸出目錄。
? 在job結束是clean up job。比如,job結束之後刪掉臨時輸出目錄。
? 建立task的臨時輸出
? 檢測一個task需不需要提交自己的輸出
? 提交task的輸出
? 丟棄task的輸出
這麼列出來,感覺比較空洞,我講一下我的理解。正如前文提到的,OutputCommitter的主要職責是建立起task執行的臨時目錄,然後驗證這個task需不需要將自己的輸出的結果提交,提交到哪裡。對於產生的臨時目錄和寫入的臨時文件,也要負責清理干淨。
OutputCommitter有很多需要實現的方法,我列一下:
public abstract class OutputCommitter { public abstract void setupJob(JobContext jobContext) throws IOException; public void cleanupJob(JobContext jobContext) throws IOException { } public void commitJob(JobContext jobContext) throws IOException { cleanupJob(jobContext); } public void abortJob(JobContext jobContext, JobStatus.State state) throws IOException { cleanupJob(jobContext); } public abstract void setupTask(TaskAttemptContext taskContext) throws IOException; public abstract boolean needsTaskCommit(TaskAttemptContext taskContext) throws IOException; public abstract void commitTask(TaskAttemptContext taskContext) throws IOException; public abstract void abortTask(TaskAttemptContext taskContext) throws IOException; public boolean isRecoverySupported() { return false; } public void recoverTask(TaskAttemptContext taskContext) throwsIOException {}
方法名比較准確的反應方法需要實現的功能。下面我們看一下與FileOutputFormat對應的Committer。
FileOutputCommitter
前面已經提到了,OutputCommitter最重要的就是目錄的建立刪除以及拷貝,那麼要理解一個 Committer的行為,只要專注它是怎麼操作目錄的就可以了。
在FileOutputCommitter裡,有三個四種目錄,四種目錄分別包括
· 最終的Job輸出目錄
· 臨時的Job目錄
· task的提交目錄
· task的臨時目錄
task每次在task臨時目錄中工作,如果確定成功並且需要被提交,就會提交到task的提交目錄中。 task的提交目錄實際上跟臨時Job的目錄是一個目錄,當一個job的所有task都順利執行之後,會從臨時 job目錄提交到最終的輸出目錄。
之所以有這麼多跳,其實還是基於task很可能會執行失敗的假設,這種方式,在task失敗的時候, 可以直接清掉它的目錄重來,效率上肯定要差一些。因此我的同事寫過一個DirectFileOutputCommitter,當task執行成功時,直接提交到最終的工作目錄。這種方式雖然在一定程度上提高了效率,可有個風險, 當這個job失敗需要重新執行的時候,就得事先清一下最終的輸出目錄。
在實踐的時候,我們常常通過在一個目錄下生成"_SUCCESS"文件來標記這個目錄已經完成,一個很好的生成時機就是Committer的commitJob方法。
RecordWriter
這個類的介紹非常普通,它做的事情也很簡單,就是將一對KeyValue的pair寫入到輸出文件中。他的接口很簡單:
public abstract class RecordWriterwrite方法用來寫入,close方法用來釋放資源。{ /** * Writes a key/value pair. * * @param key the key to write. * @param value the value to write. * @throws IOException */ public abstract void write(K key,V value ) throws IOException, InterruptedException; /** * Close this RecordWriter
to future operations. * * @param context the context of the task * @throws IOException */ public abstract void close(TaskAttemptContext context ) throws IOException, InterruptedException; }
public RecordWritergetRecordWriter(FileSystem ignored, JobConf job, String name, Progressable progress) throws IOException { boolean isCompressed = getCompressOutput(job); String keyValueSeparator = job.get("mapreduce.output.textoutputformat.separator", "\t"); if (!isCompressed) { Path file = FileOutputFormat.getTaskOutputPath(job, name); FileSystem fs = file.getFileSystem(job); FSDataOutputStream fileOut = fs.create(file, progress); return new LineRecordWriter (fileOut, keyValueSeparator); } else { Class codecClass = getOutputCompressorClass(job, GzipCodec.class); // create the named codec CompressionCodec codec = ReflectionUtils.newInstance(codecClass, job); // build the filename including the extension Path file = FileOutputFormat.getTaskOutputPath(job, name + codec.getDefaultExtension()); FileSystem fs = file.getFileSystem(job); FSDataOutputStream fileOut = fs.create(file, progress); return new LineRecordWriter (new DataOutputStream (codec.createOutputStream(fileOut)), keyValueSeparator); } }
結語
終於將這篇博客寫完了,本來想著是入職之前完成的,結果拖了快兩個月。最初開始的時候也沒想 過會寫這麼多,看起源碼來,一環一環的,就總想再搞得明白一些。裡面摻雜了一些我工作中遇到的問 題,不是很多,不過我覺得還是有一定的參考意義的。希望這篇博客能幫助到對hadoop感興趣的你。