嗯,今天其實在看HtttpProcessor的實現,但是突然想到了以前在看poller的時候看到了有閉鎖,用於控制當前connector的連接數量,嗯,那就順便把這部分來看了。。。
在Tomcat中,通過繼承AbstractQueuedSynchronizer來實現了自己的同步工具,進而來實現了一個用於控制連接數量的閉鎖。。LimitLatch。。
這裡就需對AbstractQueuedSynchronizer有一些初步的了解。。。
首先它concurrent類庫中提供的一個用於構建自己的同步工具的一個工具類。。可以通過繼承他來快速的完成一個同步類的實現
(1)acquireSharedInterruptibly()方法,用於以共享的方式來獲取鎖,如果暫時無法獲取,將會將線程掛起到隊列,進行阻塞,對於這個方法是否最終能獲取鎖,是通過tryAcquireShared()方法的返回來定義的,這個方法需要自己實現。。。如果能獲取鎖,那麼返回1,否則返回-1.。。
(2)releaseShared()方法。以共享的方法釋放一個鎖,這樣前面提到的掛起的線程將會喚醒,進而重新嘗試獲取鎖。。。
好啦,接下來就來看看LimitLatch的定義吧,直接上代碼好了,。,。代碼還是很簡單的。。
//其實是通過AbstractQueuedSynchronizer來構建的 public class LimitLatch { private static final Log log = LogFactory.getLog(LimitLatch.class); //構建Sync類型,實現基本的同步,以及阻塞。。 private class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 1L; public Sync() { } @Override //用於增加計數,如果計數增加之後小於最大的,那麼返回1,不會阻塞,否則將會返回-1阻塞 protected int tryAcquireShared(int ignored) { //調用acquiredShared方法的時候會調用這個方法來返回狀態,如果返回1,那麼表示獲取成功,返回-1表示獲取失敗,將會阻塞 long newCount = count.incrementAndGet(); //先增加計數 if (!released && newCount > limit) { //如果當前已經超過了最大的限制 // Limit exceeded count.decrementAndGet(); //減少計數 return -1; //返回-1,將阻塞當前線程 } else { return 1; } } @Override //用於減少計數 protected boolean tryReleaseShared(int arg) { count.decrementAndGet(); return true; } } private final Sync sync; //同步對象 private final AtomicLong count; //計數器 private volatile long limit; //最大的數量 private volatile boolean released = false; //是否全部釋放 /** * Instantiates a LimitLatch object with an initial limit. * @param limit - maximum number of concurrent acquisitions of this latch */ public LimitLatch(long limit) { this.limit = limit; //最大限制 this.count = new AtomicLong(0); this.sync = new Sync(); //sync 對象 } /** * Returns the current count for the latch * @return the current count for latch */ public long getCount() { return count.get(); } /** * Obtain the current limit. */ public long getLimit() { return limit; } /** * Sets a new limit. If the limit is decreased there may be a period where * more shares of the latch are acquired than the limit. In this case no * more shares of the latch will be issued until sufficient shares have been * returned to reduce the number of acquired shares of the latch to below * the new limit. If the limit is increased, threads currently in the queue * may not be issued one of the newly available shares until the next * request is made for a latch. * * @param limit The new limit */ public void setLimit(long limit) { this.limit = limit; } /** * Acquires a shared latch if one is available or waits for one if no shared * latch is current available. */ //增加計數,如果太大,那麼等等待 public void countUpOrAwait() throws InterruptedException { if (log.isDebugEnabled()) { log.debug("Counting up["+Thread.currentThread().getName()+"] latch="+getCount()); } sync.acquireSharedInterruptibly(1); } /** * Releases a shared latch, making it available for another thread to use. * @return the previous counter value */ //減少計數 public long countDown() { sync.releaseShared(0); //釋放 long result = getCount(); if (log.isDebugEnabled()) { log.debug("Counting down["+Thread.currentThread().getName()+"] latch="+result); } return result; } /** * Releases all waiting threads and causes the {@link #limit} to be ignored * until {@link #reset()} is called. */ //通過將released設置為true,將會釋放所有的線程,知道reset了 public boolean releaseAll() { released = true; return sync.releaseShared(0); } /** * Resets the latch and initializes the shared acquisition counter to zero. * @see #releaseAll() */ //重制 public void reset() { this.count.set(0); released = false; } /** * Returnstrue
if there is at least one thread waiting to * acquire the shared lock, otherwise returnsfalse
. */ //當前是否有線程等待 public boolean hasQueuedThreads() { return sync.hasQueuedThreads(); } /** * Provide access to the list of threads waiting to acquire this limited * shared latch. */ //獲取所有等待的線程 public CollectiongetQueuedThreads() { return sync.getQueuedThreads(); } }
代碼應該還是很簡單的吧,而且注釋也算是說的比較清楚。。。其實是構建了一個繼承自AbstractQueuedSynchronizer的Sync對象,通過它來進行真正的同步功能。。。然後通過一個原子的整數計數器,和一個最大值,來判斷當前是否可以獲取鎖
好啦,這裡來看看Tomcat是如何通過LimitLatch來控制連接數量的吧,先來看看NioEndpoint的啟動方法:
//啟動當前的endpoint public void startInternal() throws Exception { if (!running) { running = true; //設置表示為,表示已經看是運行了 paused = false; //沒有暫停 // Create worker collection if ( getExecutor() == null ) { //如果沒有executor,那麼創建 createExecutor(); //創建executor } initializeConnectionLatch(); //初始化閉鎖,用於控制連接的數量 // Start poller threads pollers = new Poller[getPollerThreadCount()]; //根據設置的poller數量來創建poller對象的數組 for (int i=0; i
這裡調用了initializeConnectionLatch方法來初始化閉鎖,來看看吧:
//初始化閉鎖,用於控制連接的數量 protected LimitLatch initializeConnectionLatch() { if (maxConnections==-1) return null; //這個是無限的鏈接數量 if (connectionLimitLatch==null) { connectionLimitLatch = new LimitLatch(getMaxConnections()); //根據最大的鏈接數量來創建 } return connectionLimitLatch; }
我們知道在Connector的配置中可以設置最大的鏈接數量,其實這裡也就是通過這個數量來構建LimitLatch對象的。。。
嗯,Tomcat是從哪裡獲取連接呢,這個就要從Accecptor看了。。。
public void run() { int errorDelay = 0; // Loop until we receive a shutdown command while (running) { // Loop if endpoint is paused while (paused && running) { //如果暫停了 state = AcceptorState.PAUSED; //更改當前acceptor的狀態 try { Thread.sleep(50); } catch (InterruptedException e) { // Ignore } } if (!running) { //如果沒有運行,那麼這裡直接跳過 break; } state = AcceptorState.RUNNING; //設置當前acceptor的狀態是running try { //if we have reached max connections, wait countUpOrAwaitConnection(); //增減閉鎖的計數,如果connection數量已經達到了最大,那麼暫停一下,這裡用到的是connectionLimitLatch鎖,可以理解為一個閉鎖吧 SocketChannel socket = null; try { // Accept the next incoming connection from the server // socket socket = serverSock.accept(); //調用serversocket的accept方法 } catch (IOException ioe) { //we didn't get a socket countDownConnection(); //出了異常,並沒有獲取鏈接,那麼這裡減少閉鎖的計數 // Introduce delay if necessary errorDelay = handleExceptionWithDelay(errorDelay); // re-throw throw ioe; } // Successful accept, reset the error delay errorDelay = 0; // setSocketOptions() will add channel to the poller // if successful if (running && !paused) { if (!setSocketOptions(socket)) { //這裡主要是將socket加入到poller對象上面去,而且還要設置參數 countDownConnection(); //加入poller對象失敗了的話,那麼將閉鎖的計數減低 closeSocket(socket); //關閉剛剛 創建的這個socket } } else { countDownConnection(); closeSocket(socket); } } catch (SocketTimeoutException sx) { // Ignore: Normal condition } catch (IOException x) { if (running) { log.error(sm.getString("endpoint.accept.fail"), x); } } catch (OutOfMemoryError oom) { try { oomParachuteData = null; releaseCaches(); log.error("", oom); }catch ( Throwable oomt ) { try { try { System.err.println(oomParachuteMsg); oomt.printStackTrace(); }catch (Throwable letsHopeWeDontGetHere){ ExceptionUtils.handleThrowable(letsHopeWeDontGetHere); } }catch (Throwable letsHopeWeDontGetHere){ ExceptionUtils.handleThrowable(letsHopeWeDontGetHere); } } } catch (Throwable t) { ExceptionUtils.handleThrowable(t); log.error(sm.getString("endpoint.accept.fail"), t); } } state = AcceptorState.ENDED; //設置acceptor的狀態為ended }
這裡讀一下Accecptor,的run方法可以知道,每次在調用serverSocketChannel的accept方法之前都會調用countUpOrAwaitConnection方法來增加閉鎖的計數,如果有問題,那就會調用countDownConnection方法來降低閉鎖的計數。。。
其實這裡通過這兩個方法就知道他們是干嘛的了,先來看看countUpOrAwaitConnection吧:
//這裡用於增加閉鎖的計數 protected void countUpOrAwaitConnection() throws InterruptedException { if (maxConnections==-1) return; LimitLatch latch = connectionLimitLatch; if (latch!=null) latch.countUpOrAwait(); //增加閉鎖的counter }
沒啥意思吧,就是調用剛剛創建的閉鎖的countUpOrAwait方法,接下來來看看countDownConnection方法吧:
//用於減少閉鎖的計數 protected long countDownConnection() { if (maxConnections==-1) return -1; LimitLatch latch = connectionLimitLatch; if (latch!=null) { long result = latch.countDown(); if (result<0) { getLog().warn("Incorrect connection count, multiple socket.close called on the same socket." ); } return result; } else return -1; }
這個也沒啥意思吧。。。就是調用閉鎖的countDown方法。。。
嗯,到這裡整個Tomcat如何控制連接的數量就算是比較清楚了吧。。。
最後,我們知道是通過調用endpoint的cancelledKey方法來關閉一個連接的,來看看它的實現吧:
//取消一個注冊 public void cancelledKey(SelectionKey key, SocketStatus status) { try { if ( key == null ) return;//nothing to do KeyAttachment ka = (KeyAttachment) key.attachment(); if (ka != null && ka.isComet() && status != null) { ka.setComet(false);//to avoid a loop if (status == SocketStatus.TIMEOUT ) { if (processSocket(ka.getChannel(), status, true)) { return; // don't close on comet timeout } } else { // Don't dispatch if the lines below are canceling the key processSocket(ka.getChannel(), status, false); } } key.attach(null); //將附件設置為null if (ka!=null) handler.release(ka); //可以取消這個attachment了 else handler.release((SocketChannel)key.channel()); if (key.isValid()) key.cancel(); //取消key if (key.channel().isOpen()) { //如果channel還是打開的,那麼需要關閉channel try { key.channel().close(); } catch (Exception e) { if (log.isDebugEnabled()) { log.debug(sm.getString( "endpoint.debug.channelCloseFail"), e); } } } try { if (ka!=null) { ka.getSocket().close(true); //關閉sockt } } catch (Exception e){ if (log.isDebugEnabled()) { log.debug(sm.getString( "endpoint.debug.socketCloseFail"), e); } } try { if (ka != null && ka.getSendfileData() != null && ka.getSendfileData().fchannel != null && ka.getSendfileData().fchannel.isOpen()) { ka.getSendfileData().fchannel.close(); } } catch (Exception ignore) { } if (ka!=null) { ka.reset(); countDownConnection(); //降低用於維護連接數量的閉鎖 } } catch (Throwable e) { ExceptionUtils.handleThrowable(e); if (log.isDebugEnabled()) log.error("",e); } }
這裡可以看到調用了countDownConnection方法來降低閉鎖的計數。。
最後總結:Tomcat通過在acceptor中對閉鎖的獲取來控制總連接的數量,如果連接數量達到了最大的限制,那麼將會被阻塞。。直到有連接關閉為止。。。這樣acceptor的線程就又被喚醒了。。。