在前面的文章中,我們使用線程的時候就去創建一個線程,這樣實現起來非常簡便,但是就會有一個問題:
如果並發的線程數量很多,並且每個線程都是執行一個時間很短的任務就結束了,這樣頻繁創建線程就會大大降低系統的效率,因為頻繁創建線程和銷毀線程需要時間。
那麼有沒有一種辦法使得線程可以復用,就是執行完一個任務,並不被銷毀,而是可以繼續執行其他的任務?
在Java中可以通過線程池來達到這樣的效果。今天我們就來詳細講解一下Java的線程池,首先我們從最核心的ThreadPoolExecutor類中的方法講起,然後再講述它的實現原理,接著給出了它的使用示例,最後討論了一下如何合理配置線程池的大小。
以下是本文的目錄大綱:
一.Java中的ThreadPoolExecutor類
二.深入剖析線程池實現原理
三.使用示例
四.如何合理配置線程池的大小
若有不正之處請多多諒解,並歡迎批評指正。
一.Java中的ThreadPoolExecutor類
java.uitl.concurrent.ThreadPoolExecutor類是線程池中最核心的一個類,因此如果要透徹地了解Java中的線程池,必須先了解這個類。下面我們來看一下ThreadPoolExecutor類的具體實現源碼。
在ThreadPoolExecutor類中提供了四個構造方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
public
class
ThreadPoolExecutor
extends
AbstractExecutorService {
.....
public
ThreadPoolExecutor(
int
corePoolSize,
int
maximumPoolSize,
long
keepAliveTime,TimeUnit unit,
BlockingQueue<Runnable> workQueue);
public
ThreadPoolExecutor(
int
corePoolSize,
int
maximumPoolSize,
long
keepAliveTime,TimeUnit unit,
BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory);
public
ThreadPoolExecutor(
int
corePoolSize,
int
maximumPoolSize,
long
keepAliveTime,TimeUnit unit,
BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler);
public
ThreadPoolExecutor(
int
corePoolSize,
int
maximumPoolSize,
long
keepAliveTime,TimeUnit unit,
BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler);
...
}
從上面的代碼可以得知,ThreadPoolExecutor繼承了AbstractExecutorService類,並提供了四個構造器,事實上,通過觀察每個構造器的源碼具體實現,發現前面三個構造器都是調用的第四個構造器進行的初始化工作。
下面解釋下一下構造器中各個參數的含義:
TimeUnit.DAYS;
//天
TimeUnit.HOURS;
//小時
TimeUnit.MINUTES;
//分鐘
TimeUnit.SECONDS;
//秒
TimeUnit.MILLISECONDS;
//毫秒
TimeUnit.MICROSECONDS;
//微妙
TimeUnit.NANOSECONDS;
//納秒
ArrayBlockingQueue;
LinkedBlockingQueue;
SynchronousQueue;
ArrayBlockingQueue和PriorityBlockingQueue使用較少,一般使用LinkedBlockingQueue和Synchronous。線程池的排隊策略與BlockingQueue有關。
ThreadPoolExecutor.AbortPolicy:丟棄任務並拋出RejectedExecutionException異常。
ThreadPoolExecutor.DiscardPolicy:也是丟棄任務,但是不拋出異常。
ThreadPoolExecutor.DiscardOldestPolicy:丟棄隊列最前面的任務,然後重新嘗試執行任務(重復此過程)
ThreadPoolExecutor.CallerRunsPolicy:由調用線程處理該任務
具體參數的配置與線程池的關系將在下一節講述。
從上面給出的ThreadPoolExecutor類的代碼可以知道,ThreadPoolExecutor繼承了AbstractExecutorService,我們來看一下AbstractExecutorService的實現:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
public
abstract
class
AbstractExecutorService
implements
ExecutorService {
protected
<T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { };
protected
<T> RunnableFuture<T> newTaskFor(Callable<T> callable) { };
public
Future<?> submit(Runnable task) {};
public
<T> Future<T> submit(Runnable task, T result) { };
public
<T> Future<T> submit(Callable<T> task) { };
private
<T> T doInvokeAny(Collection<?
extends
Callable<T>> tasks,
boolean
timed,
long
nanos)
throws
InterruptedException, ExecutionException, TimeoutException {
};
public
<T> T invokeAny(Collection<?
extends
Callable<T>> tasks)
throws
InterruptedException, ExecutionException {
};
public
<T> T invokeAny(Collection<?
extends
Callable<T>> tasks,
long
timeout, TimeUnit unit)
throws
InterruptedException, ExecutionException, TimeoutException {
};
public
<T> List<Future<T>> invokeAll(Collection<?
extends
Callable<T>> tasks)
throws
InterruptedException {
};
public
<T> List<Future<T>> invokeAll(Collection<?
extends
Callable<T>> tasks,
long
timeout, TimeUnit unit)
throws
InterruptedException {
};
}
AbstractExecutorService是一個抽象類,它實現了ExecutorService接口。
我們接著看ExecutorService接口的實現:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22public
interface
ExecutorService
extends
Executor {
void
shutdown();
boolean
isShutdown();
boolean
isTerminated();
boolean
awaitTermination(
long
timeout, TimeUnit unit)
throws
InterruptedException;
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);
<T> List<Future<T>> invokeAll(Collection<?
extends
Callable<T>> tasks)
throws
InterruptedException;
<T> List<Future<T>> invokeAll(Collection<?
extends
Callable<T>> tasks,
long
timeout, TimeUnit unit)
throws
InterruptedException;
<T> T invokeAny(Collection<?
extends
Callable<T>> tasks)
throws
InterruptedException, ExecutionException;
<T> T invokeAny(Collection<?
extends
Callable<T>> tasks,
long
timeout, TimeUnit unit)
throws
InterruptedException, ExecutionException, TimeoutException;
}
而ExecutorService又是繼承了Executor接口,我們看一下Executor接口的實現:
1 2 3public
interface
Executor {
void
execute(Runnable command);
}
到這裡,大家應該明白了ThreadPoolExecutor、AbstractExecutorService、ExecutorService和Executor幾個之間的關系了。
Executor是一個頂層接口,在它裡面只聲明了一個方法execute(Runnable),返回值為void,參數為Runnable類型,從字面意思可以理解,就是用來執行傳進去的任務的;
然後ExecutorService接口繼承了Executor接口,並聲明了一些方法:submit、invokeAll、invokeAny以及shutDown等;
抽象類AbstractExecutorService實現了ExecutorService接口,基本實現了ExecutorService中聲明的所有方法;
然後ThreadPoolExecutor繼承了類AbstractExecutorService。
在ThreadPoolExecutor類中有幾個非常重要的方法:
1 2 3 4execute()
submit()
shutdown()
shutdownNow()
execute()方法實際上是Executor中聲明的方法,在ThreadPoolExecutor進行了具體的實現,這個方法是ThreadPoolExecutor的核心方法,通過這個方法可以向線程池提交一個任務,交由線程池去執行。
submit()方法是在ExecutorService中聲明的方法,在AbstractExecutorService就已經有了具體的實現,在ThreadPoolExecutor中並沒有對其進行重寫,這個方法也是用來向線程池提交任務的,但是它和execute()方法不同,它能夠返回任務執行的結果,去看submit()方法的實現,會發現它實際上還是調用的execute()方法,只不過它利用了Future來獲取任務執行結果(Future相關內容將在下一篇講述)。
shutdown()和shutdownNow()是用來關閉線程池的。
還有很多其他的方法:
比如:getQueue() 、getPoolSize() 、getActiveCount()、getCompletedTaskCount()等獲取與線程池相關屬性的方法,有興趣的朋友可以自行查閱API。
二.深入剖析線程池實現原理
在上一節我們從宏觀上介紹了ThreadPoolExecutor,下面我們來深入解析一下線程池的具體實現原理,將從下面幾個方面講解:
1.線程池狀態
2.任務的執行
3.線程池中的線程初始化
4.任務緩存隊列及排隊策略
5.任務拒絕策略
6.線程池的關閉
7.線程池容量的動態調整
1.線程池狀態
在ThreadPoolExecutor中定義了一個volatile變量,另外定義了幾個static final變量表示線程池的各個狀態:
1 2 3 4 5
volatile
int
runState;
static
final
int
RUNNING =
0
;
static
final
int
SHUTDOWN =
1
;
static
final
int
STOP =
2
;
static
final
int
TERMINATED =
3
;
runState表示當前線程池的狀態,它是一個volatile變量用來保證線程之間的可見性;
下面的幾個static final變量表示runState可能的幾個取值。
當創建線程池後,初始時,線程池處於RUNNING狀態;
如果調用了shutdown()方法,則線程池處於SHUTDOWN狀態,此時線程池不能夠接受新的任務,它會等待所有任務執行完畢;
如果調用了shutdownNow()方法,則線程池處於STOP狀態,此時線程池不能接受新的任務,並且會去嘗試終止正在執行的任務;
當線程池處於SHUTDOWN或STOP狀態,並且所有工作線程已經銷毀,任務緩存隊列已經清空或執行結束後,線程池被設置為TERMINATED狀態。
2.任務的執行
在了解將任務提交給線程池到任務執行完畢整個過程之前,我們先來看一下ThreadPoolExecutor類中其他的一些比較重要成員變量:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
private
final
BlockingQueue<Runnable> workQueue;
//任務緩存隊列,用來存放等待執行的任務
private
final
ReentrantLock mainLock =
new
ReentrantLock();
//線程池的主要狀態鎖,對線程池狀態(比如線程池大小
//、runState等)的改變都要使用這個鎖
private
final
HashSet<Worker> workers =
new
HashSet<Worker>();
//用來存放工作集
private
volatile
long
keepAliveTime;
//線程存貨時間
private
volatile
boolean
allowCoreThreadTimeOut;
//是否允許為核心線程設置存活時間
private
volatile
int
corePoolSize;
//核心池的大小(即線程池中的線程數目大於這個參數時,提交的任務會被放進任務緩存隊列)
private
volatile
int
maximumPoolSize;
//線程池最大能容忍的線程數
private
volatile
int
poolSize;
//線程池中當前的線程數
private
volatile
RejectedExecutionHandler handler;
//任務拒絕策略
private
volatile
ThreadFactory threadFactory;
//線程工廠,用來創建線程
private
int
largestPoolSize;
//用來記錄線程池中曾經出現過的最大線程數
private
long
completedTaskCount;
//用來記錄已經執行完畢的任務個數
每個變量的作用都已經標明出來了,這裡要重點解釋一下corePoolSize、maximumPoolSize、largestPoolSize三個變量。
corePoolSize在很多地方被翻譯成核心池大小,其實我的理解這個就是線程池的大小。舉個簡單的例子:
假如有一個工廠,工廠裡面有10個工人,每個工人同時只能做一件任務。
因此只要當10個工人中有工人是空閒的,來了任務就分配給空閒的工人做;
當10個工人都有任務在做時,如果還來了任務,就把任務進行排隊等待;
如果說新任務數目增長的速度遠遠大於工人做任務的速度,那麼此時工廠主管可能會想補救措施,比如重新招4個臨時工人進來;
然後就將任務也分配給這4個臨時工人做;
如果說著14個工人做任務的速度還是不夠,此時工廠主管可能就要考慮不再接收新的任務或者拋棄前面的一些任務了。
當這14個工人當中有人空閒時,而新任務增長的速度又比較緩慢,工廠主管可能就考慮辭掉4個臨時工了,只保持原來的10個工人,畢竟請額外的工人是要花錢的。
這個例子中的corePoolSize就是10,而maximumPoolSize就是14(10+4)。
也就是說corePoolSize就是線程池大小,maximumPoolSize在我看來是線程池的一種補救措施,即任務量突然過大時的一種補救措施。
不過為了方便理解,在本文後面還是將corePoolSize翻譯成核心池大小。
largestPoolSize只是一個用來起記錄作用的變量,用來記錄線程池中曾經有過的最大線程數目,跟線程池的容量沒有任何關系。
下面我們進入正題,看一下任務從提交到最終執行完畢經歷了哪些過程。
在ThreadPoolExecutor類中,最核心的任務提交方法是execute()方法,雖然通過submit也可以提交任務,但是實際上submit方法裡面最終調用的還是execute()方法,所以我們只需要研究execute()方法的實現原理即可:
1 2 3 4 5 6 7 8 9 10 11 12public
void
execute(Runnable command) {
if
(command ==
null
)
throw
new
NullPointerException();
if
(poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
if
(runState == RUNNING && workQueue.offer(command)) {
if
(runState != RUNNING || poolSize ==
0
)
ensureQueuedTaskHandled(command);
}
else
if
(!addIfUnderMaximumPoolSize(command))
reject(command);
// is shutdown or saturated
}
}
上面的代碼可能看起來不是那麼容易理解,下面我們一句一句解釋:
首先,判斷提交的任務command是否為null,若是null,則拋出空指針異常;
接著是這句,這句要好好理解一下:
1if
(poolSize >= corePoolSize || !addIfUnderCorePoolSize(command))
由於是或條件運算符,所以先計算前半部分的值,如果線程池中當前線程數不小於核心池大小,那麼就會直接進入下面的if語句塊了。
如果線程池中當前線程數小於核心池大小,則接著執行後半部分,也就是執行
1addIfUnderCorePoolSize(command)
如果執行完addIfUnderCorePoolSize這個方法返回false,則繼續執行下面的if語句塊,否則整個方法就直接執行完畢了。
如果執行完addIfUnderCorePoolSize這個方法返回false,然後接著判斷:
1if
(runState == RUNNING && workQueue.offer(command))
如果當前線程池處於RUNNING狀態,則將任務放入任務緩存隊列;如果當前線程池不處於RUNNING狀態或者任務放入緩存隊列失敗,則執行:
1addIfUnderMaximumPoolSize(command)
如果執行addIfUnderMaximumPoolSize方法失敗,則執行reject()方法進行任務拒絕處理。
回到前面:
1if
(runState == RUNNING && workQueue.offer(command))
這句的執行,如果說當前線程池處於RUNNING狀態且將任務放入任務緩存隊列成功,則繼續進行判斷:
1if
(runState != RUNNING || poolSize ==
0
)
這句判斷是為了防止在將此任務添加進任務緩存隊列的同時其他線程突然調用shutdown或者shutdownNow方法關閉了線程池的一種應急措施。如果是這樣就執行:
1ensureQueuedTaskHandled(command)
進行應急處理,從名字可以看出是保證 添加到任務緩存隊列中的任務得到處理。
我們接著看2個關鍵方法的實現:addIfUnderCorePoolSize和addIfUnderMaximumPoolSize:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15private
boolean
addIfUnderCorePoolSize(Runnable firstTask) {
Thread t =
null
;
final
ReentrantLock mainLock =
this
.mainLock;
mainLock.lock();
try
{
if
(poolSize < corePoolSize && runState == RUNNING)
t = addThread(firstTask);
//創建線程去執行firstTask任務
}
finally
{
mainLock.unlock();
}
if
(t ==
null
)
return
false
;
t.start();
return
true
;
}
這個是addIfUnderCorePoolSize方法的具體實現,從名字可以看出它的意圖就是當低於核心吃大小時執行的方法。下面看其具體實現,首先獲取到鎖,因為這地方涉及到線程池狀態的變化,先通過if語句判斷當前線程池中的線程數目是否小於核心池大小,有朋友也許會有疑問:前面在execute()方法中不是已經判斷過了嗎,只有線程池當前線程數目小於核心池大小才會執行addIfUnderCorePoolSize方法的,為何這地方還要繼續判斷?原因很簡單,前面的判斷過程中並沒有加鎖,因此可能在execute方法判斷的時候poolSize小於corePoolSize,而判斷完之後,在其他線程中又向線程池提交了任務,就可能導致poolSize不小於corePoolSize了,所以需要在這個地方繼續判斷。然後接著判斷線程池的狀態是否為RUNNING,原因也很簡單,因為有可能在其他線程中調用了shutdown或者shutdownNow方法。然後就是執行
1t = addThread(firstTask);
這個方法也非常關鍵,傳進去的參數為提交的任務,返回值為Thread類型。然後接著在下面判斷t是否為空,為空則表明創建線程失敗(即poolSize>=corePoolSize或者runState不等於RUNNING),否則調用t.start()方法啟動線程。
我們來看一下addThread方法的實現:
1 2 3 4 5 6 7 8 9 10 11 12private
Thread addThread(Runnable firstTask) {
Worker w =
new
Worker(firstTask);
Thread t = threadFactory.newThread(w);
//創建一個線程,執行任務
if
(t !=
null
) {
w.thread = t;
//將創建的線程的引用賦值為w的成員變量
workers.add(w);
int
nt = ++poolSize;
//當前線程數加1
if
(nt > largestPoolSize)
largestPoolSize = nt;
}
return
t;
}
在addThread方法中,首先用提交的任務創建了一個Worker對象,然後調用線程工廠threadFactory創建了一個新的線程t,然後將線程t的引用賦值給了Worker對象的成員變量thread,接著通過workers.add(w)將Worker對象添加到工作集當中。
下面我們看一下Worker類的實現:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64private
final
class
Worker
implements
Runnable {
private
final
ReentrantLock runLock =
new
ReentrantLock();
private
Runnable firstTask;
volatile
long
completedTasks;
Thread thread;
Worker(Runnable firstTask) {
this
.firstTask = firstTask;
}
boolean
isActive() {
return
runLock.isLocked();
}
void
interruptIfIdle() {
final
ReentrantLock runLock =
this
.runLock;
if
(runLock.tryLock()) {
try
{
if
(thread != Thread.currentThread())
thread.interrupt();
}
finally
{
runLock.unlock();
}
}
}
void
interruptNow() {
thread.interrupt();
}
private
void
runTask(Runnable task) {
final
ReentrantLock runLock =
this
.runLock;
runLock.lock();
try
{
if
(runState < STOP &&
Thread.interrupted() &&
runState >= STOP)
boolean
ran =
false
;
beforeExecute(thread, task);
//beforeExecute方法是ThreadPoolExecutor類的一個方法,沒有具體實現,用戶可以根據
//自己需要重載這個方法和後面的afterExecute方法來進行一些統計信息,比如某個任務的執行時間等
try
{
task.run();
ran =
true
;
afterExecute(task,
null
);
++completedTasks;
}
catch
(RuntimeException ex) {
if
(!ran)
afterExecute(task, ex);
throw
ex;
}
}
finally
{
runLock.unlock();
}
}
public
void
run() {
try
{
Runnable task = firstTask;
firstTask =
null
;
while
(task !=
null
|| (task = getTask()) !=
null
) {
runTask(task);
task =
null
;
}
}
finally
{
workerDone(
this
);
//當任務隊列中沒有任務時,進行清理工作
}
}
}
它實際上實現了Runnable接口,因此上面的Thread t = threadFactory.newThread(w);效果跟下面這句的效果基本一樣:
1Thread t =
new
Thread(w);
相當於傳進去了一個Runnable任務,在線程t中執行這個Runnable。
既然Worker實現了Runnable接口,那麼自然最核心的方法便是run()方法了:
1 2 3 4 5 6 7 8 9 10 11 12public
void
run() {
try
{
Runnable task = firstTask;
firstTask =
null
;
while
(task !=
null
|| (task = getTask()) !=
null
) {
runTask(task);
task =
null
;
}
}
finally
{
workerDone(
this
);
}
}
從run方法的實現可以看出,它首先執行的是通過構造器傳進來的任務firstTask,在調用runTask()執行完firstTask之後,在while循環裡面不斷通過getTask()去取新的任務來執行,那麼去哪裡取呢?自然是從任務緩存隊列裡面去取,getTask是ThreadPoolExecutor類中的方法,並不是Worker類中的方法,下面是getTask方法的實現:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27Runnable getTask() {
for
(;;) {
try
{
int
state = runState;
if
(state > SHUTDOWN)
return
null
;
Runnable r;
if
(state == SHUTDOWN)
// Help drain queue
r = workQueue.poll();
else
if
(poolSize > corePoolSize || allowCoreThreadTimeOut)
//如果線程數大於核心池大小或者允許為核心池線程設置空閒時間,
//則通過poll取任務,若等待一定的時間取不到任務,則返回null
r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
else
r = workQueue.take();
if
(r !=
null
)
return
r;
if
(workerCanExit()) {
//如果沒取到任務,即r為null,則判斷當前的worker是否可以退出
if
(runState >= SHUTDOWN)
// Wake up others
interruptIdleWorkers();
//中斷處於空閒狀態的worker
return
null
;
}
// Else retry
}
catch
(InterruptedException ie) {
// On interruption, re-check runState
}
}
}
在getTask中,先判斷當前線程池狀態,如果runState大於SHUTDOWN(即為STOP或者TERMINATED),則直接返回null。
如果runState為SHUTDOWN或者RUNNING,則從任務緩存隊列取任務。
如果當前線程池的線程數大於核心池大小corePoolSize或者允許為核心池中的線程設置空閒存活時間,則調用poll(time,timeUnit)來取任務,這個方法會等待一定的時間,如果取不到任務就返回null。
然後判斷取到的任務r是否為null,為null則通過調用workerCanExit()方法來判斷當前worker是否可以退出,我們看一下workerCanExit()的實現:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16private
boolean
workerCanExit() {
final
ReentrantLock mainLock =
this
.mainLock;
mainLock.lock();
boolean
canExit;
//如果runState大於等於STOP,或者任務緩存隊列為空了
//或者 允許為核心池線程設置空閒存活時間並且線程池中的線程數目大於1
try
{
canExit = runState >= STOP ||
workQueue.isEmpty() ||
(allowCoreThreadTimeOut &&
poolSize > Math.max(
1
, corePoolSize));
}
finally
{
mainLock.unlock();
}
return
canExit;
}
也就是說如果線程池處於STOP狀態、或者任務隊列已為空或者允許為核心池線程設置空閒存活時間並且線程數大於1時,允許worker退出。如果允許worker退出,則調用interruptIdleWorkers()中斷處於空閒狀態的worker,我們看一下interruptIdleWorkers()的實現:
1 2 3 4 5 6 7 8 9 10void
interruptIdleWorkers() {
final
ReentrantLock mainLock =
this
.mainLock;
mainLock.lock();
try
{
for
(Worker w : workers)
//實際上調用的是worker的interruptIfIdle()方法
w.interruptIfIdle();
}
finally
{
mainLock.unlock();
}
}
從實現可以看出,它實際上調用的是worker的interruptIfIdle()方法,在worker的interruptIfIdle()方法中:
1 2 3 4 5 6 7 8 9 10 11 12void
interruptIfIdle() {
final
ReentrantLock runLock =
this
.runLock;
if
(runLock.tryLock()) {
//注意這裡,是調用tryLock()來獲取鎖的,因為如果當前worker正在執行任務,鎖已經被獲取了,是無法獲取到鎖的
//如果成功獲取了鎖,說明當前worker處於空閒狀態
try
{
if
(thread != Thread.currentThread())
thread.interrupt();
}
finally
{
runLock.unlock();
}
}
}
這裡有一個非常巧妙的設計方式,假如我們來設計線程池,可能會有一個任務分派線程,當發現有線程空閒時,就從任務緩存隊列中取一個任務交給空閒線程執行。但是在這裡,並沒有采用這樣的方式,因為這樣會要額外地對任務分派線程進行管理,無形地會增加難度和復雜度,這裡直接讓執行完任務的線程去任務緩存隊列裡面取任務來執行。
我們再看addIfUnderMaximumPoolSize方法的實現,這個方法的實現思想和addIfUnderCorePoolSize方法的實現思想非常相似,唯一的區別在於addIfUnderMaximumPoolSize方法是在線程池中的線程數達到了核心池大小並且往任務隊列中添加任務失敗的情況下執行的:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15private
boolean
addIfUnderMaximumPoolSize(Runnable firstTask) {
Thread t =
null
;
final
ReentrantLock mainLock =
this
.mainLock;
mainLock.lock();
try
{
if
(poolSize < maximumPoolSize && runState == RUNNING)
t = addThread(firstTask);
}
finally
{
mainLock.unlock();
}
if
(t ==
null
)
return
false
;
t.start();
return
true
;
}
看到沒有,其實它和addIfUnderCorePoolSize方法的實現基本一模一樣,只是if語句判斷條件中的poolSize < maximumPoolSize不同而已。
到這裡,大部分朋友應該對任務提交給線程池之後到被執行的整個過程有了一個基本的了解,下面總結一下:
1)首先,要清楚corePoolSize和maximumPoolSize的含義;
2)其次,要知道Worker是用來起到什麼作用的;
3)要知道任務提交給線程池之後的處理策略,這裡總結一下主要有4點:
如果當前線程池中的線程數目小於corePoolSize,則每來一個任務,就會創建一個線程去執行這個任務;
如果當前線程池中的線程數目>=corePoolSize,則每來一個任務,會嘗試將其添加到任務緩存隊列當中,若添加成功,則該任務會等待空閒線程將其取出去執行;若添加失敗(一般來說是任務緩存隊列已滿),則會嘗試創建新的線程去執行這個任務;
如果當前線程池中的線程數目達到maximumPoolSize,則會采取任務拒絕策略進行處理;
如果線程池中的線程數量大於 corePoolSize時,如果某線程空閒時間超過keepAliveTime,線程將被終止,直至線程池中的線程數目不大於corePoolSize;如果允許為核心池中的線程設置存活時間,那麼核心池中的線程空閒時間超過keepAliveTime,線程也會被終止。
3.線程池中的線程初始化
默認情況下,創建線程池之後,線程池中是沒有線程的,需要提交任務之後才會創建線程。
在實際中如果需要線程池創建之後立即創建線程,可以通過以下兩個方法辦到:
下面是這2個方法的實現:
1 2 3 4 5 6 7 8 9 10public
boolean
prestartCoreThread() {
return
addIfUnderCorePoolSize(
null
);
//注意傳進去的參數是null
}
public
int
prestartAllCoreThreads() {
int
n =
0
;
while
(addIfUnderCorePoolSize(
null
))
//注意傳進去的參數是null
++n;
return
n;
}
注意上面傳進去的參數是null,根據第2小節的分析可知如果傳進去的參數為null,則最後執行線程會阻塞在getTask方法中的
1r = workQueue.take();
即等待任務隊列中有任務。
4.任務緩存隊列及排隊策略
在前面我們多次提到了任務緩存隊列,即workQueue,它用來存放等待執行的任務。
workQueue的類型為BlockingQueue<Runnable>,通常可以取下面三種類型:
1)ArrayBlockingQueue:基於數組的先進先出隊列,此隊列創建時必須指定大小;
2)LinkedBlockingQueue:基於鏈表的先進先出隊列,如果創建時沒有指定此隊列大小,則默認為Integer.MAX_VALUE;
3)synchronousQueue:這個隊列比較特殊,它不會保存提交的任務,而是將直接新建一個線程來執行新來的任務。
5.任務拒絕策略
當線程池的任務緩存隊列已滿並且線程池中的線程數目達到maximumPoolSize,如果還有任務到來就會采取任務拒絕策略,通常有以下四種策略:
1 2 3 4ThreadPoolExecutor.AbortPolicy:丟棄任務並拋出RejectedExecutionException異常。
ThreadPoolExecutor.DiscardPolicy:也是丟棄任務,但是不拋出異常。
ThreadPoolExecutor.DiscardOldestPolicy:丟棄隊列最前面的任務,然後重新嘗試執行任務(重復此過程)
ThreadPoolExecutor.CallerRunsPolicy:由調用線程處理該任務
6.線程池的關閉
ThreadPoolExecutor提供了兩個方法,用於線程池的關閉,分別是shutdown()和shutdownNow(),其中:
7.線程池容量的動態調整
ThreadPoolExecutor提供了動態調整線程池容量大小的方法:setCorePoolSize()和setMaximumPoolSize(),
當上述參數從小變大時,ThreadPoolExecutor進行線程賦值,還可能立即創建新的線程來執行任務。
三.使用示例
前面我們討論了關於線程池的實現原理,這一節我們來看一下它的具體使用:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33public
class
Test {
public
static
void
main(String[] args) {
ThreadPoolExecutor executor =
new
ThreadPoolExecutor(
5
,
10
,
200
, TimeUnit.MILLISECONDS,
new
ArrayBlockingQueue<Runnable>(
5
));
for
(
int
i=
0
;i<
15
;i++){
MyTask myTask =
new
MyTask(i);
executor.execute(myTask);
System.out.println(
"線程池中線程數目:"
+executor.getPoolSize()+
",隊列中等待執行的任務數目:"
+
executor.getQueue().size()+
",已執行玩別的任務數目:"
+executor.getCompletedTaskCount());
}
executor.shutdown();
}
}
class
MyTask
implements
Runnable {
private
int
taskNum;
public
MyTask(
int
num) {
this
.taskNum = num;
}
@Override
public
void
run() {
System.out.println(
"正在執行task "
+taskNum);
try
{
Thread.currentThread().sleep(
4000
);
}
catch
(InterruptedException e) {
e.printStackTrace();
}
System.out.println(
"task "
+taskNum+
"執行完畢"
);
}
}
執行結果:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45正在執行task
0
線程池中線程數目:
1
,隊列中等待執行的任務數目:
0
,已執行玩別的任務數目:
0
線程池中線程數目:
2
,隊列中等待執行的任務數目:
0
,已執行玩別的任務數目:
0
正在執行task
1
線程池中線程數目:
3
,隊列中等待執行的任務數目:
0
,已執行玩別的任務數目:
0
正在執行task
2
線程池中線程數目:
4
,隊列中等待執行的任務數目:
0
,已執行玩別的任務數目:
0
正在執行task
3
線程池中線程數目:
5
,隊列中等待執行的任務數目:
0
,已執行玩別的任務數目:
0
正在執行task
4
線程池中線程數目:
5
,隊列中等待執行的任務數目:
1
,已執行玩別的任務數目:
0
線程池中線程數目:
5
,隊列中等待執行的任務數目:
2
,已執行玩別的任務數目:
0
線程池中線程數目:
5
,隊列中等待執行的任務數目:
3
,已執行玩別的任務數目:
0
線程池中線程數目:
5
,隊列中等待執行的任務數目:
4
,已執行玩別的任務數目:
0
線程池中線程數目:
5
,隊列中等待執行的任務數目:
5
,已執行玩別的任務數目:
0
線程池中線程數目:
6
,隊列中等待執行的任務數目:
5
,已執行玩別的任務數目:
0
正在執行task
10
線程池中線程數目:
7
,隊列中等待執行的任務數目:
5
,已執行玩別的任務數目:
0
正在執行task
11
線程池中線程數目:
8
,隊列中等待執行的任務數目:
5
,已執行玩別的任務數目:
0
正在執行task
12
線程池中線程數目:
9
,隊列中等待執行的任務數目:
5
,已執行玩別的任務數目:
0
正在執行task
13
線程池中線程數目:
10
,隊列中等待執行的任務數目:
5
,已執行玩別的任務數目:
0
正在執行task
14
task
3
執行完畢
task
0
執行完畢
task
2
執行完畢
task
1
執行完畢
正在執行task
8
正在執行task
7
正在執行task
6
正在執行task
5
task
4
執行完畢
task
10
執行完畢
task
11
執行完畢
task
13
執行完畢
task
12
執行完畢
正在執行task
9
task
14
執行完畢
task
8
執行完畢
task
5
執行完畢
task
7
執行完畢
task
6
執行完畢
task
9
執行完畢
從執行結果可以看出,當線程池中線程的數目大於5時,便將任務放入任務緩存隊列裡面,當任務緩存隊列滿了之後,便創建新的線程。如果上面程序中,將for循環中改成執行20個任務,就會拋出任務拒絕異常了。
不過在java doc中,並不提倡我們直接使用ThreadPoolExecutor,而是使用Executors類中提供的幾個靜態方法來創建線程池:
1 2 3Executors.newCachedThreadPool();
//創建一個緩沖池,緩沖池容量大小為Integer.MAX_VALUE
Executors.newSingleThreadExecutor();
//創建容量為1的緩沖池
Executors.newFixedThreadPool(
int
);
//創建固定容量大小的緩沖池
下面是這三個靜態方法的具體實現;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16public
static
ExecutorService newFixedThreadPool(
int
nThreads) {
return
new
ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new
LinkedBlockingQueue<Runnable>());
}
public
static
ExecutorService newSingleThreadExecutor() {
return
new
FinalizableDelegatedExecutorService
(
new
ThreadPoolExecutor(
1
,
1
,
0L, TimeUnit.MILLISECONDS,
new
LinkedBlockingQueue<Runnable>()));
}
public
static
ExecutorService newCachedThreadPool() {
return
new
ThreadPoolExecutor(
0
, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new
SynchronousQueue<Runnable>());
}
從它們的具體實現來看,它們實際上也是調用了ThreadPoolExecutor,只不過參數都已配置好了。
newFixedThreadPool創建的線程池corePoolSize和maximumPoolSize值是相等的,它使用的LinkedBlockingQueue;
newSingleThreadExecutor將corePoolSize和maximumPoolSize都設置為1,也使用的LinkedBlockingQueue;
newCachedThreadPool將corePoolSize設置為0,將maximumPoolSize設置為Integer.MAX_VALUE,使用的SynchronousQueue,也就是說來了任務就創建線程運行,當線程空閒超過60秒,就銷毀線程。
實際中,如果Executors提供的三個靜態方法能滿足要求,就盡量使用它提供的三個方法,因為自己去手動配置ThreadPoolExecutor的參數有點麻煩,要根據實際任務的類型和數量來進行配置。
另外,如果ThreadPoolExecutor達不到要求,可以自己繼承ThreadPoolExecutor類進行重寫。
四.如何合理配置線程池的大小
本節來討論一個比較重要的話題:如何合理配置線程池大小,僅供參考。
一般需要根據任務的類型來配置線程池大小:
如果是CPU密集型任務,就需要盡量壓搾CPU,參考值可以設為 NCPU+1
如果是IO密集型任務,參考值可以設置為2*NCPU
當然,這只是一個參考值,具體的設置還需要根據實際情況進行調整,比如可以先將線程池大小設置為參考值,再觀察任務運行情況和系統負載、資源利用率來進行適當調整。
參考資料:
http://ifeve.com/java-threadpool/
http://blog.163.com/among_1985/blog/static/275005232012618849266/
http://developer.51cto.com/art/201203/321885.htm
http://blog.csdn.net/java2000_wl/article/details/22097059
http://blog.csdn.net/cutesource/article/details/6061229
http://blog.csdn.net/xieyuooo/article/details/8718741
《JDK API 1.6》
問啊-一鍵呼叫程序員答題神器,牛人一對一服務,開發者編程必備官方網站:www.wenaaa.com
QQ群290551701 聚集很多互聯網精英,技術總監,架構師,項目經理!開源技術研究,歡迎業內人士,大牛及新手有志於從事IT行業人員進入!