在執行並發任務時,我們可以把任務傳遞給一個線程池,來替代為每個並發執行的任務都啟動一個新的線程,只要池裡有空閒的線程,任務就會分配一個線程執行。在線程池的內部,任務被插入一個阻塞隊列(BlockingQueue),線程池裡的線程會去取這個隊列裡的任務。
利用線程池有三個好處:
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
創建一個線程池需要的幾個參數:
1、ArrayBlockingQueue:是一個基於數組結構的有界阻塞隊列,此隊列按 FIFO(先進先出)原則對元素進行排序。
2、LinkedBlockingQueue:一個基於鏈表結構的阻塞隊列,此隊列按FIFO (先進先出) 排序元素,吞吐量通常要高於ArrayBlockingQueue。靜態工廠方法Executors.newFixedThreadPool()使用了這個隊列。
3、SynchronousQueue:一個不存儲元素的阻塞隊列。每個插入操作必須等到另一個線程調用移除操作,否則插入操作一直處於阻塞狀態,吞吐量通常要高於LinkedBlockingQueue,靜態工廠方法Executors.newCachedThreadPool使用了這個隊列。
4、PriorityBlockingQueue:一個具有優先級得無限阻塞隊列。
此外,我們還可以通過調用Ececutors中的某個靜態工廠方法來創建一個線程池(它們的內部實現原理都是相同的,僅僅是使用了不同的工作隊列或線程池大小):
newFixedThreadPool:創建一個定長的線程池,每當提交一個任務就創建一個線程,直到達到池的最大長度,這時線程池會保持長度不在變化
newCachedThreadPool:創建一個可緩存的線程池,如果當前的線程池的長度超過了處理的需要時,它可以靈活的回收空閒的線程,當需求增加時,它可以靈活的添加新的線程,並不會對池的長度做任何限制
newSingleThreadPool:創建一個單線程化的executor,它只會創建唯一的工作者線程來執行任務
newScheduledThreadPool:創建一個定長的線程池,而且支持定時的以及周期性的任務執行,類似於Timer
可以使用execute向線程池提交任務:
public class Test2 { public static void main(String[] args) { BlockingQueue<Runnable> workQueue=new LinkedBlockingDeque<Runnable>(); ThreadPoolExecutor poolExecutor=new ThreadPoolExecutor(2, 3, 60, TimeUnit.SECONDS, workQueue); poolExecutor.execute(new Task1()); poolExecutor.execute(new Task2());
poolExecutor.shutdown(); } } class Task1 implements Runnable { public void run() { System.out.println("執行任務1"); } } class Task2 implements Runnable { public void run() { System.out.println("執行任務2"); } }
也可以使用submit方法來提交任務,它會返回一個future,我們可以通過這個future來判斷任務是否執行成功,通過future的get方法獲取返回值,get方法會阻塞直到任務完成。
public class Test3 { public static void main(String[] args) throws InterruptedException, ExecutionException { ExecutorService executorService = Executors.newCachedThreadPool(); List<Future<String>> resultList = new ArrayList<Future<String>>(); // 創建10個任務並執行 for (int i = 0; i < 10; i++) { // 使用ExecutorService執行Callable類型的任務,並將結果保存在future變量中 Future<String> future = executorService.submit(new TaskWithResult(i)); resultList.add(future); } for (Future<String> future : resultList) { while (!future.isDone());// Future返回如果沒有完成,則一直循環等待,直到Future返回完成 { System.out.println(future.get()); // 打印各個線程(任務)執行的結果 } } executorService.shutdown(); } } class TaskWithResult implements Callable<String> { private int id; public TaskWithResult(int id) { this.id = id; } public String call() throws Exception { return "執行結果"+id; } }
可以通過調用線程池的shutdown或shutdownNow方法來關閉線程池,但是它們的實現原理不同,shutdown的原理是只是將線程池的狀態設置成SHUTDOWN狀態,然後中斷所有沒有正在執行任務的線程。shutdownNow的原理是遍歷線程池中的工作線程,然後逐個調用線程的interrupt方法來中斷線程,所以無法響應中斷的任務可能永遠無法終止。shutdownNow會首先將線程池的狀態設置成STOP,然後嘗試停止所有的正在執行或暫停任務的線程,並返回等待執行任務的列表。
只要調用了這兩個關閉方法的其中一個,isShutdown方法就會返回true。當所有的任務都已關閉後,才表示線程池關閉成功,這時調用isTerminaed方法會返回true。至於我們應該調用哪一種方法來關閉線程池,應該由提交到線程池的任務特性決定,通常調用shutdown來關閉線程池,如果任務不一定要執行完,則可以調用shutdownNow。
整個ThreadExecutor的任務處理經過下面4個步驟,如下圖所示:
1、如果當前的線程數<corePoolSize,提交的Runnable任務,會直接作為new Thread的參數,立即執行,當提交的任務數超過了corePoolSize,就進入第二部操作
2、將當前的任務提交到BlockingQueue阻塞隊列中,如果Block Queue是個有界隊列,當隊列滿了之後就進入第三步
3、如果poolSize < maximumPoolsize時,會嘗試new 一個Thread的進行救急處理,執行對應的runnable任務
4、如果第三步也無法處理,就會用RejectedExecutionHandler來做拒絕處理
Timer工具管理任務的定時以及周期性執行。示例代碼如下:
public class TimerTest { final static SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); public static void main(String[] args) { TimerTask timerTask1=new TimerTask() { @Override public void run() { System.out.println("任務1執行時間:"+sdf.format(new Date())); try { Thread.sleep(3000);//模擬任務1執行時間為3秒 } catch (InterruptedException e) { // TODO 自動生成的 catch 塊 e.printStackTrace(); } } }; System.out.println("當前時間:"+sdf.format(new Date())); Timer timer=new Timer(); timer.schedule(timerTask1, new Date(),4000); //間隔4秒周期性執行 } }
執行結果:
可以看到上述任務1以4秒為間隔周期性執行。但是Timer存在一些缺陷,主要是下面兩個方面的問題:
缺陷1:Timer只創建唯一的線程的來執行所有的Timer任務,如果一個time任務的執行很耗時,會導致其他的TimeTask的時效准確性出問題。看下面的例子:
public class TimerTest { final static SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); public static void main(String[] args) { TimerTask timerTask1=new TimerTask() { @Override public void run() { System.out.println("任務1執行時間:"+sdf.format(new Date())); try { Thread.sleep(10000); } catch (InterruptedException e) { // TODO 自動生成的 catch 塊 e.printStackTrace(); } } }; TimerTask timerTask2=new TimerTask() { @Override public void run() { System.out.println("任務2執行時間:"+sdf.format(new Date())); } }; System.out.println("當前時間:"+sdf.format(new Date())); Timer timer=new Timer(); timer.schedule(timerTask1, new Date(),1000); //間隔1秒周期性執行 timer.schedule(timerTask2, new Date(),4000); //間隔4秒周期性執行 } }
執行結果:
缺陷2:如果TimeTask拋出未檢查的異常,Timer將產生無法預料的行為。Timer線程並不捕獲線程,所有TimerTask拋出的未檢查的異常會終止timer線程。看下面的代碼:
public class TimerTest2 { final static SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); public static void main(String[] args) { TimerTask timerTask1=new TimerTask() { @Override public void run() { System.out.println("任務1執行時間:"+sdf.format(new Date())); throw new RuntimeException(); } }; TimerTask timerTask2=new TimerTask() { @Override public void run() { System.out.println("任務2執行時間:"+sdf.format(new Date())); } }; System.out.println("當前時間:"+sdf.format(new Date())); Timer timer=new Timer(); timer.schedule(timerTask1, new Date(),1000); //周期1秒執行任務1 timer.schedule(timerTask2, new Date() ,3000); //周期3秒執行任務2 } }
執行結果為:
針對上述的兩個問題,我們可以使用ScheduledThreadPoolExecutor來作為Timer的替代。
針對問題1,有下面代碼:
public class ScheduledThreadPoolExecutorTest { final static SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); public static void main(String[] args) { TimerTask timerTask1 = new TimerTask() { @Override public void run() { System.out.println("任務1執行時間:" + sdf.format(new Date())); try { Thread.sleep(10000); } catch (InterruptedException e) { // TODO 自動生成的 catch 塊 e.printStackTrace(); } } }; TimerTask timerTask2 = new TimerTask() { @Override public void run() { System.out.println("任務2執行時間:" + sdf.format(new Date())); } }; System.out.println("當前時間:" + sdf.format(new Date())); ScheduledThreadPoolExecutor poolExecutor=new ScheduledThreadPoolExecutor(2); poolExecutor.scheduleAtFixedRate(timerTask1, 0, 1000,TimeUnit.MILLISECONDS); poolExecutor.scheduleAtFixedRate(timerTask2, 0, 4000,TimeUnit.MILLISECONDS); } }
執行的結果為:
針對問題2,有下面代碼:
public class ScheduledThreadPoolExecutorTest2 { final static SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); public static void main(String[] args) { TimerTask timerTask1 = new TimerTask() { @Override public void run() { System.out.println("任務1執行時間:" + sdf.format(new Date())); throw new RuntimeException(); } }; TimerTask timerTask2 = new TimerTask() { @Override public void run() { System.out.println("任務2執行時間:" + sdf.format(new Date())); } }; System.out.println("當前時間:" + sdf.format(new Date())); ScheduledThreadPoolExecutor poolExecutor=new ScheduledThreadPoolExecutor(2); poolExecutor.scheduleAtFixedRate(timerTask1, 0, 1000, TimeUnit.MILLISECONDS); poolExecutor.scheduleAtFixedRate(timerTask2, 0, 2000, TimeUnit.MILLISECONDS); } }
執行結果為:
1、Java並發編程實踐