大多數服務端應用程序都需要同時處理任務的能力,這樣可以提高工作性能並增加硬件資源的利用。在早期的Java版本(1.4或更早的)中,開發者需要完成並發(concurrent)應用程序——包括線程池邏輯—他們自己使用的是低層次語言結構和Java Thread API。但是結果卻總是不理想。Java Thread API的特性會導致不知情的編程者開發一些難以調試的編程錯誤的代碼。
在Java5.0中,Sun公司采用了Java concurrency功能(JSR-166)來解決這些問題,並且提供了一套標准的APIs來創建並開發應用程序。本文探究了一些Java concurrency package提供的特性並使用這些功能來演示編寫並發應用程序的技術。
Concurrent Programming的挑戰
自從它發布以來,Java就提供了Thread類和低層次語言結構,例如synchronized 和volatile用來開發獨立平台的並發應用程序。但是,用這些特性來構建並發應用程序並不是簡單的事情。開發者要面對以下的挑戰:
不正確的編程會導致一些困境,就是兩個或兩個以上的線程都等待永遠被鎖住的對方。
在Java語言中沒有機制用於寫wait-free, lock-free算法。開發者必須使用本地代碼。
開發者必須編寫他們復雜的線程池邏輯,這樣會很棘手並且容易出錯。
Java Concurrency UtilitIEs的概述
JSR-166(Java concurrency utilitIEs),是Java5.0的一部分,通過著重在寬度並提供跨域大范圍並發編程風格的重要功能,大大簡化了在Java中並發應用程序的開發。
Java concurrency utilitIEs提供了多種功能,開發者可以應用於更快更有預見性的並發應用程序的開發。這些功能讓開發者從通過寫自定義代碼來重新發明wheel中解放出來。一些JSR-166的最顯著的特點是:
標准的接口和構架來定義自定義線程子系統。
是一種機制用於規范調用,時序安排,執行和異步任務的控制,這是根據在Executor構架中的一套執行政策。
是一種機制通過類用於線程協調,例如semaphore, mutexe, barrIEr, latche和 Exchangers。
非阻礙FIFO列隊執行(ConcurrentLinkedQueue類)用於可升級的,有效的線程安全操作。
阻礙列隊執行類來涵蓋最常見的使用情況,對於生成者/消費者方法,信息,並行任務和相關的並發設計。
是一種構架用於鎖定和等待不同於內置同步和監測器的條件。
隨時准備使用的類用於在單個變量上的鎖定自由,線程安全的編程。
開發一個Concurrent Java Application
本節是演示如何使用Java concurrency utility API來開發一個多線程的在線訂單程序的電子商務應用程序。在應用程序生效並授權命令之後,把它們放在訂單處理列隊(Java.util.concurrent.BlockingQueue)。訂單處理器線程池不斷的對訂單進行測驗,而且當這些訂單可以使用時進行處理。
解耦應用程序的訂單處理代碼提供了增加和減少訂單處理率的靈活性,通過改變線程池的大小。在一個並發BlockingQueue中放入訂單對象確保一個處理器處理一個訂單,而且它要照顧自動同步。
在以下小節中的代碼段是截取於伴隨本文中的應用程序源代碼。
擴展ThreadPoolExecutor
Executor接口只規定了一個方法並從任務如何運行中解耦任務提交。ExecutorService子接口規定了額外的方法用於提交並追蹤異步任務,以及關閉線程池。ThreadPoolExecutor類是ExecutorService接口的一個具體的執行,應該足以用於大多數訂單處理應用程序的需求。
ThreadPoolExecutor也提供有用的連接方法(e.g., beforeExecute),可以覆蓋定制目的。在Listing 1中的CustomThreadPoolExecutor類擴展了ThreadPoolExecutor類並覆蓋了beforeExecute, afterExecute和 terminated 方法。
@Override
public void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
Logger.log("After calling afterExecute() method for a thread "
+ r);
}
@Override
public void terminated() {
super.terminated();
Logger.log("Threadpool terminated");
}
該覆蓋方法只需登陸復寫信息,但是在現實的情況中,它們能做更有用的事情。例如,terminated方法可以發送一個在這個池中的所有線程都死鎖的警告。要正確構建多重覆蓋,通常你應該從在子類中的各個方法中調用主類的覆蓋方法。
Java concurrency API還提供了ScheduledExecutorService接口,可以擴展ExecutorServiceInterface,而且在一個特定延遲或是定期的執行之後能夠安排任務進行運行。ScheduledExecutorThreadPool是這個接口的具體執行。
確定異步任務執行
Executor 執行提交異步任務,這些任務執行實際的業務邏輯。向executor提交一個任務,ExecutorService接口提供重載的submit方法,可以接受Runnable 或是Callable 對象類型。
Runnable任務類型在以下情況下市非常有用的:
任務完成時不需要返回任何結果。
如果run()方法遇到一個例外,沒有必要拋出一個特定程序檢查例外。
移植現有的legacy 類來實施Runnable接口是必需的。
Callable任務類型提供了更多靈活性並提供了下列的優點:
任務可以返回用戶定義對象作為結果。
任務可以拋出用戶定義的檢查例外。
你需要分別為Runnable 和Callable任務類型執行run() 和call()方法。
在Listing 2中的OrderProcessorCallable類執行Callable接口並指定一個 Integer 作為結果對象。Constructor把任務對象名稱和BlockingQueue當做檢索命令來處理。call()方法繼續為訂單值對象調查BlockingQueue,並且處理任何所它所發現的事情。如果沒有訂單處理,call()方法會休息一段時間再繼續工作。
Call方法的無限循環在這個應用程序方案中是非常有用的,因為因為沒有必要一次又一次為每個訂單對象去創建並提交新的任務到ThreadPoolExecutor中。
public Integer call() throws OrderProcessingException {
while (running) {
// check if currend Thread is interrupted
checkInterruptStatus();
// poll for OrderVO from blocking queue and do
// order processing here
...
}
// return result
return processedCount;
}
請注意異步任務執行不斷的為應用程序的生命周期運行。在大多數程序中,異步任務執行會進行必要的操作並立即返回。
處理線程中斷
Call方法執行使用checkInterruptStatus方法在執行線程中斷上進行經常性檢查。這是必需的因為為了迫使任務取消,ThreadPoolExecutor會向線程發送一個intereupt。否則檢查中斷狀態會導致特定線程再也無法返回。以下的checkInterruptStatus方法檢查運行線程的中斷狀態,如果線程被中斷會拋出OrderProcessingException。
private void checkInterruptStatus() throws
OrderProcessingException {
if (Thread.interrupted()) {
throw new OrderProcessingException("Thread was interrupted");
}
}
作為任務的實施,它是拋出一個異常並在運行線程被中斷的時候終止任務執行的一個非常好的練習。但是,基於訂單處理的應用程序需求,在這種情況下忽略線程中斷是非常恰當的。