一、ThreadPoolExecutor介紹
在jdk1.8中,構造函數有4個。以
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)為例:
1、corePoolSize: 核心線程池大小
2、maximumPoolSize: 最大線程池大小
3、keepAliveTime: 當線程池中的線程數大於corePoolSize時, 多余空閒線程等待新任務的最長時間, 超過這個時間後多余線程終止
4、unit: 時間單位, 比如毫秒, 納秒等
5、workQueue: 阻塞隊列
6、threadFactory: 創建線程工廠, 可以方便的創建線程, 可以自定義; 默認為Executors.DefaultThreadFactory
7、handler: 飽和策略, 默認為AbortPolicy
定義一個完整的線程池可以這麼寫:
private ThreadPoolExecutor defaultThreadPool = new ThreadPoolExecutor(10, 100, 100, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(1000), Executors.defaultThreadFactory(), new ThreadPoolExecutor.DiscardPolicy());
線程池的主要處理流程如下:
1、提交任務到線程池,判斷核心線程池(corePoolSize)是否已滿? 沒滿,創建工作線程執行任務;滿了,進入下個流程。
2、線程池判斷工作隊列(workQueue)是否已滿?沒滿,則將新提交的任務存儲在工作隊列裡。滿了,則進入下個流程。
3、判斷整個線程池(maximumPoolSize)是否已滿?沒滿,則創建一個新的工作線程來執行任務,滿了,則交給飽和策略(RejectedExecutionHandler)來處理這個任務。
二、飽和策略 RejectedExecutionHandler
在ThreadPoolExecutor類中定義了4種RejectedExecutionHandler類型:
1、AbortPolicy: 默認策略,直接拋出RejectedExecutionException異常。
2、CallerRunsPolicy:只用調用者所在線程來運行任務。
3、DiscardOldestPolicy:丟棄隊列裡最近的一個任務,並執行當前任務。
4、DiscardPolicy:不處理,丟棄掉。
也可以自定義飽和策略,比如將無法處理的新任務加入日志等,只需要實現RejectedExecutionHandler接口即可。
三、阻塞隊列BlockingQueue
/** * ArrayBlockingQueue: 由數組結構組成的有界阻塞隊列, 隊列遵循FIFO */ private BlockingQueue<String> abq = new ArrayBlockingQueue<String>(10); /** * LinkedBlockingQueue: 由鏈表結構組成的阻塞隊列, FIFO * LinkedBlockingDeque: 由鏈表結構組成的雙向阻塞隊列, 構造方法和LinkedBlockingQueue類似 * public LinkedBlockingQueue(int capacity) //有界阻塞隊列,隊列最大值為capacity * public LinkedBlockingQueue() { this(Integer.MAX_VALUE); } //無界阻塞隊列,隊列最大值為Integer.MAX_VALUE * 通過Executors.newFixedThreadPool(int nThreads)創建的線程池中采用的是無界LinkedBlockingQueue * 對於put和take操作, 內部采用了不同的鎖: putLock, takeLock, 而ArrayBlockingQueue內部只有一把鎖 */ private BlockingQueue<Thread> lbq = new LinkedBlockingQueue<Thread>(10); private ExecutorService fixedThreadPool = Executors.newFixedThreadPool(10); private BlockingQueue<String> lbd = new LinkedBlockingDeque<String>(); /** * PriorityBlockingQueue: 支持優先級排序的有界阻塞隊列 * public PriorityBlockingQueue(int initialCapacity, Comparator<? super E> comparator) */ private BlockingQueue<String> pbq = new PriorityBlockingQueue(100, new Comparator<String>() { public int compare(String o1, String o2) { return o1.compareTo(o2); //升序排列 } }); /** * DelayQueue: 一個使用優先級隊列實現的無界阻塞隊列, 支持延時獲取元素, 適用於緩存系統的設計以及定時任務調度。 * 內部隊列采用PriorityQueue, private final PriorityQueue<E> q = new PriorityQueue<E>(); * 隊列元素需要實現Delayed接口, class DelayQueue<E extends Delayed> extends AbstractQueue<E> */ /** * SynchronousQueue: 不存儲元素的阻塞隊列 */ /** * LinkedTransferQueue: 由鏈表結構組成的實現了TransferQueue接口的無界阻塞隊列 * transfer方法: 如果當前消費者正在等待接收元素(take()或poll(long timeout, TimeUnit unit))方法, 生產者傳入的元素可以直接傳遞給消費者, 而不放入隊列 */ BlockingQueue<String> ltq = new LinkedTransferQueue<String>();
四、通過Executors創建線程池
/** * 創建單個線程, 適用於需要保證順序執行任務的場景 * return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); * BlockingQueue采用無界LinkedBlockingQueue, 因此maximumPoolSize、keepAliveTime參數無意義 */ Executors.newSingleThreadExecutor();
/** * 創建固定線程, 適用於需要限制當前線程數量, 負載比較重的服務器 * return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); * 采用無界LinkedBlockingQueue, 因此maximumPoolSize、keepAliveTime參數無意義 */ Executors.newFixedThreadPool(10);
/** * 根據需要創建線程, 適用於執行很多的短期異步任務的小程序, 或者負載較輕的服務器 * return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); * 初始線程為0, 采用SynchronousQueue阻塞隊列, 如果生產任務的速度低於消費的速度, 空閒60s的線程會被終止; 如果生產任務的速度持續高於消費速度, 則會不斷創建新線程 */ Executors.newCachedThreadPool();
/** * 在給定的延遲之後運行任務, 或者定期執行任務 * super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue()); * 采用無界DelayQueue隊列, 因此maximumPoolSize、keepAliveTime參數無意義 */ Executors.newScheduledThreadPool(10);