程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
 程式師世界 >> 編程語言 >> C語言 >> C++ >> C++入門知識 >> Tomcat源碼閱讀之閉鎖的實現與連接數量的控制

Tomcat源碼閱讀之閉鎖的實現與連接數量的控制

編輯:C++入門知識

嗯,今天其實在看HtttpProcessor的實現,但是突然想到了以前在看poller的時候看到了有閉鎖,用於控制當前connector的連接數量,嗯,那就順便把這部分來看了。。。

在Tomcat中,通過繼承AbstractQueuedSynchronizer來實現了自己的同步工具,進而來實現了一個用於控制連接數量的閉鎖。。LimitLatch。。

這裡就需對AbstractQueuedSynchronizer有一些初步的了解。。。

首先它concurrent類庫中提供的一個用於構建自己的同步工具的一個工具類。。可以通過繼承他來快速的完成一個同步類的實現

(1)acquireSharedInterruptibly()方法,用於以共享的方式來獲取鎖,如果暫時無法獲取,將會將線程掛起到隊列,進行阻塞,對於這個方法是否最終能獲取鎖,是通過tryAcquireShared()方法的返回來定義的,這個方法需要自己實現。。。如果能獲取鎖,那麼返回1,否則返回-1.。。

(2)releaseShared()方法。以共享的方法釋放一個鎖,這樣前面提到的掛起的線程將會喚醒,進而重新嘗試獲取鎖。。。


好啦,接下來就來看看LimitLatch的定義吧,直接上代碼好了,。,。代碼還是很簡單的。。

//其實是通過AbstractQueuedSynchronizer來構建的
public class LimitLatch {

    private static final Log log = LogFactory.getLog(LimitLatch.class);

    //構建Sync類型,實現基本的同步,以及阻塞。。
    private class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 1L;

        public Sync() {
        }

        @Override
        //用於增加計數,如果計數增加之後小於最大的,那麼返回1,不會阻塞,否則將會返回-1阻塞
        protected int tryAcquireShared(int ignored) {  //調用acquiredShared方法的時候會調用這個方法來返回狀態,如果返回1,那麼表示獲取成功,返回-1表示獲取失敗,將會阻塞
            long newCount = count.incrementAndGet();  //先增加計數
            if (!released && newCount > limit) {  //如果當前已經超過了最大的限制
                // Limit exceeded
                count.decrementAndGet();  //減少計數
                return -1;  //返回-1,將阻塞當前線程
            } else {
                return 1;
            }
        }

        @Override
        //用於減少計數
        protected boolean tryReleaseShared(int arg) {
            count.decrementAndGet();
            return true;
        }
    }

    private final Sync sync;  //同步對象
    private final AtomicLong count;  //計數器
    private volatile long limit;  //最大的數量
    private volatile boolean released = false;   //是否全部釋放

    /**
     * Instantiates a LimitLatch object with an initial limit.
     * @param limit - maximum number of concurrent acquisitions of this latch
     */
    public LimitLatch(long limit) {
        this.limit = limit;  //最大限制
        this.count = new AtomicLong(0);
        this.sync = new Sync();  //sync 對象
    }

    /**
     * Returns the current count for the latch
     * @return the current count for latch
     */
    public long getCount() {
        return count.get();
    }

    /**
     * Obtain the current limit.
     */
    public long getLimit() {
        return limit;
    }


    /**
     * Sets a new limit. If the limit is decreased there may be a period where
     * more shares of the latch are acquired than the limit. In this case no
     * more shares of the latch will be issued until sufficient shares have been
     * returned to reduce the number of acquired shares of the latch to below
     * the new limit. If the limit is increased, threads currently in the queue
     * may not be issued one of the newly available shares until the next
     * request is made for a latch.
     *
     * @param limit The new limit
     */
    public void setLimit(long limit) {
        this.limit = limit;
    }


    /**
     * Acquires a shared latch if one is available or waits for one if no shared
     * latch is current available.
     */
    //增加計數,如果太大,那麼等等待
    public void countUpOrAwait() throws InterruptedException {
        if (log.isDebugEnabled()) {
            log.debug("Counting up["+Thread.currentThread().getName()+"] latch="+getCount());
        }
        sync.acquireSharedInterruptibly(1);
    }

    /**
     * Releases a shared latch, making it available for another thread to use.
     * @return the previous counter value
     */
    //減少計數
    public long countDown() {
        sync.releaseShared(0);  //釋放
        long result = getCount();
        if (log.isDebugEnabled()) {
            log.debug("Counting down["+Thread.currentThread().getName()+"] latch="+result);
    }
        return result;
    }

    /**
     * Releases all waiting threads and causes the {@link #limit} to be ignored
     * until {@link #reset()} is called.
     */
    //通過將released設置為true,將會釋放所有的線程,知道reset了
    public boolean releaseAll() {
        released = true;
        return sync.releaseShared(0);
    }

    /**
     * Resets the latch and initializes the shared acquisition counter to zero.
     * @see #releaseAll()
     */
    //重制
    public void reset() {
        this.count.set(0);
        released = false;
    }

    /**
     * Returns true if there is at least one thread waiting to
     * acquire the shared lock, otherwise returns false.
     */
    //當前是否有線程等待
    public boolean hasQueuedThreads() {
        return sync.hasQueuedThreads();
    }

    /**
     * Provide access to the list of threads waiting to acquire this limited
     * shared latch.
     */
    //獲取所有等待的線程
    public Collection getQueuedThreads() {
        return sync.getQueuedThreads();
    }
}

代碼應該還是很簡單的吧,而且注釋也算是說的比較清楚。。。其實是構建了一個繼承自AbstractQueuedSynchronizer的Sync對象,通過它來進行真正的同步功能。。。然後通過一個原子的整數計數器,和一個最大值,來判斷當前是否可以獲取鎖


好啦,這裡來看看Tomcat是如何通過LimitLatch來控制連接數量的吧,先來看看NioEndpoint的啟動方法:

    //啟動當前的endpoint
    public void startInternal() throws Exception {

        if (!running) {
            running = true;  //設置表示為,表示已經看是運行了
            paused = false;  //沒有暫停

            // Create worker collection
            if ( getExecutor() == null ) {  //如果沒有executor,那麼創建
                createExecutor();   //創建executor
            }

            initializeConnectionLatch();   //初始化閉鎖,用於控制連接的數量

            // Start poller threads
            pollers = new Poller[getPollerThreadCount()];   //根據設置的poller數量來創建poller對象的數組
            for (int i=0; i

這裡調用了initializeConnectionLatch方法來初始化閉鎖,來看看吧:

    //初始化閉鎖,用於控制連接的數量
    protected LimitLatch initializeConnectionLatch() {
        if (maxConnections==-1) return null;  //這個是無限的鏈接數量
        if (connectionLimitLatch==null) {
            connectionLimitLatch = new LimitLatch(getMaxConnections());  //根據最大的鏈接數量來創建
        }
        return connectionLimitLatch;
    }

我們知道在Connector的配置中可以設置最大的鏈接數量,其實這裡也就是通過這個數量來構建LimitLatch對象的。。。

嗯,Tomcat是從哪裡獲取連接呢,這個就要從Accecptor看了。。。

 public void run() {

            int errorDelay = 0;

            // Loop until we receive a shutdown command
            while (running) {

                // Loop if endpoint is paused
                while (paused && running) {  //如果暫停了
                    state = AcceptorState.PAUSED;  //更改當前acceptor的狀態
                    try {
                        Thread.sleep(50);  
                    } catch (InterruptedException e) {
                        // Ignore
                    }
                }

                if (!running) {  //如果沒有運行,那麼這裡直接跳過
                    break;
                }
                state = AcceptorState.RUNNING;  //設置當前acceptor的狀態是running

                try {
                    //if we have reached max connections, wait
                    countUpOrAwaitConnection();  //增減閉鎖的計數,如果connection數量已經達到了最大,那麼暫停一下,這裡用到的是connectionLimitLatch鎖,可以理解為一個閉鎖吧

                    SocketChannel socket = null;
                    try {
                        // Accept the next incoming connection from the server
                        // socket
                        socket = serverSock.accept();  //調用serversocket的accept方法
                    } catch (IOException ioe) {
                        //we didn't get a socket
                        countDownConnection();  //出了異常,並沒有獲取鏈接,那麼這裡減少閉鎖的計數
                        // Introduce delay if necessary
                        errorDelay = handleExceptionWithDelay(errorDelay);
                        // re-throw
                        throw ioe;
                    }
                    // Successful accept, reset the error delay
                    errorDelay = 0;

                    // setSocketOptions() will add channel to the poller
                    // if successful
                    if (running && !paused) {
                        if (!setSocketOptions(socket)) {  //這裡主要是將socket加入到poller對象上面去,而且還要設置參數
                            countDownConnection();  //加入poller對象失敗了的話,那麼將閉鎖的計數減低
                            closeSocket(socket);  //關閉剛剛 創建的這個socket
                        }
                    } else {
                        countDownConnection();
                        closeSocket(socket);
                    }
                } catch (SocketTimeoutException sx) {
                    // Ignore: Normal condition
                } catch (IOException x) {
                    if (running) {
                        log.error(sm.getString("endpoint.accept.fail"), x);
                    }
                } catch (OutOfMemoryError oom) {
                    try {
                        oomParachuteData = null;
                        releaseCaches();
                        log.error("", oom);
                    }catch ( Throwable oomt ) {
                        try {
                            try {
                                System.err.println(oomParachuteMsg);
                                oomt.printStackTrace();
                            }catch (Throwable letsHopeWeDontGetHere){
                                ExceptionUtils.handleThrowable(letsHopeWeDontGetHere);
                            }
                        }catch (Throwable letsHopeWeDontGetHere){
                            ExceptionUtils.handleThrowable(letsHopeWeDontGetHere);
                        }
                    }
                } catch (Throwable t) {
                    ExceptionUtils.handleThrowable(t);
                    log.error(sm.getString("endpoint.accept.fail"), t);
                }
            }
            state = AcceptorState.ENDED;  //設置acceptor的狀態為ended
        }

這裡讀一下Accecptor,的run方法可以知道,每次在調用serverSocketChannel的accept方法之前都會調用countUpOrAwaitConnection方法來增加閉鎖的計數,如果有問題,那就會調用countDownConnection方法來降低閉鎖的計數。。。

其實這裡通過這兩個方法就知道他們是干嘛的了,先來看看countUpOrAwaitConnection吧:

    //這裡用於增加閉鎖的計數
    protected void countUpOrAwaitConnection() throws InterruptedException {
        if (maxConnections==-1) return;
        LimitLatch latch = connectionLimitLatch;
        if (latch!=null) latch.countUpOrAwait();  //增加閉鎖的counter
    }

沒啥意思吧,就是調用剛剛創建的閉鎖的countUpOrAwait方法,接下來來看看countDownConnection方法吧:

    //用於減少閉鎖的計數
    protected long countDownConnection() {
        if (maxConnections==-1) return -1;
        LimitLatch latch = connectionLimitLatch;
        if (latch!=null) {
            long result = latch.countDown();
            if (result<0) {
                getLog().warn("Incorrect connection count, multiple socket.close called on the same socket." );
            }
            return result;
        } else return -1;
    }

這個也沒啥意思吧。。。就是調用閉鎖的countDown方法。。。

嗯,到這裡整個Tomcat如何控制連接的數量就算是比較清楚了吧。。。

最後,我們知道是通過調用endpoint的cancelledKey方法來關閉一個連接的,來看看它的實現吧:

     //取消一個注冊
        public void cancelledKey(SelectionKey key, SocketStatus status) {
            try {
                if ( key == null ) return;//nothing to do
                KeyAttachment ka = (KeyAttachment) key.attachment();
                if (ka != null && ka.isComet() && status != null) {
                    ka.setComet(false);//to avoid a loop
                    if (status == SocketStatus.TIMEOUT ) {
                        if (processSocket(ka.getChannel(), status, true)) {
                            return; // don't close on comet timeout
                        }
                    } else {
                        // Don't dispatch if the lines below are canceling the key
                        processSocket(ka.getChannel(), status, false);
                    }
                }
                key.attach(null);  //將附件設置為null
                if (ka!=null) handler.release(ka);  //可以取消這個attachment了
                else handler.release((SocketChannel)key.channel());
                if (key.isValid()) key.cancel();  //取消key
                if (key.channel().isOpen()) {  //如果channel還是打開的,那麼需要關閉channel
                    try {
                        key.channel().close();
                    } catch (Exception e) {
                        if (log.isDebugEnabled()) {
                            log.debug(sm.getString(
                                    "endpoint.debug.channelCloseFail"), e);
                        }
                    }
                }
                try {
                    if (ka!=null) {
                        ka.getSocket().close(true); //關閉sockt
                    }
                } catch (Exception e){
                    if (log.isDebugEnabled()) {
                        log.debug(sm.getString(
                                "endpoint.debug.socketCloseFail"), e);
                    }
                }
                try {
                    if (ka != null && ka.getSendfileData() != null
                            && ka.getSendfileData().fchannel != null
                            && ka.getSendfileData().fchannel.isOpen()) {
                        ka.getSendfileData().fchannel.close();
                    }
                } catch (Exception ignore) {
                }
                if (ka!=null) {
                    ka.reset();
                    countDownConnection();  //降低用於維護連接數量的閉鎖
                }
            } catch (Throwable e) {
                ExceptionUtils.handleThrowable(e);
                if (log.isDebugEnabled()) log.error("",e);
            }
        }

這裡可以看到調用了countDownConnection方法來降低閉鎖的計數。。


最後總結:Tomcat通過在acceptor中對閉鎖的獲取來控制總連接的數量,如果連接數量達到了最大的限制,那麼將會被阻塞。。直到有連接關閉為止。。。這樣acceptor的線程就又被喚醒了。。。

  1. 上一頁:
  2. 下一頁:
Copyright © 程式師世界 All Rights Reserved