通常初級的程序員喜歡使用synchronized關鍵字來實現同步機制,理由很簡單,使用它簡單,我們不用考慮更多的細節,對程序員的要求比較低。那這裡我們介紹另外一種通過Lock實現的同步的方法,顯然使用Lock方法,能夠使程序並發更加高效、靈活,其對程序員的要求也就更高。
public interface Lock {
void lock();
void lockInterruptibly() throws InterruptedException;
boolean tryLock();
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
void unlock();
Condition newCondition();
}
從Lock源碼中我們可以看到它是一個接口。
lock() 如果能夠獲取鎖,就返回,如果不能獲取,它就會一直在等待獲取鎖。
lockInterruptibly() 支持線程中斷,如果線程中斷,默認它就不會去競爭了,這個相對於同步代碼代碼與同步方法而言,可擴展性還是大了很多。
tryLock() 如果能獲取鎖,就立馬返回true,否則就返回false。
tryLock(long time, TimeUnit unit) 如果能獲鎖,就返回true,如果有以下兩種情況,就會設置成false:
一是線程被中斷了;
二是時間到了。
unlock() 釋放一個鎖。
Condition newCondition() 引入Condition有兩個方法的意圖:
a.對一個共享資源有讀和寫的能力,如果讀線程或寫線程獲取了Lock的權力,即有能力進入,但是如果裡面沒有內容,讀也沒有用,如果空間已滿了,寫也寫不了,所有還得有條件去判斷一下,是不是線程要等待了;
b.提供一種多線程之間的通信機制,類似wait()和nofity()的原理。
可重入鎖,也叫做遞歸鎖,指的是同一線程 外層函數獲得鎖之後 ,內層遞歸函數仍然有獲取該鎖的代碼,但不受影響。
在JAVA環境下 ReentrantLock 和synchronized 都是可重入鎖,可重入鎖最大的作用是避免死鎖。
class MyClass {
public synchronized void method1() {
method2();
}
public synchronized void method2() {
}
}
在一個線程獲得method1方法的鎖後,進入method1,同樣被synchronized修飾的method2方法就不需要再次去獲取鎖了。因為synchronized具有可重入性,所以進入直接運行。
可中斷鎖:即為可以實現中斷的鎖。
在Java中,synchronized就不是可中斷鎖,而Lock是可中斷鎖,這也是我們需要Lock實現同步的重要原因。
如果某一線程A正在執行鎖中的代碼,另一線程B正在等待獲取該鎖,可能由於等待時間過長,線程B不想等待了,想先處理其他事情,我們可以讓它中斷自己或者在別的線程中中斷它,這種就是可中斷鎖。
在前面Lock的接口方法lockInterruptibly()的用法時已經體現了Lock的可中斷性。
公平鎖即盡量以請求鎖的順序來獲取鎖。比如同是有多個線程在等待一個鎖,當這個鎖被釋放時,等待時間最久的線程(最先請求的線程)會獲得該所,這種就是公平鎖。
非公平鎖即無法保證鎖的獲取是按照請求鎖的順序進行的。這樣就可能導致某個或者一些線程永遠獲取不到鎖。
在Java中,synchronized就是非公平鎖,它無法保證等待的線程獲取鎖的順序。
而對於ReentrantLock和ReentrantReadWriteLock,它默認情況下是非公平鎖,但是可以設置為公平鎖,等下後面會有介紹。
ReentrantLock,也就是可重入鎖,前面已經介紹過可重入鎖的概念。現在就從源碼的層面來分析分析ReentrantLock,這將有助於我們更好的理解線程調度,鎖的實現,掌握更多的高並發編程的思想。
ReentrantLock中lock() 方法實現:
public void lock() {
sync.lock();
}
sync的聲明:
/** Synchronizer providing all implementation mechanics */
private final Sync sync;
這部分看起來很簡單,也很好理解,但是讓你困惑的是sync是什麼,它的內部是怎樣實現的呢?下面我們來看看sync究竟是怎樣實現的。通過進入lock源碼我們發現:
vcv4us23x7mrxr3L+KGj1eLA787Sw8fW99KqvenJ3LfHuavGvcv4oaM8L3A+DQo8cD7K18/IztLDx8C0v7S/tNfuusvQxLXEQWJzdHJhY3RRdWV1ZWRTeW5jaHJvbml6ZXIgwOCjrFN5bmO8zLPQwcvV4rj2wOCjrNfu1tjSqrXEwb249sr9vt2zydSxtbHHsMv417TMrLrNtci0/cG0se22vMrH08nL/MC0yrXP1rXEoaM8L3A+DQo8cHJlIGNsYXNzPQ=="brush:java;">
/**
* Head of the wait queue, lazily initialized. Except for
* initialization, it is modified only via method setHead. Note:
* If head exists, its waitStatus is guaranteed not to be
* CANCELLED.
*/
private transient volatile Node head;
/**
* Tail of the wait queue, lazily initialized. Modified only via
* method enq to add new wait node.
*/
private transient volatile Node tail;
/**
* The synchronization state.
*/
private volatile int state; // 記錄了當前鎖被鎖定的次數
當state值為0,說明未被綁定,加鎖通過更高state值來實現,而更改狀態主要由函數compareAndSetState實現。調用cas原語以保證操作的原子性,如果state值為expect,則更新為update值且返回true,否則不更改state且返回false.
/**
* Atomically sets synchronization state to the given updated
* value if the current state value equals the expected value.
* This operation has memory semantics of a {@code volatile} read
* and write.
*
* @param expect the expected value
* @param update the new value
* @return {@code true} if successful. False return indicates that the actual
* value was not equal to the expected value.
*/
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
當前線程則是在AbstractOwnableSynchronizer中:
/**
* The current owner of exclusive mode synchronization.
*/
private transient Thread exclusiveOwnerThread;
了解了這些基本的數據結構後,我們再來看sync.lock()的究竟,NonfairSync源碼:
/**
* Sync object for non-fair locks
*/
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
// 如果鎖沒有被任何線程鎖定且加鎖成功則設定當前線程為鎖的擁有者
// 如果鎖已被當前線程鎖定,則在acquire中將狀態加1並返回
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
// 加鎖失敗,再次嘗試加鎖,失敗則加入等待隊列,禁用當前線程,直到被中斷或有線程釋放鎖時被喚醒
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
acquire方法在AbstractQueuedSynchronizer中的實現:
/**
* Acquires in exclusive mode, ignoring interrupts. Implemented
* by invoking at least once {@link #tryAcquire},
* returning on success. Otherwise the thread is queued, possibly
* repeatedly blocking and unblocking, invoking {@link
* #tryAcquire} until success. This method can be used
* to implement method {@link Lock#lock}.
*
* @param arg the acquire argument. This value is conveyed to
* {@link #tryAcquire} but is otherwise uninterpreted and
* can represent anything you like.
*/
public final void acquire(int arg) {
// 首先嘗試獲取鎖,成功則直接返回
// 否則將當前線程加入鎖的等待隊列並禁用當前線程
// 直到線程被中斷或者在鎖為其它線程釋放時喚醒
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
嘗試獲得鎖,調用nonfairTryAcquire(acquires);方法,該方法的實現如下:
/**
* Performs non-fair tryLock. tryAcquire is implemented in
* subclasses, but both need nonfair try for trylock method.
*/
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 如果鎖空閒則嘗試鎖定,成功則設當前線程為鎖擁有者
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 若當前線程為鎖擁有者則直接修改鎖狀態計數
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
// 嘗試獲取鎖失敗,返回false
return false;
}
在tryAcquire失敗後則進行如下操作
第一步調用AbstractQueuedSynchronizer.addWaiter將當前線程加入等待隊列尾部。
/**
* Creates and enqueues node for given thread and mode.
*
* @param current the thread
* @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
* @return the new node
*/
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
第二步調用AbstractQueuedSynchronizer.acquireQueued讓線程進入禁用狀態,並在每次被喚醒時嘗試獲取鎖,失敗則繼續禁用線程。
/**
* Acquires in exclusive uninterruptible mode for thread already in
* queue. Used by condition wait methods as well as acquire.
*
* @param node the node
* @param arg the acquire argument
* @return {@code true} if interrupted while waiting
*/
final boolean acquireQueued(final Node node, int arg) {
try {
boolean interrupted = false;
for (;;) {
// 如果當前線程是head的直接後繼則嘗試獲取鎖
// 這裡不會和等待隊列中其它線程發生競爭,但會和嘗試獲取鎖且尚未進入等待隊列的線程發生競爭。這是非公平鎖和公平鎖的一個重要區別。
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
return interrupted;
}
// 如果不是head直接後繼或獲取鎖失敗,則檢查是否要禁用當前線程
// 是則禁用,直到被lock.release喚醒或線程中斷
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} catch (RuntimeException ex) {
cancelAcquire(node);
throw ex;
}
}
AbstractQueuedSynchronizer. shouldParkAfterFailedAcquire做了一件很重要的事:根據狀態對等待隊列進行清理,並設置等待信號。
這裡需要先說明一下waitStatus,它是AbstractQueuedSynchronizer的靜態內部類Node的成員變量,用於記錄Node對應的線程等待狀態.等待狀態在剛進入隊列時都是0,如果等待被取消則被設為Node.CANCELLED,若線程釋放鎖時需要喚醒等待隊列裡的其它線程則被置為Node.SIGNAL,還有一種狀態Node.CONDITION這裡先不討論。
/** waitStatus value to indicate thread has cancelled */
static final int CANCELLED = 1;
/** waitStatus value to indicate successor's thread needs unparking */
static final int SIGNAL = -1;
/** waitStatus value to indicate thread is waiting on condition */
static final int CONDITION = -2;
AbstractQueuedSynchronizer.shouldParkAfterFailedAcquire實現如下:
/**
* Checks and updates status for a node that failed to acquire.
* Returns true if thread should block. This is the main signal
* control in all acquire loops. Requires that pred == node.prev
*
* @param pred node's predecessor holding status
* @param node the node
* @return {@code true} if thread should block
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int s = pred.waitStatus;
if (s < 0)
/*
* This node has already set status asking a release
* to signal it, so it can safely park
*/
// 如果前置結點waitStatus已經被置為SIGNAL,則返回true,可以禁用線程
return true;
if (s > 0) {
// 如果前置結果已被CALCEL,則移除。
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
}
else
/*
* Indicate that we need a signal, but don't park yet. Caller
* will need to retry to make sure it cannot acquire before
* parking.
*/
// 原子性將前置結點waitStatus設為SIGNAL
compareAndSetWaitStatus(pred, 0, Node.SIGNAL);
// 這裡一定要返回false,有可能前置結點這時已經釋放了鎖,但因其waitStatus在釋放鎖時還未被置為SIGNAL而未觸發喚醒等待線程操作,因此必須通過return false來重新嘗試一次獲取鎖
return false;
}
AbstractQueuedSynchronizer.parkAndCheckInterrupt實現如下,很簡單,直接禁用線程,並等待被喚醒或中斷發生。對java中Thread.interrupted()都作了什麼不甚了解的要做功課。
這裡線程即被堵塞,醒來時會重試獲取鎖,失敗則繼續堵塞。即使Thread.interrupted()也無法中斷。那些想在等待時間過長時中斷退出的線程可以調用ReentrantLoc.lockInterruptibly().
/**
* Convenience method to park and then check if interrupted
*
* @return {@code true} if interrupted
*/
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
java.util.concurrent.locks.ReadWriteLock 讀寫鎖是一個接口類,我們知道讀寫鎖的概念,它允許多個線程同一時間對特定資源讀取,但只允許一個線程對資源進行寫操作。
其接口定義:
public interface ReadWriteLock {
/**
* Returns the lock used for reading.
*
* @return the lock used for reading
*/
Lock readLock();
/**
* Returns the lock used for writing.
*
* @return the lock used for writing
*/
Lock writeLock();
}
讀鎖:當沒有寫操作線程鎖定ReadWriteLock ,並且沒有任何線程要求獲得寫操作,這時候允許多個讀線程進行鎖定。
寫鎖:當沒有任何線程讀操作或者寫操作時,允許唯一線程進行寫操作鎖定。
ReadWriteLock(讀寫鎖)的概念了解後,我們要深入研究它的實現ReentrantReadWriteLock 。可重入的概念以及實現在以上內容已經有詳細的介紹,這裡主要介紹ReentrantReadWriteLock 它的兩個主要的readLock()和writeLock()內部類的實現。
ReadLock 內部實現:
public static class ReadLock implements Lock, java.io.Serializable {
private static final long serialVersionUID = -5992448646407690164L;
private final Sync sync;
protected ReadLock(ReentrantReadWriteLock lock) {
sync = lock.sync;
}
public void lock() {
sync.acquireShared(1);
}
public void lockInterruptibly() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public boolean tryLock() {
return sync.tryReadLock();
}
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
public void unlock() {
sync.releaseShared(1);
}
public Condition newCondition() {
throw new UnsupportedOperationException();
}
public String toString() {
int r = sync.getReadLockCount();
return super.toString() +
"[Read locks = " + r + "]";
}
}
WriteLock 實現:
public static class WriteLock implements Lock, java.io.Serializable {
private static final long serialVersionUID = -4992448646407690164L;
private final Sync sync;
protected WriteLock(ReentrantReadWriteLock lock) {
sync = lock.sync;
}
public void lock() {
sync.acquire(1);
}
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
public boolean tryLock( ) {
return sync.tryWriteLock();
}
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
public void unlock() {
sync.release(1);
}
public Condition newCondition() {
return sync.newCondition();
}
public String toString() {
Thread o = sync.getOwner();
return super.toString() + ((o == null) ?
"[Unlocked]" :
"[Locked by thread " + o.getName() + "]");
}
public boolean isHeldByCurrentThread() {
return sync.isHeldExclusively();
}
public int getHoldCount() {
return sync.getWriteHoldCount();
}
}
簡單的從類方法的聲明上,我們很難看出讀寫鎖的區別,這裡我們分析它們直接不同的點。
(1)trylock() 獲取鎖
tryReadLock() 的源碼實現:
/**
* Performs tryLock for read, enabling barging in both modes.
* This is identical in effect to tryAcquireShared except for
* lack of calls to readerShouldBlock.
*/
final boolean tryReadLock() {
Thread current = Thread.currentThread();
for (;;) {
int c = getState();
// 如果有寫鎖占用並且當前線程不是setExclusiveOwnerThread,返回false
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return false;
int r = sharedCount(c);// 獲得共享鎖的數量
if (r == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
if (compareAndSetState(c, c + SHARED_UNIT)) {
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return true;
}
}
}
tryWriteLock() 的源碼實現:
/**
* Performs tryLock for write, enabling barging in both modes.
* This is identical in effect to tryAcquire except for lack
* of calls to writerShouldBlock.
*/
final boolean tryWriteLock() {
Thread current = Thread.currentThread();
int c = getState();
if (c != 0) {
int w = exclusiveCount(c);
// 獨占持有樹為0,或者當前線程沒有獲得setExclusiveOwnerThread,返回false
if (w == 0 || current != getExclusiveOwnerThread())
return false;
if (w == MAX_COUNT) // 寫線程超過最大數
throw new Error("Maximum lock count exceeded");
}
if (!compareAndSetState(c, c + 1))
return false;
setExclusiveOwnerThread(current);
return true;
}
從源碼可以看出,ReadLock獲取的是共享鎖,WriteLock獲取的是獨占鎖。WriteLock的實現與ReentrantLock裡面的實現幾乎相同,都是使用了AQS的acquire/release操作。
state字段(int類型,32位)用來描述有多少線程獲持有鎖。在獨占鎖的時代這個值通常是0或者1(如果是重入的就是重入的次數),在共享鎖的時代就是持有鎖的數量。
但在這裡就需要兩個變量來描述讀寫鎖不同的數量,在ReentrantReadWrilteLock裡面將這個字段一分為二,高位16位表示共享鎖的數量,低位16位表示獨占鎖的數量(或者重入數量)。2^16-1=65536,這就是上節中提到的為什麼共享鎖和獨占鎖的數量最大只能是65535的原因了。
/*
* Read vs write count extraction constants and functions.
* Lock state is logically divided into two unsigned shorts:
* The lower one representing the exclusive (writer) lock hold count,
* and the upper the shared (reader) hold count.
*/
static final int SHARED_SHIFT = 16;
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
/** Returns the number of shared holds represented in count */
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
/** Returns the number of exclusive holds represented in count */
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }