public interface POConverter抽象類POConvertor提供了convert方法,輸入參數中的List{ RDD convert(List > rdd, T physicalOperator) throws IOException; }
走的都是NewHadoopRDD路線。
Load方面是通過POLoad獲得文件路徑,pigContext獲得必要配置信息,然後交由SparkContext調用newAPIHadoopFile來獲得NewHadoopRDD,最後把Tuple2
Store方面是先把最近的前驅rdd轉會成Key為空Text的Tuple2
ForEach裡實現一個Iterator[T] => Iterator[T]的方法,把foreach轉化為rdd.mapPartitions()方法。
Iterator[T]=> Iterator[T]方法的實現,會依賴原本的POForEach來獲得nextTuple和進行一些別的操作,來實現一個新的Iterator。
對於hadoop backend的executionengine裡的抽象類PhysicalOperator來說,
setInput()和attachInput()方法是放入帶處理的tuple數據,
getNextTuple()的時候觸發processTuple(),處理對象就是內部的Input Tuple。
所以ForEach操作實現Iterator的時候,在readNext()方法裡摻入了以上設置Input數據的操作,在返回前調用getNextTuple()返回處理後的結果。
POFilter也是通過setInput()和attachInput()以及getNextTuple()來返回處理結果。
所以在實現為RDD操作的時候,把以上步驟包裝成一個FilterFunction,傳入rdd.filter(Function)處理。
POLimit同POFilter是完全一樣的。
現在RDD已經直接具備distinct(numPartitions: Int)方法了。
這裡的distinct實現同rdd裡的distinct邏輯是完全一樣的。
第一步:把類型為Tuple的rdd映射成為Tuple2
第二步:進行rdd.reduceByKey(merge_function, parallelism)操作,merge_function對兩個value部分的Object不做任何處理,也就是按key reduce且不對value部分處理;
第三步:對第二步的結果進行rdd.map(function, ClassTag)處理,function為得到Tuple2
Union是一次求並過程,直接new UnionRDD
由於UnionRDD處理的是Seq
Sort過程:
第一步:把Tuple類型的RDD轉成Tuple2
第二步:根據第一步結果,new OrderedRDDFunctions
,其sortByKey方法產出一個排過序的RDD
第三步:調用rdd.mapPartition(function, xx, xx),function作用為把Iterator
POSplit的處理是直接返回第一個祖先RDD。
LocalRearrange -> Global Rearrange -> Package是一同出現的。
Local rearrange直接依賴
physicalOperator.setInputs(null); physicalOperator.attachInput(t); result = physicalOperator.getNextTuple();
三步得到result。返回的Tuple格式為(index, key, value)。
依賴POLocalRearrange本身內部對input tuple的處理。
待處理的Tuple格式是(index, key, value)。最後結果為(key, { values })
如果父RDD只有一個:
先進行按key進行一次groupBy,得到結果是Tuple2
然後做一次map操作,得到(key, { values })形態的RDD,即Tuple
如果父RDD有多個:
讓通過rdd的map操作先將Tuple從(index, key, value)轉成(key, value)形態,然後把這個rdd集合new成CoGroupRDD,包含一次(Seq) JavaConversions.asScalaBuffer(rddPairs)轉化。最後調用CoGroupRDD的map方法,把Tuple2
Package需要把global rearrange處理後的key, Seq
tuple.get(0)是keyTuple,tuple.get(1)是Iterator