程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
 程式師世界 >> 編程語言 >> JAVA編程 >> 關於JAVA >> 用JAVA實現緩沖多線程無阻塞讀取遠程文件

用JAVA實現緩沖多線程無阻塞讀取遠程文件

編輯:關於JAVA

我平時比較喜歡從網上聽歌,有些鏈接下載速度太慢了。如果用 HttpURLConnection類的方法打開連接,然後用InputStream類獲得輸入流,再用 BufferedInputStream構造出帶緩沖區的輸入流,如果網速太慢的話,無論緩沖 區設置多大,聽起來都是斷斷續續的,達不到真正緩沖的目的。於是嘗試編寫代 碼實現用緩沖方式讀取遠程文件,以下貼出的代碼是我寫的MP3解碼器的一部分 。我是不怎麼贊同使用多線程下載的,加之有的鏈接下載速度本身就比較快,所 以在下載速度足夠的情況下,就讓下載線程退出,直到只剩下一個下載線程。當 然,多線程中令人頭痛的死鎖問題、HttpURLConnection的超時阻塞問題都會使 代碼看起來異常復雜。

簡要介紹一下實現多線程環形緩沖的方法。將緩沖區buf[]分為16塊,每塊 32K,下載線程負責向緩沖區寫數據,每次寫一塊;讀線程(BuffRandAcceURL類 )每次讀小於32K的任意字節。同步描述:寫/寫互斥等待空閒塊;寫/寫並發填 寫buf[];讀/寫並發使用buf[]。

經過我很長一段時間使用,我認為比較滿意地實現了我的目標,同其它MP3播 放器對比,我的這種方法能夠比較流暢、穩定地下載並播放。我把實現多線程下 載緩沖的方法寫出來,不足之處懇請批評指正。

一、HttpReader類功能:HTTP協議從指定URL讀取數據

/** *//**
* author by http://www.bt285.cn http://www.5a520.cn
*/
package instream;   
  
import java.io.IOException;   
import java.io.InputStream;   
import java.net.HttpURLConnection;   
import java.net.URL;   
  
public final class HttpReader {   
    public static final int MAX_RETRY = 10;   
    private static long content_length;   
    private URL url;   
    private HttpURLConnection httpConnection;   
    private InputStream in_stream;   
    private long cur_pos;           //用於決定seek方法中

是否執行文件定位   
    private int connect_timeout;   
    private int read_timeout;   
       
    public HttpReader(URL u) {   
        this(u, 5000, 5000);   
    }   
       
    public HttpReader(URL u, int connect_timeout, int read_timeout) 

{   
        this.connect_timeout = connect_timeout;   
        this.read_timeout = read_timeout;   
        url = u;   
        if (content_length == 0) {   
            int retry = 0;   
            while (retry < HttpReader.MAX_RETRY)   
                try {   
                    this.seek(0);   
                    content_length = 

httpConnection.getContentLength();   
                    break;   
                } catch (Exception e) {   
                    retry++;   
                }   
        }   
    }   
       
    public static long getContentLength() {   
        return content_length;   
    }   
       
    public int read(byte[] b, int off, int len) throws IOException 

{   
        int r = in_stream.read(b, off, len);   
        cur_pos += r;   
        return r;   
    }   
       
    public int getData(byte[] b, int off, int len) throws 

IOException {   
        int r, rema = len;   
        while (rema > 0) {   
            if ((r = in_stream.read(b, off, rema)) == -1) {

   
                return -1;   
            }   
            rema -= r;   
            off += r;   
            cur_pos += r;   
        }   
        return len;   
    }   
       
    public void close() {   
        if (httpConnection != null) {   
            httpConnection.disconnect();   
            httpConnection = null;   
        }   
        if (in_stream != null) {   
            try {   
                in_stream.close();   
            } catch (IOException e) {}   
            in_stream = null;   
        }   
        url = null;   
    }   
       
    /**//*  
     * 拋出異常通知再試  
     * 響應碼503可能是由某種暫時的原因引起的,例如同一IP頻繁的連接

請求可能遭服務器拒絕  
     */  
    public void seek(long start_pos) throws IOException {   
        if (start_pos == cur_pos && in_stream != null)

   
            return;   
        if (httpConnection != null) {   
            httpConnection.disconnect();   
            httpConnection = null;   
        }   
        if (in_stream != null) {   
            in_stream.close();   
            in_stream = null;   
        }   
        httpConnection = (HttpURLConnection) 

url.openConnection();   
        httpConnection.setConnectTimeout(connect_timeout);   
        httpConnection.setReadTimeout(read_timeout);   
        String sProperty = "bytes=" + start_pos + "-";   
        httpConnection.setRequestProperty("Range", sProperty);

   
        //httpConnection.setRequestProperty("Connection", 

"Keep-Alive");   
        int responseCode = httpConnection.getResponseCode(); 

  
        if (responseCode < 200 || responseCode >= 300) {

   
            try {   
                Thread.sleep(500);   
            } catch (InterruptedException e) {   
                e.printStackTrace();   
            }   
            throw new IOException("HTTP 

responseCode="+responseCode);   
        }   
  
        in_stream = httpConnection.getInputStream();   
        cur_pos = start_pos;   
    }   
  
}

二、IWriterCallBack接口功能:實現讀/寫通信。

package instream;   
  
public interface IWriterCallBack {   
    public boolean tryWriting(Writer w) throws 

InterruptedException;   
    public void updateBuffer(int i, int len);   
    public void updateWriterCount();   
    public void terminateWriters();   
}

三、Writer類:下載線程,負責向buf[]寫數據。

/** *//**
* http://www.bt285.cn http://www.5a520.cn 
*/
package instream;   
import java.io.IOException;   
import java.net.URL;   
  
public final class Writer implements Runnable {   
    private static boolean isalive = true;   
    private byte[] buf;   
    private IWriterCallBack icb;   
    protected int index;            //buf[]內"塊"索引號

   
    protected long start_pos;       //index對應的文件位置(相

對於文件首的偏移量)   
    protected int await_count;      //用於判斷:下載速度足夠就

退出一個"寫"線程   
    private HttpReader hr;   
       
    public Writer(IWriterCallBack call_back, URL u, byte[] b, int 

i) {   
        hr = new HttpReader(u);   
        if(HttpReader.getContentLength() == 0)  //實例化

HttpReader對象都不成功   
            return;   
        icb = call_back;   
        buf = b;   
        Thread t = new Thread(this,"dt_"+i);   
        t.setPriority(Thread.NORM_PRIORITY + 1);   
        t.start();   
    }   
       
    public void run() {   
        int write_bytes=0, write_pos=0, rema = 0, retry = 0; 

  
        boolean cont = true;   
        while (cont) {   
            try {   
                // 1.等待空閒塊   
                if(retry == 0) {   
                    if (icb.tryWriting(this) == 

false)   
                        break;   
                    write_bytes = 0;   
                    rema = 

BuffRandAcceURL.UNIT_LENGTH;   
                    write_pos = index << 

BuffRandAcceURL.UNIT_LENGTH_BITS;   
                }   
                   
                // 2.定位   
                hr.seek(start_pos);   
  
                // 3.下載"一塊"   
                int w;   
                while (rema > 0 && isalive) 

{   
                    w = (rema < 2048) ? rema : 

2048; //每次讀幾K合適?   
                    if ((w = hr.read(buf, 

write_pos, w)) == -1) {   
                        cont = false;   
                        break;   
                    }   
                    rema -= w;   
                    write_pos += w;   
                    start_pos += w;   
                    write_bytes += w;   
                }   
                   
                //4.通知"讀"線程   
                retry = 0;   
                icb.updateBuffer(index, write_bytes); 

  
            } catch (InterruptedException e) {   
                isalive = false;   
                icb.terminateWriters();   
                break;   
            } catch (IOException e) {   
                if(++retry == HttpReader.MAX_RETRY) { 

  
                    isalive = false;   
                    icb.terminateWriters();   
                    break;   
                }   
            }   
        }   
        icb.updateWriterCount();   
        try {   
            hr.close();   
        } catch (Exception e) {}   
        hr = null;   
        buf = null;   
        icb = null;   
    }   
  
}

四、IRandomAccess接口:隨機讀取文件接口,BuffRandAcceURL類和 BuffRandAcceFile類實現接口方法。BuffRandAcceFile類實現讀取本地磁盤文件 ,這兒就不給出其源碼了。

package instream;   
  
public interface IRandomAccess {   
    public int read() throws Exception;   
    public int read(byte b[]) throws Exception;   
    public int read(byte b[], int off, int len) throws Exception; 

  
    public int dump(int src_off, byte b[], int dst_off, int len) 

throws Exception;   
    public void seek(long pos) throws Exception;   
    public long length();   
    public long getFilePointer();   
    public void close();   
}

五、BuffRandAcceURL類功能:創建下載線程;read方法從buf[]讀數據。

關鍵是如何簡單有效防止死鎖?以下只是我的一次嘗試,請指正。

/** *//**
* http://www.5a520.cn  http://www.bt285.cn
*/ 
package instream;   
  
import java.net.URL;   
import java.net.URLDecoder;   
import decode.Header;   
import tag.MP3Tag;   
import tag.TagThread;   
  
public final class BuffRandAcceURL implements IRandomAccess, 

IWriterCallBack {   
    public static final int UNIT_LENGTH_BITS = 15;        

          //32K   
    public static final int UNIT_LENGTH = 1 << 

UNIT_LENGTH_BITS;   
    public static final int BUF_LENGTH = UNIT_LENGTH << 4; 

           //16塊   
    public static final int UNIT_COUNT = BUF_LENGTH >> 

UNIT_LENGTH_BITS;   
    public static final int BUF_LENGTH_MASK = (BUF_LENGTH - 1);   
    private static final int MAX_WRITER = 8;   
    private static long file_pointer;   
    private static int read_pos;   
    private static int fill_bytes;   
    private static byte[] buf;      //同時也作讀寫同步

鎖:buf.wait()/buf.notify()   
    private static int[] buf_bytes;   
    private static int buf_index;   
    private static int alloc_pos;   
    private static URL url = null;   
    private static boolean isalive = true;   
    private static int writer_count;   
    private static int await_count;   
    private long file_length;   
    private long frame_bytes;   
       
    public BuffRandAcceURL(String sURL) throws Exception {   
        this(sURL,MAX_WRITER);   
    }   
       
    public BuffRandAcceURL(String sURL, int download_threads) 

throws Exception {   
        buf = new byte[BUF_LENGTH];   
        buf_bytes = new int[UNIT_COUNT];   
        url = new URL(sURL);   
           
        //創建線程以異步方式解析ID3   
        new TagThread(url);   
           
        //打印當前文件名   
        try {   
            String s = URLDecoder.decode(sURL, "GBK");   
            System.out.println("start>> " + 

s.substring(s.lastIndexOf("/") + 1));   
            s = null;   
        } catch (Exception e) {   
            System.out.println("start>> " + sURL); 

  
        }   
           
        //創建"寫"線程   
        for(int i = 0; i < download_threads; i++)   
            new Writer(this, url, buf, i+1);   
        frame_bytes = file_length = 

HttpReader.getContentLength();   
        if(file_length == 0) {   
            Header.strLastErr = "連接URL出錯,重試 " + 

HttpReader.MAX_RETRY + " 次後放棄。";   
            throw new Exception("retry " + 

HttpReader.MAX_RETRY);   
        }   
        writer_count = download_threads;   
           
        //緩沖   
        try_cache();   
           
        //跳過ID3 v2   
        MP3Tag mP3Tag = new MP3Tag();   
        int v2_size = mP3Tag.checkID3V2(buf,0);   
        if (v2_size > 0) {   
            frame_bytes -= v2_size;   
            //seek(v2_size):   
            fill_bytes -= v2_size;   
            file_pointer = v2_size;   
            read_pos = v2_size;   
            read_pos &= BUF_LENGTH_MASK;   
            int units = v2_size >> UNIT_LENGTH_BITS;

   
            for(int i = 0; i < units; i++) {   
                buf_bytes[i] = 0;   
                this.notifyWriter();   
            }   
            buf_bytes[units] -= v2_size;   
            this.notifyWriter();   
        }   
        mP3Tag = null;   
    }   
       
    private void try_cache() throws InterruptedException {   
        int cache_size = BUF_LENGTH;   
        if(cache_size > (int)file_length - alloc_pos)   
            cache_size = (int)file_length - alloc_pos;   
        cache_size -= UNIT_LENGTH;   
           
        //等待填寫當前正在讀的那"一塊"緩沖區   
        /**//*if(fill_bytes >= cache_size && 

writer_count > 0) {  
            synchronized (buf) {  
                buf.wait();  
            }  
            return;  
        }*/  
           
        //等待填滿緩沖區   
        while (fill_bytes < cache_size) {   
            if (writer_count == 0 || isalive == false)   
                return;   
            if(BUF_LENGTH > (int)file_length - 

alloc_pos)   
                cache_size = (int)file_length - 

alloc_pos - UNIT_LENGTH;   
            System.out.printf("\r[緩沖%1$6.2f%%] ",(float)

fill_bytes / cache_size * 100);   
            synchronized (buf) {   
                buf.wait();   
            }   
        }   
        System.out.printf("\r");   
    }   
       
    private int try_reading(int i, int len) throws Exception {   
        int n = (i == UNIT_COUNT - 1) ? 0 : (i + 1);   
        int r = (buf_bytes[i] == 0) ? 0 : (buf_bytes[i] + 

buf_bytes[n]);   
        while (r < len) {   
            if (writer_count == 0 || isalive == false)   
                return r;   
            try_cache();   
            r = (buf_bytes[i] == 0) ? 0 : (buf_bytes[i] + 

buf_bytes[n]);   
        }   
           
        return len;   
    }   
       
    /**//*  
     * 各個"寫"線程互斥等待空閒塊  
     */  
    public synchronized boolean tryWriting(Writer w) throws 

InterruptedException {   
        await_count++;   
        while (buf_bytes[buf_index] != 0 && isalive) {

   
            this.wait();   
        }   
           
        //下載速度足夠就結束一個"寫"線程   
        if(writer_count > 1 && w.await_count >= 

await_count &&   
                w.await_count >= writer_count)   
            return false;   
           
        if(alloc_pos >= file_length)   
            return false;   
        w.await_count = await_count;   
        await_count--;   
        w.start_pos = alloc_pos;   
        w.index = buf_index;   
        alloc_pos += UNIT_LENGTH;   
        buf_index = (buf_index == UNIT_COUNT - 1) ? 0 : 

buf_index + 1;   
        return isalive;   
    }   
       
    public void updateBuffer(int i, int len) {   
        synchronized (buf) {   
            buf_bytes[i] = len;   
            fill_bytes += len;   
            buf.notify();   
        }   
    }   
       
    public void updateWriterCount() {   
        synchronized (buf) {   
            writer_count--;   
            buf.notify();   
        }   
    }   
       
    public synchronized void notifyWriter() {   
        this.notifyAll();   
    }   
       
    public void terminateWriters() {   
        synchronized (buf) {   
            if (isalive) {   
                isalive = false;   
                Header.strLastErr = "讀取文件超時。重試 

" + HttpReader.MAX_RETRY   
                        + " 次後放棄,請您稍後

再試。";   
            }   
            buf.notify();   
        }   
           
        notifyWriter();        
    }   
       
    public int read() throws Exception {   
        int iret = -1;   
        int i = read_pos >> UNIT_LENGTH_BITS;   
        // 1."等待"有1字節可讀   
        while (buf_bytes[i] < 1) {   
            try_cache();   
            if (writer_count == 0)   
                return -1;   
        }   
        if(isalive == false)   
            return -1;   
  
        // 2.讀取   
        iret = buf[read_pos] & 0xff;   
        fill_bytes--;   
        file_pointer++;   
        read_pos++;   
        read_pos &= BUF_LENGTH_MASK;   
        if (--buf_bytes[i] == 0)   
            notifyWriter();     // 3.通知   
  
        return iret;   
    }
       
    public int read(byte b[]) throws Exception {
        return read(b, 0, b.length);
    }
    public int read(byte[] b, int off, int len) throws Exception {

        if(len > UNIT_LENGTH)
            len = UNIT_LENGTH;
        int i = read_pos >> UNIT_LENGTH_BITS;   
           
        // 1."等待"有足夠內容可讀   
        if(try_reading(i, len) < len || isalive == false) 

            return -1;
  
        // 2.讀取
        int tail_len = BUF_LENGTH - read_pos; // write_pos != 

BUF_LENGTH
        if (tail_len < len) {
            System.arraycopy(buf, read_pos, b, off, 

tail_len);
            System.arraycopy(buf, 0, b, off + tail_len, len 

- tail_len);
        } else
            System.arraycopy(buf, read_pos, b, off, len); 

        fill_bytes -= len;
        file_pointer += len;
        read_pos += len;
        read_pos &= BUF_LENGTH_MASK;
        buf_bytes[i] -= len;
        if (buf_bytes[i] < 0) {
            int ni = read_pos >> UNIT_LENGTH_BITS;

            buf_bytes[ni] += buf_bytes[i];
            buf_bytes[i] = 0;
            notifyWriter();
        } else if (buf_bytes[i] == 0)
            notifyWriter();
        return len;
    }
       
    /**//*
     * 從src_off位置復制,不移動文件"指針"
     */
    public int dump(int src_off, byte b[], int dst_off, int len) 

throws Exception {
        int rpos = read_pos + src_off;   
        if(try_reading(rpos >> UNIT_LENGTH_BITS, len) 

< len || isalive == false)
            return -1;   
        int tail_len = BUF_LENGTH - rpos;
        if (tail_len < len) {
            System.arraycopy(buf, rpos, b, dst_off, 

tail_len);
            System.arraycopy(buf, 0, b, dst_off + tail_len, 

len - tail_len);
        } else
            System.arraycopy(buf, rpos, b, dst_off, len); 

  
        // 不發信號
  
        return len;   
    }
       
    public long length() {
        return file_length;   
    }   
       
    public long getFilePointer() {
        return file_pointer;   
    }   
  
    public void close() {
        //...
    }   
       
    //   
    public void seek(long pos) throws Exception {
        //...
    }   
       
}
  1. 上一頁:
  2. 下一頁:
Copyright © 程式師世界 All Rights Reserved