Java 線程池詳解。本站提示廣大學習愛好者:(Java 線程池詳解)文章只能為提供參考,不一定能成為您想要的結果。以下是Java 線程池詳解正文
體系啟動一個線程的本錢是比擬高的,由於它觸及到與操作體系的交互,應用線程池的利益是進步機能,當體系中包括年夜量並發的線程時,會招致體系機能激烈降低,乃至招致JVM瓦解,而線程池的最年夜線程數參數可以掌握體系中並發線程數不跨越次數。
1、Executors 工場類用來發生線程池,該工場類包括以下幾個靜態工場辦法來創立對應的線程池。創立的線程池是一個ExecutorService對象,應用該對象的submit辦法或許是execute辦法履行響應的Runnable或許是Callable義務。線程池自己在不再須要的時刻挪用shutdown()辦法停滯線程池,挪用該辦法後,該線程池將不再許可義務添加出去,然則會直到已添加的一切義務履行完成後才逝世亡。
1、newCachedThreadPool(),創立一個具有緩存功效的線程池,提交到該線程池的義務(Runnable或Callable對象)創立的線程,假如履行完成,會被緩存到CachedThreadPool中,供前面須要履行的義務應用。
import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class CacheThreadPool { static class Task implements Runnable { @Override public void run() { System.out.println(this + " " + Thread.currentThread().getName() + " AllStackTraces map size: " + Thread.currentThread().getAllStackTraces().size()); } } public static void main(String[] args) { ExecutorService cacheThreadPool = Executors.newCachedThreadPool(); //先添加三個義務到線程池 for(int i = 0 ; i < 3; i++) { cacheThreadPool.execute(new Task()); } //等三個線程履行完成後,再次添加三個義務到線程池 try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } for(int i = 0 ; i < 3; i++) { cacheThreadPool.execute(new Task()); } } }
履行成果以下:
CacheThreadPool$Task@2d312eb9 pool-1-thread-1 AllStackTraces map size: 7 CacheThreadPool$Task@59522b86 pool-1-thread-3 AllStackTraces map size: 7 CacheThreadPool$Task@73dbb89f pool-1-thread-2 AllStackTraces map size: 7 CacheThreadPool$Task@5795cedc pool-1-thread-3 AllStackTraces map size: 7 CacheThreadPool$Task@256d5600 pool-1-thread-1 AllStackTraces map size: 7 CacheThreadPool$Task@7d1c5894 pool-1-thread-2 AllStackTraces map size: 7
線程池中的線程對象停止了緩存,當有新義務履行時停止了復用。然則假如有特殊多的並發時,緩存線程池照樣會創立許多個線程對象。
2、newFixedThreadPool(int nThreads) 創立一個指定線程個數,線程可復用的線程池。
import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class FixedThreadPool { static class Task implements Runnable { @Override public void run() { System.out.println(this + " " + Thread.currentThread().getName() + " AllStackTraces map size: " + Thread.currentThread().getAllStackTraces().size()); } } public static void main(String[] args) { ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3); // 先添加三個義務到線程池 for (int i = 0; i < 5; i++) { fixedThreadPool.execute(new Task()); } // 等三個線程履行完成後,再次添加三個義務到線程池 try { Thread.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); } for (int i = 0; i < 3; i++) { fixedThreadPool.execute(new Task()); } } }
履行成果:
FixedThreadPool$Task@7045c12d pool-1-thread-2 AllStackTraces map size: 7 FixedThreadPool$Task@50fa0bef pool-1-thread-2 AllStackTraces map size: 7 FixedThreadPool$Task@ccb1870 pool-1-thread-2 AllStackTraces map size: 7 FixedThreadPool$Task@7392b4e3 pool-1-thread-1 AllStackTraces map size: 7 FixedThreadPool$Task@5bdeff18 pool-1-thread-2 AllStackTraces map size: 7 FixedThreadPool$Task@7d5554e1 pool-1-thread-1 AllStackTraces map size: 7 FixedThreadPool$Task@24468092 pool-1-thread-3 AllStackTraces map size: 7 FixedThreadPool$Task@fa7b978 pool-1-thread-2 AllStackTraces map size: 7
3、newSingleThreadExecutor(),創立一個只要單線程的線程池,相當於挪用newFixedThreadPool(1)
4、newSheduledThreadPool(int corePoolSize),創立指定線程數的線程池,它可以在指定延遲後履行線程。也能夠以某一周期反復履行某一線程,曉得挪用shutdown()封閉線程池。
示例以下:
import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; public class ScheduledThreadPool { static class Task implements Runnable { @Override public void run() { System.out.println("time " + System.currentTimeMillis() + " " + Thread.currentThread().getName() + " AllStackTraces map size: " + Thread.currentThread().getAllStackTraces().size()); } } public static void main(String[] args) { ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(3); scheduledExecutorService.schedule(new Task(), 3, TimeUnit.SECONDS); scheduledExecutorService.scheduleAtFixedRate(new Task(), 3, 5, TimeUnit.SECONDS); try { Thread.sleep(30 * 1000); } catch (InterruptedException e) { e.printStackTrace(); } scheduledExecutorService.shutdown(); } }
運轉成果以下:
time 1458921795240 pool-1-thread-1 AllStackTraces map size: 6 time 1458921795241 pool-1-thread-2 AllStackTraces map size: 6 time 1458921800240 pool-1-thread-1 AllStackTraces map size: 7 time 1458921805240 pool-1-thread-1 AllStackTraces map size: 7 time 1458921810240 pool-1-thread-1 AllStackTraces map size: 7 time 1458921815240 pool-1-thread-1 AllStackTraces map size: 7 time 1458921820240 pool-1-thread-1 AllStackTraces map size: 7
由運轉時光可看出,義務是依照5秒的周期履行的。
5、newSingleThreadScheduledExecutor() 創立一個只要一個線程的線程池,同挪用newScheduledThreadPool(1)。
2、ForkJoinPool和ForkJoinTask
ForkJoinPool是ExecutorService的完成類,支撐將一個義務劃分為多個小義務並行盤算,在把多個小義務的盤算成果歸並成總的盤算成果。它有兩個結構函數
ForkJoinPool(int parallelism)創立一個包括parallelism個並行線程的ForkJoinPool。
ForkJoinPool(),以Runtime.availableProcessors()辦法前往值作為parallelism參數來創立ForkJoinPool。
ForkJoinTask 代表一個可以並行,歸並的義務。它是完成了Future<T>接口的籠統類,它有兩個籠統子類,代表無前往值義務的RecuriveAction和有前往值的RecursiveTask。可依據詳細需求繼續這兩個籠統類完成本身的對象,然後挪用ForkJoinPool的submit 辦法履行。
RecuriveAction 示例以下,完成並行輸入0-300的數字。
import java.util.concurrent.ForkJoinPool; import java.util.concurrent.RecursiveAction; import java.util.concurrent.TimeUnit; public class ActionForkJoinTask { static class PrintTask extends RecursiveAction { private static final int THRESHOLD = 50; private int start; private int end; public PrintTask(int start, int end) { this.start = start; this.end = end; } @Override protected void compute() { if (end - start < THRESHOLD) { for(int i = start; i < end; i++) { System.out.println(Thread.currentThread().getName() + " " + i); } } else { int middle = (start + end) / 2; PrintTask left = new PrintTask(start, middle); PrintTask right = new PrintTask(middle, end); left.fork(); right.fork(); } } } public static void main(String[] args) { ForkJoinPool pool = new ForkJoinPool(); pool.submit(new PrintTask(0, 300)); try { pool.awaitTermination(2, TimeUnit.SECONDS); } catch (InterruptedException e) { e.printStackTrace(); } pool.shutdown(); } }
在拆分小義務後,挪用義務的fork()辦法,參加到ForkJoinPool中並行履行。
RecursiveTask示例,完成並行盤算100個整數乞降。拆分為每20個數乞降後獲得成果,在最初歸並為最初的成果。
import java.util.Random; import java.util.concurrent.ExecutionException; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.Future; import java.util.concurrent.RecursiveTask; public class TaskForkJoinTask { static class CalTask extends RecursiveTask<Integer> { private static final int THRESHOLD = 20; private int arr[]; private int start; private int end; public CalTask(int[] arr, int start, int end) { this.arr = arr; this.start = start; this.end = end; } @Override protected Integer compute() { int sum = 0; if (end - start < THRESHOLD) { for (int i = start; i < end; i++) { sum += arr[i]; } System.out.println(Thread.currentThread().getName() + " sum:" + sum); return sum; } else { int middle = (start + end) / 2; CalTask left = new CalTask(arr, start, middle); CalTask right = new CalTask(arr, middle, end); left.fork(); right.fork(); return left.join() + right.join(); } } } public static void main(String[] args) { int arr[] = new int[100]; Random random = new Random(); int total = 0; for (int i = 0; i < arr.length; i++) { int tmp = random.nextInt(20); total += (arr[i] = tmp); } System.out.println("total " + total); ForkJoinPool pool = new ForkJoinPool(4); Future<Integer> future = pool.submit(new CalTask(arr, 0, arr.length)); try { System.out.println("cal result: " + future.get()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } pool.shutdown(); } }
履行成果以下:
total 912 ForkJoinPool-1-worker-2 sum:82 ForkJoinPool-1-worker-2 sum:123 ForkJoinPool-1-worker-2 sum:144 ForkJoinPool-1-worker-3 sum:119 ForkJoinPool-1-worker-2 sum:106 ForkJoinPool-1-worker-2 sum:128 ForkJoinPool-1-worker-2 sum:121 ForkJoinPool-1-worker-3 sum:89 cal result: 912
子義務履行完後,挪用義務的join()辦法獲得子義務履行成果,再相加取得最初的成果。