程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
 程式師世界 >> 編程語言 >> JAVA編程 >> 關於JAVA >> Java線程間通訊概述

Java線程間通訊概述

編輯:關於JAVA

這個故事源自一個很簡單的想法:創建一個對開發人員友好的、簡單輕量的線程間通訊框架,完全不 用鎖、同步器、信號量、等待和通知,在Java裡開發一個輕量、無鎖的線程內通訊框架;並且也沒有隊列 、消息、事件或任何其他並發專用的術語或工具。

只用普通的老式Java接口實現POJO的通訊。

它可能跟Akka的類型化actor類似,但作為一個必須超級輕量,並且要針對單台多核計算機進行優化的 新框架,那個可能有點過了。

當actor跨越不同JVM實例(在同一台機器上,或分布在網絡上的不同機器上)的進程邊界時,Akka框 架很善於處理進程間的通訊。

但對於那種只需要線程間通訊的小型項目而言,用Akka類型化actor可能有點兒像用牛刀殺雞,不過類 型化actor仍然是一種理想的實現方式。

我花了幾天時間,用動態代理,阻塞隊列和緩存線程池創建了一個解決方案。

圖一是這個框架的高層次架構:

圖一: 框架的高層次架構

SPSC隊列是指單一生產者/單一消費者隊列。MPSC隊列是指多生產者/單一消費者隊列。

派發線程負責接收Actor線程發送的消息,並把它們派發到對應的SPSC隊列中去。

接收到消息的Actor線程用其中的數據調用相應的actor實例中的方法。借助其他actor的代理,actor 實例可以將消息發送到MPSC隊列中,然後消息會被發送給目標actor線程。

我創建了一個簡單的例子來測試,就是下面這個打乒乓球的程序:

public interface PlayerA (
  void pong(long ball); //發完就忘的方法調用 
}
public interface PlayerB {   
  void ping(PlayerA playerA, long ball); //發完就忘的方法調用 
}    
public class PlayerAImpl implements PlayerA {    
  @Override    
  public void pong(long ball) {    
  }    
}
public class PlayerBImpl implements PlayerB {   
  @Override    
  public void ping(PlayerA playerA, long ball) {    
    playerA.pong(ball);    
  }    
}
public class PingPongExample {   
  public void testPingPong() {
    // 管理器隱藏了線程間通訊的復雜性
    // 控制actor代理,actor實現和線程  
    ActorManager manager = new ActorManager();
    // 在管理器內注冊actor實現 
    manager.registerImpl(PlayerAImpl.class);    
    manager.registerImpl(PlayerBImpl.class);
    //創建actor代理。代理會將方法調用轉換成內部消息。 
    //會在線程間發給特定的actor實例。    
    PlayerA playerA = manager.createActor(PlayerA.class);    
    PlayerB playerB = manager.createActor(PlayerB.class);    
    for(int i = 0; i < 1000000; i++) {    
       playerB.ping(playerA, i);     
   }    
}

經過測試,速度大約在每秒500,000 次乒/乓左右;還不錯吧。然而跟單線程的運行速度比起來,我突 然就感覺沒那麼好了。在 單線程 中運行的代碼每秒速度能達到20億 (2,681,850,373) !

居然差了5,000 多倍。太讓我失望了。在大多數情況下,單線程代碼的效果都比多線程代碼更高效。

我開始找原因,想看看我的乒乓球運動員們為什麼這麼慢。經過一番調研和測試,我發現是阻塞隊列 的問題,我用來在actor間傳遞消息的隊列影響了性能。

圖 2: 只有一個生產者和一個消費者的SPSC隊列

所以我發起了一場競賽,要將它換成Java裡最快的隊列。我發現了Nitsan Wakart的 博客 。他發了幾篇文章介紹單一生產者/單一消費者 (SPSC)無鎖隊列的實現。這些文章受到了Martin Thompson的演講 終極性能的無鎖算法的啟發 。

跟基於私有鎖的隊列相比,無鎖隊列的性能更優。在基於鎖的隊列中,當一個線程得到鎖時,其它線 程就要等著鎖被釋放。而在無鎖的算法中,某個生產者線程生產消息時不會阻塞其它生產者線程,消費者 也不會被其它讀取隊列的消費者阻塞。

在Martin Thompson的演講以及在Nitsan的博客中介紹的SPSC隊列的性能簡直令人難以置信 —— 超過了100M ops/sec。比JDK的並發隊列實現還要快10倍 (在4核的 Intel Core i7 上的性能大約在 8M ops/sec 左右)。

我懷著極大的期望,將所有actor上連接的鏈式阻塞隊列都換成了無鎖的SPSC隊列。可惜,在吞吐量上 的性能測試並沒有像我預期的那樣出現大幅提升。不過很快我就意識到,瓶頸並不在SPSC隊列上,而是在 多個生產者/單一消費者(MPSC)那裡。

用SPSC隊列做MPSC隊列的任務並不那麼簡單;在做put操作時,多個生產者可能會覆蓋掉彼此的值。 SPSC 隊列就沒有控制多個生產者put操作的代碼。所以即便換成最快的SPSC隊列,也解決不了我的問題。

為了處理多個生產者/單一消費者的情況,我決定啟用LMAX Disruptor ——一個基於環形緩沖區的高性能進 程間消息庫。

圖3: 單一生產者和單一消費者的LMAX Disruptor

借助Disruptor,很容易實現低延遲、高吞吐量的線程間消息通訊。它還為生產者和消費者的不同組合 提供了不同的用例。幾個線程可以互不阻塞地讀取環形緩沖中的消息:

圖 4: 單一生產者和兩個消費者的LMAX Disruptor    

下面是有多個生產者寫入環形緩沖區,多個消費者從中讀取消息的場景。

圖 5: 兩個生產者和兩個消費者的LMAX Disruptor

經過對性能測試的快速搜索,我找到了 三個發布者和一個消費者的吞吐量測試。 這個真是正合我意,它給出了下面這個結果:

在3 個生產者/1個 消費者場景下, Disruptor要比LinkedBlockingQueue快兩倍多。然而這跟我所期 望的性能上提升10倍仍有很大差距。

這讓我覺得很沮喪,並且我的大腦一直在搜尋解決方案。就像命中注定一樣,我最近不在跟人拼車上 下班,而是改乘地鐵了。突然靈光一閃,我的大腦開始將車站跟生產者消費者對應起來。在一個車站裡, 既有生產者(車和下車的人),也有消費者(同一輛車和上車的人)。

我創建了 Railway類,並用AtomicLong追蹤從一站到下一站的列車。我先從簡單的場景開始,只有一 輛車的鐵軌。

public class RailWay {  
 private final Train train = new Train();  
 // stationNo追蹤列車並定義哪個車站接收到了列車
 private final AtomicInteger stationIndex = new AtomicInteger();
// 會有多個線程訪問這個方法,並等待特定車站上的列車 
public Train waitTrainOnStation(final int stationNo) {
  
   while (stationIndex.get() % stationCount != stationNo) {
    Thread.yield(); // 為保證高吞吐量的消息傳遞,這個是必須的。
                   //但在等待列車時它會消耗CPU周期 
   }  
   // 只有站號等於stationIndex.get() % stationCount時,這個忙循環才會返回

   return train;
 }
// 這個方法通過增加列車的站點索引將這輛列車移到下一站
  public void sendTrain() {
    stationIndex.getAndIncrement();
   }
  }

為了測試,我用的條件跟在Disruptor性能測試中用的一樣,並且也是測的SPSC隊列——測 試在線程間傳遞long值。我創建了下面這個Train類,其中包含了一個long數組:

public class Train {   
  //   
  public static int CAPACITY = 2*1024;
  private final long[] goodsArray; // 傳輸運輸貨物的數組

  private int index;

  public Train() {   
      goodsArray = new long[CAPACITY];     
 }

 public int goodsCount() { //返回貨物數量    
  return index;    
 }    
 public void addGoods(long i) { // 向列車中添加條目    
  goodsArray[index++] = i;    
 }    
 public long getGoods(int i) { //從列車中移走條目    
  index--;    
  return goodsArray[i];    
 }    
}

然後我寫了一個簡單的測試 :兩個線程通過列車互相傳遞long值。

圖 6: 使用單輛列車的單一生產者和單一消費者Railway

public void testRailWay() {   
  final Railway railway = new Railway();    
  final long n = 20000000000l;    
  //啟動一個消費者進程 
  new Thread() {    
   long lastValue = 0;
   @Override   
   public void run() {    
    while (lastValue < n) {    
      Train train = railway.waitTrainOnStation(1); //在#1站等列車
      int count = train.goodsCount();    
      for (int i = 0; i < count; i++) {    
        lastValue = train.getGoods(i); // 卸貨   
      }    
      railway.sendTrain(); //將當前列車送到第一站 
     }    
   }    
 }.start();

final long start = System.nanoTime();
long i = 0;   
while (i < n) {    
 Train train = railway.waitTrainOnStation(0); // 在#0站等列車    
 int capacity = train.getCapacity();    
 for (int j = 0; j < capacity; j++) {    
   train.addGoods((int)i++); // 將貨物裝到列車上 
 }    
 railway.sendTrain();
 if (i % 100000000 == 0) { //每隔100M個條目測量一次性能 
    final long duration = System.nanoTime() - start;    
    final long ops = (i * 1000L * 1000L * 1000L) / duration;    
    System.out.format("ops/sec = %,d\n", ops);    
    System.out.format("trains/sec = %,d\n", ops / Train.CAPACITY);    
    System.out.format("latency nanos = %.3f%n\n", 
    duration / (float)(i) * (float)Train.CAPACITY);    
  }    
 }    
}

在不同的列車容量下運行這個測試,結果驚著我了:

在列車容量達到32,768時,兩個線程傳送消息的吞吐量達到了767,028,751 ops/sec。比Nitsan博客中 的SPSC隊列快了幾倍。

繼續按鐵路列車這個思路思考,我想知道如果有兩輛列車會怎麼樣?我覺得應該能提高吞吐量,同時 還能降低延遲。每個車站都會有它自己的列車。當一輛列車在第一個車站裝貨時 ,第二輛列車會在第二個車站卸貨,反之亦然。

圖 7: 使用兩輛列車的單一生產者和單一消費者Railway

下面是吞吐量的結果:

結果是驚人的;比單輛列車的結果快了1.4倍多。列車容量為一時,延遲從192.6納秒降低到133.5納秒 ;這顯然是一個令人鼓舞的跡象。

因此我的實驗還沒結束。列車容量為2048的兩個線程傳遞消息的延遲為2,178.4 納秒,這太高了。我 在想如何降低它,創建一個有很多輛列車 的例子:

圖 8: 使用多輛列車的單一生產者和單一消費者Railway  

查看本欄目

我還把列車容量降到了1個long值,開始玩起了列車數量。下面是測試結果:

用32,768 列車在線程間發送一個long值的延遲降低到了13.9 納秒。通過調整列車數量和列車容量, 當延時不那麼高,吞吐量不那麼低時,吞吐量和延時就達到了最佳平衡。

對於單一生產者和單一消費者(SPSC)而言,這些數值很棒;但我們怎麼讓它在有多個生產者和消費者 時也能生效呢?答案很簡單,添加更多的車站!

圖 9:一個生產者和兩個消費者的Railway

每個線程都等著下一趟列車,裝貨/卸貨,然後把列車送到下一站。在生產者往列車上裝貨時,消費者 在從列車上卸貨。列車周而復始地從一個車站轉到另一個車站。

為了測試單一生產者/多消費者(SPMC) 的情況,我創建了一個有8個車站的Railway測試。 一個車站屬於一個生產者,而另 外7個車站屬於消費者。結果是:

列車數量 = 256 ,列車容量 = 32:

 ops/sec = 116,604,397     延遲(納秒) = 274.4

列車數量= 32,列車容量= 256:

 ops/sec = 432,055,469     延遲(納秒) = 592.5

如你所見,即便有8個工作線程,測試給出的結果也相當好-- 32輛容量為256個long的列車吞吐量為 432,055,469 ops/sec。在測試期間,所有CPU內核的負載都是100%。

圖 10:在測試有8個車站的Railway 期間的CPU 使用情況

在玩這個Railway算法時,我幾乎忘了我最初的目標:提升多生產者/單消費者情況下的性能。

圖 11:三個生產者和一個消費者的 Railway  

我創建了3個生產者和1個消費者的新測試。 每輛列車一站一站地轉圈,而每個生產者只給每輛車裝1/3容量的貨。消費者取出每輛車上三個生產者給 出的全部三項貨物。性能測試給出的平均結果如下所示:

 ops/sec = 162,597,109  列車/秒 = 54,199,036     延遲(納秒) = 18.5

結果相當棒。生產者和消費者工作的速度超過了160M ops/sec。

為了填補差異,下面給出相同情況下的Disruptor結果- 3個生產者和1個消費者:

Run 0, Disruptor=11,467,889 ops/sec
Run 1, Disruptor=11,280,315 ops/sec
Run 2, Disruptor=11,286,681 ops/sec
Run 3, Disruptor=11,254,924 ops/sec

下面是另一個批量消息的Disruptor 3P:1C 測試 (10 條消息每批):

Run 0, Disruptor=116,009,280 ops/sec
Run 1, Disruptor=128,205,128 ops/sec
Run 2, Disruptor=101,317,122 ops/sec
Run 3, Disruptor=98,716,683 ops/sec;

最後是用帶LinkedBlockingQueue 實現的Disruptor 在3P:1C場景下的測試結果:

Run 0, BlockingQueue=4,546,281 ops/sec
Run 1, BlockingQueue=4,508,769 ops/sec
Run 2, BlockingQueue=4,101,386 ops/sec
Run 3, BlockingQueue=4,124,561 ops/sec

如你所見,Railway方式的平均吞吐量是162,597,109 ops/sec,而Disruptor在同樣的情況下的最好結 果只有128,205,128 ops/sec。至於 LinkedBlockingQueue,最好的結果只有4,546,281 ops/sec。

Railway算法為事件批處理提供了一種可以顯著增加吞吐量的簡易辦法。通過調整列車容量或列車數量 ,很容易達成想要的吞吐量/延遲。

另外, 當同一個線程可以用來消費消息,處理它們並向環中返回結果時,通過混合生產者和消費者, Railway也能用來處理復雜的情況:

圖 12: 混合生產者和消費者的Railway

最後,我會提供一個經過優化的超高吞吐量 單生產者/單消費者測試:

圖 13:單個生產者和單個消費者的Railway

它的平均結果為:吞吐量超過每秒15億 (1,569,884,271)次操作,延遲為1.3 微秒。如你所見,本文 開頭描述的那個規模相同的單線程測試的結果是每秒2,681,850,373。

你自己想想結論是什麼吧。

我希望將來再寫一篇文章,闡明如何用Queue和 BlockingQueue接口支持Railway算法,用來處理不同 的生產者和消費者組合。

  1. 上一頁:
  2. 下一頁:
Copyright © 程式師世界 All Rights Reserved