程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
 程式師世界 >> 編程語言 >> JAVA編程 >> 關於JAVA >> 用java多線程斷點續傳實踐

用java多線程斷點續傳實踐

編輯:關於JAVA

annegu做了一個簡單的Http多線程的下載程序,來討論一下多線程並發下載 以及斷點續傳的問題。

這個程序的功能,就是可以分多個線程從目標地址上下載數據,每個線程負 責下載一部分,並可以支持斷點續傳和超時重連。

下載的方法是download(),它接收兩個參數,分別是要下載的頁面的url和編 碼方式。在這個負責下載的方法中,主要分了三個步驟。第一步是用來設置斷點 續傳時候的一些信息的,第二步就是主要的分多線程來下載了,最後是數據的合 並。

1、多線程下載:

/** *//** http://www.bt285.cn http://www.5a520.cn
*/
public String download(String urlStr, String charset) {
     this.charset = charset;
     long contentLength = 0;
         CountDownLatch latch = new CountDownLatch (threadNum);
     long[] startPos = new long[threadNum];
     long endPos = 0;

     try {
         // 從url中獲得下載的文件格式與名字
         this.fileName = urlStr.substring (urlStr.lastIndexOf("/") + 1);

         this.url = new URL(urlStr);
         URLConnection con = url.openConnection();
         setHeader(con);
         // 得到content的長度
         contentLength = con.getContentLength();
         // 把context分為threadNum段的話,每段的長度。
         this.threadLength = contentLength /  threadNum;

         // 第一步,分析已下載的臨時文件,設置斷點,如果是 新的下載任務,則建立目標文件。在第4點中說明。
         startPos = setThreadBreakpoint(fileDir,  fileName, contentLength, startPos);

         //第二步,分多個線程下載文件
         ExecutorService exec =  Executors.newCachedThreadPool();
         for (int i = 0; i < threadNum; i++)  {
             // 創建子線程來負責下載數據,每段數據的起 始位置為(threadLength * i + 已下載長度)
             startPos[i] += threadLength *  i;

             /**//*設置子線程的終止位置,非最後一個線程 即為(threadLength * (i + 1) - 1)
             最後一個線程的終止位置即為下載內容的長度 */
             if (i == threadNum - 1) {
                 endPos = contentLength;
             } else {
                 endPos = threadLength * (i +  1) - 1;
             }
             // 開啟子線程,並執行。
             ChildThread thread = new ChildThread (this, latch, i, startPos[i], endPos);
             childThreads[i] = thread;
             exec.execute(thread);
         }

         try {
             // 等待CountdownLatch信號為0,表示所有子線 程都結束。
                 latch.await();
             exec.shutdown();

             // 第三步,把分段下載下來的臨時文件中的內 容寫入目標文件中。在第3點中說明。
             tempFileToTargetFile(childThreads);

         } catch (InterruptedException e) {
             e.printStackTrace();
         }
}

首先來看最主要的步驟:多線程下載。

首先從url中提取目標文件的名稱,並在對應的目錄創建文件。然後取得要下 載的文件大小,根據分成的下載線程數量平均分配每個線程需要下載的數據量, 就是threadLength。然後就可以分多個線程來進行下載任務了。

在這個例子中,並沒有直接顯示的創建Thread對象,而是用Executor來管理 Thread對象,並且用CachedThreadPool來創建的線程池,當然也可以用 FixedThreadPool。CachedThreadPool在程序執行的過程中會創建與所需數量相 同的線程,當程序回收舊線程的時候就停止創建新線程。FixedThreadPool可以 預先新建參數給定個數的線程,這樣就不用在創建任務的時候再來創建線程了, 可以直接從線程池中取出已准備好的線程。下載線程的數量是通過一個全局變量 threadNum來控制的,默認為5。

好了,這5個子線程已經通過Executor來創建了,下面它們就會各自為政,互 不干涉的執行了。線程有兩種實現方式:實現Runnable接口;繼承Thread類。

ChildThread就是子線程,它作為DownloadTask的內部類,繼承了Thread,它 的構造方法需要5個參數,依次是一個對 DownloadTask的引用,一個 CountDownLatch,id(標識線程的id號),startPosition(下載內容的開始位 置),endPosition(下載內容的結束位置)。

這個CountDownLatch是做什麼用的呢?

現在我們整理一下思路,要實現分多個線程來下載數據的話,我們肯定還要 把這多個線程下載下來的數據進行合。主線程必須等待所有的子線程都執行結束 之後,才能把所有子線程的下載數據按照各自的id順序進行合並。 CountDownLatch就是來做這個工作的。

CountDownLatch用來同步主線程,強制主線程等待所有的子線程執行的下載 操作完成。在主線程中,CountDownLatch對象被設置了一個初始計數器,就是子 線程的個數5個,代碼①處。在新建了5個子線程並開始執行之後,主線程用 CountDownLatch的await()方法來阻塞主線程,直到這個計數器的值到達0,才會 進行下面的操作,代碼②處。

對每個子線程來說,在執行完下載指定區間與長度的數據之後,必須通過調 用CountDownLatch的countDown()方法來把這個計數器減1。

2、在全面開啟下載任務之後,主線程就開始阻塞,等待子線程執行完畢,所 以下面我們來看一下具體的下載線程ChildThread。

/** *//**
*author by http://www.5a520.cn http://www.feng123.com
*/
public class ChildThread extends Thread {
     public static final int STATUS_HASNOT_FINISHED =  0;
     public static final int STATUS_HAS_FINISHED = 1;
     public static final int STATUS_HTTPSTATUS_ERROR =  2;
     private DownloadTask task;
     private int id;
     private long startPosition;
     private long endPosition;
     private final CountDownLatch latch;
     private File tempFile = null;
     //線程狀態碼
     private int status =  ChildThread.STATUS_HASNOT_FINISHED;

     public ChildThread(DownloadTask task, CountDownLatch  latch, int id, long startPos, long endPos) {
         super();
         this.task = task;
         this.id = id;
         this.startPosition = startPos;
         this.endPosition = endPos;
         this.latch = latch;

         try {
             tempFile = new File(this.task.fileDir +  this.task.fileName + "_" + id);
             if(!tempFile.exists()){
                 tempFile.createNewFile();
             }
         } catch (IOException e) {
             e.printStackTrace();
         }

     }

     public void run() {
         System.out.println("Thread " + id + " run  ");
         HttpURLConnection con = null;
         InputStream inputStream = null;
         BufferedOutputStream outputStream = null;
         int count = 0;
         long threadDownloadLength = endPosition -  startPosition;

         try {
             outputStream = new BufferedOutputStream (new FileOutputStream(tempFile.getPath(), true));
         } catch (FileNotFoundException e2) {
             e2.printStackTrace();
         }

③       for(;;){
④           startPosition += count;
             try {
                 //打開URLConnection
                 con = (HttpURLConnection)  task.url.openConnection();
                 setHeader(con);
                 con.setAllowUserInteraction (true);
                 //設置連接超時時間為10000ms
⑤               con.setConnectTimeout(10000);
                 //設置讀取數據超時時間為10000ms
                 con.setReadTimeout(10000);

                 if(startPosition <  endPosition){
                     //設置下載數據的起止區間
                     con.setRequestProperty ("Range", "bytes=" + startPosition + "-"
                             +  endPosition);
                     System.out.println("Thread  " + id + " startPosition is " + startPosition);
                     System.out.println("Thread  " + id + " endPosition is " + endPosition);

                     //判斷http status是否為 HTTP/1.1 206 Partial Content或者200 OK
                     //如果不是以上兩種狀態,把 status改為STATUS_HTTPSTATUS_ERROR
⑥                   if (con.getResponseCode()  != HttpURLConnection.HTTP_OK
                             && con.getResponseCode() != HttpURLConnection.HTTP_PARTIAL)  {
                         System.out.println ("Thread " + id + ": code = "
                                 +  con.getResponseCode() + ", status = "
                                 +  con.getResponseMessage());
                         status =  ChildThread.STATUS_HTTPSTATUS_ERROR;
                         this.task.statusError = true;
                         outputStream.close ();
                         con.disconnect ();
                         System.out.println ("Thread " + id + " finished.");
                         latch.countDown ();
                         break;
                     }

                     inputStream =  con.getInputStream();

                     int len = 0;
                     byte[] b = new byte [1024];
                     while ((len =  inputStream.read(b)) != -1) {
                         outputStream.write (b, 0, len);
                         count +=  len;

                         //每讀滿5000個byte ,往磁盤上flush一下
                         if(count % 5000  == 0){
⑦                            outputStream.flush();
                         }
                     }

                     System.out.println("count  is " + count);
                     if(count >=  threadDownloadLength){
                         hasFinished =  true;
                     }
⑧                   outputStream.flush();
                     outputStream.close();
                     inputStream.close();
                     con.disconnect();
                 }

                 System.out.println("Thread " +  id + " finished.");
                 latch.countDown();
                 break;
             } catch (IOException e) {
                 try {
⑨                   outputStream.flush();
⑩                   TimeUnit.SECONDS.sleep (getSleepSeconds());
                 } catch (InterruptedException  e1) {
                     e1.printStackTrace();
                 } catch (IOException e2) {
                     e2.printStackTrace();
                 }
                 continue;
             }
         }
     }
}

在ChildThread的構造方法中,除了設置一些從主線程中帶來的id, 起始位置 之外,就是新建了一個臨時文件用來存放當前線程的下載數據。臨時文件的命名 規則是這樣的:下載的目標文件名+”_”+線程編號。

現在讓我們來看看從網絡中讀數據是怎麼讀的。我們通過URLConnection來獲 得一個http的連接。有些網站為了安全起見,會對請求的http連接進行過濾,因 此為了偽裝這個http的連接請求,我們給httpHeader穿一件偽裝服。下面的 setHeader方法展示了一些非常常用的典型的 httpHeader的偽裝方法。比較重要 的有:Uer-Agent模擬從Ubuntu的firefox浏覽器發出的請求;Referer模擬浏覽 器請求的前一個觸發頁面,例如從skycn站點來下載軟件的話,Referer設置成 skycn的首頁域名就可以了;Range就是這個連接獲取的流文件的起始區間。

private void setHeader(URLConnection con) {
     con.setRequestProperty("User-Agent", "Mozilla/5.0  (X11; U; Linux i686; en-US; rv:1.9.0.3) Gecko/2008092510  Ubuntu/8.04 (hardy) Firefox/3.0.3");
     con.setRequestProperty("Accept-Language", "en- us,en;q=0.7,zh-cn;q=0.3");
     con.setRequestProperty("Accept-Encoding", "aa");
     con.setRequestProperty("Accept-Charset", "ISO-8859-1,utf- 8;q=0.7,*;q=0.7");
     con.setRequestProperty("Keep-Alive", "300");
     con.setRequestProperty("Connection", "keep-alive");
     con.setRequestProperty("If-Modified-Since", "Fri, 02 Jan  2009 17:00:05 GMT");
     con.setRequestProperty("If-None-Match", "\"1261d8-4290- df64d224\"");
     con.setRequestProperty("Cache-Control", "max-age=0");
     con.setRequestProperty("Referer",  "http://http://www.bt285.cn");
}

另外,為了避免線程因為網絡原因而阻塞,設置了ConnectTimeout和 ReadTimeout,代碼⑤、⑥處。 setConnectTimeout設置的連接的超時時間,而 setReadTimeout設置的是讀取數據的超時時間,發生超時的話,就會拋出 socketTimeout異常,兩個方法的參數都是超時的毫秒數。

這裡對超時的發生,采用的是等候一段時間重新連接的方法。整個獲取網絡 連接並讀取下載數據的過程都包含在一個循環之中(代碼③處),如果發生了連 接或者讀取數據的超時,在拋出的異常裡面就會sleep一定的時間(代碼⑩處) ,然後continue,再次嘗試獲取連接並讀取數據,這個時間可以通過 setSleepSeconds()方法來設置。我們在迅雷等下載工具的使用中,經常可以看 到狀態欄會輸出類似“連接超時,等待*秒後重試”的話,這個就是通過 ConnectTimeout,ReadTimeout來實現的。

連接建立好之後,我們要檢查一下返回響應的狀態碼。常見的Http Response Code有以下幾種:

a) 200 OK 一切正常,對GET和POST請求的應答文檔跟在後面。

b) 206 Partial Content 客戶發送了一個帶有Range頭的GET請求,服務器完 成。

c) 404 Not Found 無法找到指定位置的資源。這也是一個常用的應答。

d) 414 Request URI Too Long URI太長。

e) 416 Requested Range Not Satisfiable 服務器不能滿足客戶在請求中指 定的Range頭。

f) 500 Internal Server Error 服務器遇到了意料不到的情況,不能完成客 戶的請求。

g) 503 Service Unavailable 服務器由於維護或者負載過重未能應答。例如 ,Servlet可能在數據庫連接池已滿的情況下返回503。

在這些狀態裡面,只有200與206才是我們需要的正確的狀態。所以在代碼⑥ 處,進行了狀態碼的判斷,如果返回不符合要求的狀態碼,則結束線程,返回主 線程並提示報錯。

假設一切正常,下面我們就要考慮從網絡中讀數據了。正如我之前在分析 mysql的數據庫驅動中看的一樣,網絡中發送數據都是以數據包的形式來發送的 ,也就是說不管是客戶端向服務器發出的請求數據,還是從服務器返回給客戶端 的響應數據,都會被拆分成若干個小型數據包在網絡中傳遞,等數據包到達了目 的地,網絡接口會依據數據包的編號來組裝它們,成為完整的比特數據。因此, 我們可以想到在這裡也是一樣的,我們用inputStream的read方法來通過網卡從 網絡中讀取數據,並不一定一次就能把所有的數據包都讀完,所以我們要不斷的 循環來從inputStream中讀取數據。Read方法有一個int型的返回值,表示每次從 inputStream中讀取的字節數,如果把這個inputStream中的數據讀完了,那麼就 返回-1。

Read方法最多可以有三個參數,byte b[]是讀取數據之後存放的目標數組, off標識了目標數組中存儲的開始位置,len是想要讀取的數據長度,這個長度必 定不能大於b[]的長度。

public synchronized int read(byte b[], int off, int  len);

我們的目標是要把目標地址的內容下載下來,現在分了5個線程來分段下載, 那麼這些分段下載的數據保存在哪裡呢?如果把它們都保存在內存中是非常糟糕 的做法,如果文件相當之大,例如是一個視頻的話,難道把這麼大的數據都放在 內存中嗎,這樣的話,萬一連接中斷,那前面下載的東西就都沒有了?我們當然 要想辦法及時的把下載的數據刷到磁盤上保存下來。當用bt下載視頻的時候,通 常都會有個臨時文件,當視頻完全下載結束之後,這個臨時文件就會被刪除,那 麼下次繼續下載的時候,就會接著上次下載的點繼續下載。所以我們的 outputStream就是往這個臨時文件來輸出了。

OutputStream的write方法和上面InputStream的read方法有類似的參數, byte b[]是輸出數據的來源,off標識了開始位置,len是數據長度。

public synchronized void write(byte b[], int off, int len)  throws IOException;

在往臨時文件的outputStream中寫數據的時候,我會加上一個計數器,每滿 5000個比特就往文件中flush一下(代碼⑦處)。

對於輸出流的flush,有些要注意的地方,在程序中有三個地方調用了 outputStream.flush()。第一個是在循環的讀取網絡數據並往 outputStream中 寫入的時候,每滿5000個byte就flush一下(代碼⑦處);第二個是循環之後( 代碼⑧處),這時候正常的讀取寫入操作已經完成,但是outputStream中還有沒 有刷入磁盤的數據,所以要flush一下才能關閉連接;第三個就是在異常中的 flush(代碼⑨處),因為如果發生了連接超時或者讀取數據超時的話,就會直 接跑到catch的exception中去,這個時候outputStream中的數據如果不 flush的 話,重新連接的時候這部分數據就會丟失了。另外,當拋出異常,重新連接的時 候,下載的起始位置也要重新設置(代碼④處),count就是用來標識已經下載 的字節數的,把count+startPosition就是新一次連接需要的下載起始位置了。

3、現在每個分段的下載線程都順利結束了,也都創建了相應的臨時文件,接 下來在主線程中會對臨時文件進行合並,並寫入目標文件,最後刪除臨時文件。 這部分很簡單,就是一個對所有下載線程進行遍歷的過程。這裡outputStream也 有兩次flush,與上面類似,不再贅述。

/** *//**author by http://www.bt285.cn  http://www.guihua.org */
private void tempFileToTargetFile(ChildThread[] childThreads)  {
     try {
         BufferedOutputStream outputStream = new  BufferedOutputStream(
                 new FileOutputStream(fileDir +  fileName));

         // 遍歷所有子線程創建的臨時文件,按順序把下載內容 寫入目標文件中
         for (int i = 0; i < threadNum; i++)  {
             if (statusError) {
                 for (int k = 0; k <  threadNum; k++) {
                     if (childThreads [k].tempFile.length() == 0)
                         childThreads [k].tempFile.delete();
                 }
                 System.out.println("本次下載任務不 成功,請重新設置線程數。");
                 break;
             }

             BufferedInputStream inputStream = new  BufferedInputStream(
                     new FileInputStream (childThreads[i].tempFile));
             System.out.println("Now is file " +  childThreads[i].id);
             int len = 0;
             int count = 0;
             byte[] b = new byte[1024];
             while ((len = inputStream.read(b)) !=  -1) {
                 count += len;
                 outputStream.write(b, 0,  len);
                 if ((count % 5000) == 0)  {
                     outputStream.flush();
                 }

                 // b = new byte[1024];
             }

             inputStream.close();
             // 刪除臨時文件
             if (childThreads[i].status ==  ChildThread.STATUS_HAS_FINISHED) {
                 childThreads[i].tempFile.delete ();
             }
         }

         outputStream.flush();
         outputStream.close();
     } catch (FileNotFoundException e) {
         e.printStackTrace();
     } catch (IOException e) {
         e.printStackTrace();
     }
}

4、最後,說說斷點續傳,前面為了實現斷點續傳,在每個下載線程中都創建 了一個臨時文件,現在我們就要利用這個臨時文件來設置斷點的位置。由於臨時 文件的命名方式都是固定的,所以我們就專門找對應下載的目標文件的臨時文件 ,臨時文件中已經下載的字節數就是我們需要的斷點位置。startPos是一個數組 ,存放了每個線程的已下載的字節數。

//第一步,分析已下載的臨時文件,設置斷點,如果是新的下載任務,則建 立目標文件。

private long[] setThreadBreakpoint(String fileDir2,  String fileName2,
         long contentLength, long[] startPos) {
     File file = new File(fileDir + fileName);
     long localFileSize = file.length();

     if (file.exists()) {
         System.out.println("file " + fileName + " has  exists!");
         // 下載的目標文件已存在,判斷目標文件是否完整
         if (localFileSize < contentLength) {
             System.out.println("Now download continue   ");

             // 遍歷目標文件的所有臨時文件,設置斷點的 位置,即每個臨時文件的長度
             File tempFileDir = new File (fileDir);
             File[] files = tempFileDir.listFiles ();
             for (int k = 0; k < files.length;  k++) {
                 String tempFileName = files [k].getName();
                 // 臨時文件的命名方式為:目標文件 名+"_"+編號
                 if (tempFileName != null  && files[k].length() > 0
                         && tempFileName.startsWith(fileName + "_")) {
                     int fileLongNum =  Integer.parseInt(tempFileName
                             .substring (tempFileName.lastIndexOf("_") + 1,
                                      tempFileName.lastIndexOf("_") + 2));
                     // 為每個線程設置已下載的 位置
                     startPos[fileLongNum] =  files[k].length();
                 }
             }
         }
     } else {
         // 如果下載的目標文件不存在,則創建新文件
         try {
             file.createNewFile();
         } catch (IOException e) {
             e.printStackTrace();
         }
     }

     return startPos;
} 

5、測試

public class DownloadStartup {
     private static final String encoding = "utf- 8";
     public static void main(String[] args) {
         DownloadTask downloadManager = new  DownloadTask();
         String urlStr =  "http://apache.freelamp.com/velocity/tools/1.4/velocity-tools- 1.4.zip";
         downloadManager.setSleepSeconds(5);
         downloadManager.download(urlStr, encoding);
     }
}

  1. 上一頁:
  2. 下一頁:
Copyright © 程式師世界 All Rights Reserved