上節,我們提到,在異步任務程序中,一種常見的場景是,主線程提交多個異步任務,然後希望有任務完成就處理結果,並且按任務完成順序逐個處理,對於這種場景,Java並發包提供了一個方便的方法,使用CompletionService,這是一個接口,它的實現類是ExecutorCompletionService,本節我們就來探討它們。
基本用法
接口和類定義
與77節介紹的ExecutorService一樣,CompletionService也可以提交異步任務,它的不同是,它可以按任務完成順序獲取結果,其具體定義為:
public interface CompletionService<V> { Future<V> submit(Callable<V> task); Future<V> submit(Runnable task, V result); Future<V> take() throws InterruptedException; Future<V> poll(); Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException; }
其submit方法與ExecutorService是一樣的,多了take和poll方法,它們都是獲取下一個完成任務的結果,take()會阻塞等待,poll()會立即返回,如果沒有已完成的任務,返回null,帶時間參數的poll方法會最多等待限定的時間。
CompletionService的主要實現類是ExecutorCompletionService,它依賴於一個Executor完成實際的任務提交,而自己主要負責結果的排隊和處理,它的構造方法有兩個:
public ExecutorCompletionService(Executor executor) public ExecutorCompletionService(Executor executor, BlockingQueue<Future<V>> completionQueue)
至少需要一個Executor參數,可以提供一個BlockingQueue參數,用作完成任務的隊列,沒有提供的話,ExecutorCompletionService內部會創建一個LinkedBlockingQueue。
基本示例
我們在77節的invokeAll的示例中,演示了並發下載並分析URL的標題,那個例子中,是要等到所有任務都完成才處理結果的,這裡,我們修改一下,一有任務完成就輸出其結果,代碼如下:
public class CompletionServiceDemo { static class UrlTitleParser implements Callable<String> { private String url; public UrlTitleParser(String url) { this.url = url; } @Override public String call() throws Exception { Document doc = Jsoup.connect(url).get(); Elements elements = doc.select("head title"); if (elements.size() > 0) { return url + ": " + elements.get(0).text(); } return null; } } public static void parse(List<String> urls) throws InterruptedException { ExecutorService executor = Executors.newFixedThreadPool(10); try { CompletionService<String> completionService = new ExecutorCompletionService<>( executor); for (String url : urls) { completionService.submit(new UrlTitleParser(url)); } for (int i = 0; i < urls.size(); i++) { Future<String> result = completionService.take(); try { System.out.println(result.get()); } catch (ExecutionException e) { e.printStackTrace(); } } } finally { executor.shutdown(); } } public static void main(String[] args) throws InterruptedException { List<String> urls = Arrays.asList(new String[] { "http://www.cnblogs.com/swiftma/p/5396551.html", "http://www.cnblogs.com/swiftma/p/5399315.html", "http://www.cnblogs.com/swiftma/p/5405417.html", "http://www.cnblogs.com/swiftma/p/5409424.html" }); parse(urls); } }
在parse方法中,首先創建了一個ExecutorService,然後才是CompletionService,通過後者提交任務、按完成順序逐個處理結果,這樣,是不是很方便?
基本原理
ExecutorCompletionService是怎麼讓結果有序處理的呢?其實,也很簡單,如前所述,它有一個額外的隊列,每個任務完成之後,都會將代表結果的Future入隊。
那問題是,任務完成後,怎麼知道入隊呢?我們具體來看下。
在77節我們介紹過FutureTask,任務完成後,不管是正常完成、異常結束、還是被取消,都會調用finishCompletion方法,而該方法會調用一個done方法,該方法代碼為:
protected void done() { }
它的實現為空,但它是一個protected方法,子類可以重寫該方法。
在ExecutorCompletionService中,提交的任務類型不是一般的FutureTask,而是一個子類QueueingFuture,如下所示:
public Future<V> submit(Callable<V> task) { if (task == null) throw new NullPointerException(); RunnableFuture<V> f = newTaskFor(task); executor.execute(new QueueingFuture(f)); return f; }
該子類重寫了done方法,在任務完成時將結果加入到完成隊列中,其代碼為:
private class QueueingFuture extends FutureTask<Void> { QueueingFuture(RunnableFuture<V> task) { super(task, null); this.task = task; } protected void done() { completionQueue.add(task); } private final Future<V> task; }
ExecutorCompletionService的take/poll方法就是從該隊列獲取結果,如下所示:
public Future<V> take() throws InterruptedException { return completionQueue.take(); }
實現invokeAny
我們在77節提到,AbstractExecutorService的invokeAny的實現,就利用了ExecutorCompletionService,它的基本思路是,提交任務後,通過take方法獲取結果,獲取到第一個有效結果後,取消所有其他任務,不過,它的具體實現有一些優化,比較復雜。我們看一個模擬的示例,從多個搜索引擎查詢一個關鍵詞,但只要任意一個的結果就可以,模擬代碼如下:
public class InvokeAnyDemo { static class SearchTask implements Callable<String> { private String engine; private String keyword; public SearchTask(String engine, String keyword) { this.engine = engine; this.keyword = keyword; } @Override public String call() throws Exception { // 模擬從給定引擎搜索結果 Thread.sleep(engine.hashCode() % 1000); return "<result for> " + keyword; } } public static String search(List<String> engines, String keyword) throws InterruptedException { ExecutorService executor = Executors.newFixedThreadPool(10); CompletionService<String> cs = new ExecutorCompletionService<>(executor); List<Future<String>> futures = new ArrayList<Future<String>>( engines.size()); String result = null; try { for (String engine : engines) { futures.add(cs.submit(new SearchTask(engine, keyword))); } for (int i = 0; i < engines.size(); i++) { try { result = cs.take().get(); if (result != null) { break; } } catch (ExecutionException ignore) { // 出現異常,結果無效,繼續 } } } finally { // 取消所有任務,對於已完成的任務,取消沒有什麼效果 for (Future<String> f : futures) f.cancel(true); executor.shutdown(); } return result; } public static void main(String[] args) throws InterruptedException { List<String> engines = Arrays.asList(new String[] { "www.baidu.com", "www.sogou.com", "www.so.com", "www.google.com" }); System.out.println(search(engines, "老馬說編程")); } }
SearchTask模擬從指定搜索引擎查詢結果,search利用CompletionService/ExecutorService執行並發查詢,在得到第一個有效結果後,取消其他任務。
小結
本節比較簡單,主要就是介紹了CompletionService的用法和原理,它通過一個額外的結果隊列,方便了對於多個異步任務結果的處理。
下一節,我們來探討一種常見的需求 - 定時任務。
(與其他章節一樣,本節所有代碼位於 https://github.com/swiftma/program-logic)
----------------
未完待續,查看最新文章,敬請關注微信公眾號“老馬說編程”(掃描下方二維碼),從入門到高級,深入淺出,老馬和你一起探索Java編程及計算機技術的本質。用心原創,保留所有版權。