一、概念
生產者與消費者問題是一個金典的多線程協作的問題.生產者負責生產產品,並將產品存放到倉庫;消費者從倉庫中獲取產品並消費。當倉庫滿時,生產者必須停止生產,直到倉庫有位置存放產品;當倉庫空時,消費者必須停止消費,直到倉庫中有產品。
解決生產者/消費者問題主要用到如下幾個技術:1.用線程模擬生產者,在run方法中不斷地往倉庫中存放產品。2.用線程模擬消費者,在run方法中不斷地從倉庫中獲取產品。3
.倉庫類保存產品,當產品數量為0時,調用wait方法,使得當前消費者線程進入等待狀態,當有新產品存入時,調用notify方法,喚醒等待的消費者線程。當倉庫滿時,調用wait方法,使得當前生產者線程進入等待狀態,當有消費者獲取產品時,調用notify方法,喚醒等待的生產者線程。
二、實例
package book.thread.product;
public class Consumer extends Thread{
private Warehouse warehouse;//消費者獲取產品的倉庫
private boolean running = false;//是否需要結束線程的標志位
public Consumer(Warehouse warehouse,String name){
super(name);
this.warehouse = warehouse;
}
public void start(){
this.running = true;
super.start();
}
public void run(){
Product product;
try {
while(running){
//從倉庫中獲取產品
product = warehouse.getProduct();
sleep(500);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//停止消費者線程
public void stopConsumer(){
synchronized(warehouse){
this.running = false;
warehouse.notifyAll();//通知等待倉庫的線程
}
}
//消費者線程是否在運行
public boolean isRunning(){
return running;
}
}
package book.thread.product;
public class Producer extends Thread{
private Warehouse warehouse;//生產者存儲產品的倉庫
private static int produceName = 0;//產品的名字
private boolean running = false;//是否需要結束線程的標志位
public Producer(Warehouse warehouse,String name){
super(name);
this.warehouse = warehouse;
}
public void start(){
this.running = true;
super.start();
}
public void run(){
Product product;
//生產並存儲產品
try {
while(running){
product = new Product((++produceName)+"");
this.warehouse.storageProduct(product);
sleep(300);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//停止生產者線程
public void stopProducer(){
synchronized(warehouse){
this.running = false;
//通知等待倉庫的線程
warehouse.notifyAll();
}
}
//生產者線程是否在運行
public boolean isRunning(){
return running;
}
}
package book.thread.product;
public class Product {
private String name;//產品名
public Product(String name){
this.name = name;
}
public String toString(){
return "Product-"+name;
}
}
package book.thread.product;
//產品的倉庫類,內部采用數組來表示循環隊列,以存放產品
public class Warehouse {
private static int CAPACITY = 11;//倉庫的容量
private Product[] products;//倉庫裡的產品
//[front,rear]區間的產品未被消費
private int front = 0;//當前倉庫中第一個未被消費的產品的下標
private int rear = 0;//倉庫中最後一個未被消費的產品下標加1
public Warehouse(){
this.products = new Product[CAPACITY];
}
public Warehouse(int capacity){
this();
if(capacity > 0){
CAPACITY = capacity +1;
this.products = new Product[CAPACITY];
}
}
//從倉庫獲取一個產品
public Product getProduct() throws InterruptedException{
synchronized(this){
boolean consumerRunning = true;//標志消費者線程是否還在運行
Thread currentThread = Thread.currentThread();//獲取當前線程
if(currentThread instanceof Consumer){
consumerRunning = ((Consumer)currentThread).isRunning();
}else{
return null;//非消費者不能獲取產品
}
//若消費者線程在運行中,但倉庫中沒有產品了,則消費者線程繼續等待
while((front==rear) && consumerRunning){
wait();
consumerRunning = ((Consumer)currentThread).isRunning();
}
//如果消費者線程已經停止運行,則退出該方法,取消獲取產品
if(!consumerRunning){
return null;
}
//獲取當前未被消費的第一個產品
Product product = products[front];
System.out.println("Consumer[" + currentThread.getName()+"] getProduct:"+product);
//將當前未被消費產品的下標後移一位,如果到了數組末尾,則移動到首部
front = (front+1+CAPACITY)%CAPACITY;
System.out.println("倉庫中還沒有被消費的產品數量:"+(rear+CAPACITY-front)%CAPACITY);
//通知其他等待線程
notify();
return product;
}
}
//向倉庫存儲一個產品
public void storageProduct(Product product) throws InterruptedException{
synchronized(this){
boolean producerRunning = true;//標志生產者線程是否在運行
Thread currentThread = Thread.currentThread();
if(currentThread instanceof Producer){
producerRunning = ((Producer)currentThread).isRunning();
}else{
return;
}
//如果最後一個未被消費的產品與第一個未被消費的產品的下標緊挨著,則說明沒有存儲空間了。
//如果沒有存儲空間了,而生產者線程還在運行,則生產者線程等待倉庫釋放產品
while(((rear+1)%CAPACITY == front) && producerRunning){
wait();
producerRunning = ((Producer)currentThread).isRunning();
}
//如果生產線程已經停止運行了,則停止產品的存儲
if(!producerRunning){
return;
}
//保存產品到倉庫
products[rear] = product;
System.out.println("Producer[" + Thread.currentThread().getName()+"] storageProduct:" + product);
//將rear下標循環後移一位
rear = (rear + 1)%CAPACITY;
System.out.println("倉庫中還沒有被消費的產品數量:"+(rear + CAPACITY -front)%CAPACITY);
notify();
}
}
}
package book.thread.product;
public class TestProduct {
public static void main(String[] args) {
Warehouse warehouse = new Warehouse(10);//建立一個倉庫,容量為10
//建立生產者線程和消費者
Producer producers1 = new Producer(warehouse,"producer-1");
Producer producers2 = new Producer(warehouse,"producer-2");
Producer producers3 = new Producer(warehouse,"producer-3");
Consumer consumer1 = new Consumer(warehouse,"consumer-1");
Consumer consumer2 = new Consumer(warehouse,"consumer-2");
Consumer consumer3 = new Consumer(warehouse,"consumer-3");
Consumer consumer4 = new Consumer(warehouse,"consumer-4");
//啟動生產者線程和消費者線程
producers1.start();
producers2.start();
consumer1.start();
producers3.start();
consumer2.start();
consumer3.start();
consumer4.start();
//讓生產者/消費者程序運行1600ms
try {
Thread.sleep(1600);
} catch (InterruptedException e) {
e.printStackTrace();
}
//停止消費者線程
producers1.stopProducer();
consumer1.stopConsumer();
producers2.stopProducer();
consumer2.stopConsumer();
producers3.stopProducer();
consumer3.stopConsumer();
consumer4.stopConsumer();
}
}
輸出結果:
Producer[producer-1] storageProduct:Product-1
倉庫中還沒有被消費的產品數量:1
Consumer[consumer-2] getProduct:Product-1
倉庫中還沒有被消費的產品數量:0
Producer[producer-3] storageProduct:Product-3
倉庫中還沒有被消費的產品數量:1
Producer[producer-2] storageProduct:Product-2
倉庫中還沒有被消費的產品數量:2
Consumer[consumer-3] getProduct:Product-3
倉庫中還沒有被消費的產品數量:1
Consumer[consumer-1] getProduct:Product-2
倉庫中還沒有被消費的產品數量:0
Producer[producer-1] storageProduct:Product-4
倉庫中還沒有被消費的產品數量:1
Consumer[consumer-4] getProduct:Product-4
倉庫中還沒有被消費的產品數量:0
Producer[producer-3] storageProduct:Product-6
倉庫中還沒有被消費的產品數量:1
Producer[producer-2] storageProduct:Product-5
倉庫中還沒有被消費的產品數量:2
Consumer[consumer-1] getProduct:Product-6
倉庫中還沒有被消費的產品數量:1
Consumer[consumer-2] getProduct:Product-5
倉庫中還沒有被消費的產品數量:0
Producer[producer-1] storageProduct:Product-7
倉庫中還沒有被消費的產品數量:1
Consumer[consumer-3] getProduct:Product-7
倉庫中還沒有被消費的產品數量:0
Producer[producer-3] storageProduct:Product-8
倉庫中還沒有被消費的產品數量:1
Producer[producer-2] storageProduct:Product-9
倉庫中還沒有被消費的產品數量:2
Consumer[consumer-4] getProduct:Product-8
倉庫中還沒有被消費的產品數量:1
Producer[producer-1] storageProduct:Product-10
倉庫中還沒有被消費的產品數量:2
Producer[producer-3] storageProduct:Product-11
倉庫中還沒有被消費的產品數量:3
Producer[producer-2] storageProduct:Product-12
倉庫中還沒有被消費的產品數量:4
Consumer[consumer-1] getProduct:Product-9
倉庫中還沒有被消費的產品數量:3
Consumer[consumer-2] getProduct:Product-10
倉庫中還沒有被消費的產品數量:2
Consumer[consumer-3] getProduct:Product-11
倉庫中還沒有被消費的產品數量:1
Producer[producer-3] storageProduct:Product-13
倉庫中還沒有被消費的產品數量:2
Producer[producer-1] storageProduct:Product-14
倉庫中還沒有被消費的產品數量:3
Producer[producer-2] storageProduct:Product-15
倉庫中還沒有被消費的產品數量:4
Consumer[consumer-4] getProduct:Product-12
倉庫中還沒有被消費的產品數量:3
Consumer[consumer-1] getProduct:Product-13
倉庫中還沒有被消費的產品數量:2
Consumer[consumer-2] getProduct:Product-14
倉庫中還沒有被消費的產品數量:1
Producer[producer-1] storageProduct:Product-16
倉庫中還沒有被消費的產品數量:2
Producer[producer-3] storageProduct:Product-17
倉庫中還沒有被消費的產品數量:3
Producer[producer-2] storageProduct:Product-18
倉庫中還沒有被消費的產品數量:4
分析:在main方法中建立了一個產品倉庫,並未該倉庫關聯了3個生產者線程和4個消費者線程,啟動這些線程,使生產 者/消費者模型運作起來,當程序運行1600ms時,所有的生產者停止生產產品,消費者停止消費產品。
生產者線程Product在run方法中沒300ms便生產一個產品,並存入倉庫;消費者線程Consumer在run方法中沒500ms便從倉庫中取一個產品。
倉庫類Warehouse負責存放產品和發放產品。storageProduct方法負責存儲產品,當倉庫滿時,當前線程進入等待狀態,即如果生產者線程A在調用storageProduct方法以存儲產品時,發現倉庫已滿,無法存儲時,便會進入等待狀態。當存儲產品成功時,調用notify方法,喚醒等待的消費者線程。
getProduct方法負責提前產品,當倉庫空時,當前線程進入等待狀態,即如果消費者線程B在調用getProduct方法以獲取產品時,發現倉庫空了,便會進入等待狀態。當提取產品成功時,調用notify方法,喚醒等待的生產者線程。