在 MapReduce 分布式計算時有這樣一種場景:mapper 輸入來自多個不同的數據源,共同點是每行記錄第一列是作為 key 的 id 列,reducer 需要根據數據源的不同,進行相應的處理。由於數據到 reducer 階段已經無法區分來自什麼文件,所以一般采取的方法是 mapper 為數據記錄打一個 TAG。為了便於使用,我習慣於把這個 TAG 打到數據的第二列(第一列為 id 列,作為 reduce/join 的 key),所以有這樣的 mapper 函數:
def mapper1(line): l = line.split('\t', 1) return "%s\t%s\t%s" % (l[0], 'TAG', l[1])
這樣給定輸入:
s = "3001 VALUE"
mapper1(s) 的結果就是:
s = "3001 TAG VALUE"
這是一個潛意識就想到的很直白的函數,但是我今天忽然腦子轉筋,陷入了“這是最快的嗎”思維怪圈裡。於是我就想,還有什麼其它方法呢?哦,格式化的表達式可以用 string 的 + 運算來表示:
def mapper2(line): l = line.split('\t', 1) return l[0] + '\t' + 'TAG' + '\t' + l[1]
上面是故意將 '\t' 分開寫,因為一般 TAG 是以變量方式傳入的。還有,都說 join 比 + 快,那麼也可以這樣:
def mapper3(line): l = line.split('\t', 1) l.insert(1, 'TAG') return '\t'.join(l)
split 可能要消耗額外的空間,那就換 find:
def mapper4(line): pos = line.find('\t') return "%s\t%s\t%s" % (line[0:pos], 'TAG', line[pos+1:])
變態一點兒,第一個數是整數嘛,換成整型輸出:
def mapper5(line): pos = line.find('\t') pid = long(line[0:pos]) return "%d\t%s\t%s" % (pid, 'TAG', line[pos+1:])
再換個思路,split 可以換成 partition:
def mapper6(line): (h,s,t) = line.partition('\t') return "%s\t%s\t%s" % (h, 'TAG', t)
或者干脆 ticky 一點兒,用 replace 替換第一個找到的制表符:
def mapper7(line): return line.replace('\t', '\t'+'TAG'+'\t', 1)
哇,看一下,原來可選的方法還真不少,而且我相信這肯定沒有列舉到所有的方法。看到這裡,就這幾個有限的算法,你猜一下哪個最快?最快的比最慢的快多少?
先把計時方法貼一下:
for i in range(1,8): f = 'mapper%d(s)' % i su = "from __main__ import mapper%d,s" % i print f, ':', timeit.Timer(f, setup=su).timeit()
下面是答案:
mapper1(s) : 1.32489800453 mapper2(s) : 1.2933549881 mapper3(s) : 1.65229916573 mapper4(s) : 1.22059297562 mapper5(s) : 2.60358095169 mapper6(s) : 0.956777095795 mapper7(s) : 0.726199865341
最後勝出的是 mapper7 (tricky 的 replace 方法),最慢的是 mapper5 (蛋疼的 id 轉數字方法),最慢的耗時是最慢的約 3.6 倍。最早想到的 mapper1 方法在 7 種方法中排名——第 5!耗時是最快方法的 1.8 倍。考慮到 mapper 足夠簡單,這個將近一倍的開銷還是有一點點意義的。