程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
 程式師世界 >> 編程語言 >> JAVA編程 >> JAVA綜合教程 >> Hadoop學習2,hadoop學習

Hadoop學習2,hadoop學習

編輯:JAVA綜合教程

Hadoop學習2,hadoop學習


搭建偽分布式完成之後:偽分布式安裝詳細介紹:http://www.powerxing.com/install-hadoop/

           練習1 編寫Java程序實現以下函數

           1.向HDFS中上傳文件

           2.從HDFS下載文件到本地

           3.顯示文件目錄

           4.移動文件

           5.新建文件夾

           6.移除文件夾

    

package cn.itcast.hadoop.hdfs;


import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.IOException; import org.apache.commons.compress.utils.IOUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; import org.junit.Before; import org.junit.Test; public class temp { static FileSystem fs = null; /* * initiation */ @Before public void init() throws IOException{ Configuration configuration = new Configuration(); configuration.set("fs.defaultFS", "hdfs://zpfbuaa:9000/"); fs = FileSystem.get(configuration); } /* * upload files */ @Test public void upload() throws IOException{ init(); Path dstPath = new Path("hdfs://zpfbuaa:9000/aa/my.jar"); FSDataOutputStream os = fs.create(dstPath); FileInputStream is = new FileInputStream("/home/hadoop/download/my.jar"); IOUtils.copy(is, os); } /* * upload files to HDFS */ @Test public void upload2() throws IOException{ fs.copyFromLocalFile(new Path("/home/hadoop/download/my.jar"), new Path("hdfs://zpfbuaa:9000/aaa/bbb/ccc/my3.jar")); } /* * download files to local */ public void download(){ } /* * list the information of files */ @Test public void listfile() throws FileNotFoundException, IllegalArgumentException, IOException{ RemoteIterator<LocatedFileStatus> filesIterator = fs.listFiles(new Path("/"), true); while(filesIterator.hasNext()){ LocatedFileStatus fileStatus = filesIterator.next(); Path path = fileStatus.getPath(); String filename = path.getName(); System.out.println(filename); } System.out.println("---------------------------------------------"); FileStatus[] listStatus = fs.listStatus(new Path("/")); for(FileStatus status : listStatus){ String name = status.getPath().getName(); System.out.println(name + (status.isDirectory()?" is a dir":" is a file")); } } /* * make a new file */ @Test public void makdir() throws IllegalArgumentException, IOException{ fs.mkdirs(new Path("/aaa/bbb/ccc")); } /* * delete a old file */ public void rm() throws IllegalArgumentException, IOException{ fs.delete(new Path("/aaa/bbb"), true); } public static void main(String[] args) throws Exception { // TODO Auto-generated method stub Configuration configuration = new Configuration(); configuration.set("fs.defaultFS", "hdfs://zpfbuaa:9000/"); fs = FileSystem.get(configuration); FSDataInputStream is = fs.open(new Path("/jdk-7u65-linux-i586.tar.gz")); FileOutputStream os = new FileOutputStream("/home/hadoop/download/my.jar"); IOUtils.copy(is,os); } }

 

    練習2 編寫Java程序實現客戶端和服務器端的socket信息交互以及函數調用

  LoginServiceImpl.class 服務器實例類

  

package cn.itcast.hadoop.rpc;

public class LoginServiceImpl implements LoginServiceInterface{

    @Override
    public String Login(String username, String password) {
        
        return username + " logged in successfully!";
    }
    
    
}
  LoginServiceInterface 接口類(服務器端以及本地端均實現的)

  

package cn.itcast.hadoop.rpc;

public interface LoginServiceInterface {
    
    public static final long versionID = 1L;
    
    public String Login(String username,String password);
}
     starter.class 創建服務器端類  
package cn.itcast.hadoop.rpc;

import java.io.IOException;

import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RPC.Server;
import org.apache.hadoop.ipc.RPC.Builder;

public class starter {
    
    public static void main(String[] args) throws HadoopIllegalArgumentException, IOException {
        
        Builder builder = new RPC.Builder(new Configuration());
        
        builder.setBindAddress("zpfbuaa").setPort(10000).setProtocol(LoginServiceInterface.class).setInstance(new LoginServiceImpl());
        
        Server server = builder.build();
        
        
    }
    
}

LoginController登錄類

  

package cn.itcast.hadoop.rpc;

import java.io.IOException;
import java.net.InetSocketAddress;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.RPC;

public class LoginController {

    public static void main(String[] args) throws IOException {
        
        LoginServiceInterface proxy = RPC.getProxy(LoginServiceInterface.class, 1L, new InetSocketAddress("zpfbuaa", 10000), new Configuration());
        
        
        String result = proxy.Login("zpfbuaa", "123456789");
        
        System.out.println(result);
    }
}

LoginServiceInterface 接口類

  

package cn.itcast.hadoop.rpc;

public interface LoginServiceInterface {
    
    public static final long versionID = 1L;
    
    public String Login(String username,String password);
}

  需要注意的是:

  1.為了進行遠程調用的模仿,將LoginServiceImpl.class以及LoginServiceInterface.class接口類和 starter.class類放在虛擬機上。本地放LoginController類以及LoginServiceInterface接口類。

  2.首先需要將服務器端的服務啟動,上述例子會監聽虛擬機的10000端口。

  3.容易忽略的地方:版本號versionID. 對於不同的版本擁有不同的版本號。在上述例子中簡單的均定義版本號為Long類型 並且為final類型 賦值為1L。

  4.jar包的導入以及版本的控制。

  5.本地以及服務器端的函數都要實現一樣的接口類,但是為了防止調用時版本的不對應,所以在Build實例的時候需要將版本號也就是versionID聲明清楚,這樣調用的時候可以通過版本號的不同將函數進行區別開。

Hadoop自身的遠程調用實現機制RPC

  主要步驟如下:

  1.將本地socket以及接口類封裝為一個proxy,生成動態本地代理實例

  2.該實例調用相對應的函數並且傳入相應的參數。

  3.本地socket得到動態代理調用的函數以及傳入的參數

  4.使用網絡傳輸協議實現本地socket與遠程服務器的socket進行連接,實現信息傳遞

  5.服務器端socket得到調用的函數以及傳入的參數,生成動態服務器端的代理實例

  6.該服務器端實例調用服務器端的函數,並且傳入得到的參數。

  7.函數調用結果返回給服務器端socket。

  8.服務器端socket將返回結果通過網絡傳輸協議傳遞給本地socket。

  9.本地socket將返回結果傳遞給本地動態代理proxy。

  RPC的優點:

    1.實現了controller和implement的分離

    2.利用RPC機制可以實現信息的有效傳遞。

    3.保障數據的可靠性(DataNode需要定時向NameNode傳遞自身保存的blocks信息,以便NameNode進行blocks的維護)。

 

遠程調用的底層實現機制

實現RPC機制:

查看FileSystem fs = FileSystem.get(new Configuration());

一步一步查看fs的生成過程!

clip_image003

加入斷點後,逐步進行查看!

clip_image004

  1. 上一頁:
  2. 下一頁:
Copyright © 程式師世界 All Rights Reserved