1、線程池的處理流程(execute方法) 當向線程池提交一個任務後,其經歷的流程如下: 1)、如果當前線程數小於核心線程數(corePoolSize),則創建新線程來執行該任務; 2)、如果當前線程數不小於,即等於或大於核心線程數(corePoolSize),則將任務添加到阻塞隊列(BlockingQueue)中; 3)、如果阻塞隊列中的任務已滿,且此時線程數小於最大線程數(maximumPoolSize)時,則創建新線程來執行該任務; 4)、執行對應的任務策略,一般是拒絕任務,拋出異常。 2、任務策略: 1)、拋出異常 ThreadPoolExecutor.AbortPolicy() 2)、丟棄當前的任務 ThreadPoolExecutor.DiscardPolicy() 3)、丟棄老的任務 ThreadPoolExecutor.DiscardOldestPolicy() 4)、重試添加當前的任務 ThreadPoolExecutor.CallerRunsPolicy() 3、線程池源碼分析 1)、若干變量 //將工作線程數和線程池狀態放在一個int類型變量中存儲而設置的一個原子類型的變量 //故在ctl中,低29位是用於表示工作線程數,高位用於表示線程池狀態,如RUNNING、SHUTDOWN等。 //故一個線程池中最多有工作線程的個數為(2^29) - 1 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); //低29位 private static final int COUNT_BITS = Integer.SIZE - 3; //線程池中最大的工作線程數 private static final int CAPACITY = (1 << COUNT_BITS) - 1; // runState is stored in the high-order bits //線程池狀態,用高3位表示 private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS; // Packing and unpacking ctl //獲取當前線程池的狀態 private static int runStateOf(int c) { return c & ~CAPACITY; } //獲取當前線程池中的工作線程數 private static int workerCountOf(int c) { return c & CAPACITY; } //組合當前線程池狀態和工作線程數為一個int類型的變量 private static int ctlOf(int rs, int wc) { return rs | wc; } 2)、execute()方法 public void execute(Runnable command) { //當提交的任務為null時,則拋出空指針異常 if (command == null) throw new NullPointerException(); //獲取當前線程池用於記錄狀態和工作線程數的變量 int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { //檢測當前線程池中的工作線程數小於核心線程數時,則直接創建新線程,執行任務 if (addWorker(command, true)) return; //當創建新線程失敗時,需要重新獲取用於記錄狀態和工作線程數的變量 c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { //當前線程池是運行狀態,且將任務添加到阻塞隊列中成功時 //再次獲取用於記錄狀態和工作線程數的變量 int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) //當前線程池不是運行狀態,且刪除成功時,使用任務策略 reject(command); else if (workerCountOf(recheck) == 0) //當前工作線程數為0時,直接添加空任務 addWorker(null, false); } else if (!addWorker(command, false)) //阻塞隊列已滿且當前工作線程數小於最大線程數時,則直接創建線程,執行任務 //若還失敗,則直接使用任務策略 reject(command); } private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); //獲取當前線程池的狀態 int rs = runStateOf(c); //檢測當前線程池是否處於關閉狀態 // Check if queue empty only if necessary. if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { //獲取當前線程池的工作線程數 int wc = workerCountOf(c); //如果超過了限制,則返回false if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; //通過CAS增加一個工作線程 if (compareAndIncrementWorkerCount(c)) break retry; //再次獲取用於標記線程池狀態和記錄工作線程數的變量,並比對當前狀態是否一直,若不是,則繼續外環循環,否則繼續內環循環 c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { final ReentrantLock mainLock = this.mainLock; //新建一個工作線程 w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { mainLock.lock(); //加鎖 try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int c = ctl.get(); int rs = runStateOf(c); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); //將工作線程添加到線程集合Set中 workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { //工作線程開始啟動,執行提交的任務 t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; } //工作線程的構造方法 Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } //線程執行體 /** Delegates main run loop to outer runWorker */ public void run() { //調用父類的runWorker方法 runWorker(this); } final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { //不斷的從任務隊列中獲取任務,並執行 while (task != null || (task = getTask()) != null) { w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt //線程是否中斷關閉 if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { //任務執行前的執行方法 beforeExecute(wt, task); Throwable thrown = null; try { //執行任務 task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { //任務執行或的執行方法 afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } } //調用該方法後,該線程池不會再接受新任務,當已經存在的任務執行完畢後,線程池就會關閉 void shutdown() //調用該方法後,該線程池會嘗試關閉現有的線程,直到所有的線程都關閉,線程池就會關閉 List<Runnable> shutdownNow() 4、常用的線程池 1)、固定大小線程的線程池 newFixedThreadPool 2)、單一線程的線程池,當線程發生異常結束時,則會另外創建一個新的線程,以保持線程池自始至終只有一個線程 newSingleThreadExecutor 3)、無限制線程數的線程池,當空閒線程超過空閒時間時(默認1分鐘),線程會被回收 newCachedThreadPool 5、阻塞隊列 //往隊列中添加元素,成功返回true,失敗拋出異常 boolean add(E e) //往隊列中添加元素,成功返回true,失敗返回false boolean offer(E e) //往隊列中添加元素,在指定的時間內若是添加不了,則返回false,否則返回true boolean offer(E e, long timeout, TimeUnit unit) //有阻塞的添加元素,即肯定能將元素添加到隊列中,但是可能一直被阻塞 void put(E e) throws InterruptedException //獲取隊列中的首元素,沒有返回null E poll() //獲取隊列中的首元素,在指定的時間內若是獲取不到,則返回null E poll(long timeout, TimeUnit unit) //獲取隊列中的首元素,當隊列中沒有元素時,則一直阻塞,直到有元素時,才返回首元素 E take()