程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
 程式師世界 >> 編程語言 >> JAVA編程 >> 關於JAVA >> Java線程池分析基礎教程

Java線程池分析基礎教程

編輯:關於JAVA
 

一、概述

在執行一個異步任務或並發任務時,往往是通過直接new Thread()方法來創建新的線程,這樣做弊端較多,更好的解決方案是合理地利用線程池,線程池的優勢很明顯,如下:

  1. 降低系統資源消耗,通過重用已存在的線程,降低線程創建和銷毀造成的消耗;
  2. 提高系統響應速度,當有任務到達時,無需等待新線程的創建便能立即執行;
  3. 方便線程並發數的管控,線程若是無限制的創建,不僅會額外消耗大量系統資源,更是占用過多資源而阻塞系統或oom等狀況,從而降低系統的穩定性。線程池能有效管控線程,統一分配、調優,提供資源使用率;
  4. 更強大的功能,線程池提供了定時、定期以及可控線程數等功能的線程池,使用方便簡單。

二、線程池用法

Java API針對不同需求,利用Executors類提供了4種不同的線程池:newCachedThreadPool, newFixedThreadPool, newScheduledThreadPool, newSingleThreadExecutor,接下來講講線程池的用法。

2.1 newCachedThreadPool

創建一個可緩存的無界線程池,該方法無參數。當線程池中的線程空閒時間超過60s則會自動回收該線程,當任務超過線程池的線程數則創建新線程。線程池的大小上限為Integer.MAX_VALUE,可看做是無限大。

public void cachedThreadPoolDemo(){
    ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
    for (int i = 0; i < 5; i++) {
        final int index = i;

        cachedThreadPool.execute(new Runnable() {

            @Override
            public void run() {
                System.out.println(Thread.currentThread().getName()+", index="+index);
            }
        });

        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

運行結果:

pool-1-thread-1, index=0
pool-1-thread-1, index=1
pool-1-thread-1, index=2
pool-1-thread-1, index=3
pool-1-thread-1, index=4

從運行結果可以看出,整個過程都在同一個線程pool-1-thread-1中運行,後面線程復用前面的線程。

2.2 newFixedThreadPool

創建一個固定大小的線程池,該方法可指定線程池的固定大小,對於超出的線程會在LinkedBlockingQueue隊列中等待。

public void fixedThreadPoolDemo(){
    ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3);
    for (int i = 0; i < 6; i++) {
        final int index = i;

        fixedThreadPool.execute(new Runnable() {

            @Override
            public void run() {
                System.out.println(Thread.currentThread().getName()+", index="+index);
            }
        });

        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

運行結果:

pool-1-thread-1, index=0
pool-1-thread-2, index=1
pool-1-thread-3, index=2
pool-1-thread-1, index=3
pool-1-thread-2, index=4
pool-1-thread-3, index=5

從運行結果可以看出,線程池大小為3,每休眠1s後將任務提交給線程池的各個線程輪番交錯地執行。線程池的大小設置,可參數Runtime.getRuntime().availableProcessors()。

2.3 newSingleThreadExecutor

創建一個只有線程的線程池,該方法無參數,所有任務都保存隊列LinkedBlockingQueue中,等待唯一的單線程來執行任務,並保證所有任務按照指定順序(FIFO或優先級)執行。

public void singleThreadExecutorDemo(){
    ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
    for (int i = 0; i < 3; i++) {
        final int index = i;

        singleThreadExecutor.execute(new Runnable() {

            @Override
            public void run() {
                System.out.println(Thread.currentThread().getName()+", index="+index);
            }
        });

        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

運行結果:

pool-1-thread-1, index=0
pool-1-thread-1, index=1
pool-1-thread-1, index=2

從運行結果可以看出,所有任務都是在單一線程運行的。

2.4 newScheduledThreadPool

創建一個可定時執行或周期執行任務的線程池,該方法可指定線程池的核心線程個數。

public void scheduledThreadPoolDemo(){
    ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(3);
    //定時執行一次的任務,延遲1s後執行
    scheduledThreadPool.schedule(new Runnable() {

        @Override
        public void run() {
            System.out.println(Thread.currentThread().getName()+", delay 1s");
        }
    }, 1, TimeUnit.SECONDS);

    //周期性地執行任務,延遲2s後,每3s一次地周期性執行任務
    scheduledThreadPool.scheduleAtFixedRate(new Runnable() {

        @Override
        public void run() {
            System.out.println(Thread.currentThread().getName()+", every 3s");
        }
    }, 2, 3, TimeUnit.SECONDS);
}

運行結果:

pool-1-thread-1, delay 1s
pool-1-thread-1, every 3s
pool-1-thread-2, every 3s
pool-1-thread-2, every 3s
...
  • schedule(Runnable command, long delay, TimeUnit unit),延遲一定時間後執行Runnable任務;
  • schedule(Callable callable, long delay, TimeUnit unit),延遲一定時間後執行Callable任務;
  • scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit),延遲一定時間後,以間隔period時間的頻率周期性地執行任務;
  • scheduleWithFixedDelay(Runnable command, long initialDelay, long delay,TimeUnit unit),與scheduleAtFixedRate()方法很類似,但是不同的是scheduleWithFixedDelay()方法的周期時間間隔是以上一個任務執行結束到下一個任務開始執行的間隔,而scheduleAtFixedRate()方法的周期時間間隔是以上一個任務開始執行到下一個任務開始執行的間隔,也就是這一些任務系列的觸發時間都是可預知的。

ScheduledExecutorService功能強大,對於定時執行的任務,建議多采用該方法。

2.5 方法對比

上述4個方法的參數對比,如下:

工廠方法 corePoolSize maximumPoolSize keepAliveTime workQueue newCachedThreadPool 0 Integer.MAX_VALUE 60s SynchronousQueue newFixedThreadPool nThreads nThreads 0 LinkedBlockingQueue newSingleThreadExecutor 1 1 0 LinkedBlockingQueue newScheduledThreadPool corePoolSize Integer.MAX_VALUE 0 DelayedWorkQueue

其他參數都相同,其中線程工廠的默認類為DefaultThreadFactory,線程飽和的默認策略為ThreadPoolExecutor.AbortPolicy。

三、線程池原理

Executors類提供4個靜態工廠方法:newCachedThreadPool()、newFixedThreadPool(int)、newSingleThreadExecutor和newScheduledThreadPool(int)。這些方法最終都是通過ThreadPoolExecutor類來完成的,這裡強烈建議大家直接使用Executors類提供的便捷的工廠方法,能完成絕大多數的用戶場景,當需要更細節地調整配置,需要先了解每一項參數的意義。

3.1 ThreadPoolExecutor

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         Executors.defaultThreadFactory(), defaultHandler);
}

創建線程池,在構造一個新的線程池時,必須滿足下面的條件:

  1. corePoolSize(線程池基本大小)必須大於或等於0;
  2. maximumPoolSize(線程池最大大小)必須大於或等於1;
  3. maximumPoolSize必須大於或等於corePoolSize;
  4. keepAliveTime(線程存活保持時間)必須大於或等於0;
  5. workQueue(任務隊列)不能為空;
  6. threadFactory(線程工廠)不能為空,默認為DefaultThreadFactory類
  7. handler(線程飽和策略)不能為空,默認策略為ThreadPoolExecutor.AbortPolicy。

3.2 參數詳解

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

參數說明:

  1. corePoolSize(線程池基本大小):當向線程池提交一個任務時,若線程池已創建的線程數小於corePoolSize,即便此時存在空閒線程,也會通過創建一個新線程來執行該任務,直到已創建的線程數大於或等於corePoolSize時,才會根據是否存在空閒線程,來決定是否需要創建新的線程。除了利用提交新任務來創建和啟動線程(按需構造),也可以通過 prestartCoreThread() 或 prestartAllCoreThreads() 方法來提前啟動線程池中的基本線程。

  2. maximumPoolSize(線程池最大大小):線程池所允許的最大線程個數。當隊列滿了,且已創建的線程數小於maximumPoolSize,則線程池會創建新的線程來執行任務。另外,對於無界隊列,可忽略該參數。

  3. keepAliveTime(線程存活保持時間):默認情況下,當線程池的線程個數多於corePoolSize時,線程的空閒時間超過keepAliveTime則會終止。但只要keepAliveTime大於0,allowCoreThreadTimeOut(boolean) 方法也可將此超時策略應用於核心線程。另外,也可以使用setKeepAliveTime()動態地更改參數。

  4. unit(存活時間的單位):時間單位,分為7類,從細到粗順序:NANOSECONDS(納秒),MICROSECONDS(微妙),MILLISECONDS(毫秒),SECONDS(秒),MINUTES(分),HOURS(小時),DAYS(天);

  5. workQueue(任務隊列):用於傳輸和保存等待執行任務的阻塞隊列。可以使用此隊列與線程池進行交互:
    • 如果運行的線程數少於 corePoolSize,則 Executor 始終首選添加新的線程,而不進行排隊。
    • 如果運行的線程數等於或多於 corePoolSize,則 Executor 始終首選將請求加入隊列,而不添加新的線程。
    • 如果無法將請求加入隊列,則創建新的線程,除非創建此線程超出 maximumPoolSize,在這種情況下,任務將被拒絕。
  6. threadFactory(線程工廠):用於創建新線程。由同一個threadFactory創建的線程,屬於同一個ThreadGroup,創建的線程優先級都為Thread.NORM_PRIORITY,以及是非守護進程狀態。threadFactory創建的線程也是采用new Thread()方式,threadFactory創建的線程名都具有統一的風格:pool-m-thread-n(m為線程池的編號,n為線程池內的線程編號);

  7. handler(線程飽和策略):當線程池和隊列都滿了,則表明該線程池已達飽和狀態。
    • ThreadPoolExecutor.AbortPolicy:處理程序遭到拒絕,則直接拋出運行時異常 RejectedExecutionException。(默認策略)
    • ThreadPoolExecutor.CallerRunsPolicy:調用者所在線程來運行該任務,此策略提供簡單的反饋控制機制,能夠減緩新任務的提交速度。
    • ThreadPoolExecutor.DiscardPolicy:無法執行的任務將被刪除。
    • ThreadPoolExecutor.DiscardOldestPolicy:如果執行程序尚未關閉,則位於工作隊列頭部的任務將被刪除,然後重新嘗試執行任務(如果再次失敗,則重復此過程)。

排隊有三種通用策略:

  • 直接提交。工作隊列的默認選項是 SynchronousQueue,它將任務直接提交給線程而不保持它們。在此,如果不存在可用於立即運行任務的線程,則試圖把任務加入隊列將失敗,因此會構造一個新的線程。此策略可以避免在處理可能具有內部依賴性的請求集時出現鎖。直接提交通常要求無界 maximumPoolSizes 以避免拒絕新提交的任務。當命令以超過隊列所能處理的平均數連續到達時,此策略允許無界線程具有增長的可能性。
  • 無界隊列。使用無界隊列(例如,不具有預定義容量的 LinkedBlockingQueue)將導致在所有 corePoolSize 線程都忙時新任務在隊列中等待。這樣,創建的線程就不會超過 corePoolSize。(因此,maximumPoolSize 的值也就無效了。)當每個任務完全獨立於其他任務,即任務執行互不影響時,適合於使用無界隊列;例如,在 Web 頁服務器中。這種排隊可用於處理瞬態突發請求,當命令以超過隊列所能處理的平均數連續到達時,此策略允許無界線程具有增長的可能性。
  • 有界隊列。當使用有限的 maximumPoolSizes 時,有界隊列(如 ArrayBlockingQueue)有助於防止資源耗盡,但是可能較難調整和控制。隊列大小和最大池大小可能需要相互折衷:使用大型隊列和小型池可以最大限度地降低 CPU 使用率、操作系統資源和上下文切換開銷,但是可能導致人工降低吞吐量。如果任務頻繁阻塞(例如,如果它們是 I/O 邊界),則系統可能為超過您許可的更多線程安排時間。使用小型隊列通常要求較大的池大小,CPU 使用率較高,但是可能遇到不可接受的調度開銷,這樣也會降低吞吐量。

3.3 工作隊列BlockingQueue

BlockingQueue的插入/移除/檢查這些方法,對於不能立即滿足但可能在將來某一時刻可以滿足的操作,共有4種不同的處理方式:第一種是拋出一個異常,第二種是返回一個特殊值(null 或 false,具體取決於操作),第三種是在操作可以成功前,無限期地阻塞當前線程,第四種是在放棄前只在給定的最大時間限制內阻塞。如下表格:

操作 拋出異常 特殊值 阻塞 超時 插入 add(e) offer(e) put(e) offer(e, time, unit) 移除 remove() poll() take() poll(time, unit) 檢查 element() peek() 不可用 不可用

實現BlockingQueue接口的常見類如下:

  • ArrayBlockingQueue:基於數組的有界阻塞隊列。隊列按FIFO原則對元素進行排序,隊列頭部是在隊列中存活時間最長的元素,隊尾則是存在時間最短的元素。新元素插入到隊列的尾部,隊列獲取操作則是從隊列頭部開始獲得元素。 這是一個典型的“有界緩存區”,固定大小的數組在其中保持生產者插入的元素和使用者提取的元素。一旦創建了這樣的緩存區,就不能再增加其容量。試圖向已滿隊列中放入元素會導致操作受阻塞;試圖從空隊列中提取元素將導致類似阻塞。ArrayBlockingQueue構造方法可通過設置fairness參數來選擇是否采用公平策略,公平性通常會降低吞吐量,但也減少了可變性和避免了“不平衡性”,可根據情況來決策。

  • LinkedBlockingQueue:基於鏈表的無界阻塞隊列。與ArrayBlockingQueue一樣采用FIFO原則對元素進行排序。基於鏈表的隊列吞吐量通常要高於基於數組的隊列。

  • SynchronousQueue:同步的阻塞隊列。其中每個插入操作必須等待另一個線程的對應移除操作,等待過程一直處於阻塞狀態,同理,每一個移除操作必須等到另一個線程的對應插入操作。SynchronousQueue沒有任何容量。不能在同步隊列上進行 peek,因為僅在試圖要移除元素時,該元素才存在;除非另一個線程試圖移除某個元素,否則也不能(使用任何方法)插入元素;也不能迭代隊列,因為其中沒有元素可用於迭代。Executors.newCachedThreadPool使用了該隊列。

  • PriorityBlockingQueue:基於優先級的無界阻塞隊列。優先級隊列的元素按照其自然順序進行排序,或者根據構造隊列時提供的 Comparator 進行排序,具體取決於所使用的構造方法。優先級隊列不允許使用 null 元素。依靠自然順序的優先級隊列還不允許插入不可比較的對象(這樣做可能導致 ClassCastException)。雖然此隊列邏輯上是無界的,但是資源被耗盡時試圖執行 add 操作也將失敗(導致 OutOfMemoryError)。

3.4 線程池關閉

調用線程池的shutdown()或shutdownNow()方法來關閉線程池

  • shutdown原理:將線程池狀態設置成SHUTDOWN狀態,然後中斷所有沒有正在執行任務的線程。
  • shutdownNow原理:將線程池的狀態設置成STOP狀態,然後中斷所有任務(包括正在執行的)的線程,並返回等待執行任務的列表。

中斷采用interrupt方法,所以無法響應中斷的任務可能永遠無法終止。但調用上述的兩個關閉之一,isShutdown()方法返回值為true,當所有任務都已關閉,表示線程池關閉完成,則isTerminated()方法返回值為true。當需要立刻中斷所有的線程,不一定需要執行完任務,可直接調用shutdownNow()方法。

3.5 線程池流程

thread-pool

  1. 判斷核心線程池是否已滿,即已創建線程數是否小於corePoolSize?沒滿則創建一個新的工作線程來執行任務。已滿則進入下個流程。
  2. 判斷工作隊列是否已滿?沒滿則將新提交的任務添加在工作隊列,等待執行。已滿則進入下個流程。
  3. 判斷整個線程池是否已滿,即已創建線程數是否小於maximumPoolSize?沒滿則創建一個新的工作線程來執行任務,已滿則交給飽和策略來處理這個任務。

四、優化

4.1 合理地配置線程池

需要針對具體情況而具體處理,不同的任務類別應采用不同規模的線程池,任務類別可劃分為CPU密集型任務、IO密集型任務和混合型任務。

  • 對於CPU密集型任務:線程池中線程個數應盡量少,不應大於CPU核心數;
  • 對於IO密集型任務:由於IO操作速度遠低於CPU速度,那麼在運行這類任務時,CPU絕大多數時間處於空閒狀態,那麼線程池可以配置盡量多些的線程,以提高CPU利用率;
  • 對於混合型任務:可以拆分為CPU密集型任務和IO密集型任務,當這兩類任務執行時間相差無幾時,通過拆分再執行的吞吐率高於串行執行的吞吐率,但若這兩類任務執行時間有數據級的差距,那麼沒有拆分的意義。

4.2 線程池監控

利用線程池提供的參數進行監控,參數如下:

  • taskCount:線程池需要執行的任務數量。
  • completedTaskCount:線程池在運行過程中已完成的任務數量,小於或等於taskCount。
  • largestPoolSize:線程池曾經創建過的最大線程數量,通過這個數據可以知道線程池是否滿過。如等於線程池的最大大小,則表示線程池曾經滿了。
  • getPoolSize:線程池的線程數量。如果線程池不銷毀的話,池裡的線程不會自動銷毀,所以這個大小只增不減。
  • getActiveCount:獲取活動的線程數。

通過擴展線程池進行監控:繼承線程池並重寫線程池的beforeExecute(),afterExecute()和terminated()方法,可以在任務執行前、後和線程池關閉前自定義行為。如監控任務的平均執行時間,最大執行時間和最小執行時間等。

 
  1. 上一頁:
  2. 下一頁:
Copyright © 程式師世界 All Rights Reserved