翻譯的一篇國外的關於hadoop mapreduce的文章,文章比較長,先翻譯第一部分吧
翻譯者:pconlin900
博客:http://pconline900.javaeye.com
Hadoop是apache的一個開源的map-reduce框架,MapReduce是一個並行計算模型,用來處理海量數據。模型思想來源於google的Jeffrey Dean 和 Sanjay Ghemawat,包括map() reduce()兩個主要的功能。
這是一個很簡單的類似於Hadoop的MapReduce應用例子,應用了mapreduce的基本思想,可以幫助理解hadoop的處理思想和技術,但注意,它沒有使用hadoop框架。
例子的功能是創建一些字符串,然後統計這些字符串裡面每個字符出現的次數,最後匯總得到總的字符出現次數。
Listing 1. 主程序
public class Main { public static void main(String[] args) { MyMapReduce my = new MyMapReduce(); my.init(); }
Listing 2. MyMapReduce.java
import java.util.*; public class MyMapReduce { List buckets = new ArrayList(); List intermediateresults = new ArrayList(); List values = new ArrayList(); public void init() { for(int i = 1; i<=30; i++) { values.add("http://pconline900.javaeye.com" + new Integer(i).toString()); } System.out.println("**STEP 1 START**-> Running Conversion into Buckets**"); System.out.println(); List b = step1ConvertIntoBuckets(values,5); System.out.println("************STEP 1 COMPLETE*************"); System.out.println(); System.out.println(); System.out.println("**STEP 2 START**->Running **Map Function** concurrently for all Buckets"); System.out.println(); List res = step2RunMapFunctionForAllBuckets(b); System.out.println("************STEP 2 COMPLETE*************"); System.out.println(); System.out.println(); System.out.println("**STEP 3 START**->Running **Reduce Function** for collating Intermediate Results and Printing Results"); System.out.println(); step3RunReduceFunctionForAllBuckets(res); System.out.println("************STEP 3 COMPLETE*************"); System.out.println("************pconline900 翻譯*************"); System.out.println("***********博客:http://pconline900.javaeye.com*************"); } public List step1ConvertIntoBuckets(List list,int numberofbuckets) { int n = list.size(); int m = n / numberofbuckets; int rem = n% numberofbuckets; int count = 0; System.out.println("BUCKETS"); for(int j =1; j<= numberofbuckets; j++) { List temp = new ArrayList(); for(int i=1; i<= m; i++) { temp.add((String)values.get(count)); count++; } buckets.add(temp); temp = new ArrayList(); } if(rem != 0) { List temp = new ArrayList(); for(int i =1; i<=rem;i++) { temp.add((String)values.get(count)); count++; } buckets.add(temp); } System.out.println(); System.out.println(buckets); System.out.println(); return buckets; } public List step2RunMapFunctionForAllBuckets(List list) { for(int i=0; i< list.size(); i++) { List elementList = (ArrayList)list.get(i); new StartThread(elementList).start(); } try { Thread.currentThread().sleep(1000); }catch(Exception e) { } return intermediateresults; } public void step3RunReduceFunctionForAllBuckets(List list) { int sum =0; for(int i=0; i< list.size(); i++) { //you can do some processing here, like finding max of all results etc int t = Integer.parseInt((String)list.get(i)); sum += t; } System.out.println(); System.out.println("Total Count is "+ sum); System.out.println(); } class StartThread extends Thread { private List tempList = new ArrayList(); public StartThread(List list) { tempList = list; } public void run() { for(int i=0; i< tempList.size();i++) { String str = (String)tempList.get(i); synchronized(this) { intermediateresults.add(new Integer(str.length()).toString()); } } } } }
init()方法創建了一些測試數據,作為測試數據。實際應用中會是海量數據處理。
step1ConvertIntoBuckets()方法將測試數據拆分到5個 bucket中,每個bucket是一個ArrayList(包含6個String數據)。bucket可以保存在內存,磁盤,或者集群中的其他節點;
step2RunMapFunctionForAllBuckets()方法創建了5個線程(每個bucket一個),每個線程StartThread處理每個bucket並把處理結果放在intermediateresults這個arraylist中。
如果bucket分配給不同的節點處理,必須有一個master主控節點監控各個節點的計算,匯總各個節點的處理結果,若有節點失敗,master必須能夠分配計算任務給其他節點計算。
step3RunReduceFunctionForAllBuckets()方法加載intermediateresults中間處理結果,並進行匯總處理,最後得到最終的計算結果。