在java多線程編程中,我們經常使用線程池提交任務,並且通過Future來獲取任務執行的結果,以此達到異步或者並行執行的效果。在jdk1.7以前,FutureTask是Future唯一的實現類,1.7後加入了ForkJoinTask類。本文主要總結一下我對FutureTask的理解。
Future接口定義了5個方法,分別是
boolean cancel(boolean mayInterruptIfRunning); boolean isCancelled(); boolean isDone(); V get() throws InterruptedException, ExecutionException; V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
分別介紹一下這五個接口的用途:
寫個demo便於理解
1 public class FutureDemo { 2 public static void main(String[] args) { 3 ExecutorService executorService = Executors.newCachedThreadPool(); 4 Future future = executorService.submit(new Callable<Object>() { 5 @Override 6 public Object call() throws Exception { 7 Long start = System.currentTimeMillis(); 8 while (true) { 9 Long current = System.currentTimeMillis(); 10 if ((current - start) > 1000) { 11 return 1; 12 } 13 } 14 } 15 }); 16 17 try { 18 Integer result = (Integer)future.get(); 19 System.out.println(result); 20 }catch (Exception e){ 21 e.printStackTrace(); 22 } 23 } 24 }
這裡我們模擬了1s鐘的CPU空轉,當執行future.get()的時候,主線程阻塞了大約一秒後,把結果打印出來:1。
當然我們也可以使用V get(long timeout, TimeUnit unit),這個方法提供了一個超時時間的設置,如果超過當前時間任務線程還未返回,那麼就會停止阻塞狀態,並且拋出一個timeout異常。如下
1 try { 2 Integer result = (Integer) future.get(500, TimeUnit.MILLISECONDS); 3 System.out.println(result); 4 } catch (Exception e) { 5 e.printStackTrace(); 6 }
這裡我們設置的超時時間是500毫秒,由於一開始我們模擬了1s的CPU計算時間,這裡便會拋出超時異常,打印出堆棧信息
當然,如果我們把超時時間設置的長一些,還是可以得到預期的結果的。
剛我們測試了最常用的兩個方法,接下來我們來探一探FutureTask的內部實現機制。首先我們看一下FutureTask的繼承結構:
FutureTask實現了RunnableFuture接口,而RunnableFuture繼承了Runnable和Future,也就是說FutureTask既可以當做一個Runnable,也可以當做一個Future。
FutureTask內部定義了7個狀態,代表了FutureTask當前所處狀態。如下
private volatile int state; private static final int NEW = 0; private static final int COMPLETING = 1; private static final int NORMAL = 2; private static final int EXCEPTIONAL = 3; private static final int CANCELLED = 4; private static final int INTERRUPTING = 5; private static final int INTERRUPTED = 6;
當一個任務剛提交的時候,狀態為NEW,由FutureTask的構造器可知:
public FutureTask(Callable<V> callable) { if (callable == null) throw new NullPointerException(); this.callable = callable; this.state = NEW; // ensure visibility of callable }
任務執行正常結束前,state會被設置成COMPLETING,代表任務即將完成,接下來很快就會被設置為NARMAL或者EXCEPTIONAL,這取決於調用Runnable中的call()方法是否拋出了異常。如果沒有異常,則state設為NARMAL,反之為EXCEPTIONAL。
如果任務提交後,在任務執行結束之前調用cancel(boolean mayInterruptIfRunning) 取消任務,那麼有可能進入到後3個狀態。如果傳入的參數是false,state會被置為CANCELLED,反之如果傳入true,state先被置為INTERRUPTING,後被置為INTERRUPTED。
總結下,FutureTask的狀態流轉過程,可以出現以下三種狀態:
1. 正常執行完畢。 NEW -> COMPLETING -> NORMAL
2. 執行中出現異常。NEW -> COMPLETING -> EXCEPTIONAL
3. 任務執行過程中被取消,並且不響應中斷。NEW -> CANCELLED
4. 任務執行過程中被取消,並且響應中斷。 NEW -> INTERRUPTING -> INTERRUPTED
那麼以上狀態為什麼會這麼流轉呢?接下來我們一起扒一扒FutureTask的源碼。我們從futureTask的方法看起。
1 public void run()
1 public void run() { 2 if (state != NEW || 3 !UNSAFE.compareAndSwapObject(this, runnerOffset, 4 null, Thread.currentThread())) 5 return; 6 try { 7 Callable<V> c = callable; 8 if (c != null && state == NEW) { 9 V result; 10 boolean ran; 11 try { 12 result = c.call(); 13 ran = true; 14 } catch (Throwable ex) { 15 result = null; 16 ran = false; 17 setException(ex); 18 } 19 if (ran) 20 set(result); 21 } 22 } finally { 23 // runner must be non-null until state is settled to 24 // prevent concurrent calls to run() 25 runner = null; 26 // state must be re-read after nulling runner to prevent 27 // leaked interrupts 28 int s = state; 29 if (s >= INTERRUPTING) 30 handlePossibleCancellationInterrupt(s); 31 } 32 }
翻譯一下,這個方法經歷了以下幾步
1. 校驗Task狀態和當前線程引用runner,如果state不為NEW或者runner引用為null,直接返回。
2. 調用runner的call()方法執行主邏輯,並且嘗試獲得返回值result。
3. 如果拋出異常,調用setException(Throwable t)方法
4. 如果沒有異常,調用set(V v)方法
5. 一些掃尾工作
那麼setException(Throwable t)和set(V v)做了什麼呢?我們看一下源碼
protected void set(V v) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = v; UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state finishCompletion(); } }
set(V v) 方法首先做一個CAS操作,將state字段由NEW->COMPLETING,這裡的CAS操作讀者可以自行百度原理。如果成功,那麼把執行結果v賦給成員變量outcome,再把state的值設置為NORMAL,最後做一些清理工作,喚醒所有等待線程並把callable對象置為null。
protected void setException(Throwable t) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = t; UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state finishCompletion(); } }
同理,setException(Throwable t) 方法大同小異,只不過state字段流轉為NEW->COMPLETING->EXCEPTION。同時把異常對象賦予v。
這裡我們就清楚了,當一個任務被提交後,狀態流轉中1、2是怎麼來的了。同時我們可以確定,outcome變量,存著是執行結果或者拋出的異常對象。
2 public V get() throws InterruptionException,ExecutionException
get() 和 get(long timeout, TimeUnit unit)方法是獲取執行結果的兩個方法,我們這裡就看get()方法即可。首先貼源碼
public V get() throws InterruptedException, ExecutionException { int s = state; if (s <= COMPLETING) s = awaitDone(false, 0L); return report(s); } private V report(int s) throws ExecutionException { Object x = outcome; if (s == NORMAL) return (V)x; if (s >= CANCELLED) throw new CancellationException(); throw new ExecutionException((Throwable)x); }
首先檢查state值,如果小於COMPLETING,則阻塞,阻塞時可能會拋出異常,這裡我們不糾結這個,往下看。如果沒有拋出異常,獲取執行後返回的state值,最後調用report(s)方法。接著我們看report方法,如果s為NORMAL,返回執行結果outcome,否則拋出異常。結合之前的run()方法,我們這裡可以得出,如果主邏輯正常執行完畢,則返回執行結果,如果拋出異常,那麼這裡會封裝該異常為ExecutionException並且拋出。如果任務執行過程中被取消了,則可能拋出CancellationException()。
3 public boolean cancel(boolean mayInterruptIfRunning)
這個方法個人認為是最具爭議的方法。這裡我們先貼個demo
1 public class FutureDemo { 2 public static void main(String[] args) { 3 ThreadPoolExecutor executorService = (ThreadPoolExecutor) Executors.newFixedThreadPool(1); 4 // 預創建線程 5 executorService.prestartCoreThread(); 6 7 Future future = executorService.submit(new Callable<Object>() { 8 @Override 9 public Object call() { 10 System.out.println("start to run callable"); 11 Long start = System.currentTimeMillis(); 12 while (true) { 13 Long current = System.currentTimeMillis(); 14 if ((current - start) > 1000) { 15 System.out.println("當前任務執行已經超過1s"); 16 return 1; 17 } 18 } 19 } 20 }); 21 22 System.out.println(future.cancel(false)); 23 24 try { 25 Thread.currentThread().sleep(3000); 26 executorService.shutdown(); 27 } catch (Exception e) { 28 //NO OP 29 } 30 } 31 }
我們多次測試後發現,出現了2種打印結果,如圖
結果1
結果2
咦,兩個結果和預期的都好像不太一樣?第一種是任務壓根沒取消,第二種則是任務壓根沒提交成功,似乎和方法簽名cancel不太一致?
我們先看一下方法簽名上的作者注釋
/**
* Attempts to cancel execution of this task. This attempt will
* fail if the task has already completed, has already been cancelled,
* or could not be cancelled for some other reason. If successful,
* and this task has not started when <tt>cancel</tt> is called,
* this task should never run. If the task has already started,
* then the <tt>mayInterruptIfRunning</tt> parameter determines
* whether the thread executing this task should be interrupted in
* an attempt to stop the task.
*
* <p>After this method returns, subsequent calls to {@link #isDone} will
* always return <tt>true</tt>. Subsequent calls to {@link #isCancelled}
* will always return <tt>true</tt> if this method returned <tt>true</tt>.
*
* @param mayInterruptIfRunning <tt>true</tt> if the thread executing this
* task should be interrupted; otherwise, in-progress tasks are allowed
* to complete
* @return <tt>false</tt> if the task could not be cancelled,
* typically because it has already completed normally;
* <tt>true</tt> otherwise
*/
這裡我們可以看到,"嘗試"取消任務的執行,如果當前任務已經結束或者已經取消,則當前取消操作會失敗。如果任務還沒開始就被取消,那麼任務則不會被執行。
這裡我們就知道了,如果任務還沒開始執行時cancel(false)就被調用,那麼這個任務是不會被執行的,這就解釋了出現上圖結果2的情況。那如果任務已經開始執行,並且
調用cancel(false),是不會終止任務的。我們還是從源碼去分析cancel()究竟做了哪些事。
public boolean cancel(boolean mayInterruptIfRunning) { if (state != NEW) return false; if (mayInterruptIfRunning) { if (!UNSAFE.compareAndSwapInt(this, stateOffset, NEW, INTERRUPTING)) return false; Thread t = runner; if (t != null) t.interrupt(); UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); // final state } else if (!UNSAFE.compareAndSwapInt(this, stateOffset, NEW, CANCELLED)) return false; finishCompletion(); return true; }
執行邏輯如下
1. 如果當前futureTask狀態不為NEW,直接返回false,表示取消操作失敗。
2. 如果傳入true,代表可能會引發線程中斷。一個CAS操作,把狀態由NEW->INTERRUPTING,如果執行失敗則直接返回false。設置當前工作線程中斷標識為true,然後把futureTask狀態設置為INTERRUPTED。
3. 如果傳入false,把futureTask狀態設置為CANCELLED。
4. 做一些清理工作
可見,cancel()方法僅僅是改變了futureTask的狀態位!如果傳入的是false,當前任務是不會被終止的,而是會繼續執行,直到異常或者執行完畢。如果傳入的是true,會調用當前線程的interrupt()方法,把中斷標志位設為true。所以cancel()方法其實個人理解是有歧義的,它並不能真正取消一個任務的執行。事實上,除非線程自己停止自己的任務,或者退出JVM,是沒有其他方法完全終止一個線程的任務的。cancel(true)方法也只是希望當前線程可以響應中斷而已,當線程被阻塞,拋出InterruptedException。同時,由之前的future.get()方法可知,如果一個futureTask被cancel()了,調用get()方法會拋出CancellationException。
理解FutureTask,我們使用Future類才能更加得心應手。這裡也只是作者自己的理解,如有不對之處,還望讀者批評指正。
作者:mayday芋頭
出處:http://www.cnblogs.com/maypattis/ 本博客中未標明轉載的文章歸作者mayday芋頭和博客園共有,歡迎轉載,但未經作者同意必須保留此段聲明,且在文章頁面明顯位置給出原文連接,否則保留追究法律責任的權利