程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
 程式師世界 >> 編程語言 >> JAVA編程 >> JAVA編程入門知識 >> java並發:阻塞隊列

java並發:阻塞隊列

編輯:JAVA編程入門知識

第一節 阻塞隊列

1.1 初識阻塞隊列

  隊列以一種先進先出的方式管理數據,阻塞隊列(BlockingQueue)是一個支持兩個附加操作的隊列,這兩個附加的操作是:在隊列為空時,獲取元素的線程會等待隊列變為非空;當隊列滿時,存儲元素的線程會等待隊列可用。在多線程進行合作時,阻塞隊列是很有用的工具。

  生產者-消費者模式:阻塞隊列常用於生產者和消費者的場景,生產者線程可以定期的把中間結果存到阻塞隊列中,而消費者線程把中間結果取出並在將來修改它們。隊列會自動平衡負載,如果生產者線程集運行的比消費者線程集慢,則消費者線程集在等待結果時就會阻塞;如果生產者線程集運行的快,那麼它將等待消費者線程集趕上來。

    

  簡單解說一下如何理解上表,比如說阻塞隊列的插入方法,add(e)、offer(e)、put(e)等均為阻塞隊列的插入方法,但它們的處理方式不一樣,add(e)方法可能會拋出異常,而put(e)方法則可能一直處於阻塞狀態,下面解說一下這些處理方式:

  A、拋出異常:所謂拋出異常是指當阻塞隊列滿時,再往隊列裡插入元素,會拋出IllegalStateException("Queue full")異常;當隊列為空時,從隊列裡獲取元素時會拋出NoSuchElementException異常 。

  B、返回特殊值:插入方法,該方法會返回是否成功,成功則返回true;移除方法,該方法是從隊列裡拿出一個元素,如果沒有則返回null

  C、一直阻塞:當阻塞隊列滿時,如果生產者線程往隊列裡put元素,隊列會一直阻塞生產者線程,直到將數據放入隊列或是響應中斷退出;當隊列為空時,消費者線程試圖從隊列裡take元素,隊列也會一直阻塞消費者線程,直到隊列可用。

  D、超時退出:當阻塞隊列滿時,隊列會阻塞生產者線程一段時間,如果超過一定的時間,生產者線程就會退出。

 

1.2 Java中的阻塞隊列

  java.util.concurrent包提供了幾種不同形式的阻塞隊列,如數組阻塞隊列ArrayBlockingQueue、鏈表阻塞隊列LinkedBlockingQueue、優先級阻塞隊列PriorityBlockingQueue和延時隊列DelayQueue等,下面簡單介紹一下這幾個阻塞隊列:

  數組阻塞隊列:ArrayBlockingQueue是一個由數組支持的有界阻塞隊列,內部維持著一個定長的數據緩沖隊列(該隊列由數組構成),此隊列按照先進先出(FIFO)的原則對元素進行排序,在構造時需要給定容量。ArrayBlockingQueue內部還保存著兩個整形變量,分別標識著隊列的頭部和尾部在數組中的位置。

  對於數組阻塞隊列,可以選擇是否需要公平性,所謂公平訪問隊列是指阻塞的所有生產者線程或消費者線程,當隊列可用時,可以按照阻塞的先後順序訪問隊列,即先阻塞的生產者線程,可以先往隊列裡插入元素,先阻塞的消費者線程,可以先從隊列裡獲取元素。通常,公平性會使你在性能上付出代價,只有在的確非常需要的時候再使用它。

  我們可以使用以下代碼創建一個公平的阻塞隊列:

ArrayBlockingQueue fairQueue = new ArrayBlockingQueue(1000,true);

  數組阻塞隊列的公平性是使用可重入鎖實現的,其構造函數代碼如下:

public ArrayBlockingQueue(int capacity, boolean fair) {
    if (capacity <= 0)
      throw new IllegalArgumentException();
    this.items = new Object[capacity];
    lock = new ReentrantLock(fair);
    notEmpty = lock.newCondition();
    notFull = lock.newCondition();
}

  鏈表阻塞隊列:LinkedBlockingQueue基於鏈表的有界阻塞隊列,內部維持著一個數據緩沖隊列(該隊列由鏈表構成),此隊列按照先進先出的原則對元素進行排序。當生產者往隊列中放入一個數據時,隊列會從生產者手中獲取數據,並緩存在隊列內部,而生產者立即返回;只有當隊列緩沖區達到最大值緩存容量時(可以通過LinkedBlockingQueue的構造函數指定該值),才會阻塞生產者隊列,直到消費者從隊列中消費掉一份數據,生產者線程將會被喚醒,反之對於消費者這端的處理也基於同樣的原理。需要注意的是,如果構造一個LinkedBlockingQueue對象,而沒有指定其容量大小,LinkedBlockingQueue會默認一個類似無限大小(Integer.Max_VALUE)的容量,這樣的話,如果生產者的速度一旦大於消費者的速度,也許還沒有等到隊列滿阻塞產生,系統內存就有可能已經被消耗殆盡了。

  LinkedBlockingQueue之所以能夠高效的處理並發數據,是因為其對於生產者端和消費者端分別采用了獨立的鎖來控制數據同步,這也意味著在高並發的情況下生產者和消費者可以並行地操作隊列中的數據,以此來提高整個隊列的並發性能。

  優先級阻塞隊列:PriorityBlockingQueue是一個支持優先級排序的無界阻塞隊列,默認情況下元素采取自然順序排列,也可以通過構造函數傳入的Compator對象來決定。在實現PriorityBlockingQueue時,內部控制線程同步的鎖采用的是公平鎖。需要注意的是PriorityBlockingQueue並不會阻塞數據生產者,而只是在沒有可消費的數據時阻塞數據的消費者,因此使用的時候要特別注意,生產者生產數據的速度絕對不能快於消費者消費數據的速度,否則時間一長,會最終耗盡所有的可用堆內存空間。

  延時隊列:DelayQueue是一個支持延時獲取元素的使用優先級隊列實現的無界阻塞隊列。隊列中的元素必須實現Delayed接口和Comparable接口(用以指定元素的順序),也就是說DelayQueue裡面的元素必須有public void compareTo(To)和long getDelay(TimeUnit unit)方法存在;在創建元素時可以指定多久才能從隊列中獲取當前元素,只有在延遲期滿時才能從隊列中提取元素。

  鏈表雙向阻塞隊列:LinkedBlockingDeque是由一個鏈表結構組成的雙向阻塞隊列。所謂雙向隊列指的是你可以從隊列的兩端插入和移出元素,雙端隊列因多了一個操作入口,在多線程同時入隊時減少了一半的競爭。在初始化LinkedBlockingDeque時,可以設置容量,防止其過渡膨脹,相比其他的阻塞隊列,LinkedBlockingDeque多了addFirst,addLast,offerFirst,offerLast,peekFirst,peekLast等方法,以First單詞結尾的方法,表示插入,獲取(peek)或移除雙端隊列的第一個元素;以Last單詞結尾的方法,表示插入,獲取或移除雙端隊列的最後一個元素;插入方法add等同於addLast,移除方法remove等同於removeFirst。雙向阻塞隊列可以運用在“工作竊取”模式中。

  鏈表傳輸隊列:LinkedTransferQueue是一個由鏈表結構組成的無界傳輸阻塞隊列,相對於其他阻塞隊列,LinkedTransferQueue多了tryTransfer()方法和transfer()方法。

  transfer()方法:如果當前有消費者正在等待接收元素(消費者使用take()方法或帶時間限制的poll()方法),transfer()方法可以把生產者傳入的元素立刻傳輸給消費者;如果沒有消費者在等待接收元素,transfer()方法會將元素存放到隊列的tail節點,並等到該元素被消費者消費了才返回。

  transfer()方法的關鍵代碼如下:

Node pred = tryAppend(s, haveData);
return awaitMatch(s, pred, e, (how == TIMED), nanos);

  代碼解說:第一行代碼是試圖把存放當前元素的s節點作為tail節點,第二行代碼是讓CPU自旋等待消費者消費元素。因為自旋會消耗CPU,所以自旋一定的次數後使用Thread.yield()方法來暫停當前正在執行的線程,並執行其他線程。

  tryTransfer()方法:該方法是用來試探生產者傳入的元素是否能直接傳給消費者,如果沒有消費者等待接收元素,則返回false。與transfer()方法的區別:tryTransfer()方法是立即返回(無論消費者是否接收),transfer()方法是必須等到消費者消費了才返回。對於帶有時間限制的tryTransfer(E e, long timeout, TimeUnit unit)方法,則是試圖把生產者傳入的元素直接傳給消費者,但是如果沒有消費者消費該元素則等待指定的時間之後再返回,如果超時還沒消費元素,則返回false,如果在超時時間內消費了元素,則返回true。

  欲了解LinkedTransferQueue的更多內容,可查看以下文章:http://ifeve.com/java-transfer-queue/、http://www.tuicool.com/articles/ZFriEz、http://guojuanjun.blog.51cto.com/277646/948298/,本文不細述。

  SynchronousQueue:SynchronousQueue是一種無界、無緩沖的阻塞隊列,可以認為SynchronousQueue是一個緩存值為1的阻塞隊列,但是SynchronousQueue內部並沒有數據緩存空間,數據是在配對的生產者和消費者線程之間直接傳遞的。可以這樣來理解:SynchronousQueue是一個傳球手,SynchronousQueue不存儲數據元素,隊列頭元素是第一個排隊要插入數據的線程,而不是要交換的數據,SynchronousQueue負責把生產者線程處理的數據直接傳遞給消費者線程,生產者和消費者互相等待對方,握手,然後一起離開。SynchronousQueue的吞吐量高於LinkedBlockingQueue 和 ArrayBlockingQueue。

 

1.4 詳解SynchronousQueue

【本小節主要摘自參考資料,作為學習筆記O(∩_∩)O】

(1)認識SynchronousQueue 

  SynchronousQueue的isEmpty()方法永遠返回true,remainingCapacity()方法永遠返回0,remove()和removeAll() 方法永遠返回false,iterator()方法永遠返回null,peek()方法永遠返回null,故我們不能通過調用peek()方法來看隊列中是否有數據元素,因為數據元素只有當你試著取走的時候才可能存在,不取走而只想偷窺一下是不行的,同樣遍歷這個隊列的操作也是不允許的。

  SynchronousQueue的一個使用場景是在線程池裡,Executors.newCachedThreadPool()就使用了SynchronousQueue,這個線程池根據需要創建新的線程(新任務到來時),當然如果有空閒線程的話,則會復用這些線程。

(2)SynchronousQueue實現機制

  阻塞算法的實現通常是在內部采用一個鎖來保證在多個線程中的put()和take()方法是串行執行的,如下代碼是一般put()和take()方法的實現:

public class NativeSynchronousQueue<E> {
    boolean putting = false;
    E item = null;

    public synchronized E take() throws InterruptedException {
        while (item == null)
            wait();
        E e = item;
        item = null;
        notifyAll();
        return e;
    }

    public synchronized void put(E e) throws InterruptedException {
        if (e==null) return;
        while (putting)
            wait();
        putting = true;
        item = e;
        notifyAll();
        while (item!=null)
            wait();
        putting = false;
        notifyAll();
    }
}

  經典同步隊列的實現采用了三個信號量,代碼如下:

public class SemaphoreSynchronousQueue<E> {
    E item = null;
    Semaphore sync = new Semaphore(0);
    Semaphore send = new Semaphore(1);
    Semaphore recv = new Semaphore(0);

    public E take() throws InterruptedException {
        recv.acquire();
        E x = item;
        sync.release();
        send.release();
        return x;
    }

    public void put (E x) throws InterruptedException{
        send.acquire();
        item = x;
        recv.release();
        sync.acquire();
    }
}

  Java5中SynchronousQueue的實現相對來說做了一些優化,它只使用了一個鎖,使用隊列代替信號量,允許發布者直接發布數據,而不是要首先從阻塞在信號量處被喚醒,代碼如下:

public class Java5SynchronousQueue<E> {
    ReentrantLock qlock = new ReentrantLock();
    Queue waitingProducers = new Queue();
    Queue waitingConsumers = new Queue();

    static class Node extends AbstractQueuedSynchronizer {
        E item;
        Node next;

        Node(Object x) { item = x; }
        void waitForTake() { /* (uses AQS) */ }
           E waitForPut() { /* (uses AQS) */ }
    }

    public E take() {
        Node node;
        boolean mustWait;
        qlock.lock();
        node = waitingProducers.pop();
        if(mustWait = (node == null))
           node = waitingConsumers.push(null);
         qlock.unlock();

        if (mustWait)
           return node.waitForPut();
        else
            return node.item;
    }

    public void put(E e) {
         Node node;
         boolean mustWait;
         qlock.lock();
         node = waitingConsumers.pop();
         if (mustWait = (node == null))
             node = waitingProducers.push(e);
         qlock.unlock();

         if (mustWait)
             node.waitForTake();
         else
            node.item = e;
    }
}

  Java6中SynchronousQueue的實現采用了一種性能更好的無鎖算法——擴展的“Dual stack and Dual queue”算法,性能比Java5的實現有較大提升。

  聲明一個SynchronousQueue有兩種不同的方式,支持公平和非公平兩種競爭機制,它們之間有著不太一樣的行為,公平模式和非公平模式的區別:如果采用公平模式,SynchronousQueue會采用公平鎖,並配合一個FIFO隊列來阻塞多余的生產者和消費者;如果是非公平模式(SynchronousQueue默認),SynchronousQueue會采用非公平鎖,同時配合一個LIFO隊列來管理多余的生產者和消費者。兩者性能相當,一般情況下,Fifo通常可以支持更大的吞吐量,但Lifo可以更大程度的保持線程的本地化,需要注意的是,若采用非公平模式,如果生產者和消費者的處理速度有差距,則很容易出現饑渴的情況(可能某些生產者或者消費者的數據永遠都得不到處理)。

(3)參考資料

(1)http://ifeve.com/java-synchronousqueue/

(2)http://blog.itpub.net/29644969/viewspace-1169051/

 

 

第二節 使用示例

2.1 生產者-消費者示例

一個生產者-N個消費者,程序功能:在一個目錄及它的所有子目錄下搜索所有文件,打印出包含指定關鍵字的文件列表。

package com.test;

import java.io.*;
import java.util.*;
import java.util.concurrent.*;

public class BlockingQueueTest {
    public static void main(String[] args) {
        Scanner in = new Scanner(System.in);
        System.out.print("Enter base directory (e.g. /usr/local/jdk1.6.0/src): ");
        String directory = in.nextLine();
        System.out.print("Enter keyword (e.g. volatile): ");
        String keyword = in.nextLine();
        final int FILE_QUEUE_SIZE = 10;
        final int SEARCH_THREADS = 100;
        BlockingQueue<File> queue = new ArrayBlockingQueue<File>(FILE_QUEUE_SIZE);
        FileEnumerationTask enumerator = new FileEnumerationTask(queue,new File(directory));
        new Thread(enumerator).start();
        for (int i = 1; i <= SEARCH_THREADS; i++)
            new Thread(new SearchTask(queue, keyword)).start();
    }
}

/**
 * This task enumerates all files in a directory and its subdirectories.
 */
class FileEnumerationTask implements Runnable {
    /**
     * Constructs a FileEnumerationTask.
     * 
     * @param queue
     *            the blocking queue to which the enumerated files are added
     * @param startingDirectory
     *            the directory in which to start the enumeration
     */
    public FileEnumerationTask(BlockingQueue<File> queue, File startingDirectory) {
        this.queue = queue;
        this.startingDirectory = startingDirectory;
    }

    public void run() {
        try {
            enumerate(startingDirectory);
            queue.put(DUMMY);
        } catch (InterruptedException e) {
        }
    }

    /**
     * Recursively enumerates all files in a given directory and its
     * subdirectories
     * 
     * @param directory
     *            the directory in which to start
     */
    public void enumerate(File directory) throws InterruptedException {
        File[] files = directory.listFiles();
        for (File file : files) {
            if (file.isDirectory())
                enumerate(file);
            else
                queue.put(file);
        }
    }

    public static File DUMMY = new File("");
    private BlockingQueue<File> queue;
    private File startingDirectory;
}

/**
 * This task searches files for a given keyword.
 */
class SearchTask implements Runnable {
    /**
     * Constructs a SearchTask.
     * 
     * @param queue
     *            the queue from which to take files
     * @param keyword
     *            the keyword to look for
     */
    public SearchTask(BlockingQueue<File> queue, String keyword) {
        this.queue = queue;
        this.keyword = keyword;
    }

    public void run() {
        try {
            boolean done = false;
            while (!done) {
                File file = queue.take();
                if (file == FileEnumerationTask.DUMMY) {
                    queue.put(file);
                    done = true;
                } else
                    search(file);
            }
        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
        }
    }

    /**
     * Searches a file for a given keyword and prints all matching lines.
     * 
     * @param file
     *            the file to search
     */
    public void search(File file) throws IOException {
        Scanner in = new Scanner(new FileInputStream(file));
        int lineNumber = 0;
        while (in.hasNextLine()) {
            lineNumber++;
            String line = in.nextLine().trim();
            if (line.contains(keyword))
                System.out.printf("%s:%d    %s%n", file.getPath(), lineNumber,
                        line);
        }
        in.close();
    }

    private BlockingQueue<File> queue;
    private String keyword;
}

解說:上述程序展示了如何使用阻塞隊列來控制線程集,生產者線程枚舉在所有子目錄下的所有文件並把它們放到一個阻塞隊列中,同時我們還啟動了大量的搜索線程,每個搜索線程從隊列中取出一個文件,打開它,打印出包含關鍵字的所有行,然後取出下一個文件。

  在上述代碼中,我們使用了一個小技巧來在工作結束後終止線程,為了發出完成信號,枚舉線程把一個虛擬對象放入隊列,當搜索線程取到這個虛擬對象時,就將其放回並終止(這類似於在行李輸送帶上放一個寫著“最後一個包”的虛擬包)。

注意:在這個程序中,我們使用的是ArrayBlockingQueue,使用隊列數據結構作為一種同步機制,這裡不需要人任何顯示的線程同步。

對比分析:

  ArrayBlockingQueue在生產者放入數據和消費者獲取數據,都是共用同一個鎖對象,由此也意味著兩者無法真正並行運行,這點尤其不同於LinkedBlockingQueue;按照實現原理來分析,ArrayBlockingQueue完全可以采用分離鎖,從而實現生產者和消費者操作的完全並行運行。Doug Lea之所以沒這樣去做,也許是因為ArrayBlockingQueue的數據寫入和獲取操作已經足夠輕巧,以至於引入獨立的鎖機制,除了給代碼帶來額外的復雜性外,其在性能上完全占不到任何便宜。

  ArrayBlockingQueue和LinkedBlockingQueue間還有一個明顯的不同之處在於,前者在插入或刪除元素時不會產生或銷毀任何額外的對象實例,而後者則會生成一個額外的Node對象,這在長時間內需要高效並發地處理大批量數據的系統中,其對於GC的影響還是存在一定的區別。

 

2.2 DelayQueue使用示例

我們可以將延時隊列DelayQueue運用在以下場景中:

  (1)緩存系統的設計:可以用DelayQueue保存緩存元素的有效期,使用一個線程循環查詢DelayQueue,一旦能從DelayQueue中獲取元素時,表示緩存有效期到了。

  (2)定時任務調度:使用DelayQueue保存當天將要執行的任務和執行時間,一旦從DelayQueue中獲取到任務就開始執行任務,比如TimerQueue就是使用DelayQueue實現的。

DelayQueue使用實例如下:

(1)實現一個Student對象作為DelayQueue的元素,Student必須實現Delayed接口的兩個方法

package com.test;

import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

public class Student implements Delayed {//必須實現Delayed接口
    
    private String name;
    private long submitTime;// 交卷時間
    private long workTime;// 考試時間

    public Student(String name, long submitTime) {
        this.name = name;
        this.workTime = submitTime;
        this.submitTime = TimeUnit.NANOSECONDS.convert(submitTime, TimeUnit.MILLISECONDS) + System.nanoTime();
        System.out.println(this.name + " 交卷,用時" + workTime);
    }

    public String getName() {
        return this.name + " 交卷,用時" + workTime;
    }
    
    //必須實現getDelay方法
    public long getDelay(TimeUnit unit) {
        //返回一個延遲時間
        return unit.convert(submitTime - System.nanoTime(), unit.NANOSECONDS);
    }

    //必須實現compareTo方法
    public int compareTo(Delayed o) {
        Student that = (Student) o;
        return submitTime > that.submitTime ? 1 : (submitTime < that.submitTime ? -1 : 0);
    }

}

(2)主線程程序

package com.test;
import java.util.concurrent.DelayQueue;
public class DelayQueueTest {
    public static void main(String[] args) throws Exception {
        
        // 新建一個等待隊列
        final DelayQueue<Student> bq = new DelayQueue<Student>();
for (int i = 0; i < 5; i++) {
            Student student = new Student("學生"+i,Math.round((Math.random()*10+i)));
            bq.put(student); // 將數據存到隊列裡!
        }
        //獲取但不移除此隊列的頭部;如果此隊列為空,則返回 null。
        System.out.println(bq.peek().getName());
    }
}

上述程序運行結果如下:

學生0 交卷,用時8
學生1 交卷,用時9
學生2 交卷,用時4
學生3 交卷,用時9
學生4 交卷,用時12
學生2 交卷,用時4

 

 

第三節 使用阻塞式隊列處理大數據

 鄙人暫時還沒有研究這部分內容,此處僅貼出兩個資源,以供後續學習

(1)http://blog.csdn.net/lifetragedy/article/details/50593588

(2)http://download.csdn.net/detail/lifetragedy/9419773

 

第四節 參考資料

(1)http://www.cnblogs.com/dolphin0520/p/3932906.html

  1. 上一頁:
  2. 下一頁:
Copyright © 程式師世界 All Rights Reserved