(5) runIt方法
public synchronized void runIt(ThreadPoolRunnable
toRun) {
this.toRun =
toRun;
shouldRun = true;
this.notify();
}
該方法主要是運行一個指定的任務,具體的任務都被封裝在ThreadPoolRunnable接口裡,該方法要注意以下幾點:
Ø 該方法對每個線程僅被調用一次
Ø 調用該方法不是馬上運行ThreadPoolRunnable指定的任務,而是通知ControlRunnable”可以執行任務”。
具體的任務執行在下面的run方法裡。
(6) Run方法
public void run() {
try {
while(true) {
try {
synchronized(this) {
//當既不運行也不停止時,等待
if(!shouldRun && !shouldTerminate) {
this.wait();
}
}
//停止
if( shouldTerminate ) {
if( ThreadPool.log.isDebugEnabled())
ThreadPool.log.debug( "Terminate");
break;
}
try {
//初始化線程數據,僅一次
if(noThData) {
if(
toRun != null ) {
Object thData[]=toRun.getInitData();
t.setThreadData(p, thData);
}
noThData = false;
}
//執行操作
if(shouldRun) {
//運行ThreadRunnalbe接口
if(
toRun != null ) {
toRun.runIt(t.getThreadData(p));
//ControlRunnable也提供一般Runnable接口參與處理的機會
} else if( toRunRunnable != null ) {
toRunRunnable.run();
} else {
if( ThreadPool.log.isDebugEnabled())
ThreadPool.log.debug( "No
toRun ???");
}
}
} catch(Throwable t) {
//發生致命錯誤,從池中刪除線程
shouldTerminate = true;
shouldRun = false;
p.notifyThreadEnd(this);
} finally {
//運行結束回收線程
if(shouldRun) {
shouldRun = false;
p.returnController(this);
}
}
if(shouldTerminate) {
break;
}
} catch(InterruptedException IE) {
//當執行wait時可能發生的異常(盡管這種異常不太可能發生)
p.log.error("Unexpected exception", IE);
}
}
} finally {
//線程池停止或線程運行中發生錯誤時,從池中刪除線程
p.removeThread(Thread.currentThread());
}
}
結合runIt方法,run方法能很容易看懂。
4. org.apache.tomcat.util.threads.ThreadPoolListener
前面我們曾提到過,該接口時Observer模式的Observer對象,該接口定義了兩個方法:
//當線程被創建時執行的方法
public void threadStart( ThreadPool tp, Thread t);
//當線程被停止時執行的方法
public void threadEnd( ThreadPool tp, Thread t);
關於該接口的詳細使用可以參考上面提到的Observer模式。
5. org.apache.tomcat.util.threads.ThreadWithAttributes
ThreadWithAttributes是一個特殊的線程,該線程用來存放其他線程的屬性和數據,並且該類提供了類似ThreadLocal的功能,但比ThreadLocal效率更高。
(1) 構造函數ThreadWithAttributes
public ThreadWithAttributes(Object control, Runnable r) {
super(r);
this.control=control;
}
用control(ThreadPool)和r(ControlRunnable)構造實例(具體可參見ControlRunnable的構造方法)
(2) setNote方法
public final void setNote( Object control, int id, Object value ) {
if( this.control != control ) return;
notes[id]=value;
}
Ø 用ControlRunnable構造一個新的ThreadWithAttributes對象避免了線程公用數據的爭奪
Ø 根據control設置線程屬性,通過control可以阻止非信任的代碼操作線程屬性。
對其他操作線程屬性的方法都比較簡單就不再一一列出。
(3) Java的ThreadLocal
java.lang.ThreadLocal是在Java1.2中出現的“線程局部變量”,它為每個使用它的線程提供單獨的線程局部變量值的副本。每個線程只能看到與自己相聯系的值,而不知道別的線程可能正在使用或修改它們自己的副本。“線程局部變量”是一種能簡化多線程編程的好方法,可惜的是多數開發者可能不了解它。具體的信息可以參考:
http://www-900.ibm.com/developerWorks/cn/Java/j-threads/index3.sHtml
6. org.apache.tomcat.util.threads.ThreadPoolRunnable
前面我們提到過,如果想把自己的代碼嵌入到線程池內部被執行,就必須實現該接口。具體可以參照ControlRunnable的run方法。這個接口定義了下面兩個方法:
(1) getInitData方法
public Object[] getInitData();
取得運行該對象所需要的初始化數據,對池中所有的線程來說應該返回相同類型的數據,否則處理機制將變的很復雜。
(2) runIt方法
public void runIt(Object thData[]);
嵌入執行的代碼將在這個方法裡得以體現,以後我們將會看到,對TCP Connection得處理也是在這裡進行的。
至此,Tomcat ThreadPool的介紹就算基本結束,對Tomcat ThreadPool始終要把握住下面幾點:
Ø Tomcat ThreadPool僅提供了對線程的管理維護功能
Ø 池所執行的操作有外部組件去實現
Ø 從池的設計可以看出一點面向組件(COP)編程的痕跡
二.ThreadPool在處理TCP Connection中的應用
在接下來的內容中我們將演示Tomat是如何在指定的端口監聽HTTP連接,並利用ThreadPool生成一個線程處理接受的請求。
1. org.apache.tomcat.util.Net.PoolTcpEndpoint
類PoolTcpEndpoint主要是被用來處理接受到的HTTP連接,處理方式是處理原始的Socket,下面我們看幾個重要的方法:
(1) initEndpoint方法
對該方法,現在我們可以暫時不要考慮太多,只要知道在初始化ServerSocket的工作就足夠了。
public void initEndpoint() throws IOException, InstantiationException {
try {
//創建ServerSocket工廠
if(factory==null)
factory=ServerSocketFactory.getDefault();
//創建ServerSocket,將被用於在指定的端口(8080)監聽連接
if(serverSocket==null) {
try {
if (inet == null) {
serverSocket = factory.createSocket(port, backlog);
} else {
serverSocket = factory.createSocket(port, backlog, inet);
}
} catch ( BindException be ) {
throw new BindException(be.getMessage() + ":" + port);
}
}
//設定連接的超時限制時間
if( serverTimeout >= 0 )
serverSocket.setSoTimeout( serverTimeout );
} catch( IOException ex ) {
throw ex;
} catch( InstantiationException ex1 ) {
throw ex1;
}
//保證初始化一次
initialized = true;
}
(2) startEndpoint方法
該方法將在Tocmat啟動時被調用,主要作用時啟動線程池並生成監聽線程。
public void startEndpoint() throws IOException, InstantiationException {
if (!initialized) {
initEndpoint();
}
//tp是外部組件傳進來的ThreadPool對象,這裡Tomcat啟動了該線程池
if(isPool) {
tp.start();
}
running = true;
//生成工作線程監聽HTTP連接
if(isPool) {
listener = new TcpWorkerThread(this);
tp.runIt(listener);
} else {
log.error("XXX Error - need pool !");
}
}
下面將向大家描述,工作線程是如何監聽HTTP連接的:
2. org.apache.tomcat.util.Net.TcpWorkerThread
該類是PoolTcpEndpoint的內部類,它實現了ThreadPoolRunnable接口執行HTTP連接監聽和請求處理。(class TcpWorkerThread implements ThreadPoolRunnable)
(1) 構造函數TcpWorkerThread
該方法的主要目的是通過PoolTcpEndpoint對象生成一個實例,並且在緩存中生成一定數量的TcpConnection對象。
public TcpWorkerThread(PoolTcpEndpoint endpoint) {
this.endpoint = endpoint;
if( usePool ) {
//緩存初始化SimplePool為緩存對象,可先不理會其實現細節
connectionCache = new SimplePool(endpoint.getMaxThreads());
for(int i = 0;i< endpoint.getMaxThreads()/2 ; i++) {
connectionCache.put(new TcpConnection());
}
}
}
我們目的是先弄清楚HTTP的監聽及處理,對其他細節可先不於深究。
(2) getInitData方法
對該方法的描述前面已經說過,大家還記得否?本方法主要是取得線程的初始化數據。
public Object[] getInitData() {
if( usePool ) {
return endpoint.getConnectionHandler().init();
} else {
Object obj[]=new Object[2];
//第二個參數存放HTTP請求處理器(可先不考慮細節)
obj[1]= endpoint.getConnectionHandler().init();
//第一個參數存放TcpConnection對象
obj[0]=new TcpConnection();
return obj;
}
}
關於第二個參數,其實是初始化了HTTP請求處理器及其他的信息,大家可先不究其細節。只要能認識到這個方法是返回線程初始化數據即可。
(3) runIt方法
前面我們說過,嵌入到線程池執行的代碼要寫在這個方法裡,這個方法是HTTP監聽的核心,我們看具體實現:
public void runIt(Object perThrData[]) {
if (endpoint.isRunning()) {
Socket s = null;
//在指定的端口(8080)監聽客戶端連接
try {
s = endpoint.acceptSocket();
} finally {
//當接受到一個連接後繼續啟動下一個線程進行監聽
if (endpoint.isRunning()) {
endpoint.tp.runIt(this);
}
}
if (null != s) {
try {
if(endpoint.getServerSocketFactory()!=null) {
//客戶端與服務器第一次握手,主要用於SSI連接(即https) endpoint.getServerSocketFactory().handshake(s);
}
} catch (Throwable t) {
PoolTcpEndpoint.log.debug("Handshake failed", t);
try {
s.close();
} catch (IOException e) {
}
return;
}
TcpConnection con = null;
try {
if( usePool ) {
//從緩存中取一個TcpConnection對象
con=(TcpConnection)connectionCache.get();
if( con == null ) {
con = new TcpConnection();
}
} else {
//若不使用緩存從初始化數據中取一個TcpConnection對象
con = (TcpConnection) perThrData[0];
perThrData = (Object []) perThrData[1];
}
//設定剛生成TcpConnection對象
con.setEndpoint(endpoint);
con.setSocket(s);
endpoint.setSocketOptions( s );
//把TcpConnection及所需要的初始化數據傳給HTTP處理器處理
//在Process處理中將把原始的Socket流解析成Request對象傳
//給容器調用
endpoint.getConnectionHandler().processConnection(con, perThrData);
} catch (SocketException se) {
try {
s.close();
} catch (IOException e) {}
} catch (Throwable t) {
try {
s.close();
} catch (IOException e) {}
} finally {
if (con != null) {
con.recycle();
if (usePool) {
connectionCache.put(con);
}
}
}
}
}
}
請大家仔細而反復的多看一下上面帶陰影的注釋。通過上面我們看到工作線程作了如下的工作:
Ø 啟動了線程池(線程池啟動時將生成指定數量的線程及監視線程)
Ø 如果使用緩沖處理則預先生成指定數量的TcpConnection對象
Ø 在指定的端口(默認是8080)監聽HTTP連接
Ø 當接收的一個連接時再啟動一個線程繼續監聽連接
Ø 用接收的連接生成TcpConnection對象,即Tomcat對HTTP的處理是以TcpConnection對象為基礎的
Ø 把生成的TcpConnection對象交由HTTP Process進行Socket解析,最終生成Request對象
要注意的是:Tomcat並不是事先用指定數量的線程在端口監聽,而是當一個監聽完成後再啟動下一個監聽線程。