程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
 程式師世界 >> 編程語言 >> JAVA編程 >> JAVA綜合教程 >> Java並發編程總結5——ThreadPoolExecutor,java並發編程實戰

Java並發編程總結5——ThreadPoolExecutor,java並發編程實戰

編輯:JAVA綜合教程

Java並發編程總結5——ThreadPoolExecutor,java並發編程實戰


一、ThreadPoolExecutor介紹

在jdk1.8中,構造函數有4個。以

ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)為例:

1、corePoolSize: 核心線程池大小

2、maximumPoolSize: 最大線程池大小

3、keepAliveTime: 當線程池中的線程數大於corePoolSize時, 多余空閒線程等待新任務的最長時間, 超過這個時間後多余線程終止

4、unit: 時間單位, 比如毫秒, 納秒等

5、workQueue: 阻塞隊列

6、threadFactory: 創建線程工廠, 可以方便的創建線程, 可以自定義; 默認為Executors.DefaultThreadFactory

7、handler: 飽和策略, 默認為AbortPolicy

 

定義一個完整的線程池可以這麼寫:

    private ThreadPoolExecutor defaultThreadPool = new ThreadPoolExecutor(10, 100, 100, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(1000), Executors.defaultThreadFactory(), new ThreadPoolExecutor.DiscardPolicy());

 

線程池的主要處理流程如下:

1、提交任務到線程池,判斷核心線程池(corePoolSize)是否已滿? 沒滿,創建工作線程執行任務;滿了,進入下個流程。

2、線程池判斷工作隊列(workQueue)是否已滿?沒滿,則將新提交的任務存儲在工作隊列裡。滿了,則進入下個流程。

3、判斷整個線程池(maximumPoolSize)是否已滿?沒滿,則創建一個新的工作線程來執行任務,滿了,則交給飽和策略(RejectedExecutionHandler)來處理這個任務。

 

二、飽和策略 RejectedExecutionHandler

在ThreadPoolExecutor類中定義了4種RejectedExecutionHandler類型:

1、AbortPolicy: 默認策略,直接拋出RejectedExecutionException異常。

2、CallerRunsPolicy:只用調用者所在線程來運行任務。

3、DiscardOldestPolicy:丟棄隊列裡最近的一個任務,並執行當前任務。

4、DiscardPolicy:不處理,丟棄掉。

也可以自定義飽和策略,比如將無法處理的新任務加入日志等,只需要實現RejectedExecutionHandler接口即可。

 

三、阻塞隊列BlockingQueue

    /**
     * ArrayBlockingQueue: 由數組結構組成的有界阻塞隊列, 隊列遵循FIFO
     */
    private BlockingQueue<String> abq = new ArrayBlockingQueue<String>(10);

    /**
     * LinkedBlockingQueue: 由鏈表結構組成的阻塞隊列, FIFO
     * LinkedBlockingDeque: 由鏈表結構組成的雙向阻塞隊列, 構造方法和LinkedBlockingQueue類似
     * public LinkedBlockingQueue(int capacity)     //有界阻塞隊列,隊列最大值為capacity
     * public LinkedBlockingQueue() { this(Integer.MAX_VALUE); }        //無界阻塞隊列,隊列最大值為Integer.MAX_VALUE
     * 通過Executors.newFixedThreadPool(int nThreads)創建的線程池中采用的是無界LinkedBlockingQueue
     * 對於put和take操作, 內部采用了不同的鎖: putLock, takeLock, 而ArrayBlockingQueue內部只有一把鎖
     */
    private BlockingQueue<Thread> lbq = new LinkedBlockingQueue<Thread>(10);
    private ExecutorService fixedThreadPool = Executors.newFixedThreadPool(10);
    private BlockingQueue<String> lbd = new LinkedBlockingDeque<String>();

    /**
     * PriorityBlockingQueue: 支持優先級排序的有界阻塞隊列
     * public PriorityBlockingQueue(int initialCapacity, Comparator<? super E> comparator)
     */
    private BlockingQueue<String> pbq = new PriorityBlockingQueue(100, new Comparator<String>() {
        public int compare(String o1, String o2) {
            return o1.compareTo(o2);        //升序排列
        }
    });

    /**
     * DelayQueue: 一個使用優先級隊列實現的無界阻塞隊列, 支持延時獲取元素, 適用於緩存系統的設計以及定時任務調度。
     * 內部隊列采用PriorityQueue, private final PriorityQueue<E> q = new PriorityQueue<E>();
     * 隊列元素需要實現Delayed接口, class DelayQueue<E extends Delayed> extends AbstractQueue<E>
     */

    /**
     * SynchronousQueue: 不存儲元素的阻塞隊列
     */

    /**
     * LinkedTransferQueue: 由鏈表結構組成的實現了TransferQueue接口的無界阻塞隊列
     * transfer方法: 如果當前消費者正在等待接收元素(take()或poll(long timeout, TimeUnit unit))方法, 生產者傳入的元素可以直接傳遞給消費者, 而不放入隊列
     */
    BlockingQueue<String> ltq = new LinkedTransferQueue<String>();

 

四、通過Executors創建線程池

       /**
         * 創建單個線程, 適用於需要保證順序執行任務的場景
         * return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()));
         * BlockingQueue采用無界LinkedBlockingQueue, 因此maximumPoolSize、keepAliveTime參數無意義
         */
        Executors.newSingleThreadExecutor();
/** * 創建固定線程, 適用於需要限制當前線程數量, 負載比較重的服務器 * return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); * 采用無界LinkedBlockingQueue, 因此maximumPoolSize、keepAliveTime參數無意義 */ Executors.newFixedThreadPool(10);
/** * 根據需要創建線程, 適用於執行很多的短期異步任務的小程序, 或者負載較輕的服務器 * return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); * 初始線程為0, 采用SynchronousQueue阻塞隊列, 如果生產任務的速度低於消費的速度, 空閒60s的線程會被終止; 如果生產任務的速度持續高於消費速度, 則會不斷創建新線程 */ Executors.newCachedThreadPool();
/** * 在給定的延遲之後運行任務, 或者定期執行任務 * super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue()); * 采用無界DelayQueue隊列, 因此maximumPoolSize、keepAliveTime參數無意義 */ Executors.newScheduledThreadPool(10);

 

 

 

  1. 上一頁:
  2. 下一頁:
Copyright © 程式師世界 All Rights Reserved