程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
 程式師世界 >> 編程語言 >> C語言 >> C++ >> C++入門知識 >> Spork: Pig on Spark實現分析

Spork: Pig on Spark實現分析

編輯:C++入門知識

介紹

Spork是Pig on Spark的highly experimental版本,依賴的版本也比較久,如之前文章裡所說,目前我把Spork維護在自己的github上:flare-spork。 本文分析的是Spork的實現方式和具體內容。

Spark Launcher

在hadoop executionengine包路徑下,寫了一個Spark啟動器,同MapReduceLauncher類似,會在launchPig的時候,把傳入的物理執行計劃進行翻譯。 MR啟動器翻譯的是MR的操作,以及進一步的MR JobControl。而Spark啟動器將物理執行計劃的部分物理操作直接翻譯成了RDD的操作。 有一個缺點是翻譯成RDD算子之後,缺少優化過程,也就是直接物理操作的映射翻譯,具體執行邏輯會完全交給Spark DAGScheduler去切分,由TaskScheduler去調度任務。 比如對Pig來說,直到見到Dump/Store,才會觸發整個翻譯和launch。那麼在這一次物理執行計劃中,對應到Spark可能是多次任務。
在目前的實現方式下,翻譯物理操作交給多個Convertor的實現類來完成,
public interface POConverter {
	
    RDD convert(List> rdd, T physicalOperator) throws IOException;
    
}
抽象類POConvertor提供了convert方法,輸入參數中的List是本次物理操作的前驅們產生的RDDs,可以認為是會依賴的父RDDs。 這樣一次轉化結果就是產生nextRDD,而nextRDD是否在spark上真正觸發計算,目前來看是不去控制的,也就是上面提到的,一次Pig物理執行計劃可能會有Spark執行多次任務。
在使用的時候,以-x spark的方式就可以啟動以Spark為backend engine的Pig環境。
下面具體看目前做了哪些PO操作的轉化工作,具體怎麼轉化的。

Load/Store

走的都是NewHadoopRDD路線。

Load方面是通過POLoad獲得文件路徑,pigContext獲得必要配置信息,然後交由SparkContext調用newAPIHadoopFile來獲得NewHadoopRDD,最後把Tuple2的RDD map成只剩value的RDD

Store方面是先把最近的前驅rdd轉會成Key為空Text的Tuple2,然後映射為PairRDDFunctions,借助pigContext生成POStore操作,最後調用RDD的saveAsNewAPIHadoopFile存到HDFS上。


Foreach、Filter、Limit

ForEach裡實現一個Iterator[T] => Iterator[T]的方法,把foreach轉化為rdd.mapPartitions()方法。

Iterator[T]=> Iterator[T]方法的實現,會依賴原本的POForEach來獲得nextTuple和進行一些別的操作,來實現一個新的Iterator。

對於hadoop backend的executionengine裡的抽象類PhysicalOperator來說,

setInput()和attachInput()方法是放入帶處理的tuple數據,

getNextTuple()的時候觸發processTuple(),處理對象就是內部的Input Tuple。

所以ForEach操作實現Iterator的時候,在readNext()方法裡摻入了以上設置Input數據的操作,在返回前調用getNextTuple()返回處理後的結果。

POFilter也是通過setInput()和attachInput()以及getNextTuple()來返回處理結果。

所以在實現為RDD操作的時候,把以上步驟包裝成一個FilterFunction,傳入rdd.filter(Function)處理。

POLimit同POFilter是完全一樣的。


Distinct

現在RDD已經直接具備distinct(numPartitions: Int)方法了。


這裡的distinct實現同rdd裡的distinct邏輯是完全一樣的。

第一步:把類型為Tuple的rdd映射成為Tuple2,其中value部分是null的;

第二步:進行rdd.reduceByKey(merge_function, parallelism)操作,merge_function對兩個value部分的Object不做任何處理,也就是按key reduce且不對value部分處理;

第三步:對第二步的結果進行rdd.map(function, ClassTag)處理,function為得到Tuple2裡的._1,即key值:Tuple。



Union

Union是一次求並過程,直接new UnionRDD返回。

由於UnionRDD處理的是Seq,所以使用JavaConversions.asScalaBuffer(List>)進行一下轉換再傳入。



Sort

Sort過程:

第一步:把Tuple類型的RDD轉成Tuple2類型,Object為空

第二步:根據第一步結果,new OrderedRDDFunctions>

,其sortByKey方法產出一個排過序的RDD>。OrderedRDDFunctions裡的Key類型必須是可排序的,比較器復用的是POSort的mComparator。sortByKey結果返回的是ShuffleRDD,其Partitioner是RangePartitioner,排序之後,每個Partition裡存放的都是一個范圍內的排過序的值。

第三步:調用rdd.mapPartition(function, xx, xx),function作用為把Iterator>吐成Iterator,即再次取回Key值,此時已有序。


Split

POSplit的處理是直接返回第一個祖先RDD。


LocalRearrange

LocalRearrange -> Global Rearrange -> Package是一同出現的。

\

Local rearrange直接依賴

  physicalOperator.setInputs(null);
  physicalOperator.attachInput(t);
  result = physicalOperator.getNextTuple();

三步得到result。返回的Tuple格式為(index, key, value)。

依賴POLocalRearrange本身內部對input tuple的處理。


GlobalRearrange

待處理的Tuple格式是(index, key, value)。最後結果為(key, { values })

如果父RDD只有一個:

先進行按key進行一次groupBy,得到結果是Tuple2>

然後做一次map操作,得到(key, { values })形態的RDD,即Tuple

如果父RDD有多個:

讓通過rdd的map操作先將Tuple從(index, key, value)轉成(key, value)形態,然後把這個rdd集合new成CoGroupRDD,包含一次(Seq) JavaConversions.asScalaBuffer(rddPairs)轉化。最後調用CoGroupRDD的map方法,把Tuple2>>轉化成Tuple,即(key, { values })形態。實際上,CoGroupRDD的map方法內部做的事情,是針對每個Key裡的Iterator集合,進行了Iterator之間的合並操作。

Package

Package需要把global rearrange處理後的key, Seq進行group。具體的待處理Tuple結構是這樣的:(key, Seq:{(index,key, value without key)})

tuple.get(0)是keyTuple,tuple.get(1)是Iterator,最後返回(key, {values}),即Tuple


全文完 :)

  1. 上一頁:
  2. 下一頁:
Copyright © 程式師世界 All Rights Reserved