本文原創,轉載請注明出處。
參考文章:
《“JUC鎖”03之 公平鎖(一)》
《“JUC鎖”03之 公平鎖(二)》
鎖分獨占鎖與共享鎖,公平鎖與非公平鎖,悲觀鎖與樂觀鎖,可重入鎖與不可重入鎖,相關概念可查看其它文章。
Lock操作:加鎖(lock),解鎖(unlock),創建條件對象(newCondition)。
Condition操作:等待(await),通知(signal)。
ReentrantLock:是一個可重入鎖,獨占鎖,由構造參數決定是公平鎖還是非公平鎖。
ReadWriteLock的操作就是獲取讀取鎖與改寫鎖。
ReentrantReadWriteLock:它的讀取鎖與改寫鎖都是可重入鎖,並且由構造參數決定是公平鎖還是非公平鎖,其中讀取鎖是共享鎖,改寫鎖是獨占鎖。
/** * @since 1.5 * @author Doug Lea */ public class ReentrantLock implements Lock, java.io.Serializable { private static final long serialVersionUID = 7373984872572414699L; private final Sync sync; public ReentrantLock() { // 默認使用非公平鎖 sync = new NonfairSync(); } public ReentrantLock(boolean fair) { // 由參數決定使用公平鎖還是非公平鎖 sync = fair ? new FairSync() : new NonfairSync(); } abstract static class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = -5179523762034025860L; abstract void lock(); final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); // 判斷鎖是否未被持有 if (c == 0) { // 嘗試占用鎖 if (compareAndSetState(0, acquires)) { // 占用成功後,更新鎖所屬線程為當前線程 setExclusiveOwnerThread(current); // 返回占用成功 return true; } } // 如果鎖已被占用,判斷當前線程是否持有鎖 else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; // 不允許鎖被釋放,否則視為系統錯誤 if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); // 更新鎖的持有次數 setState(nextc); // 返回占用成功 return true; } // 返回占用失敗 return false; } protected final boolean tryRelease(int releases) { int c = getState() - releases; // 不允許當前線程釋放其它線程持有的鎖,否則拋出異常 if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; // 判斷鎖是否達到釋放值 if (c == 0) { // 標記鎖需要釋放 free = true; // 清空鎖的所屬線程 setExclusiveOwnerThread(null); } // 更新鎖的持有次數 setState(c); // 返回鎖是否達到釋放值而被釋放,否則未釋放。 return free; } protected final boolean isHeldExclusively() { // 返回當前線程是否持有鎖 return getExclusiveOwnerThread() == Thread.currentThread(); } final Thread getOwner() { // 返回鎖所屬線程 return getState() == 0 ? null : getExclusiveOwnerThread(); } final int getHoldCount() { // 如果當前線程未持有鎖,則返回0;如果當前線程持有鎖,則返回持有的次數。 return isHeldExclusively() ? getState() : 0; } final boolean isLocked() { // 返回鎖是否已經被占用 return getState() != 0; } final ConditionObject newCondition() { return new ConditionObject(); } } static final class NonfairSync extends Sync { private static final long serialVersionUID = 7316153563782823691L; final void lock() { // 嘗試占用鎖 if (compareAndSetState(0, 1)) // 占用成功後,更新鎖所屬線程為當前線程 setExclusiveOwnerThread(Thread.currentThread()); else // 占用失敗後,通過正常程序獲取鎖 acquire(1); } protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } } static final class FairSync extends Sync { private static final long serialVersionUID = -3000897897090466540L; final void lock() { // 正常程序獲取鎖 acquire(1); } protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); // 判斷鎖是否未被持有 if (c == 0) { // 如果隊列為空或隊頭為當前線程時,就嘗試占用鎖 if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { // 占用成功後,更新鎖所屬線程為當前線程 setExclusiveOwnerThread(current); // 返回占用成功 return true; } } // 如果鎖已被占用,判斷當前線程是否持有鎖 else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; // 不允許鎖被釋放,否則視為系統錯誤 if (nextc < 0) throw new Error("Maximum lock count exceeded"); // 更新鎖的持有次數 setState(nextc); // 返回占用成功 return true; } // 返回占用失敗 return false; } } public void lock() { // 獲取鎖 sync.lock(); } public void lockInterruptibly() throws InterruptedException { // 獲取鎖,可被中斷 sync.acquireInterruptibly(1); } public boolean tryLock() { // 強制使用非公平方式獲取鎖 return sync.nonfairTryAcquire(1); } public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException { // 嘗試在指定時間內獲取鎖,可被中斷 return sync.tryAcquireNanos(1, unit.toNanos(timeout)); } public void unlock() { // 釋放鎖 sync.release(1); } public int getHoldCount() { // 如果當前線程未持有鎖,則返回0;如果當前線程持有鎖,則返回持有的次數。 return sync.getHoldCount(); } public boolean isHeldByCurrentThread() { // 返回當前線程是否持有鎖 return sync.isHeldExclusively(); } public boolean isLocked() { // 返回鎖是否已經被占用 return sync.isLocked(); } public final boolean isFair() { // 返回是否公平鎖 return sync instanceof FairSync; } protected Thread getOwner() { // 返回鎖的所屬線程 return sync.getOwner(); } public final boolean hasQueuedThreads() { // 返回隊列中是否還有線程,即是否隊列不為空 return sync.hasQueuedThreads(); } public final boolean hasQueuedThread(Thread thread) { // 返回目標線程是否在隊列中 return sync.isQueued(thread); } public final int getQueueLength() { // 返回隊列長度 return sync.getQueueLength(); } protected Collection<Thread> getQueuedThreads() { // 返回隊列中的所有線程 return sync.getQueuedThreads(); } public Condition newCondition() { return sync.newCondition(); } public boolean hasWaiters(Condition condition) { if (condition == null) throw new NullPointerException(); if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject)) throw new IllegalArgumentException("not owner"); return sync.hasWaiters((AbstractQueuedSynchronizer.ConditionObject)condition); } public int getWaitQueueLength(Condition condition) { if (condition == null) throw new NullPointerException(); if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject)) throw new IllegalArgumentException("not owner"); return sync.getWaitQueueLength((AbstractQueuedSynchronizer.ConditionObject)condition); } protected Collection<Thread> getWaitingThreads(Condition condition) { if (condition == null) throw new NullPointerException(); if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject)) throw new IllegalArgumentException("not owner"); return sync.getWaitingThreads((AbstractQueuedSynchronizer.ConditionObject)condition); } public String toString() { Thread o = sync.getOwner(); return super.toString() + ((o == null) ? "[Unlocked]" : "[Locked by thread " + o.getName() + "]"); } }
/** * @since 1.5 * @author Doug Lea */ public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable { private static final long serialVersionUID = -6992448646407690164L; private final ReentrantReadWriteLock.ReadLock readerLock; private final ReentrantReadWriteLock.WriteLock writerLock; final Sync sync; public ReentrantReadWriteLock() { this(false); } public ReentrantReadWriteLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); readerLock = new ReadLock(this); writerLock = new WriteLock(this); } public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; } public ReentrantReadWriteLock.ReadLock readLock() { return readerLock; } abstract static class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 6317671515068378041L; static final int SHARED_SHIFT = 16; static final int SHARED_UNIT = (1 << SHARED_SHIFT); static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1; static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1; static int sharedCount(int c) { // 使用高16位存儲共享鎖的共享次數 return c >>> SHARED_SHIFT; } static int exclusiveCount(int c) { // 使用低16位存儲獨占鎖被持有次數 return c & EXCLUSIVE_MASK; } static final class HoldCounter { int count = 0; final long tid = Thread.currentThread().getId(); } static final class ThreadLocalHoldCounter extends ThreadLocal<HoldCounter> { public HoldCounter initialValue() { return new HoldCounter(); } } private transient ThreadLocalHoldCounter readHolds; private transient HoldCounter cachedHoldCounter; private transient Thread firstReader = null; private transient int firstReaderHoldCount; Sync() { readHolds = new ThreadLocalHoldCounter(); setState(getState()); // ensures visibility of readHolds } abstract boolean readerShouldBlock(); abstract boolean writerShouldBlock(); protected final boolean tryAcquire(int acquires) { Thread current = Thread.currentThread(); int c = getState(); int w = exclusiveCount(c); // 是否存在鎖 if (c != 0) { // 如果不存在獨占鎖,或當前線程並不是獨占鎖的所屬線程,則返回獲取失敗 if (w == 0 || current != getExclusiveOwnerThread()) return false; // 如果累加後的持有次數,達到最大值時,則系統錯誤 if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error("Maximum lock count exceeded"); // 更新獨占鎖被當前錢程持有的次數 setState(c + acquires); // 返回獲取成功 return true; } // 如果是公平策略,就需要當前線程是隊頭,非公平策略不需要。然後嘗試占用鎖 if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) // 返回占用失敗 return false; // 占用成功後,更新獨占鎖的所屬線程為當前線程 setExclusiveOwnerThread(current); // 返回占用成功 return true; } protected final boolean tryRelease(int releases) { // 不允許當前線程釋放其它線程持有的獨占鎖,否則拋出異常 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); int nextc = getState() - releases; boolean free = exclusiveCount(nextc) == 0; // 如果獨占鎖的持有次數達到釋放值,就清空獨占鎖的所屬線程 if (free) setExclusiveOwnerThread(null); // 更新獨占鎖被當前錢程持有的次數 setState(nextc); // 返回獨占鎖是否已經被釋放 return free; } protected final int tryAcquireShared(int unused) { Thread current = Thread.currentThread(); int c = getState(); // 如果存在獨占鎖,並且該獨占鎖並不屬於當前線程,就返回獲取失敗。 if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return -1; int r = sharedCount(c); // 如果是公平策略,就需要當前線程是隊頭,如果是非公平策略,就需要隊頭是以共享方式取鎖。 // 然後還需要共享次數未達到最大值 // 如果前面條件滿足後,就嘗試占用鎖(注:c + SHARED_UNIT表示增加1) if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) { // 如果首次占用共享鎖,就記錄首個占用共享鎖的線程和占用次數 if (r == 0) { firstReader = current; firstReaderHoldCount = 1; } // 如果不是首次占用,但當前線程就是那個首次占用共享鎖的線程時,就增加記錄次數 else if (firstReader == current) { firstReaderHoldCount++; } else { // 從緩存或本地變量中獲取計數器 HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != current.getId()) cachedHoldCounter = rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); // 更新當前線程所占用共享鎖的次數 rh.count++; } // 返回占用成功 return 1; } return fullTryAcquireShared(current); } final int fullTryAcquireShared(Thread current) { HoldCounter rh = null; for (;;) { int c = getState(); // 判斷是否存在獨占鎖 if (exclusiveCount(c) != 0) { // 如果該獨占鎖並不屬於當前線程,就返回獲取失敗。 if (getExclusiveOwnerThread() != current) return -1; } else { // 現在不存在獨占鎖,如果在公平策略下,當前線程不是隊頭,如果在非公平策略下,隊頭是以獨占方式取鎖 // 也不是首個占用共享鎖的線程,也未有占用共享鎖的記錄在 if (readerShouldBlock() && firstReader != current) { // 從緩存或本地變量中獲取計數器 if (rh == null) { rh = cachedHoldCounter; if (rh == null || rh.tid != current.getId()) { rh = readHolds.get(); if (rh.count == 0) readHolds.remove(); } } // 如果記錄為0,就返回占用失敗 if (rh.count == 0) return -1; } } // 現在: // 存在獨占鎖,並且當前線程是獨占鎖的所屬線程 // 不存在獨占鎖,當前線程有占用共享鎖的記錄在 // 不存在獨占鎖,非公平策略,需要隊頭是以共享取鎖才能插隊占用共享鎖 // 不存在獨占鎖,公平策略,需要當前線程位於隊頭才能占用共享鎖(即不能插隊) // 不允許共享次數達到最大值,否則系統錯誤 if (sharedCount(c) == MAX_COUNT) throw new Error("Maximum lock count exceeded"); // 嘗試占用共享鎖(注:c + SHARED_UNIT表示增加1) if (compareAndSetState(c, c + SHARED_UNIT)) { // 如果首次占用共享鎖,就記錄首個占用共享鎖的線程和占用次數 if (sharedCount(c) == 0) { firstReader = current; firstReaderHoldCount = 1; } // 如果不是首次占用,但當前線程就是那個首次占用共享鎖的線程時,就增加記錄次數 else if (firstReader == current) { firstReaderHoldCount++; } else { // 從緩存或本地變量中獲取計數器 if (rh == null) rh = cachedHoldCounter; if (rh == null || rh.tid != current.getId()) rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); // 更新當前線程所占用共享鎖的次數 rh.count++; // 緩存記錄 cachedHoldCounter = rh; // cache for release } // 返回占用成功 return 1; } } } protected final boolean tryReleaseShared(int unused) { Thread current = Thread.currentThread(); // 判斷當前線程是否為第一個占用共享鎖的線程 if (firstReader == current) { // 遞減占用共享鎖的記錄 if (firstReaderHoldCount == 1) firstReader = null; else firstReaderHoldCount--; } else { // 從緩存或本地變量中獲取計數器 HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != current.getId()) rh = readHolds.get(); // 如果當前線程未有占用共享的記錄,則異常 int count = rh.count; if (count <= 1) { readHolds.remove(); if (count <= 0) throw unmatchedUnlockException(); } // 遞減占用共享鎖的記錄 --rh.count; } for (;;) { int c = getState(); int nextc = c - SHARED_UNIT; // 嘗試遞減共享鎖的占用次數,如果成功,則返回共享鎖的全部占用都釋放。 if (compareAndSetState(c, nextc)) return nextc == 0; } } private IllegalMonitorStateException unmatchedUnlockException() { return new IllegalMonitorStateException("attempt to unlock read lock, not locked by current thread"); } final boolean tryWriteLock() { Thread current = Thread.currentThread(); int c = getState(); // 是否存在鎖 if (c != 0) { int w = exclusiveCount(c); // 如果不存在獨占鎖,或當前線程不是獨占鎖的所屬線程,則返回占用失敗 if (w == 0 || current != getExclusiveOwnerThread()) return false; // 如果當前線程持有獨占鎖的次數達到最大值,則系統錯誤 if (w == MAX_COUNT) throw new Error("Maximum lock count exceeded"); } // 嘗試占用獨占鎖 if (!compareAndSetState(c, c + 1)) // 近回占用失敗 return false; // 占用成功後,更新獨占鎖的所屬線程為當前線程 setExclusiveOwnerThread(current); return true; } final boolean tryReadLock() { Thread current = Thread.currentThread(); for (;;) { int c = getState(); // 如果存在獨占鎖,並且當前線程不是獨占鎖的所屬線程,就返回占用失敗 if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return false; int r = sharedCount(c); // 如果共享鎖的占用次數達到最值,則系統錯誤 if (r == MAX_COUNT) throw new Error("Maximum lock count exceeded"); // 嘗試占用共享鎖 if (compareAndSetState(c, c + SHARED_UNIT)) { // ... if (r == 0) { firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { firstReaderHoldCount++; } else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != current.getId()) cachedHoldCounter = rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; } // 返回占用成功 return true; } } } protected final boolean isHeldExclusively() { // 返回當前線程是否持有獨占鎖 return getExclusiveOwnerThread() == Thread.currentThread(); } final Thread getOwner() { // 返回獨占鎖的所屬線程 return ((exclusiveCount(getState()) == 0) ? null : getExclusiveOwnerThread()); } final int getReadLockCount() { // 返回共享鎖的占用次數 return sharedCount(getState()); } final boolean isWriteLocked() { // 返回獨占鎖的占用次數 return exclusiveCount(getState()) != 0; } final int getWriteHoldCount() { // 如果當前線程未持有獨占鎖,則返回0;如果當前線程持有獨占鎖,則返回持有的次數。 return isHeldExclusively() ? exclusiveCount(getState()) : 0; } final int getReadHoldCount() { // 返回當前線程占用共享鎖的次數 // 如果獨占鎖未被占用,則返回0 if (getReadLockCount() == 0) return 0; // 從第一個占用共享鎖記錄中(緩存)獲取占用次數,並返回 Thread current = Thread.currentThread(); if (firstReader == current) return firstReaderHoldCount; // 從緩存中獲取占用次數,並返回 HoldCounter rh = cachedHoldCounter; if (rh != null && rh.tid == current.getId()) return rh.count; // 從記錄中獲取占用次數,並返回 int count = readHolds.get().count; if (count == 0) readHolds.remove(); return count; } final int getCount() { // 返回鎖的數量 return getState(); } final ConditionObject newCondition() { return new ConditionObject(); } } static final class NonfairSync extends Sync { private static final long serialVersionUID = -8159625535654395037L; final boolean writerShouldBlock() { return false; } final boolean readerShouldBlock() { // 返回隊頭是否以獨占方式獲取鎖 // return apparentlyFirstQueuedIsExclusive(); return false; } } static final class FairSync extends Sync { private static final long serialVersionUID = -2274990926593161451L; final boolean writerShouldBlock() { // 如果隊列為空或隊頭為當前線程時,就返回false,否則返回true return hasQueuedPredecessors(); } final boolean readerShouldBlock() { // 如果隊列為空或隊頭為當前線程時,就返回false,否則返回true return hasQueuedPredecessors(); } } public static class ReadLock implements Lock, java.io.Serializable { private static final long serialVersionUID = -5992448646407690164L; private final Sync sync; protected ReadLock(ReentrantReadWriteLock lock) { sync = lock.sync; } public void lock() { sync.acquireShared(1); } public void lockInterruptibly() throws InterruptedException { sync.acquireSharedInterruptibly(1); } public boolean tryLock() { // 嘗試占用共享鎖,並返回是否成功 return sync.tryReadLock(); } public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); } public void unlock() { sync.releaseShared(1); } public Condition newCondition() { // 共享鎖不允許創建條件對象 throw new UnsupportedOperationException(); } public String toString() { int r = sync.getReadLockCount(); return super.toString() + "[Read locks = " + r + "]"; } } public static class WriteLock implements Lock, java.io.Serializable { private static final long serialVersionUID = -4992448646407690164L; private final Sync sync; protected WriteLock(ReentrantReadWriteLock lock) { sync = lock.sync; } public void lock() { sync.acquire(1); } public void lockInterruptibly() throws InterruptedException { sync.acquireInterruptibly(1); } public boolean tryLock() { // 嘗試占用獨占鎖,並返回是否成功 return sync.tryWriteLock(); } public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireNanos(1, unit.toNanos(timeout)); } public void unlock() { sync.release(1); } public boolean isHeldByCurrentThread() { // 返回當前線程是否持有獨占鎖 return sync.isHeldExclusively(); } public int getHoldCount() { // 如果當前線程未持有獨占鎖,則返回0;如果當前線程持有獨占鎖,則返回持有的次數。 return sync.getWriteHoldCount(); } public Condition newCondition() { return sync.newCondition(); } public String toString() { Thread o = sync.getOwner(); return super.toString() + ((o == null) ? "[Unlocked]" : "[Locked by thread " + o.getName() + "]"); } } public final boolean isFair() { // 返回是否公平鎖 return sync instanceof FairSync; } protected Thread getOwner() { // 返回獨占鎖的所屬線程 return sync.getOwner(); } public int getReadLockCount() { // 返回共享鎖的占用次數 return sync.getReadLockCount(); } public boolean isWriteLocked() { // 返回獨占鎖的占用次數 return sync.isWriteLocked(); } public boolean isWriteLockedByCurrentThread() { // 返回當前線程是否持有獨占鎖 return sync.isHeldExclusively(); } public int getWriteHoldCount() { // 如果當前線程未持有獨占鎖,則返回0;如果當前線程持有獨占鎖,則返回持有的次數。 return sync.getWriteHoldCount(); } public int getReadHoldCount() { // 返回當前線程占用共享鎖的次數 return sync.getReadHoldCount(); } protected Collection<Thread> getQueuedWriterThreads() { // 返回隊列中以獨占模式獲取鎖的所有線程 return sync.getExclusiveQueuedThreads(); } protected Collection<Thread> getQueuedReaderThreads() { // 返回隊列中以共享模式獲取鎖的所有線程 return sync.getSharedQueuedThreads(); } public final boolean hasQueuedThreads() { // 返回隊列中是否還有線程,即是否隊列不為空 return sync.hasQueuedThreads(); } public final boolean hasQueuedThread(Thread thread) { // 返回目標線程是否在隊列中 return sync.isQueued(thread); } public final int getQueueLength() { // 返回隊列長度 return sync.getQueueLength(); } protected Collection<Thread> getQueuedThreads() { // 返回隊列中的所有線程 return sync.getQueuedThreads(); } public boolean hasWaiters(Condition condition) { if (condition == null) throw new NullPointerException(); if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject)) throw new IllegalArgumentException("not owner"); return sync .hasWaiters((AbstractQueuedSynchronizer.ConditionObject) condition); } public int getWaitQueueLength(Condition condition) { if (condition == null) throw new NullPointerException(); if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject)) throw new IllegalArgumentException("not owner"); return sync .getWaitQueueLength((AbstractQueuedSynchronizer.ConditionObject) condition); } protected Collection<Thread> getWaitingThreads(Condition condition) { if (condition == null) throw new NullPointerException(); if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject)) throw new IllegalArgumentException("not owner"); return sync .getWaitingThreads((AbstractQueuedSynchronizer.ConditionObject) condition); } public String toString() { int c = sync.getCount(); int w = Sync.exclusiveCount(c); int r = Sync.sharedCount(c); return super.toString() + "[Write locks = " + w + ", Read locks = " + r + "]"; } }