程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
 程式師世界 >> 編程語言 >> JAVA編程 >> 關於JAVA >> Java concurrency之Condition條件_動力節點Java學院整理

Java concurrency之Condition條件_動力節點Java學院整理

編輯:關於JAVA

Condition介紹

Condition的作用是對鎖進行更精確的控制。Condition中的await()方法相當於Object的wait()方法,Condition中的signal()方法相當於Object的notify()方法,Condition中的signalAll()相當於Object的notifyAll()方法。不同的是,Object中的wait(),notify(),notifyAll()方法是和"同步鎖"(synchronized關鍵字)捆綁使用的;而Condition是需要與"互斥鎖"/"共享鎖"捆綁使用的。

Condition函數列表

// 造成當前線程在接到信號或被中斷之前一直處於等待狀態。
void await()
// 造成當前線程在接到信號、被中斷或到達指定等待時間之前一直處於等待狀態。
boolean await(long time, TimeUnit unit)
// 造成當前線程在接到信號、被中斷或到達指定等待時間之前一直處於等待狀態。
long awaitNanos(long nanosTimeout)
// 造成當前線程在接到信號之前一直處於等待狀態。
void awaitUninterruptibly()
// 造成當前線程在接到信號、被中斷或到達指定最後期限之前一直處於等待狀態。
boolean awaitUntil(Date deadline)
// 喚醒一個等待線程。
void signal()
// 喚醒所有等待線程。
void signalAll()

Condition示例

示例1是通過Object的wait(), notify()來演示線程的休眠/喚醒功能。

示例2是通過Condition的await(), signal()來演示線程的休眠/喚醒功能。

示例3是通過Condition的高級功能。

示例1

public class WaitTest1 {
  public static void main(String[] args) {
    ThreadA ta = new ThreadA("ta");
    synchronized(ta) { // 通過synchronized(ta)獲取“對象ta的同步鎖”
      try {
        System.out.println(Thread.currentThread().getName()+" start ta");
        ta.start();
        System.out.println(Thread.currentThread().getName()+" block");
        ta.wait();  // 等待
        System.out.println(Thread.currentThread().getName()+" continue");
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }
  }
  static class ThreadA extends Thread{
    public ThreadA(String name) {
      super(name);
    }
    public void run() {
      synchronized (this) { // 通過synchronized(this)獲取“當前對象的同步鎖”
        System.out.println(Thread.currentThread().getName()+" wakup others");
        notify();  // 喚醒“當前對象上的等待線程”
      }
    }
  }
}

示例2

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
public class ConditionTest1 {
  private static Lock lock = new ReentrantLock();
  private static Condition condition = lock.newCondition();
  public static void main(String[] args) {
    ThreadA ta = new ThreadA("ta");
    lock.lock(); // 獲取鎖
    try {
      System.out.println(Thread.currentThread().getName()+" start ta");
      ta.start();
      System.out.println(Thread.currentThread().getName()+" block");
      condition.await();  // 等待
      System.out.println(Thread.currentThread().getName()+" continue");
    } catch (InterruptedException e) {
      e.printStackTrace();
    } finally {
      lock.unlock();  // 釋放鎖
    }
  }
  static class ThreadA extends Thread{
    public ThreadA(String name) {
      super(name);
    }
    public void run() {
      lock.lock();  // 獲取鎖
      try {
        System.out.println(Thread.currentThread().getName()+" wakup others");
        condition.signal();  // 喚醒“condition所在鎖上的其它線程”
      } finally {
        lock.unlock();  // 釋放鎖
      }
    }
  }
}

運行結果:

main start ta
main block
ta wakup others
main continue

通過“示例1”和“示例2”,我們知道Condition和Object的方法有一下對應關系:

    Object Condition 
休眠     wait   await
喚醒個線程     notify signal
喚醒所有線程   notifyAll   signalAll

Condition除了支持上面的功能之外,它更強大的地方在於:能夠更加精細的控制多線程的休眠與喚醒。對於同一個鎖,我們可以創建多個Condition,在不同的情況下使用不同的Condition。

例如,假如多線程讀/寫同一個緩沖區:當向緩沖區中寫入數據之後,喚醒"讀線程";當從緩沖區讀出數據之後,喚醒"寫線程";並且當緩沖區滿的時候,"寫線程"需要等待;當緩沖區為空時,"讀線程"需要等待。         如果采用Object類中的wait(), notify(), notifyAll()實現該緩沖區,當向緩沖區寫入數據之後需要喚醒"讀線程"時,不可能通過notify()或notifyAll()明確的指定喚醒"讀線程",而只能通過notifyAll喚醒所有線程(但是notifyAll無法區分喚醒的線程是讀線程,還是寫線程)。  但是,通過Condition,就能明確的指定喚醒讀線程。

看看下面的示例3,可能對這個概念有更深刻的理解。 

示例3

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
class BoundedBuffer {
  final Lock lock = new ReentrantLock();
  final Condition notFull = lock.newCondition(); 
  final Condition notEmpty = lock.newCondition(); 
  final Object[] items = new Object[5];
  int putptr, takeptr, count;
  public void put(Object x) throws InterruptedException {
    lock.lock();  //獲取鎖
    try {
      // 如果“緩沖已滿”,則等待;直到“緩沖”不是滿的,才將x添加到緩沖中。
      while (count == items.length)
        notFull.await();
      // 將x添加到緩沖中
      items[putptr] = x; 
      // 將“put統計數putptr+1”;如果“緩沖已滿”,則設putptr為0。
      if (++putptr == items.length) putptr = 0;
      // 將“緩沖”數量+1
      ++count;
      // 喚醒take線程,因為take線程通過notEmpty.await()等待
      notEmpty.signal();
      // 打印寫入的數據
      System.out.println(Thread.currentThread().getName() + " put "+ (Integer)x);
    } finally {
      lock.unlock();  // 釋放鎖
    }
  }
  public Object take() throws InterruptedException {
    lock.lock();  //獲取鎖
    try {
      // 如果“緩沖為空”,則等待;直到“緩沖”不為空,才將x從緩沖中取出。
      while (count == 0) 
        notEmpty.await();
      // 將x從緩沖中取出
      Object x = items[takeptr]; 
      // 將“take統計數takeptr+1”;如果“緩沖為空”,則設takeptr為0。
      if (++takeptr == items.length) takeptr = 0;
      // 將“緩沖”數量-1
      --count;
      // 喚醒put線程,因為put線程通過notFull.await()等待
      notFull.signal();
      // 打印取出的數據
      System.out.println(Thread.currentThread().getName() + " take "+ (Integer)x);
      return x;
    } finally {
      lock.unlock();  // 釋放鎖
    }
  } 
}
public class ConditionTest2 {
  private static BoundedBuffer bb = new BoundedBuffer();
  public static void main(String[] args) {
    // 啟動10個“寫線程”,向BoundedBuffer中不斷的寫數據(寫入0-9);
    // 啟動10個“讀線程”,從BoundedBuffer中不斷的讀數據。
    for (int i=0; i<10; i++) {
      new PutThread("p"+i, i).start();
      new TakeThread("t"+i).start();
    }
  }
  static class PutThread extends Thread {
    private int num;
    public PutThread(String name, int num) {
      super(name);
      this.num = num;
    }
    public void run() {
      try {
        Thread.sleep(1);  // 線程休眠1ms
        bb.put(num);    // 向BoundedBuffer中寫入數據
      } catch (InterruptedException e) {
      }
    }
  }
  static class TakeThread extends Thread {
    public TakeThread(String name) {
      super(name);
    }
    public void run() {
      try {
        Thread.sleep(10);          // 線程休眠1ms
        Integer num = (Integer)bb.take();  // 從BoundedBuffer中取出數據
      } catch (InterruptedException e) {
      }
    }
  }
}

(某一次)運行結果:

p1 put  1
p4 put  4
p5 put  5
p0 put  0
p2 put  2
t0 take 1
p3 put  3
t1 take 4
p6 put  6
t2 take 5
p7 put  7
t3 take 0
p8 put  8
t4 take 2
p9 put  9
t5 take 3
t6 take 6
t7 take 7
t8 take 8
t9 take 9

結果說明:

(01) BoundedBuffer 是容量為5的緩沖,緩沖中存儲的是Object對象,支持多線程的讀/寫緩沖。多個線程操作“一個BoundedBuffer對象”時,它們通過互斥鎖lock對緩沖區items進行互斥訪問;而且同一個BoundedBuffer對象下的全部線程共用“notFull”和“notEmpty”這兩個Condition。

  notFull用於控制寫緩沖,notEmpty用於控制讀緩沖。當緩沖已滿的時候,調用put的線程會執行notFull.await()進行等待;當緩沖區不是滿的狀態時,就將對象添加到緩沖區並將緩沖區的容量count+1,最後,調用notEmpty.signal()緩沖notEmpty上的等待線程(調用notEmpty.await的線程)。 簡言之,notFull控制“緩沖區的寫入”,當往緩沖區寫入數據之後會喚醒notEmpty上的等待線程。

       同理,notEmpty控制“緩沖區的讀取”,當讀取了緩沖區數據之後會喚醒notFull上的等待線程。

(02) 在ConditionTest2的main函數中,啟動10個“寫線程”,向BoundedBuffer中不斷的寫數據(寫入0-9);同時,也啟動10個“讀線程”,從BoundedBuffer中不斷的讀數據。

(03) 簡單分析一下運行結果。

     1, p1線程向緩沖中寫入1。    此時,緩沖區數據:   | 1 |   |   |   |   |

     2, p4線程向緩沖中寫入4。    此時,緩沖區數據:   | 1 | 4 |   |   |   |

     3, p5線程向緩沖中寫入5。    此時,緩沖區數據:   | 1 | 4 | 5 |   |   |

     4, p0線程向緩沖中寫入0。    此時,緩沖區數據:   | 1 | 4 | 5 | 0 |   |

     5, p2線程向緩沖中寫入2。    此時,緩沖區數據:   | 1 | 4 | 5 | 0 | 2 |

     此時,緩沖區容量為5;緩沖區已滿!如果此時,還有“寫線程”想往緩沖中寫入數據,會調用put中的notFull.await()等待,直接緩沖區非滿狀態,才能繼續運行。

     6, t0線程從緩沖中取出數據1。此時,緩沖區數據:   |   | 4 | 5 | 0 | 2 |

     7, p3線程向緩沖中寫入3。    此時,緩沖區數據:   | 3 | 4 | 5 | 0 | 2 |

     8, t1線程從緩沖中取出數據4。此時,緩沖區數據:   | 3 |   | 5 | 0 | 2 |

     9, p6線程向緩沖中寫入6。    此時,緩沖區數據:   | 3 | 6 | 5 | 0 | 2 |
     ...

  1. 上一頁:
  2. 下一頁:
Copyright © 程式師世界 All Rights Reserved