我平時比較喜歡從網上聽歌,有些鏈接下載速度太慢了。如果用 HttpURLConnection類的方法打開連接,然後用InputStream類獲得輸入流,再用 BufferedInputStream構造出帶緩沖區的輸入流,如果網速太慢的話,無論緩沖 區設置多大,聽起來都是斷斷續續的,達不到真正緩沖的目的。於是嘗試編寫代 碼實現用緩沖方式讀取遠程文件,以下貼出的代碼是我寫的MP3解碼器的一部分 。我是不怎麼贊同使用多線程下載的,加之有的鏈接下載速度本身就比較快,所 以在下載速度足夠的情況下,就讓下載線程退出,直到只剩下一個下載線程。當 然,多線程中令人頭痛的死鎖問題、HttpURLConnection的超時阻塞問題都會使 代碼看起來異常復雜。
簡要介紹一下實現多線程環形緩沖的方法。將緩沖區buf[]分為16塊,每塊 32K,下載線程負責向緩沖區寫數據,每次寫一塊;讀線程(BuffRandAcceURL類 )每次讀小於32K的任意字節。同步描述:寫/寫互斥等待空閒塊;寫/寫並發填 寫buf[];讀/寫並發使用buf[]。
經過我很長一段時間使用,我認為比較滿意地實現了我的目標,同其它MP3播 放器對比,我的這種方法能夠比較流暢、穩定地下載並播放。我把實現多線程下 載緩沖的方法寫出來,不足之處懇請批評指正。
一、HttpReader類功能:HTTP協議從指定URL讀取數據
/** *//** * author by http://www.bt285.cn http://www.5a520.cn */ package instream; import java.io.IOException; import java.io.InputStream; import java.net.HttpURLConnection; import java.net.URL; public final class HttpReader { public static final int MAX_RETRY = 10; private static long content_length; private URL url; private HttpURLConnection httpConnection; private InputStream in_stream; private long cur_pos; //用於決定seek方法中 是否執行文件定位 private int connect_timeout; private int read_timeout; public HttpReader(URL u) { this(u, 5000, 5000); } public HttpReader(URL u, int connect_timeout, int read_timeout) { this.connect_timeout = connect_timeout; this.read_timeout = read_timeout; url = u; if (content_length == 0) { int retry = 0; while (retry < HttpReader.MAX_RETRY) try { this.seek(0); content_length = httpConnection.getContentLength(); break; } catch (Exception e) { retry++; } } } public static long getContentLength() { return content_length; } public int read(byte[] b, int off, int len) throws IOException { int r = in_stream.read(b, off, len); cur_pos += r; return r; } public int getData(byte[] b, int off, int len) throws IOException { int r, rema = len; while (rema > 0) { if ((r = in_stream.read(b, off, rema)) == -1) { return -1; } rema -= r; off += r; cur_pos += r; } return len; } public void close() { if (httpConnection != null) { httpConnection.disconnect(); httpConnection = null; } if (in_stream != null) { try { in_stream.close(); } catch (IOException e) {} in_stream = null; } url = null; } /**//* * 拋出異常通知再試 * 響應碼503可能是由某種暫時的原因引起的,例如同一IP頻繁的連接 請求可能遭服務器拒絕 */ public void seek(long start_pos) throws IOException { if (start_pos == cur_pos && in_stream != null) return; if (httpConnection != null) { httpConnection.disconnect(); httpConnection = null; } if (in_stream != null) { in_stream.close(); in_stream = null; } httpConnection = (HttpURLConnection) url.openConnection(); httpConnection.setConnectTimeout(connect_timeout); httpConnection.setReadTimeout(read_timeout); String sProperty = "bytes=" + start_pos + "-"; httpConnection.setRequestProperty("Range", sProperty); //httpConnection.setRequestProperty("Connection", "Keep-Alive"); int responseCode = httpConnection.getResponseCode(); if (responseCode < 200 || responseCode >= 300) { try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } throw new IOException("HTTP responseCode="+responseCode); } in_stream = httpConnection.getInputStream(); cur_pos = start_pos; } }
二、IWriterCallBack接口功能:實現讀/寫通信。
package instream; public interface IWriterCallBack { public boolean tryWriting(Writer w) throws InterruptedException; public void updateBuffer(int i, int len); public void updateWriterCount(); public void terminateWriters(); }
三、Writer類:下載線程,負責向buf[]寫數據。
/** *//** * http://www.bt285.cn http://www.5a520.cn */ package instream; import java.io.IOException; import java.net.URL; public final class Writer implements Runnable { private static boolean isalive = true; private byte[] buf; private IWriterCallBack icb; protected int index; //buf[]內"塊"索引號 protected long start_pos; //index對應的文件位置(相 對於文件首的偏移量) protected int await_count; //用於判斷:下載速度足夠就 退出一個"寫"線程 private HttpReader hr; public Writer(IWriterCallBack call_back, URL u, byte[] b, int i) { hr = new HttpReader(u); if(HttpReader.getContentLength() == 0) //實例化 HttpReader對象都不成功 return; icb = call_back; buf = b; Thread t = new Thread(this,"dt_"+i); t.setPriority(Thread.NORM_PRIORITY + 1); t.start(); } public void run() { int write_bytes=0, write_pos=0, rema = 0, retry = 0; boolean cont = true; while (cont) { try { // 1.等待空閒塊 if(retry == 0) { if (icb.tryWriting(this) == false) break; write_bytes = 0; rema = BuffRandAcceURL.UNIT_LENGTH; write_pos = index << BuffRandAcceURL.UNIT_LENGTH_BITS; } // 2.定位 hr.seek(start_pos); // 3.下載"一塊" int w; while (rema > 0 && isalive) { w = (rema < 2048) ? rema : 2048; //每次讀幾K合適? if ((w = hr.read(buf, write_pos, w)) == -1) { cont = false; break; } rema -= w; write_pos += w; start_pos += w; write_bytes += w; } //4.通知"讀"線程 retry = 0; icb.updateBuffer(index, write_bytes); } catch (InterruptedException e) { isalive = false; icb.terminateWriters(); break; } catch (IOException e) { if(++retry == HttpReader.MAX_RETRY) { isalive = false; icb.terminateWriters(); break; } } } icb.updateWriterCount(); try { hr.close(); } catch (Exception e) {} hr = null; buf = null; icb = null; } }
四、IRandomAccess接口:隨機讀取文件接口,BuffRandAcceURL類和 BuffRandAcceFile類實現接口方法。BuffRandAcceFile類實現讀取本地磁盤文件 ,這兒就不給出其源碼了。
package instream; public interface IRandomAccess { public int read() throws Exception; public int read(byte b[]) throws Exception; public int read(byte b[], int off, int len) throws Exception; public int dump(int src_off, byte b[], int dst_off, int len) throws Exception; public void seek(long pos) throws Exception; public long length(); public long getFilePointer(); public void close(); }
五、BuffRandAcceURL類功能:創建下載線程;read方法從buf[]讀數據。
關鍵是如何簡單有效防止死鎖?以下只是我的一次嘗試,請指正。
/** *//** * http://www.5a520.cn http://www.bt285.cn */ package instream; import java.net.URL; import java.net.URLDecoder; import decode.Header; import tag.MP3Tag; import tag.TagThread; public final class BuffRandAcceURL implements IRandomAccess, IWriterCallBack { public static final int UNIT_LENGTH_BITS = 15; //32K public static final int UNIT_LENGTH = 1 << UNIT_LENGTH_BITS; public static final int BUF_LENGTH = UNIT_LENGTH << 4; //16塊 public static final int UNIT_COUNT = BUF_LENGTH >> UNIT_LENGTH_BITS; public static final int BUF_LENGTH_MASK = (BUF_LENGTH - 1); private static final int MAX_WRITER = 8; private static long file_pointer; private static int read_pos; private static int fill_bytes; private static byte[] buf; //同時也作讀寫同步 鎖:buf.wait()/buf.notify() private static int[] buf_bytes; private static int buf_index; private static int alloc_pos; private static URL url = null; private static boolean isalive = true; private static int writer_count; private static int await_count; private long file_length; private long frame_bytes; public BuffRandAcceURL(String sURL) throws Exception { this(sURL,MAX_WRITER); } public BuffRandAcceURL(String sURL, int download_threads) throws Exception { buf = new byte[BUF_LENGTH]; buf_bytes = new int[UNIT_COUNT]; url = new URL(sURL); //創建線程以異步方式解析ID3 new TagThread(url); //打印當前文件名 try { String s = URLDecoder.decode(sURL, "GBK"); System.out.println("start>> " + s.substring(s.lastIndexOf("/") + 1)); s = null; } catch (Exception e) { System.out.println("start>> " + sURL); } //創建"寫"線程 for(int i = 0; i < download_threads; i++) new Writer(this, url, buf, i+1); frame_bytes = file_length = HttpReader.getContentLength(); if(file_length == 0) { Header.strLastErr = "連接URL出錯,重試 " + HttpReader.MAX_RETRY + " 次後放棄。"; throw new Exception("retry " + HttpReader.MAX_RETRY); } writer_count = download_threads; //緩沖 try_cache(); //跳過ID3 v2 MP3Tag mP3Tag = new MP3Tag(); int v2_size = mP3Tag.checkID3V2(buf,0); if (v2_size > 0) { frame_bytes -= v2_size; //seek(v2_size): fill_bytes -= v2_size; file_pointer = v2_size; read_pos = v2_size; read_pos &= BUF_LENGTH_MASK; int units = v2_size >> UNIT_LENGTH_BITS; for(int i = 0; i < units; i++) { buf_bytes[i] = 0; this.notifyWriter(); } buf_bytes[units] -= v2_size; this.notifyWriter(); } mP3Tag = null; } private void try_cache() throws InterruptedException { int cache_size = BUF_LENGTH; if(cache_size > (int)file_length - alloc_pos) cache_size = (int)file_length - alloc_pos; cache_size -= UNIT_LENGTH; //等待填寫當前正在讀的那"一塊"緩沖區 /**//*if(fill_bytes >= cache_size && writer_count > 0) { synchronized (buf) { buf.wait(); } return; }*/ //等待填滿緩沖區 while (fill_bytes < cache_size) { if (writer_count == 0 || isalive == false) return; if(BUF_LENGTH > (int)file_length - alloc_pos) cache_size = (int)file_length - alloc_pos - UNIT_LENGTH; System.out.printf("\r[緩沖%1$6.2f%%] ",(float) fill_bytes / cache_size * 100); synchronized (buf) { buf.wait(); } } System.out.printf("\r"); } private int try_reading(int i, int len) throws Exception { int n = (i == UNIT_COUNT - 1) ? 0 : (i + 1); int r = (buf_bytes[i] == 0) ? 0 : (buf_bytes[i] + buf_bytes[n]); while (r < len) { if (writer_count == 0 || isalive == false) return r; try_cache(); r = (buf_bytes[i] == 0) ? 0 : (buf_bytes[i] + buf_bytes[n]); } return len; } /**//* * 各個"寫"線程互斥等待空閒塊 */ public synchronized boolean tryWriting(Writer w) throws InterruptedException { await_count++; while (buf_bytes[buf_index] != 0 && isalive) { this.wait(); } //下載速度足夠就結束一個"寫"線程 if(writer_count > 1 && w.await_count >= await_count && w.await_count >= writer_count) return false; if(alloc_pos >= file_length) return false; w.await_count = await_count; await_count--; w.start_pos = alloc_pos; w.index = buf_index; alloc_pos += UNIT_LENGTH; buf_index = (buf_index == UNIT_COUNT - 1) ? 0 : buf_index + 1; return isalive; } public void updateBuffer(int i, int len) { synchronized (buf) { buf_bytes[i] = len; fill_bytes += len; buf.notify(); } } public void updateWriterCount() { synchronized (buf) { writer_count--; buf.notify(); } } public synchronized void notifyWriter() { this.notifyAll(); } public void terminateWriters() { synchronized (buf) { if (isalive) { isalive = false; Header.strLastErr = "讀取文件超時。重試 " + HttpReader.MAX_RETRY + " 次後放棄,請您稍後 再試。"; } buf.notify(); } notifyWriter(); } public int read() throws Exception { int iret = -1; int i = read_pos >> UNIT_LENGTH_BITS; // 1."等待"有1字節可讀 while (buf_bytes[i] < 1) { try_cache(); if (writer_count == 0) return -1; } if(isalive == false) return -1; // 2.讀取 iret = buf[read_pos] & 0xff; fill_bytes--; file_pointer++; read_pos++; read_pos &= BUF_LENGTH_MASK; if (--buf_bytes[i] == 0) notifyWriter(); // 3.通知 return iret; } public int read(byte b[]) throws Exception { return read(b, 0, b.length); } public int read(byte[] b, int off, int len) throws Exception { if(len > UNIT_LENGTH) len = UNIT_LENGTH; int i = read_pos >> UNIT_LENGTH_BITS; // 1."等待"有足夠內容可讀 if(try_reading(i, len) < len || isalive == false) return -1; // 2.讀取 int tail_len = BUF_LENGTH - read_pos; // write_pos != BUF_LENGTH if (tail_len < len) { System.arraycopy(buf, read_pos, b, off, tail_len); System.arraycopy(buf, 0, b, off + tail_len, len - tail_len); } else System.arraycopy(buf, read_pos, b, off, len); fill_bytes -= len; file_pointer += len; read_pos += len; read_pos &= BUF_LENGTH_MASK; buf_bytes[i] -= len; if (buf_bytes[i] < 0) { int ni = read_pos >> UNIT_LENGTH_BITS; buf_bytes[ni] += buf_bytes[i]; buf_bytes[i] = 0; notifyWriter(); } else if (buf_bytes[i] == 0) notifyWriter(); return len; } /**//* * 從src_off位置復制,不移動文件"指針" */ public int dump(int src_off, byte b[], int dst_off, int len) throws Exception { int rpos = read_pos + src_off; if(try_reading(rpos >> UNIT_LENGTH_BITS, len) < len || isalive == false) return -1; int tail_len = BUF_LENGTH - rpos; if (tail_len < len) { System.arraycopy(buf, rpos, b, dst_off, tail_len); System.arraycopy(buf, 0, b, dst_off + tail_len, len - tail_len); } else System.arraycopy(buf, rpos, b, dst_off, len); // 不發信號 return len; } public long length() { return file_length; } public long getFilePointer() { return file_pointer; } public void close() { //... } // public void seek(long pos) throws Exception { //... } }