重新認識中斷
之前在正確終止與恢復線程一文中介紹了使用Thread類的interrupt方法和使用標志位實現線程的終止。由於之前只是簡單介紹了jdk默認中斷方法的問題,對線程的中斷機制沒有深入介紹。為了正確終止線程,深刻理解線程中斷的本質是很有必要的。Java沒有提供可搶占的安全的中斷機制,但是Java提供了線程協作機制(之前說的interrupt方法和標志位本質上都屬於線程之間協作的手段),但是提供了中斷機制,中斷機制允許一個線程終止另一個線程的當前工作,所以需要在程序設計的時候考慮到中斷的位置和時機。
回到之前使用volatile類型的標志位來終止線程的例子,在代碼中調用cancel方法來取消i的自增請求,如果Runner線程在下次執行,或者正要執行下一次自增請求時判斷on的時是否變為了false,如果是則終止執行。
根據運行結果,Runner的計數任務最終會被取消,然後退出。在Runner線程最終取消執行之前,會有一定的時間,如果在在這個時間內,調用此方法的任務調用了一個會阻塞的方法,比如BlockingQueue的put方法,那麼可能該任務一直違法檢測到on的值變為false,因而Runner線程不會終止。
一個例子
比如下面的代碼就說明了這一點:
package com.rhwayfun.patchwork.concurrency.r0411;
import java.math.BigInteger;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
/**
* Created by rhwayfun on 16-4-11.
*/
public class BrokenShutdownThread extends Thread {
//是否繼續運行的標志
private static volatile boolean on = true;
//阻塞隊列
private final BlockingQueue queue;
public BrokenShutdownThread(BlockingQueue queue) {
this.queue = queue;
}
public void run() {
try {
BigInteger p = BigInteger.ONE;
while (on) {
//生產者一次可以放40個數
for (int i = 0; i < 40; i++){
queue.put(p = p.nextProbablePrime());
System.out.println(Thread.currentThread().getName() + ": put value " + p);
}
}
} catch (InterruptedException e) {}
}
public void cancel() {
on = false;
}
/**
* 消費者線程
*/
static class Consumer extends Thread{
//阻塞隊列
private final BlockingQueue queue;
public Consumer(BlockingQueue queue) {
this.queue = queue;
}
@Override
public void run() {
try {
while (on) {
//消費者一次只能消費1個數
System.out.println(Thread.currentThread().getName() + ": get value " + queue.take());
}
System.out.println("work done!");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) throws InterruptedException {
BlockingQueue queue = new LinkedBlockingQueue<>(5);
BrokenShutdownThread producer = new BrokenShutdownThread(queue);
//啟動計數線程
producer.start();
TimeUnit.SECONDS.sleep(1);
new Consumer(queue).start();
TimeUnit.SECONDS.sleep(1);
producer.cancel();
}
}
運行上面的程序,發現雖然控制台輸出了work done!
的信息,但是程序仍然沒有停止,仔細分析就會發現生產者的速度(40個數/次)遠大於消費者的速度(1個數/次),造成隊列被填滿,put方法被阻塞。雖然在運行一秒後調用cancel方法將volatile變量on設為了false,但是由於生產者線程的put方法被阻塞,所以無法從阻塞的put方法中恢復,自然程序就無法終止了。
重新認識中斷
每個線程都有一個boolean類型的中斷狀態。當中斷線程時,中斷狀態被設為true。通過Thread的三個方法可以進行不同的中斷操作:
public void interrupt() {...}
public static boolean interrupted() {...}
public boolean isInterrupted() {...}
執行interrupt方法能夠中斷線程,interrupted可以清除線程的中斷狀態,isInterrupted方法可以返回當前線程的中斷狀態。
當線程調用會阻塞的方法,比如wait()、sleep()等方法時,線程會檢查自己的中斷狀態,並且在發生中斷時提前返回。這些阻塞的方法響應中斷的操作是清除中斷狀態,拋出InterruptedException。拋出InterruptedException的作用是表示線程由於中斷需要提前結束。調用interrupt方法執行中斷的本質是調用interrupt方法並不會立即停止目標線程正在執行的工作,只是傳遞了請求中斷的消息。然後線程會在下一個時刻中斷自己。當收到中斷請求時拋出InterruptedException,讓線程有選擇中斷策略的自由。一般而言,調用代碼需要對拋出的InterruptedException進行額外的處理,直接屏蔽該異常是不正確的(也就是直接調用printStackTrace()方法)。屏蔽中斷異常的後果是調用棧的上層無法對中斷請求做出響應。
對上面代碼的修正
根據以上的分析只需要對代碼做如下的修改就能正確終止線程:
public void run() {
try {
BigInteger p = BigInteger.ONE;
while (on && !Thread.currentThread().isInterrupted()) {
//生產者一次可以放40個數
for (int i = 0; i < 40; i++){
queue.put(p = p.nextProbablePrime());
System.out.println(Thread.currentThread().getName() + ": put value " + p);
}
}
} catch (InterruptedException e) {
//讓線程退出
return;
}
}
public void cancel() {
on = false;
interrupt();
}
static class Consumer extends Thread{
//阻塞隊列
private final BlockingQueue queue;
public Consumer(BlockingQueue queue) {
this.queue = queue;
}
@Override
public void run() {
try {
while (on && !Thread.currentThread().isInterrupted()) {
//消費者一次只能消費1個數System.out.println(Thread.currentThread().getName() + ": get value " + queue.take());
}
System.out.println("work done!");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
而其他代碼保持不變,再次運行以上的程序,發現能夠正確終止了。主要就是使用中斷機制完成了線程之間的協作,從而達到正確終止線程的目的。
實際上在調用可阻塞的方法時拋出的InterruptedException是為了讓調用者能夠注意到中斷信息,使得調用者可以就中斷做出自己的操作。往往在將中斷信息傳給調用者之前需要執行其他操作,如果在線程中使用中斷機制完成線程之間的協作,那麼就應該調用Thread.currentThread().intrrupt()恢復當前線程的中斷狀態
,這樣當前線程就能夠繼續其他操作了。正常情況下,都需要對中斷進行響應,除非自己實現了中斷所應該進行的操作。
為了取消線程的執行,除了之前的方法,還可以使用Future.get(Long time,TimeUnit unit)的帶超時限制的方法取消線程的執行,如果沒有在指定的時間內完成任務,那麼可以在代碼中直接調用Future.cancel()方法取消任務的執行。取消任務的時候有兩種情況:一是任務在指定的時間完成了,這個時候調用取消操作沒有什麼影響;二是任務沒有在指定的時間完成,那麼調用cancel方法後任務將被中斷。
偽代碼如下:
Future task = threadPool.submit(runnable);
try{
}catch(TimeOutException e){
//會取消任務的執行
}catch(ExecutionException e){
//如果在任務中拋出了執行異常,則重新拋出該異常
throw(new Throwable(e.getCause()));
}finally{
//true表示正在執行的任務能夠接收中斷,如果在執行則線程能被中斷
//如果為false,則表示若任務還沒有啟動則不要啟動該任務
task.cancel(true);
}
實現線程取消的完整例子
這裡以日志服務作為例子,業務場景是這樣的:前台會有多個生產者調用日志服務輸出程序的日志,生產者將需要輸出的日志信息放入一個隊列中,後台服務器有一個消費者線程,負責從隊列中取出日志信息並輸出(目的地可能不同)。顯然這是一個典型的生產者-消費者問題,不過這裡出現了多個生產者,但是只有一個消費者。顯然如果生產者的速度遠遠大於消費者的處理速度的話,很可能造成阻塞,不過這點已經再上面的分析中得到了解決。現在需要實現的是,提供可靠的關閉日志服務的方法,在前台調用服務接口可以正確停止日志服務,而不會出現任何問題。
實現代碼如下:
package com.rhwayfun.patchwork.concurrency.r0411;
import java.io.PrintWriter;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
/**
* Created by rhwayfun on 16-4-11.
*/
public class LoggerService {
// 存放日志消息的阻塞隊列
private final BlockingQueue logQueue;
// 打印日志的消費者線程
private final LoggerThread loggerThread;
// 打印日志的打印器
private PrintWriter writer;
// 日志服務是否關閉的標志
private boolean isShutdown;
// 執行log方法的調用者的計數器
private int reservations;
public LoggerService(PrintWriter writer) {
this.logQueue = new LinkedBlockingQueue<>(5);
this.loggerThread = new LoggerThread(writer);
}
/**
* 啟動日志服務
*/
public void start() {
loggerThread.start();
}
/**
* 記錄日志
*
* @param msg
* @throws InterruptedException
*/
public void recordLog(String msg) throws InterruptedException {
// 有條件保持對日志的添加
// 並且在接收到關閉請求時停止往隊列中填入日志
synchronized (this) {
if (isShutdown) throw new IllegalStateException("LoggerService is shutdown!");
++reservations;
}
// 由生產者將消息放入隊列
// 這裡不放入synchronized塊是因為put方法有阻塞的作用
logQueue.put(msg);
}
/**
* 停止日志服務
*/
public void stop() {
// 以原子方式檢查關閉請求
synchronized (this) {
isShutdown = true;
}
// 讓消費者線程停止從隊列取日志
loggerThread.interrupt();
}
/**
* 消費者線程
*/
private class LoggerThread extends Thread {
private PrintWriter writer;
public LoggerThread(PrintWriter writer) {
this.writer = writer;
}
@Override
public void run() {
try {
while (true) {
try {
// 持有的鎖與之前的相同
// 如果接收到應用程序的關閉請求並且沒有生產者線程繼續往隊列填入日志
// 那麼就結束循環,消費者線程終止
synchronized (LoggerService.this) {
if (isShutdown && reservations == 0) break;
}
// 從隊列獲取生產者的日志
String msg = logQueue.take();
// 每輸出一條日志就減少一個線程
synchronized (LoggerService.this) {
--reservations;
}
writer.println("Read: " + msg);
} catch (InterruptedException e) {
//恢復中斷狀態
Thread.currentThread().interrupt();
}
}
} finally {
writer.close();
}
}
}
/**
* 生產者線程
*/
private static class LoggerWriter implements Runnable {
private LoggerService service;
private final DateFormat format = new SimpleDateFormat("HH:mm:ss");
public LoggerWriter(LoggerService service) {
this.service = service;
}
@Override
public void run() {
try {
String msg = "time is " + format.format(new Date());
System.out.println("Write: " + msg);
service.recordLog(msg);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
public static void main(String[] args) throws InterruptedException {
LoggerService service = new LoggerService(new PrintWriter(System.out));
//創建多個生產者線程負責創建日志
for (int i = 0; i < 5; i++) {
new Thread(new LoggerWriter(service)).start();
TimeUnit.SECONDS.sleep(1);
}
//啟動日志服務
service.start();
//休眠10秒
TimeUnit.SECONDS.sleep(10);
//關閉日志服務
service.stop();
}
}
小結
Java沒有提供搶占式安全終止線程的機制,但是使用線程的中斷機制可以很好實現線程的終止除了標志位使用FutureTask和Executor框架也能實現線程的終止,這裡主要使用的是FutureTask的cancel方法除非在程序中自己實現中斷策略,不然不要對中斷異常進行屏蔽拋出InterruptedException的目的可以使得上層調用者可以接收中斷信息,並對中斷做出自己的操作如果需要在將中斷信息傳遞給上層調用者之前做其他的操作,需要調用Thread.currentThread().interrupt()
恢復當前線程的中斷狀態如果使用線程池執行任務,那麼可以時使用其shutdown方法或者shutdownNow方法完成線程的終止。