我們要編寫一個Socket應用,監聽指定端口,實現數據包的接收和發送邏輯,這在早期系統間進行數據交互是經常使用的,這類接口通常需要考慮兩個問題:一個是避免線程阻塞,保證接收的數據盡快處理;二是:接口的穩定性和可靠性問題,數據包很復雜,接口服務的系統也很多,一旦守候線程出現異常就會導致Socket停止,這是非常危險的,那我們有什麼辦法避免嗎?
Java1.5版本以後在Thread類中增加了setUncaughtExceptionHandler方法,實現了線程異常的捕捉和處理。可能大家會有一個疑問:如果Socket應用出現了不可預測的異常是否可以自動重啟呢?其實使用線程異常處理器很容易解決,我們來看一個異常處理器應用實例,代碼如下:
class TcpServer implements Runnable { // 創建後即運行 public TcpServer() { Thread t = new Thread(this); t.setUncaughtExceptionHandler(new TcpServerExceptionHandler()); t.start(); } @Override public void run() { for (int i = 0; i < 3; i++) { try { Thread.sleep(1000); System.out.println("系統正常運行:" + i); } catch (InterruptedException e) { e.printStackTrace(); } } // 拋出異常 throw new RuntimeException(); } // 異常處理器 private static class TcpServerExceptionHandler implements Thread.UncaughtExceptionHandler { @Override public void uncaughtException(Thread t, Throwable e) { // 記錄線程異常信息 System.out.println("線程" + t.getName() + " 出現異常,自行重啟,請分析原因。"); e.printStackTrace(); // 重啟線程,保證業務不中斷 new TcpServer(); } } }
這段代碼的邏輯比較簡單,在TcpServer類創建時即啟動一個線程,提供TCP服務,例如接收和發送文件,具體邏輯在run方法中實現。同時,設置了該線程出現運行期異常(也就是Uncaught Exception)時,由TcpServerExceptionHandler異常處理器來處理異常。那麼TcpServerExceptionHandler做什麼事呢?兩件事:
有了這兩點,TcpServer就可以穩定的運行了,即使出現異常也能自動重啟,客戶端代碼比較簡單,只需要new TcpServer()即可,運行結果如下:
從運行結果上可以看出,當Thread-0出現異常時,系統自動重啟了Thread-1線程,繼續提供服務,大大提高了系統的性能。
這段程序只是一個示例程序,若要在實際環境中應用,則需要注意以下三個方面:
volatile關鍵字比較少用,原因無外乎兩點,一是在Java1.5之前該關鍵字在不同的操作系統上有不同的表現,所帶來的問題就是移植性較差;而且比較難設計,而且誤用較多,這也導致它的"名譽" 受損。
我們知道,每個線程都運行在棧內存中,每個線程都有自己的工作內存(Working Memory,比如寄存器Register、高速緩沖存儲器Cache等),線程的計算一般是通過工作內存進行交互的,其示意圖如下圖所示:
從示意圖上我們可以看到,線程在初始化時從主內存中加載所需的變量值到工作內存中,然後在線程運行時,如果是讀取,則直接從工作內存中讀取,若是寫入則先寫到工作內存中,之後刷新到主內存中,這是JVM的一個簡答的內存模型,但是這樣的結構在多線程的情況下有可能會出現問題,比如:A線程修改變量的值,也刷新到了主內存,但B、C線程在此時間內讀取的還是本線程的工作內存,也就是說它們讀取的不是最"新鮮"的值,此時就出現了不同線程持有的公共資源不同步的情況。
對於此類問題有很多解決辦法,比如使用synchronized同步代碼塊,或者使用Lock鎖來解決該問題,不過,Java可以使用volatile更簡單地解決此類問題,比如在一個變量前加上volatile關鍵字,可以確保每個線程對本地變量的訪問和修改都是直接與內存交互的,而不是與本線程的工作內存交互的,保證每個線程都能獲得最"新鮮"的變量值,其示意圖如下:
明白了volatile變量的原理,那我們思考一下:volatile變量是否能夠保證數據的同步性呢?兩個線程同時修改一個volatile是否會產生髒數據呢?我們看看下面代碼:
class UnsafeThread implements Runnable { // 共享資源 private volatile int count = 0; @Override public void run() { // 增加CPU的繁忙程度,不必關心其邏輯含義 for (int i = 0; i < 1000; i++) { Math.hypot(Math.pow(92456789, i), Math.cos(i)); } count++; } public int getCount() { return count; } }
上面的代碼定義了一個多線程類,run方法的主要邏輯是共享資源count的自加運算,而且我們還為count變量加上了volatile關鍵字,確保是從內存中讀取和寫入的,如果有多個線程運行,也就是多個線程執行count變量的自加操作,count變量會產生髒數據嗎?想想看,我們已經為count加上了volatile關鍵字呀!模擬多線程的代碼如下:
public static void main(String[] args) throws InterruptedException { // 理想值,並作為最大循環次數 int value = 1000; // 循環次數,防止造成無限循環或者死循環 int loops = 0; // 主線程組,用於估計活動線程數 ThreadGroup tg = Thread.currentThread().getThreadGroup(); while (loops++ < value) { // 共享資源清零 UnsafeThread ut = new UnsafeThread(); for (int i = 0; i < value; i++) { new Thread(ut).start(); } // 先等15毫秒,等待活動線程為1 do { Thread.sleep(15); } while (tg.activeCount() != 1); // 檢查實際值與理論值是否一致 if (ut.getCount() != value) { // 出現線程不安全的情況 System.out.println("循環到:" + loops + " 遍,出現線程不安全的情況"); System.out.println("此時,count= " + ut.getCount()); System.exit(0); } } }
想讓volatite變量"出點丑",還是需要花點功夫的。此段程序的運行邏輯如下:
運行結果如下:
循環到:40 遍,出現線程不安全的情況
此時,count= 999
這只是一種可能的結果,每次執行都有可能產生不同的結果。這也說明我們的count變量沒有實現數據同步,在多個線程修改的情況下,count的實際值與理論值產生了偏差,直接說明了volatile關鍵字並不能保證線程的安全。
在解釋原因之前,我們先說一下自加操作。count++表示的是先取出count的值然後再加1,也就是count=count+1,所以,在某個緊鄰時間片段內會發生如下神奇的事情:
(1)、第一個時間片段
A線程獲得執行機會,因為有關鍵字volatile修飾,所以它從主內存中獲得count的最新值為998,接下來的事情又分為兩種類型:
(2)、第二個片段
這兩個時間片段執行完畢後,原本期望的結果為1000,單運行後的值為999,這表示出現了線程不安全的情況。這也是我們要說明的:volatile關鍵字並不能保證線程安全,它只能保證當前線程需要該變量的值時能夠獲得最新的值,而不能保證線程修改的安全性。
順便說一下,在上面的代碼中,UnsafeThread類的消耗CPU計算是必須的,其目的是加重線程的負荷,以便出現單個線程搶占整個CPU資源的情景,否則很難模擬出volatile線程不安全的情況,大家可以自行模擬測試。
多線程應用有兩種實現方式,一種是實現Runnable接口,另一種是繼承Thread類,這兩個方法都有缺點:run方法沒有返回值,不能拋出異常(這兩個缺點歸根到底是Runnable接口的缺陷,Thread類也實現了Runnable接口),如果需要知道一個線程的運行結果就需要用戶自行設計,線程類本身也不能提供返回值和異常。但是從Java1.5開始引入了一個新的接口Callable,它類似於Runnable接口,實現它就可以實現多線程任務,Callable的接口定義如下:
public interface Callable<V> { /** * Computes a result, or throws an exception if unable to do so. * * @return computed result * @throws Exception if unable to compute a result */ V call() throws Exception; }
實現Callable接口的類,只是表明它是一個可調用的任務,並不表示它具有多線程運算能力,還是需要執行器來執行的,我們先編寫一個任務類,代碼如下:
//稅款計算器 class TaxCalculator implements Callable<Integer> { // 本金 private int seedMoney; // 接收主線程提供的參數 public TaxCalculator(int _seedMoney) { seedMoney = _seedMoney; } @Override public Integer call() throws Exception { // 復雜計算,運行一次需要2秒 TimeUnit.MILLISECONDS.sleep(2000); return seedMoney / 10; } }
這裡模擬了一個復雜運算:稅款計算器,該運算可能要花費10秒鐘的時間,此時不能讓用戶一直等著吧,需要給用戶輸出點什麼,讓用戶知道系統還在運行,這也是系統友好性的體現:用戶輸入即有輸出,若耗時較長,則顯示運算進度。如果我們直接計算,就只有一個main線程,是不可能有友好提示的,如果稅金不計算完畢,也不會執行後續動作,所以此時最好的辦法就是重啟一個線程來運算,讓main線程做進度提示,代碼如下:
public static void main(String[] args) throws InterruptedException, ExecutionException { // 生成一個單線程的異步執行器 ExecutorService es = Executors.newSingleThreadExecutor(); // 線程執行後的期望值 Future<Integer> future = es.submit(new TaxCalculator(100)); while (!future.isDone()) { // 還沒有運算完成,等待200毫秒 TimeUnit.MICROSECONDS.sleep(200); // 輸出進度符號 System.out.print("*"); } System.out.println("\n計算完成,稅金是:" + future.get() + " 元 "); es.shutdown(); }
在這段代碼中,Executors是一個靜態工具類,提供了異步執行器的創建能力,如單線程異步執行器newSingleThreadExecutor、固定線程數量的執行器newFixedThreadPool等,一般它是異步計算的入口類。future關注的是線程執行後的結果,比如沒有運行完畢,執行結果是多少等。此段代碼的運行結果如下所示:
**********************************************......
計算完成,稅金是:10 元
執行時,"*"會依次遞增,表示系統正在運算,為用戶提供了運算進度,此類異步計算的好處是:
在Java1.5之前,實現多線程比較麻煩,需要自己啟動線程,並關注同步資源,防止出現線程死鎖等問題,在1.5版本之後引入了並行計算框架,大大簡化了多線程開發。我們知道一個線程有五個狀態:新建狀態(NEW)、可運行狀態(Runnable,也叫作運行狀態)、阻塞狀態(Blocked)、等待狀態(Waiting)、結束狀態(Terminated),線程的狀態只能由新建轉變為了運行狀態後才能被阻塞或等待,最後終結,不可能產生本末倒置的情況,比如把一個結束狀態的線程轉變為新建狀態,則會出現異常,例如如下代碼會拋出異常:
public static void main(String[] args) throws InterruptedException { // 創建一個線程,新建狀態 Thread t = new Thread(new Runnable() { @Override public void run() { System.out.println("線程正在運行"); } }); // 運行狀態 t.start(); // 是否是運行狀態,若不是則等待10毫秒 while (!t.getState().equals(Thread.State.TERMINATED)) { TimeUnit.MICROSECONDS.sleep(10); } // 直接由結束轉變為雲心態 t.start(); }
此段程序運行時會報java.lang.IllegalThreadStateException異常,原因就是不能從結束狀態直接轉變為運行狀態,我們知道一個線程的運行時間分為3部分:T1為線程啟動時間,T2為線程的運行時間,T3為線程銷毀時間,如果一個線程不能被重復使用,每次創建一個線程都需要經過啟動、運行、銷毀時間,這勢必增大系統的響應時間,有沒有更好的辦法降低線程的運行時間呢?
T2是無法避免的,只有通過優化代碼來實現降低運行時間。T1和T2都可以通過線程池(Thread Pool)來縮減時間,比如在容器(或系統)啟動時,創建足夠多的線程,當容器(或系統)需要時直接從線程池中獲得線程,運算出結果,再把線程返回到線程池中___ExecutorService就是實現了線程池的執行器,我們來看一個示例代碼:
public static void main(String[] args) throws InterruptedException { // 2個線程的線程池 ExecutorService es = Executors.newFixedThreadPool(2); // 多次執行線程體 for (int i = 0; i < 4; i++) { es.submit(new Runnable() { @Override public void run() { System.out.println(Thread.currentThread().getName()); } }); } // 關閉執行器 es.shutdown(); }
此段代碼首先創建了一個包含兩個線程的線程池,然後在線程池中多次運行線程體,輸出運行時的線程名稱,結果如下:
pool-1-thread-1
pool-1-thread-2
pool-1-thread-1
pool-1-thread-2
本次代碼執行了4遍線程體,按照我們之前闡述的" 一個線程不可能從結束狀態轉變為可運行狀態 ",那為什麼此處的2個線程可以反復使用呢?這就是我們要搞清楚的重點。
線程池涉及以下幾個名詞:
我們首先從線程池的創建說起,Executors.newFixedThreadPool(2)表示創建一個具有兩個線程的線程池,源代碼如下:
public class Executors { //生成一個最大為nThreads的線程池執行器 public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); } }
這裡使用了LinkedBlockingQueue作為隊列任務管理器,所有等待處理的任務都會放在該對列中,需要注意的是,此隊列是一個阻塞式的單端隊列。線程池建立好了,那就需要線程在其中運行了,線程池中的線程是在submit第一次提交任務時建立的,代碼如下:
public Future<?> submit(Runnable task) { //檢查任務是否為null if (task == null) throw new NullPointerException(); //把Runnable任務包裝成具有返回值的任務對象,不過此時並沒有執行,只是包裝 RunnableFuture<Object> ftask = newTaskFor(task, null); //執行此任務 execute(ftask); //返回任務預期執行結果 return ftask; }
此處的代碼關鍵是execute方法,它實現了三個職責。
其中此處的關鍵是工作線程的創建,它也是通過new Thread方式創建的一個線程,只是它創建的並不是我們的任務線程(雖然我們的任務實現了Runnable接口,但它只是起了一個標志性的作用),而是經過包裝的Worker線程,代碼如下:
private final class Worker implements Runnable { // 運行一次任務 private void runTask(Runnable task) { /* 這裡的task才是我們自定義實現Runnable接口的任務 */ task.run(); /* 該方法其它代碼略 */ } // 工作線程也是線程,必須實現run方法 public void run() { try { Runnable task = firstTask; firstTask = null; while (task != null || (task = getTask()) != null) { runTask(task); task = null; } } finally { workerDone(this); } } // 任務隊列中獲得任務 Runnable getTask() { /* 其它代碼略 */ for (;;) { return r = workQueue.take(); } } }
此處為示意代碼,刪除了大量的判斷條件和鎖資源。execute方法是通過Worker類啟動的一個工作線程,執行的是我們的第一個任務,然後改線程通過getTask方法從任務隊列中獲取任務,之後再繼續執行,但問題是任務隊列是一個BlockingQuene,是阻塞式的,也就是說如果該隊列的元素為0,則保持等待狀態,直到有任務進入為止,我們來看LinkedBlockingQuene的take方法,代碼如下:
public E take() throws InterruptedException { E x; int c = -1; final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly(); try { try { // 如果隊列中的元素為0,則等待 while (count.get() == 0) notEmpty.await(); } catch (InterruptedException ie) { notEmpty.signal(); // propagate to a non-interrupted thread throw ie; } // 等待狀態結束,彈出頭元素 x = extract(); c = count.getAndDecrement(); // 如果隊列數量還多於一個,喚醒其它線程 if (c > 1) notEmpty.signal(); } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); // 返回頭元素 return x; }
分析到這裡,我們就明白了線程池的創建過程:創建一個阻塞隊列以容納任務,在第一次執行任務時創建做夠多的線程(不超過許可線程數),並處理任務,之後每個工作線程自行從任務對列中獲得任務,直到任務隊列中的任務數量為0為止,此時,線程將處於等待狀態,一旦有任務再加入到隊列中,即召喚醒工作線程進行處理,實現線程的可復用性。
使用線程池減少的是線程的創建和銷毀時間,這對於多線程應用來說非常有幫助,比如我們常用的Servlet容器,每次請求處理的都是一個線程,如果不采用線程池技術,每次請求都會重新創建一個新的線程,這會導致系統的性能符合加大,響應效率下降,降低了系統的友好性。