程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
 程式師世界 >> 編程語言 >> JAVA編程 >> 關於JAVA >> 關於java.util.concurrent您不知道的 5 件事,第1部分

關於java.util.concurrent您不知道的 5 件事,第1部分

編輯:關於JAVA

通過並發 Collections 進行多線程編程

Concurrent Collections 是 Java™ 5 的巨大附加產品,但是在關於注釋和泛型的爭 執中很多 Java 開發人員忽視了它們。此外(或者更老實地說),許多開發人員避免使用這個 數據包,因為他們認為它一定很復雜,就像它所要解決的問題一樣。

事實上,java.util.concurrent 包含許多類,能夠有效解決普通的並發問題,無需復雜工 序。閱讀本文,了解 java.util.concurrent 類,比如 CopyOnWriteArrayList 和 BlockingQueue 如何幫助您解決多線程編程的棘手問題。

1. TimeUnit

盡管本質上 不是 Collections 類,但 java.util.concurrent.TimeUnit 枚舉讓代碼更易讀懂。使用 TimeUnit 將使用您的方法或 API 的開發人員從毫秒的 “暴政” 中解放出來。

TimeUnit 包括所有時間 單位,從 MILLISECONDS 和 MICROSECONDS 到 DAYS 和 HOURS,這就意味著它能夠處理一個開 發人員所需的幾乎所有的時間范圍類型。同時,因為在列舉上聲明了轉換方法,在時間加快時 ,將 HOURS 轉換回 MILLISECONDS 甚至變得更容易。

2. CopyOnWriteArrayList

創建數組的全新副本是過於昂貴的操作,無論是從時間上,還是 從記憶開銷上,因此在通常使用中很少考慮;開發人員往往求助於使用同步的 ArrayList。然 而,這也是一個成本較高的選擇,因為每當您跨集合內容進行迭代時,您就不得不同步所有操 作,包括讀和寫,以此保證一致性。

這又讓成本結構退回到這樣一個場景:很多讀者都 在閱讀 ArrayList,但是幾乎沒人會去修改它。

CopyOnWriteArrayList 是個巧妙的小 寶貝,能解決這一問題。它的 Javadoc 將 CopyOnWriteArrayList 定義為一個 “ArrayList 的線程安全變體,在這個變體中所有易變操作(添加,設置等)可以通過復 制全新的數組來實現”。

集合從內部將它的內容復制到一個沒有修改的新數組,這樣讀者訪問數組內容時就不會產生 同步成本(因為他們從來不是在易變數據上操作)。

本質上講,CopyOnWriteArrayList 對處理 ArrayList 讓我們失敗這種場景是很理想的:讀 取頻繁,但很少有寫操作的集合,例如 JavaBean 事件的 Listeners。

3. BlockingQueue

BlockingQueue 界面表示它是一個 Queue,意思是它的項以先入先出(FIFO)順序存儲。在 特定順序插入的項以相同的順序檢索 — 但是需要附加保證,從空隊列檢索一個項的任何嘗試 都會阻塞調用線程,直到這個項准備好被檢索。同理,想要將一個項插入到滿隊列的嘗試也會 導致阻塞調用線程,直到隊列的存儲空間可用。

BlockingQueue 干淨利落地解決了如何將一個線程收集的項“傳遞”給另一線程用於處理的 問題,無需考慮同步問題。Java Tutorial 的 Guarded Blocks 試用版就是一個很好的例子。 它構建一個單插槽綁定的緩存,當新的項可用,而且插槽也准備好接受新的項時,使用手動同 步和 wait()/notifyAll() 在線程之間發信。

盡管 Guarded Blocks 教程中的代碼有效,但是它耗時久,混亂,而且也並非完全直觀。退 回到 Java 平台較早的時候,沒錯,Java 開發人員不得不糾纏於這種代碼;但現在是 2010 年 — 情況難道沒有改善?

清單 1 顯示了 Guarded Blocks 代碼的重寫版,其中我使用了一個 ArrayBlockingQueue, 而不是手寫的 Drop。

清單 1. BlockingQueue

import java.util.*;
import java.util.concurrent.*;

class Producer
   implements Runnable
{
   private BlockingQueue<String> drop;
   List<String> messages = Arrays.asList(
     "Mares eat oats",
     "Does eat oats",
     "Little lambs eat ivy",
     "Wouldn't you eat ivy too?");

   public Producer(BlockingQueue<String> d) { this.drop = d; }

   public void run()
   {
     try
     {
       for (String s : messages)
         drop.put(s);
       drop.put("DONE");
     }
     catch (InterruptedException intEx)
     {
       System.out.println("Interrupted! " +
         "Last one out, turn out the lights!");
     }
   }
}

class Consumer
   implements Runnable
{
   private BlockingQueue<String> drop;
   public Consumer(BlockingQueue<String> d) { this.drop = d; }

   public void run()
   {
     try
     {
       String msg = null;
       while (!((msg = drop.take()).equals("DONE")))
         System.out.println(msg);
     }
     catch (InterruptedException intEx)
     {
       System.out.println("Interrupted! " +
         "Last one out, turn out the lights!");
     }
   }
}

public class ABQApp
{
   public static void main(String[] args)
   {
     BlockingQueue<String> drop = new ArrayBlockingQueue(1,  true);
     (new Thread(new Producer(drop))).start();
     (new Thread(new Consumer(drop))).start();
   }
}

ArrayBlockingQueue 還體現了“公平” — 意思是它為讀取器和編寫器提供線程先入先出 訪問。這種替代方法是一個更有效,但又冒窮盡部分線程風險的政策。(即,允許一些讀取器 在其他讀取器鎖定時運行效率更高,但是您可能會有讀取器線程的流持續不斷的風險,導致編 寫器無法進行工作。)

注意 Bug!

順便說一句,如果您注意到 Guarded Blocks 包含一個重大 bug,那麼您是對的 — 如果開 發人員在 main() 中的 Drop 實例上同步,會出現什麼情況呢?

BlockingQueue 還支持接收時間參數的方法,時間參數表明線程在返回信號故障以插入或者 檢索有關項之前需要阻塞的時間。這麼做會避免非綁定的等待,這對一個生產系統是致命的, 因為一個非綁定的等待會很容易導致需要重啟的系統掛起。

4. ConcurrentMap

Map 有一個微妙的並發 bug,這個 bug 將許多不知情的 Java 開發人員引入歧途。 ConcurrentMap 是最容易的解決方案。

當一個 Map 被從多個線程訪問時,通常使用 containsKey() 或者 get() 來查看給定鍵是 否在存儲鍵/值對之前出現。但是即使有一個同步的 Map,線程還是可以在這個過程中潛入,然 後奪取對 Map 的控制權。問題是,在對 put() 的調用中,鎖在 get() 開始時獲取,然後在可 以再次獲取鎖之前釋放。它的結果是個競爭條件:這是兩個線程之間的競爭,結果也會因誰先 運行而不同。

如果兩個線程幾乎同時調用一個方法,兩者都會進行測試,調用 put,在處理中丟失第一線 程的值。幸運的是,ConcurrentMap 接口支持許多附加方法,它們設計用於在一個鎖下進行兩 個任務:putIfAbsent(),例如,首先進行測試,然後僅當鍵沒有存儲在 Map 中時進行 put。

5. SynchronousQueues

根據 Javadoc,SynchronousQueue 是個有趣的東西:

這是一個阻塞隊列,其中,每個插入操作必須等待另一個線程的對應移除操作,反之亦然。 一個同步隊列不具有任何內部容量,甚至不具有 1 的容量。

本質上講,SynchronousQueue 是之前提過的 BlockingQueue 的又一實現。它給我們提供了 在線程之間交換單一元素的極輕量級方法,使用 ArrayBlockingQueue 使用的阻塞語義。在清 單 2 中,我重寫了 清單 1 的代碼,使用 SynchronousQueue 替代 ArrayBlockingQueue:

清單 2. SynchronousQueue

import java.util.*;
import java.util.concurrent.*;

class Producer
   implements Runnable
{
   private BlockingQueue<String> drop;
   List<String> messages = Arrays.asList(
     "Mares eat oats",
     "Does eat oats",
     "Little lambs eat ivy",
     "Wouldn't you eat ivy too?");

   public Producer(BlockingQueue<String> d) { this.drop = d; }

   public void run()
   {
     try
     {
       for (String s : messages)
         drop.put(s);
       drop.put("DONE");
     }
     catch (InterruptedException intEx)
     {
       System.out.println("Interrupted! " +
         "Last one out, turn out the lights!");
     }
   }
}

class Consumer
   implements Runnable
{
   private BlockingQueue<String> drop;
   public Consumer(BlockingQueue<String> d) { this.drop = d; }

   public void run()
   {
     try
     {
       String msg = null;
       while (!((msg = drop.take()).equals("DONE")))
         System.out.println(msg);
     }
     catch (InterruptedException intEx)
     {
       System.out.println("Interrupted! " +
         "Last one out, turn out the lights!");
     }
   }
}

public class SynQApp
{
   public static void main(String[] args)
   {
     BlockingQueue<String> drop = new  SynchronousQueue<String>();
     (new Thread(new Producer(drop))).start();
     (new Thread(new Consumer(drop))).start();
   }
}

實現代碼看起來幾乎相同,但是應用程序有額外獲益:SynchronousQueue 允許在隊列進行 一個插入,只要有一個線程等著使用它。

在實踐中,SynchronousQueue 類似於 Ada 和 CSP 等語言中可用的 “會合通道”。這些通 道有時在其他環境中也稱為 “連接”,這樣的環境包括 .NET 。

結束語

當 Java 運行時知識庫提供便利、預置的並發性時,為什麼還要苦苦掙扎,試圖將並發性導 入到您的 Collections 類?本系列的下一篇文章將會進一步探討 java.util.concurrent 名稱 空間的內容。

原文地址:http://www.ibm.com/developerworks/cn/java/j-5things4.html

  1. 上一頁:
  2. 下一頁:
Copyright © 程式師世界 All Rights Reserved