第一:降低資源消耗。通過重復利用已創建的線程降低線程創建和銷毀造成的消耗。
第二:提高響應速度。當任務到達時,任務可以不需要等到線程創建就能立即執行。
第三:提高線程的可管理性。線程是稀缺資源,如果無限制地創建,不僅會消耗系統資源,還會降低系統的穩定性,使用線程池可以進行統一分配、調優和監控。
先看線程池處理任務的機制圖:
從上圖中可以看出,當提交一個新任務到線程池時,線程池的處理流程如下:
線程池判斷核心線程池裡的線程是否都在執行任務。如果不是,則創建一個新的工作線程來執行任務。如果核心線程池裡的線程都在執行任務,則進入下個流程。
線程池判斷工作隊列是否已經滿。如果工作隊列沒有滿,則將新提交的任務存儲在這個工作隊列裡。如果工作隊列滿了,則進入下個流程。
線程池判斷線程池的線程是否都處於工作狀態。如果沒有,則創建一個新的工作線程來執行任務。如果已經滿了,則交給飽和策略來處理這個任務。
ThreadPoolExecutor執行execute方法分下面4種情況:
如果當前運行的線程少於corePoolSize,則創建新線程來執行任務(注意,執行這一步驟需要獲取全局鎖)。
如果運行的線程等於或多於corePoolSize,則將任務加入BlockingQueue。
如果無法將任務加入BlockingQueue(隊列已滿),則創建新的線程來處理任務(注意,執行這一步驟需要獲取全局鎖)。
如果創建新線程將使當前運行的線程超出maximumPoolSize,任務將被拒絕,並調用RejectedExecutionHandler.rejectedExecution()方法。
下面是ThreadPoolExecutor的執行流程圖:
創建線程池很簡單,Java提供了ThreadPoolExecutor類。下面的語句就能創建一個大小為10的線程池。
1 new ThreadPoolExecutor(10,10,1,TimeUnit.DAYS);
先來看看ThreadPoolExecutor類的方法簽名:
1 public ThreadPoolExecutor(int corePoolSize, //線程池基本大小 2 int maximumPoolSize, //線程池最大線程數 3 long keepAliveTime, //線程活動保持時間 4 TimeUnit unit, //線程活動時間單位 5 BlockingQueue<Runnable> workQueue, 6 7 public ThreadPoolExecutor(int corePoolSize, //線程池基本大小 8 int maximumPoolSize, //線程池最大線程數 9 long keepAliveTime, //線程活動保持時間 10 TimeUnit unit, //線程活動時間單位 11 BlockingQueue<Runnable> workQueue, //任務隊列 12 ThreadFactory threadFactory, //產生線程的工廠 13 RejectedExecutionHandler handler //飽和策略);
下面詳細的講解一下這幾個參數:
corePoolSize(線程池的基本大小):當提交一個任務到線程池時,線程池會創建一個線程來執行任務,即使其他空閒的基本線程能夠執行新任務也會創建線程,等到需要執行的任務數大於線程池基本大小時就不再創建。如果調用了線程池的prestartAllCoreThreads()方法,線程池會提前創建並啟動所有基本線程。
maximumPoolSize(線程池最大數量):線程池允許創建的最大線程數。如果隊列滿了,並且已創建的線程數小於最大線程數,則線程池會再創建新的線程執行任務。值得注意的是,如果使用了無界的任務隊列這個參數就沒什麼效果。
keepAliveTime(線程活動保持時間):線程池的工作線程空閒後,保持存活的時間。所以,如果任務很多,並且每個任務執行的時間比較短,可以調大時間,提高線程的利用率。
TimeUnit(線程活動保持時間的單位):可選的單位有天、小時、分鐘、毫秒、微秒和納秒。
runnableTaskQueue(任務隊列):用於保存等待執行的任務的阻塞隊列。可選項有:
ArrayBlockingQueue:是一個基於數組結構的有界阻塞隊列,此隊列按FIFO(先進先出)原則對元素進行排序。
LinkedBlockingQueue:一個基於鏈表結構的阻塞隊列,此隊列按FIFO排序元素,吞吐量通常要高於ArrayBlockingQueue。靜態工廠方法Executors.newFixedThreadPool()使用了這個隊列。
SynchronousQueue:一個不存儲元素的阻塞隊列。每個插入操作必須等到另一個線程調用移除操作,否則插入操作一直處於阻塞狀態,吞吐量通常要高於Linked-BlockingQueue,靜態工廠方法Executors.newCachedThreadPool使用了這個隊列。
PriorityBlockingQueue:一個具有優先級的無限阻塞隊列。
ThreadFactory:用於設置創建線程的工廠。
RejectedExecutionHandler(飽和策略):當隊列和線程池都滿了,說明線程池處於飽和狀態,那麼必須采取一種策略處理提交的新任務。這個策略默認情況下是AbortPolicy,表示無法處理新任務時拋出異常。
ThreadPoolExecutor提供了兩個方法可以向線程池提交任務去執行:submit()和execute()。
execute()方法用於執行不需要返回結果的任務。所以無法判斷任務是否被線程池執行成功。
submit()方法用於提交需要返回值的任務。線程池會返回一個future類型的對象,通過這個future對象可以判斷任務是否執行成功,並且可以通過future的get()方法來獲取返回值。get()方法會阻塞當前線程直到任務完成,而使用get(long timeout,TimeUnit unit)方法則會阻塞當前線程一段時間後立即返回,這時候有可能任務沒有執行完。
看下面一段代碼:
1 package com.alibaba.thread; 2 3 import java.util.concurrent.*; 4 5 /** 6 * Created by zhouxuanyu on 2016/12/6. 7 */ 8 public class CallableAndFuture { 9 public static void main(String[] args) throws ExecutionException, InterruptedException { 10 ExecutorService executorService = Executors.newScheduledThreadPool(3); 11 12 Runnable runnable = new Runnable() { 13 public void run() { 14 System.out.print("runnable()"); 15 } 16 }; 17 18 executorService.execute(runnable); //execute()拿不到返回值 19 20 Callable<String> callable = new Callable<String>() { 21 public String call() throws Exception { 22 return "callable"; 23 } 24 }; 25 26 Future<String> future = executorService.submit(callable);//submit()可以拿到返回值 27 28 System.out.print(future.get()); 29 30 } 31 }
有了上面的基礎,我們可以知道,當我們需要異步執行某個任務的時候,可以將這個任務丟到線程池中,如果不需要結果返回,那麼可以使用execute();相反,則需要使用submit()。當有多個任務執行,比如十個,在交給線程池去執行時,我們固然可以為這十個任務關聯十個Future而拿到結果。但是這樣做實在比較low!
Java為我們提供了一個名叫CompletionService的接口,它是Executor和BlockQueue的結合體。你可以將Callable任務交給CompletionService去執行,然後使用類似隊列的take()和poll()方法拿到Future類型的結果。ExecutorCompletionService是CompletionService的一個實現,它將任務交給executor去執行。
先看一下ExecutorCompletionService的構造方法:
1 public ExecutorCompletionService(Executor executor, 2 BlockingQueue<Future<V>> completionQueue)
由構造函數可以看到,executor是執行任務的執行器,而completionQueue是用來保存執行結果的隊列。當提交一個任務之後,會首先把這個任務包裝為一個QueueingFuture。QueueingFuture是FutureTask的子類。然後復寫了done()方法,將執行結果放入completionQueue中。
從上面可以知道,ExecutorCompletionService實現了哪一個任務先執行完就返回,而不是按任務添加的順序返回。
下面一個例子:
1 package com.alibaba.thread; 2 3 import java.util.Random; 4 import java.util.concurrent.*; 5 6 /** 7 * Created by zhouxuanyu on 2016/12/14. 8 */ 9 public class TestCompletionService { 10 11 public static void main(String[] args){ 12 13 ExecutorService executorService = Executors.newFixedThreadPool(11); //創建一個大小為11的線程池 14 final BlockingQueue<Future<String>> blockingQueue = new LinkedBlockingQueue<Future<String>>();//創建一個結果隊列 15 16 CompletionService<String> completionService = new ExecutorCompletionService<String>(executorService,blockingQueue); 17 18 //使用completionService向線程池中添加10個任務 19 for (int i = 0; i < 10; i++) { 20 completionService.submit(new Callable<String>() { 21 public String call() throws Exception { 22 int random = new Random().nextInt(10000); 23 Thread.sleep(random); 24 return Thread.currentThread().getName() + "---sleep---" + random; 25 } 26 }); 27 } 28 29 //按照執行完成的順便取出已經完成的任務。 30 for (int i = 0; i < 10; i++) { 31 try { 32 Future future = completionService.take(); 33 System.out.println(future.get(1000,TimeUnit.NANOSECONDS)); 34 } catch (InterruptedException e) { 35 e.printStackTrace(); 36 } catch (ExecutionException e) { 37 e.printStackTrace(); 38 } catch (TimeoutException e) { 39 e.printStackTrace(); 40 } 41 } 42 43 //關閉線程池 44 executorService.shutdown(); 45 } 46 }