Java 高並發五:JDK並發包1具體引見。本站提示廣大學習愛好者:(Java 高並發五:JDK並發包1具體引見)文章只能為提供參考,不一定能成為您想要的結果。以下是Java 高並發五:JDK並發包1具體引見正文
在[高並發Java 二] 多線程基本中,我們曾經初步提到了根本的線程同步操作。此次要提到的是在並發包中的同步掌握對象。
1. 各類同步掌握對象的應用
1.1 ReentrantLock
ReentrantLock感到上是synchronized的加強版,synchronized的特色是應用簡略,一切交給JVM行止理,然則功效上是比擬軟弱的。在JDK1.5之前,ReentrantLock的機能要好過synchronized,因為對JVM停止了優化,如今的JDK版本中,二者機能是平起平坐的。假如是簡略的完成,不要銳意去應用ReentrantLock。
比擬於synchronized,ReentrantLock在功效上加倍豐碩,它具有可重入、可中止、可限時、公正鎖等特色。
起首我們經由過程一個例子來講明ReentrantLock最後步的用法:
package test; import java.util.concurrent.locks.ReentrantLock; public class Test implements Runnable { public static ReentrantLock lock = new ReentrantLock(); public static int i = 0; @Override public void run() { for (int j = 0; j < 10000000; j++) { lock.lock(); try { i++; } finally { lock.unlock(); } } } public static void main(String[] args) throws InterruptedException { Test test = new Test(); Thread t1 = new Thread(test); Thread t2 = new Thread(test); t1.start(); t2.start(); t1.join(); t2.join(); System.out.println(i); } }
有兩個線程都對i停止++操作,為了包管線程平安,應用了 ReentrantLock,從用法上可以看出,與 synchronized比擬, ReentrantLock就略微龐雜一點。由於必需在finally中停止解鎖操作,假如不在 finally解鎖,有能夠代碼湧現異常鎖沒被釋放,而synchronized是由JVM來釋放鎖。
那末ReentrantLock究竟有哪些優良的特色呢?
1.1.1 可重入
單線程可以反復進入,但要反復加入
lock.lock(); lock.lock(); try { i++; } finally { lock.unlock(); lock.unlock(); }
因為ReentrantLock是重入鎖,所以可以重復獲得雷同的一把鎖,它有一個與鎖相干的獲得計數器,假如具有鎖的某個線程再次獲得鎖,那末獲得計數器就加1,然後鎖須要被釋放兩次能力取得真正釋放(重入鎖)。這模擬了 synchronized 的語義;假如線程進入由線程曾經具有的監控器掩護的 synchronized 塊,就許可線程持續停止,當線程加入第二個(或許後續) synchronized 塊的時刻,不釋放鎖,只要線程加入它進入的監控器掩護的第一個synchronized 塊時,才釋放鎖。
public class Child extends Father implements Runnable{ final static Child child = new Child();//為了包管鎖獨一 public static void main(String[] args) { for (int i = 0; i < 50; i++) { new Thread(child).start(); } } public synchronized void doSomething() { System.out.println("1child.doSomething()"); doAnotherThing(); // 挪用本身類中其他的synchronized辦法 } private synchronized void doAnotherThing() { super.doSomething(); // 挪用父類的synchronized辦法 System.out.println("3child.doAnotherThing()"); } @Override public void run() { child.doSomething(); } } class Father { public synchronized void doSomething() { System.out.println("2father.doSomething()"); } }
我們可以看到一個線程進入分歧的 synchronized辦法,是不會釋放之前獲得的鎖的。所以輸入照樣次序輸入。所以synchronized也是重入鎖
輸入:
1child.doSomething()
2father.doSomething()
3child.doAnotherThing()
1child.doSomething()
2father.doSomething()
3child.doAnotherThing()
1child.doSomething()
2father.doSomething()
3child.doAnotherThing()
...
1.1.2.可中止
與synchronized分歧的是,ReentrantLock對中止是有呼應的。中止相干常識檢查[高並發Java 二] 多線程基本
通俗的lock.lock()是不克不及呼應中止的,lock.lockInterruptibly()可以或許呼應中止。
我們模仿出一個逝世鎖現場,然後用中止來處置逝世鎖
package test; import java.lang.management.ManagementFactory; import java.lang.management.ThreadInfo; import java.lang.management.ThreadMXBean; import java.util.concurrent.locks.ReentrantLock; public class Test implements Runnable { public static ReentrantLock lock1 = new ReentrantLock(); public static ReentrantLock lock2 = new ReentrantLock(); int lock; public Test(int lock) { this.lock = lock; } @Override public void run() { try { if (lock == 1) { lock1.lockInterruptibly(); try { Thread.sleep(500); } catch (Exception e) { // TODO: handle exception } lock2.lockInterruptibly(); } else { lock2.lockInterruptibly(); try { Thread.sleep(500); } catch (Exception e) { // TODO: handle exception } lock1.lockInterruptibly(); } } catch (Exception e) { // TODO: handle exception } finally { if (lock1.isHeldByCurrentThread()) { lock1.unlock(); } if (lock2.isHeldByCurrentThread()) { lock2.unlock(); } System.out.println(Thread.currentThread().getId() + ":線程加入"); } } public static void main(String[] args) throws InterruptedException { Test t1 = new Test(1); Test t2 = new Test(2); Thread thread1 = new Thread(t1); Thread thread2 = new Thread(t2); thread1.start(); thread2.start(); Thread.sleep(1000); //DeadlockChecker.check(); } static class DeadlockChecker { private final static ThreadMXBean mbean = ManagementFactory .getThreadMXBean(); final static Runnable deadlockChecker = new Runnable() { @Override public void run() { // TODO Auto-generated method stub while (true) { long[] deadlockedThreadIds = mbean.findDeadlockedThreads(); if (deadlockedThreadIds != null) { ThreadInfo[] threadInfos = mbean.getThreadInfo(deadlockedThreadIds); for (Thread t : Thread.getAllStackTraces().keySet()) { for (int i = 0; i < threadInfos.length; i++) { if(t.getId() == threadInfos[i].getThreadId()) { t.interrupt(); } } } } try { Thread.sleep(5000); } catch (Exception e) { // TODO: handle exception } } } }; public static void check() { Thread t = new Thread(deadlockChecker); t.setDaemon(true); t.start(); } } }
上述代碼有能夠會產生逝世鎖,線程1獲得lock1,線程2獲得lock2,然後彼此又想取得對方的鎖。
我們用jstack檢查運轉上述代碼後的情形
切實其實發明了一個逝世鎖。
DeadlockChecker.check();辦法用來檢測逝世鎖,然後把逝世鎖的線程中止。中止後,線程正常加入。
1.1.3.可限時
超時不克不及取得鎖,就前往false,不會永遠期待組成逝世鎖
應用lock.tryLock(long timeout, TimeUnit unit)來完成可限時鎖,參數為時光和單元。
舉個例子來講明下可限時:
package test; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantLock; public class Test implements Runnable { public static ReentrantLock lock = new ReentrantLock(); @Override public void run() { try { if (lock.tryLock(5, TimeUnit.SECONDS)) { Thread.sleep(6000); } else { System.out.println("get lock failed"); } } catch (Exception e) { } finally { if (lock.isHeldByCurrentThread()) { lock.unlock(); } } } public static void main(String[] args) { Test t = new Test(); Thread t1 = new Thread(t); Thread t2 = new Thread(t); t1.start(); t2.start(); } }
應用兩個線程來爭取一把鎖,當某個線程取得鎖後,sleep6秒,每一個線程都只測驗考試5秒去取得鎖。
所以一定有一個線程沒法取得鎖。沒法取得後就直接加入了。
輸入:
get lock failed
1.1.4.公正鎖
應用方法:
public ReentrantLock(boolean fair)
public static ReentrantLock fairLock = new ReentrantLock(true);
普通意義上的鎖是不公正的,紛歧定先來的線程能先獲得鎖,後來的線程就後獲得鎖。不公正的鎖能夠會發生饑餓景象。
公正鎖的意思就是,這個鎖能包管線程是先來的先獲得鎖。固然公正鎖不會發生饑餓景象,然則公正鎖的機能會比非公正鎖差許多。
1.2 Condition
Condition與ReentrantLock的關系就相似於synchronized與Object.wait()/signal()
await()辦法會使以後線程期待,同時釋放以後鎖,當其他線程中應用signal()時或許signalAll()辦法時,線 程會從新取得鎖並持續履行。或許當線程被中止時,也能跳出期待。這和Object.wait()辦法很類似。
awaitUninterruptibly()辦法與await()辦法根本雷同,然則它其實不會再期待進程中呼應中止。 singal()辦法用於叫醒一個在期待中的線程。絕對的singalAll()辦法會叫醒一切在期待中的線程。這和Obejct.notify()辦法很相似。
這裡就不再具體引見了。舉個例子來講明:
package test; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; public class Test implements Runnable { public static ReentrantLock lock = new ReentrantLock(); public static Condition condition = lock.newCondition(); @Override public void run() { try { lock.lock(); condition.await(); System.out.println("Thread is going on"); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } public static void main(String[] args) throws InterruptedException { Test t = new Test(); Thread thread = new Thread(t); thread.start(); Thread.sleep(2000); lock.lock(); condition.signal(); lock.unlock(); } }
上述例子很簡略,讓一個線程await住,讓主線程去叫醒它。condition.await()/signal只能在獲得鎖今後應用。
1.3.Semaphore
關於鎖來講,它是互斥的排他的。意思就是,只需我取得了鎖,沒人能再取得了。
而關於Semaphore來講,它許可多個線程同時進入臨界區。可以以為它是一個同享鎖,然則同享的額度是無限制的,額度用完了,其他沒有拿到額度的線程照樣要壅塞在臨界區外。當額度為1時,就相等於lock
上面舉個例子:
package test; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; public class Test implements Runnable { final Semaphore semaphore = new Semaphore(5); @Override public void run() { try { semaphore.acquire(); Thread.sleep(2000); System.out.println(Thread.currentThread().getId() + " done"); } catch (Exception e) { e.printStackTrace(); }finally { semaphore.release(); } } public static void main(String[] args) throws InterruptedException { ExecutorService executorService = Executors.newFixedThreadPool(20); final Test t = new Test(); for (int i = 0; i < 20; i++) { executorService.submit(t); } } }
有一個20個線程的線程池,每一個線程都去 Semaphore的允許,Semaphore的允許只要5個,運轉後可以看到,5個一批,一批一批地輸入。
固然一個線程也能夠一次請求多個允許
public void acquire(int permits) throws InterruptedException
1.4 ReadWriteLock
ReadWriteLock是辨別功效的鎖。讀和寫是兩種分歧的功效,讀-讀不互斥,讀-寫互斥,寫-寫互斥。
如許的設計是並發量進步了,又包管了數據平安。
應用方法:
private static ReentrantReadWriteLock readWriteLock=new ReentrantReadWriteLock();
private static Lock readLock = readWriteLock.readLock();
private static Lock writeLock = readWriteLock.writeLock();
具體例子可以檢查 Java完成臨盆者花費者成績與讀者寫者成績,這裡就不睜開了。
1.5 CountDownLatch
倒數計時器
一種典范的場景就是火箭發射。在火箭發射前,為了包管滿有把握,常常還要停止各項裝備、儀器的檢討。 只要等一切檢討終了後,引擎能力焚燒。這類場景就異常合適應用CountDownLatch。它可使得焚燒線程
,期待一切檢討線程全體落成後,再履行
應用方法:
static final CountDownLatch end = new CountDownLatch(10);
end.countDown();
end.await();
表示圖:
一個簡略的例子:
package test; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class Test implements Runnable { static final CountDownLatch countDownLatch = new CountDownLatch(10); static final Test t = new Test(); @Override public void run() { try { Thread.sleep(2000); System.out.println("complete"); countDownLatch.countDown(); } catch (Exception e) { e.printStackTrace(); } } public static void main(String[] args) throws InterruptedException { ExecutorService executorService = Executors.newFixedThreadPool(10); for (int i = 0; i < 10; i++) { executorService.execute(t); } countDownLatch.await(); System.out.println("end"); executorService.shutdown(); } }
主線程必需期待10個線程全體履行完才會輸入"end"。
1.6 CyclicBarrier
和CountDownLatch類似,也是期待某些線程都做完今後再履行。與CountDownLatch差別在於這個計數器可以重復應用。好比,假定我們將計數器設置為10。那末湊齊第一批1 0個線程後,計數器就會歸零,然後接著湊齊下一批10個線程
應用方法:
public CyclicBarrier(int parties, Runnable barrierAction)
barrierAction就是當計數器一次計數完成後,體系會履行的舉措
await()
表示圖:
上面舉個例子:
package test; import java.util.concurrent.CyclicBarrier; public class Test implements Runnable { private String soldier; private final CyclicBarrier cyclic; public Test(String soldier, CyclicBarrier cyclic) { this.soldier = soldier; this.cyclic = cyclic; } @Override public void run() { try { //期待一切兵士到齊 cyclic.await(); dowork(); //期待一切兵士完成任務 cyclic.await(); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } private void dowork() { // TODO Auto-generated method stub try { Thread.sleep(3000); } catch (Exception e) { // TODO: handle exception } System.out.println(soldier + ": done"); } public static class BarrierRun implements Runnable { boolean flag; int n; public BarrierRun(boolean flag, int n) { super(); this.flag = flag; this.n = n; } @Override public void run() { if (flag) { System.out.println(n + "個義務完成"); } else { System.out.println(n + "個聚集完成"); flag = true; } } } public static void main(String[] args) { final int n = 10; Thread[] threads = new Thread[n]; boolean flag = false; CyclicBarrier barrier = new CyclicBarrier(n, new BarrierRun(flag, n)); System.out.println("聚集"); for (int i = 0; i < n; i++) { System.out.println(i + "報導"); threads[i] = new Thread(new Test("兵士" + i, barrier)); threads[i].start(); } } }
打印成果:
聚集
0報導
1報導
2報導
3報導
4報導
5報導
6報導
7報導
8報導
9報導
10個聚集完成
兵士5: done
兵士7: done
兵士8: done
兵士3: done
兵士4: done
兵士1: done
兵士6: done
兵士2: done
兵士0: done
兵士9: done
10個義務完成
1.7 LockSupport
供給線程壅塞原語
和suspend相似
LockSupport.park();
LockSupport.unpark(t1);
與suspend比擬 不輕易惹起線程解凍
LockSupport的思惟呢,和 Semaphore有點類似,外部有一個允許,park的時刻拿失落這個允許,unpark的時刻請求這個允許。所以假如unpark在park之前,是不會產生線程解凍的。
上面的代碼是[高並發Java 二] 多線程基本中suspend示例代碼,在應用suspend時會產生逝世鎖。
package test; import java.util.concurrent.locks.LockSupport; public class Test { static Object u = new Object(); static TestSuspendThread t1 = new TestSuspendThread("t1"); static TestSuspendThread t2 = new TestSuspendThread("t2"); public static class TestSuspendThread extends Thread { public TestSuspendThread(String name) { setName(name); } @Override public void run() { synchronized (u) { System.out.println("in " + getName()); //Thread.currentThread().suspend(); LockSupport.park(); } } } public static void main(String[] args) throws InterruptedException { t1.start(); Thread.sleep(100); t2.start(); // t1.resume(); // t2.resume(); LockSupport.unpark(t1); LockSupport.unpark(t2); t1.join(); t2.join(); } }
而應用 LockSupport則不會產生逝世鎖。
別的
park()可以或許呼應中止,但不拋出異常。中止呼應的成果是,park()函數的前往,可以從Thread.interrupted()獲得中止標記。
在JDK傍邊有年夜量處所應用到了park,固然LockSupport的完成也是應用unsafe.park()來完成的。
public static void park() {
unsafe.park(false, 0L);
}
1.8 ReentrantLock 的完成
上面來引見下ReentrantLock的完成,ReentrantLock的完成重要由3部門構成:
ReentrantLock的父類中會有一個state變量來表現同步的狀況
/** * The synchronization state. */ private volatile int state;
經由過程CAS操作來設置state來獲得鎖,假如設置成了1,則將鎖的持有者給以後線程
final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); }
假如拿鎖不勝利,則會做一個請求
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
起首,再去請求下嘗嘗看tryAcquire,由於此時能夠另外一個線程曾經釋放了鎖。
假如照樣沒有請求到鎖,就addWaiter,意思是把本身加到期待隊列中去
private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; }
其間還會有屢次測驗考試去請求鎖,假如照樣請求不到,就會被掛起
private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); }
同理,假如在unlock操作中,就是釋放了鎖,然後unpark,這裡就不詳細講了。
2. 並發容器及典范源碼剖析
2.1 ConcurrentHashMap
我們曉得HashMap不是一個線程平安的容器,最簡略的方法使HashMap釀成線程平安就是應用
Collections.synchronizedMap,它是對HashMap的一個包裝
public static Map m=Collections.synchronizedMap(new HashMap());
同理關於List,Set也供給了類似辦法。
然則這類方法只合適於並發量比擬小的情形。
我們來看下synchronizedMap的完成
private final Map<K,V> m; // Backing Map final Object mutex; // Object on which to synchronize SynchronizedMap(Map<K,V> m) { if (m==null) throw new NullPointerException(); this.m = m; mutex = this; } SynchronizedMap(Map<K,V> m, Object mutex) { this.m = m; this.mutex = mutex; } public int size() { synchronized (mutex) {return m.size();} } public boolean isEmpty() { synchronized (mutex) {return m.isEmpty();} } public boolean containsKey(Object key) { synchronized (mutex) {return m.containsKey(key);} } public boolean containsValue(Object value) { synchronized (mutex) {return m.containsValue(value);} } public V get(Object key) { synchronized (mutex) {return m.get(key);} } public V put(K key, V value) { synchronized (mutex) {return m.put(key, value);} } public V remove(Object key) { synchronized (mutex) {return m.remove(key);} } public void putAll(Map<? extends K, ? extends V> map) { synchronized (mutex) {m.putAll(map);} } public void clear() { synchronized (mutex) {m.clear();} }
它會將HashMap包裝在外面,然後將HashMap的每一個操作都加上synchronized。
因為每一個辦法都是獲得統一把鎖(mutex),這就意味著,put和remove等操作是互斥的,年夜年夜削減了並發量。
上面來看下ConcurrentHashMap是若何完成的
public V put(K key, V value) { Segment<K,V> s; if (value == null) throw new NullPointerException(); int hash = hash(key); int j = (hash >>> segmentShift) & segmentMask; if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck (segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment s = ensureSegment(j); return s.put(key, hash, value, false); }
在 ConcurrentHashMap外部有一個Segment段,它將年夜的HashMap切分紅若干個段(小的HashMap),然後讓數據在每段上Hash,如許多個線程在分歧段上的Hash操作必定是線程平安的,所以只須要同步統一個段上的線程便可以了,如許完成了鎖的分別,年夜年夜增長了並發量。
在應用ConcurrentHashMap.size時會比擬費事,由於它要統計每一個段的數據和,在這個時刻,要把每個段都加上鎖,然後再做數據統計。這個就是把鎖分別後的小小弊病,然則size辦法應當是不會被高頻率挪用的辦法。
在完成上,不應用synchronized和lock.lock而是盡可能應用trylock,同時在HashMap的完成上,也做了一點優化。這裡就不提了。
2.2 BlockingQueue
BlockingQueue不是一個高機能的容器。然則它是一個異常好的同享數據的容器。是典范的臨盆者和花費者的完成。
表示圖:
詳細可以檢查Java完成臨盆者花費者成績與讀者寫者成績