Java多線程之異步Future機制的道理和完成。本站提示廣大學習愛好者:(Java多線程之異步Future機制的道理和完成)文章只能為提供參考,不一定能成為您想要的結果。以下是Java多線程之異步Future機制的道理和完成正文
項目中常常有些義務須要異步(提交到線程池中)去履行,而主線程常常須要曉得異步履行發生的成果,這時候我們要怎樣做呢?用runnable是沒法完成的,我們須要用callable看上面的代碼:
import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; public class AddTask implements Callable<Integer> { private int a,b; public AddTask(int a, int b) { this.a = a; this.b = b; } @Override public Integer call throws Exception { Integer result = a + b; return result; } public static void main(String[] args) throws InterruptedException, ExecutionException { ExecutorService executor = Executors.newSingleThreadExecutor; //JDK今朝為止前往的都是FutureTask的實例 Future<Integer> future = executor.submit(new AddTask(1, 2)); Integer result = future.get;// 只要當future的狀況是已完成時(future.isDone = true),get辦法才會前往 } }
固然可以完成獲得異步履行成果的需求,然則我們發明這個Future其實很欠好用,由於它沒有供給告訴的機制,也就是說我們不曉得future甚麼時刻完成(假如我們須要輪詢isDone()來斷定的話感到就沒有效這個的需要了)。看下java.util.concurrent.future.Future 的接口辦法:
public interface Future<V> { boolean cancel(boolean mayInterruptIfRunning); boolean isCancelled; boolean isDone; V get throws InterruptedException, ExecutionException; V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }
因而可知JDK的Future機制其實其實不好用,假如能給這個future加個監聽器,讓它在完成時告訴監聽器的話就比擬好用了,就像上面這個IFuture:
package future; import java.util.concurrent.CancellationException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; /** * The result of an asynchronous operation. * * @author lixiaohui * @param <V> 履行成果的類型參數 */ public interface IFuture<V> extends Future<V> { boolean isSuccess; // 能否勝利 V getNow; //立刻前往成果(不論Future能否處於完成狀況) Throwable cause; //若履行掉敗時的緣由 boolean isCancellable; //能否可以撤消 IFuture<V> await throws InterruptedException; //期待future的完成 boolean await(long timeoutMillis) throws InterruptedException; // 超時期待future的完成 boolean await(long timeout, TimeUnit timeunit) throws InterruptedException; IFuture<V> awaitUninterruptibly; //期待future的完成,不呼應中止 boolean awaitUninterruptibly(long timeoutMillis);//超時期待future的完成,不呼應中止 boolean awaitUninterruptibly(long timeout, TimeUnit timeunit); IFuture<V> addListener(IFutureListener<V> l); //當future完成時,會告訴這些加出去的監聽器 IFuture<V> removeListener(IFutureListener<V> l); }
接上去就一路來完成這個IFuture,在這之前要解釋下Object.wait,Object.notifyAll辦法,由於全部Future完成的原���的焦點就是這兩個辦法.看看JDK外面的說明:
public class Object { /** * Causes the current thread to wait until another thread invokes the * {@link java.lang.Object#notify} method or the * {@link java.lang.Object#notifyAll} method for this object. * In other words, this method behaves exactly as if it simply * performs the call {@code wait(0)}. * 挪用該辦法後,以後線程會釋放對象監督器鎖,並讓出CPU應用權。直到其余線程挪用notify/notifyAll */ public final void wait throws InterruptedException { wait(0); } /** * Wakes up all threads that are waiting on this object's monitor. A * thread waits on an object's monitor by calling one of the * {@code wait} methods. * <p> * The awakened threads will not be able to proceed until the current * thread relinquishes the lock on this object. The awakened threads * will compete in the usual manner with any other threads that might * be actively competing to synchronize on this object; for example, * the awakened threads enjoy no reliable privilege or disadvantage in * being the next thread to lock this object. */ public final native void notifyAll; }
曉得這個後,我們要本身完成Future也就有了思緒,當線程挪用了IFuture.await等一系列的辦法時,假如Future還未完成,那末就挪用future.wait 辦法使線程進入WAITING狀況。而當其余線程設置Future為完成狀況(留意這裡的完成狀況包含正常停止和異常停止)時,就須要挪用future.notifyAll辦法來叫醒之前由於挪用過wait辦法而處於WAITING狀況的那些線程。完全的完成以下(代碼應當沒有很難懂得的處所,我是參考netty的Future機制的。有興致的可以去看看netty的源碼):
package future; import java.util.Collection; import java.util.concurrent.CancellationException; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; /** * <pre> * 正常停止時, 若履行的成果不為null, 則result為履行成果; 若履行成果為null, 則result = {@link AbstractFuture#SUCCESS_SIGNAL} * 異常停止時, result為 {@link CauseHolder} 的實例;若是被撤消而招致的異常停止, 則result為 {@link CancellationException} 的實例, 不然為其它異常的實例 * 以下情形會使異步操作由未完成狀況轉至已完成狀況, 也就是在以下情形產生時挪用notifyAll辦法: * <ul> * <li>異步操作被撤消時(cancel辦法)</li> * <li>異步操作正常停止時(setSuccess辦法)</li> * <li>異步操作異常停止時(setFailure辦法)</li> * </ul> * </pre> * * @author lixiaohui * * @param <V> * 異步履行成果的類型 */ public class AbstractFuture<V> implements IFuture<V> { protected volatile Object result; // 須要包管其可見性 /** * 監聽器集 */ protected Collection<IFutureListener<V>> listeners = new CopyOnWriteArrayList<IFutureListener<V>>; /** * 當義務正常履行成果為null時, 即客戶端挪用{@link AbstractFuture#setSuccess(null)}時, * result援用該對象 */ private static final SuccessSignal SUCCESS_SIGNAL = new SuccessSignal; @Override public boolean cancel(boolean mayInterruptIfRunning) { if (isDone) { // 已完成了不克不及撤消 return false; } synchronized (this) { if (isDone) { // double check return false; } result = new CauseHolder(new CancellationException); notifyAll; // isDone = true, 告訴期待在該對象的wait的線程 } notifyListeners; // 告訴監聽器該異步操作已完成 return true; } @Override public boolean isCancellable { return result == null; } @Override public boolean isCancelled { return result != null && result instanceof CauseHolder && ((CauseHolder) result).cause instanceof CancellationException; } @Override public boolean isDone { return result != null; } @Override public V get throws InterruptedException, ExecutionException { await; // 期待履行成果 Throwable cause = cause; if (cause == null) { // 沒有產生異常,異步操作正常停止 return getNow; } if (cause instanceof CancellationException) { // 異步操作被撤消了 throw (CancellationException) cause; } throw new ExecutionException(cause); // 其他異常 } @Override public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { if (await(timeout, unit)) {// 超時期待履行成果 Throwable cause = cause; if (cause == null) {// 沒有產生異常,異步操作正常停止 return getNow; } if (cause instanceof CancellationException) {// 異步操作被撤消了 throw (CancellationException) cause; } throw new ExecutionException(cause);// 其他異常 } // 時光到了異步操作還沒有停止, 拋出超時異常 throw new TimeoutException; } @Override public boolean isSuccess { return result == null ? false : !(result instanceof CauseHolder); } @SuppressWarnings("unchecked") @Override public V getNow { return (V) (result == SUCCESS_SIGNAL ? null : result); } @Override public Throwable cause { if (result != null && result instanceof CauseHolder) { return ((CauseHolder) result).cause; } return null; } @Override public IFuture<V> addListener(IFutureListener<V> listener) { if (listener == null) { throw new NullPointerException("listener"); } if (isDone) { // 若已完成直接告訴該監聽器 notifyListener(listener); return this; } synchronized (this) { if (!isDone) { listeners.add(listener); return this; } } notifyListener(listener); return this; } @Override public IFuture<V> removeListener(IFutureListener<V> listener) { if (listener == null) { throw new NullPointerException("listener"); } if (!isDone) { listeners.remove(listener); } return this; } @Override public IFuture<V> await throws InterruptedException { return await0(true); } private IFuture<V> await0(boolean interruptable) throws InterruptedException { if (!isDone) { // 若已完造詣直接前往了 // 若許可終端且被中止了則拋出中止異常 if (interruptable && Thread.interrupted) { throw new InterruptedException("thread " + Thread.currentThread.getName + " has been interrupted."); } boolean interrupted = false; synchronized (this) { while (!isDone) { try { wait; // 釋放鎖進入waiting狀況,期待其它線程挪用本對象的notify/notifyAll辦法 } catch (InterruptedException e) { if (interruptable) { throw e; } else { interrupted = true; } } } } if (interrupted) { // 為何這裡要設中止標記位?由於從wait辦法前往後, 中止標記是被clear了的, // 這裡從新設置以便讓其它代碼曉得這裡被中止了。 Thread.currentThread.interrupt; } } return this; } @Override public boolean await(long timeoutMillis) throws InterruptedException { return await0(TimeUnit.MILLISECONDS.toNanos(timeoutMillis), true); } @Override public boolean await(long timeout, TimeUnit unit) throws InterruptedException { return await0(unit.toNanos(timeout), true); } private boolean await0(long timeoutNanos, boolean interruptable) throws InterruptedException { if (isDone) { return true; } if (timeoutNanos <= 0) { return isDone; } if (interruptable && Thread.interrupted) { throw new InterruptedException(toString); } long startTime = timeoutNanos <= 0 ? 0 : System.nanoTime; long waitTime = timeoutNanos; boolean interrupted = false; try { synchronized (this) { if (isDone) { return true; } if (waitTime <= 0) { return isDone; } for (;;) { try { wait(waitTime / 1000000, (int) (waitTime % 1000000)); } catch (InterruptedException e) { if (interruptable) { throw e; } else { interrupted = true; } } if (isDone) { return true; } else { waitTime = timeoutNanos - (System.nanoTime - startTime); if (waitTime <= 0) { return isDone; } } } } } finally { if (interrupted) { Thread.currentThread.interrupt; } } } @Override public IFuture<V> awaitUninterruptibly { try { return await0(false); } catch (InterruptedException e) { // 這裡若拋異常了就沒法處置了 throw new java.lang.InternalError; } } @Override public boolean awaitUninterruptibly(long timeoutMillis) { try { return await0(TimeUnit.MILLISECONDS.toNanos(timeoutMillis), false); } catch (InterruptedException e) { throw new java.lang.InternalError; } } @Override public boolean awaitUninterruptibly(long timeout, TimeUnit unit) { try { return await0(unit.toNanos(timeout), false); } catch (InterruptedException e) { throw new java.lang.InternalError; } } protected IFuture<V> setFailure(Throwable cause) { if (setFailure0(cause)) { notifyListeners; return this; } throw new IllegalStateException("complete already: " + this); } private boolean setFailure0(Throwable cause) { if (isDone) { return false; } synchronized (this) { if (isDone) { return false; } result = new CauseHolder(cause); notifyAll; } return true; } protected IFuture<V> setSuccess(Object result) { if (setSuccess0(result)) { // 設置勝利後告訴監聽器 notifyListeners; return this; } throw new IllegalStateException("complete already: " + this); } private boolean setSuccess0(Object result) { if (isDone) { return false; } synchronized (this) { if (isDone) { return false; } if (result == null) { // 異步操作正常履行終了的成果是null this.result = SUCCESS_SIGNAL; } else { this.result = result; } notifyAll; } return true; } private void notifyListeners { for (IFutureListener<V> l : listeners) { notifyListener(l); } } private void notifyListener(IFutureListener<V> l) { try { l.operationCompleted(this); } catch (Exception e) { e.printStackTrace; } } private static class SuccessSignal { } private static final class CauseHolder { final Throwable cause; CauseHolder(Throwable cause) { this.cause = cause; } } }
那末要怎樣應用這個呢,有了下面的骨架完成,我們便可以定制各類各樣的異步成果了。上面模仿一下一個延時的義務:
package future.test; import future.IFuture; import future.IFutureListener; /** * 延時加法 * @author lixiaohui * */ public class DelayAdder { public static void main(String[] args) { new DelayAdder.add(3 * 1000, 1, 2).addListener(new IFutureListener<Integer> { @Override public void operationCompleted(IFuture<Integer> future) throws Exception { System.out.println(future.getNow); } }); } /** * 延遲加 * @param delay 延不時長 milliseconds * @param a 加數 * @param b 加數 * @return 異步成果 */ public DelayAdditionFuture add(long delay, int a, int b) { DelayAdditionFuture future = new DelayAdditionFuture; new Thread(new DelayAdditionTask(delay, a, b, future)).start; return future; } private class DelayAdditionTask implements Runnable { private long delay; private int a, b; private DelayAdditionFuture future; public DelayAdditionTask(long delay, int a, int b, DelayAdditionFuture future) { super; this.delay = delay; this.a = a; this.b = b; this.future = future; } @Override public void run { try { Thread.sleep(delay); Integer i = a + b; // TODO 這裡設置future為完成狀況(正常履行終了) future.setSuccess(i); } catch (InterruptedException e) { // TODO 這裡設置future為完成狀況(異常履行終了) future.setFailure(e.getCause); } } } } package future.test; import future.AbstractFuture; import future.IFuture; //只是把兩個辦法對外裸露 public class DelayAdditionFuture extends AbstractFuture<Integer> { @Override public IFuture<Integer> setSuccess(Object result) { return super.setSuccess(result); } @Override public IFuture<Integer> setFailure(Throwable cause) { return super.setFailure(cause); } }
可以看到客戶端不消自動去訊問future能否完成,而是future完成時主動回調operationcompleted辦法,客戶端只需在回調裡完成邏輯便可。
以上就是本文的全體內容,願望對年夜家的進修有所贊助,也願望年夜家多多支撐。