package cn.study.concurrency.ch12; public class Util { public static int xorShift(int y) { //進行左移和無符號右移,最後異或操作(異或,當兩個位數據不同的時候為1,否則為0) y ^= (y << 6); y ^= (y >>> 21); y ^= (y << 7); return y;//y初始值是隨機種子 } public static void main(String[] args) { for(int i = 0; i < 10; ++i) { System.out.println(xorShift((int) System.nanoTime())); } } }
package cn.study.concurrency.ch12; import java.util.concurrent.Semaphore; public class BoundedBuffer<E> { //信號量 private final Semaphore availableItems, availableSpaces; private final E[] items; private int putPosition=0, takePosition=0; public BoundedBuffer(int capacity) { availableItems = new Semaphore(0); availableSpaces = new Semaphore(capacity); items = (E[]) new Object[capacity]; } public boolean isEmpty() { //這個表示已經是空的了 return availableItems.availablePermits() == 0; } public boolean isFull() { //表明這個是滿的隊列 return availableSpaces.availablePermits() == 0; } //放入一個對象,首先向availableSpaces請求一個信號量,然後結束之後返回一個availableItems信號 public void put(E x) throws InterruptedException { //減少一個許可 availableSpaces.acquire(); doInsert(x); //添加一個許可 availableItems.release(); } //釋放一個數據對象 public E take() throws InterruptedException { //當釋放一個對象的時候,減少一個連接許可 availableItems.acquire(); E item = doExtract(); availableSpaces.release();//取出數據之後,吧能插入的可能添加一個 return item; } private synchronized void doInsert(E x) { int i = putPosition; items[i] = x; putPosition = (++i == items.length) ? 0 : i; } //不論是取數據,還是獲取數據,都是循環取得 private synchronized E doExtract() { int i = takePosition; E x = items[i]; items[i] = null; takePosition = (++i == items.length) ? 0: i; return x; } public static void main(String[] args) throws InterruptedException { BoundedBuffer<Integer> bb = new BoundedBuffer<Integer>(10); bb.take(); if(bb.isEmpty()) { System.out.println("空"); } if(bb.isFull()) { System.out.println("滿"); } } }
package cn.study.concurrency.ch12; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicInteger; public class PutTakeTest { //創建線程池 private static final ExecutorService pool = Executors.newCachedThreadPool(); //原子型的int數據,最後用來統計線程設計是否合理,是否線程安全 private final AtomicInteger putSum = new AtomicInteger(0); private final AtomicInteger takeSum = new AtomicInteger(0); private final CyclicBarrier barrier; //柵欄 private final BoundedBuffer<Integer> bb; //隊列 private final int nTrials, nPairs; //要創建的隊列數量和柵欄的屏障點 //構造函數 public PutTakeTest(int capacity, int npairs, int ntrials) { this.bb = new BoundedBuffer<Integer>(capacity); this.nTrials = ntrials; this.nPairs = npairs; //+1是為了,在所有的線程都建立結束之後,最後編碼手動決定什麼時候啟動所有線程 this.barrier = new CyclicBarrier(2 * npairs + 1); } //生產者線程 class Producer implements Runnable { @Override public void run() { try { //設計隨機種子,異或操作 int seed = (this.hashCode() ^ (int)System.nanoTime()); int sum = 0; barrier.await();//進行柵欄等待 for(int i = nTrials; i > 0; --i) { bb.put(seed); sum += seed; seed = Util.xorShift(seed);//獲取隨機值 } //獲取值,並且添加到putsum中 putSum.getAndAdd(sum); barrier.await();//添加一個柵欄,標識運行結束 } catch (Exception e) { System.out.println("????producer"); } } } //消費者線程 class Consumer implements Runnable { @Override public void run() { try { barrier.await();//進行柵欄等待 int sum = 0; for(int i = nTrials; i > 0; --i) { sum += bb.take(); } //獲取值,並且添加到putsum中 takeSum.getAndAdd(sum); barrier.await();//添加一個柵欄,標識運行結束 } catch (Exception e) { e.printStackTrace(); System.out.println(Thread.currentThread().getName() + "????consumer"); } } } //測試函數 public void test() { try { for(int i = 0; i < nPairs; ++i) { pool.execute(new Producer()); pool.execute(new Consumer()); } //當現場全部添加好了之後,打開柵欄 barrier.await(); //第2n+1個 //然後線程執行之後,每個線程執行完又調用了一次await,當所有的都執行完了之後 barrier.await(); //全部結束之後判斷是否結果對等 if(putSum.get() == takeSum.get()) { System.out.println("程序是OK的"); } else { System.out.println("程序有安全漏洞"); } } catch (Exception e) { System.out.println("????test"); } } public static void main(String[] args) { //隊列容量是10,每個線程起10個,一共放100000個數據 new PutTakeTest(10, 2, 100).test(); pool.shutdown();//執行完,結束線程 } }