程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
 程式師世界 >> 編程語言 >> JAVA編程 >> 關於JAVA >> 基於Hadoop的Map reduce編程(一)

基於Hadoop的Map reduce編程(一)

編輯:關於JAVA

翻譯的一篇國外的關於hadoop mapreduce的文章,文章比較長,先翻譯第一部分吧

翻譯者:pconlin900

博客:http://pconline900.javaeye.com

Hadoop是apache的一個開源的map-reduce框架,MapReduce是一個並行計算模型,用來處理海量數據。模型思想來源於google的Jeffrey Dean 和 Sanjay Ghemawat,包括map() reduce()兩個主要的功能。

這是一個很簡單的類似於Hadoop的MapReduce應用例子,應用了mapreduce的基本思想,可以幫助理解hadoop的處理思想和技術,但注意,它沒有使用hadoop框架。

例子的功能是創建一些字符串,然後統計這些字符串裡面每個字符出現的次數,最後匯總得到總的字符出現次數。

Listing 1. 主程序

public class Main
{
  public static void main(String[] args)
  {
    MyMapReduce my = new MyMapReduce();
    my.init();
  }

Listing 2. MyMapReduce.java

import java.util.*;
public class MyMapReduce
{
List buckets = new ArrayList();
List intermediateresults = new ArrayList();
List values = new ArrayList();
public void init()
{
for(int i = 1; i<=30; i++)
{
values.add("http://pconline900.javaeye.com" + new Integer(i).toString());
}
  
System.out.println("**STEP 1 START**-> Running Conversion into Buckets**");
System.out.println();
List b = step1ConvertIntoBuckets(values,5);
    System.out.println("************STEP 1 COMPLETE*************");
    System.out.println();
    System.out.println();
  System.out.println("**STEP 2 START**->Running **Map Function** concurrently for all    Buckets");
System.out.println();
List res = step2RunMapFunctionForAllBuckets(b);
System.out.println("************STEP 2 COMPLETE*************");
    System.out.println();
    System.out.println();
System.out.println("**STEP 3 START**->Running **Reduce Function** for collating Intermediate Results and Printing Results");
System.out.println();
step3RunReduceFunctionForAllBuckets(res);
System.out.println("************STEP 3 COMPLETE*************");
         System.out.println("************pconline900 翻譯*************");
         System.out.println("***********博客:http://pconline900.javaeye.com*************");
}
public List step1ConvertIntoBuckets(List list,int numberofbuckets)
{
int n = list.size();
int m = n / numberofbuckets;
int rem = n% numberofbuckets;
int count = 0;
System.out.println("BUCKETS");
for(int j =1; j<= numberofbuckets; j++)
{
List temp = new ArrayList();
for(int i=1; i<= m; i++)
{
temp.add((String)values.get(count));
count++;
}
buckets.add(temp);
temp = new ArrayList();
}
if(rem != 0)
{
List temp = new ArrayList();
for(int i =1; i<=rem;i++)
{
temp.add((String)values.get(count));
count++;
}
buckets.add(temp);
}
    System.out.println();
System.out.println(buckets);
System.out.println();
return buckets;
}
public List step2RunMapFunctionForAllBuckets(List list)
{
for(int i=0; i< list.size(); i++)
{
List elementList = (ArrayList)list.get(i);
new StartThread(elementList).start();
}
    try
    {
Thread.currentThread().sleep(1000);
}catch(Exception e)
{
}
return intermediateresults;
}
public void step3RunReduceFunctionForAllBuckets(List list)
{
int sum =0;
for(int i=0; i< list.size(); i++)
{
//you can do some processing here, like finding max of all results etc
int t = Integer.parseInt((String)list.get(i));
sum += t;
}
System.out.println();
System.out.println("Total Count is "+ sum);
System.out.println();
}
class StartThread extends Thread
{
private List tempList = new ArrayList();
public StartThread(List list)
{
tempList = list;
}
public void run()
{
for(int i=0; i< tempList.size();i++)
{
String str = (String)tempList.get(i);
synchronized(this)
           {
intermediateresults.add(new Integer(str.length()).toString());
}
}
}
}
}

init()方法創建了一些測試數據,作為測試數據。實際應用中會是海量數據處理。

step1ConvertIntoBuckets()方法將測試數據拆分到5個 bucket中,每個bucket是一個ArrayList(包含6個String數據)。bucket可以保存在內存,磁盤,或者集群中的其他節點;

step2RunMapFunctionForAllBuckets()方法創建了5個線程(每個bucket一個),每個線程StartThread處理每個bucket並把處理結果放在intermediateresults這個arraylist中。

如果bucket分配給不同的節點處理,必須有一個master主控節點監控各個節點的計算,匯總各個節點的處理結果,若有節點失敗,master必須能夠分配計算任務給其他節點計算。

step3RunReduceFunctionForAllBuckets()方法加載intermediateresults中間處理結果,並進行匯總處理,最後得到最終的計算結果。

  1. 上一頁:
  2. 下一頁:
Copyright © 程式師世界 All Rights Reserved