#0 系列目錄#java
#1 背景# 最近一個月都在作項目,我主要負責分佈式任務的調度的功能,須要實現一個分佈式的受權控制。具體的需求:併發
#2 分析# 先拋開分佈式通信這一塊,首先從單個jvm如何實現進行分析, 簡單點來講:框架
在單jvm中就是兩種線程,一個爲manager,另外一種爲worker。1:n的對應關係,manager能夠隨時掛起worker的全部線程,而worker線程互不干擾
。異步
咋一看,會以爲是一個比較典型的讀寫鎖的應用場景,讀寫鎖特性:jvm
當讀寫鎖是寫加鎖狀態時, 在這個鎖被解鎖以前, 全部試圖對這個鎖加鎖的線程都會被阻塞;分佈式
當讀寫鎖在讀加鎖狀態時, 全部試圖以讀模式對它進行加鎖的線程均可以獲得訪問權, 可是若是線程但願以寫模式對此鎖進行加鎖, 它必須直到知道全部的線程釋放鎖;測試
使用讀寫鎖實現這樣的功能會存在一個問題,就是對應的寫鎖是沒有搶佔權,好比當前有讀鎖未釋放時,寫鎖一直會被阻塞。而項目的需求是,manager是個領導,它能夠不用排隊,隨時打斷你
。ui
除此以外,整個worker線程操做會是一個跨方法,跨類的複雜實現。經過lock方式實現,異常稍微處理很差,很容易形成鎖未釋放,致使manager一直拿不到對應的鎖操做。並且worker中本省會使用一些lock操做,容易形成死鎖。.net
總結一下:線程
讀線程能夠不互相影響,寫線程擁有最高的搶佔權
,能夠不理會讀線程是否在操做;所以本文的互斥信號(BooleanMutex)就應運而生,它是信號量(Semaphore)的一個變種,加入了讀鎖的特性。好比在狀態爲1時能夠一直獲得響應,對應的P操做不會消費對應的資源。
#3 實現# 基於jdk 1.5以後的concurrent的AQS,實現了一個本身的互斥信號控制。
package com.king.lock; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.locks.AbstractQueuedSynchronizer; /** * 互斥信號共享鎖 */ public class BooleanMutex { private Sync sync; public BooleanMutex() { sync = new Sync(); set(false); } /** * 阻塞等待Boolean爲true * @throws InterruptedException */ public void lock() throws InterruptedException { sync.innerLock(); } /** * 阻塞等待Boolean爲true,容許設置超時時間 * @param timeout * @param unit * @throws InterruptedException * @throws TimeoutException */ public void lockTimeOut(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException { sync.innerLock(unit.toNanos(timeout)); } public void unlock(){ set(true); } /** * 從新設置對應的Boolean mutex * @param mutex */ public void set(Boolean mutex) { if (mutex) { sync.innerSetTrue(); } else { sync.innerSetFalse(); } } public boolean state() { return sync.innerState(); } /** * 互斥信號共享鎖 */ private final class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = -7828117401763700385L; /** * 狀態爲1,則喚醒被阻塞在狀態爲FALSE的全部線程 */ private static final int TRUE = 1; /** * 狀態爲0,則當前線程阻塞,等待被喚醒 */ private static final int FALSE = 0; /** * 返回值大於0,則執行;返回值小於0,則阻塞 */ protected int tryAcquireShared(int arg) { return getState() == 1 ? 1 : -1; } /** * 實現AQS的接口,釋放共享鎖的判斷 */ protected boolean tryReleaseShared(int ignore) { // 始終返回true,表明能夠release return true; } private boolean innerState() { return getState() == 1; } private void innerLock() throws InterruptedException { acquireSharedInterruptibly(0); } private void innerLock(long nanosTimeout) throws InterruptedException, TimeoutException { if (!tryAcquireSharedNanos(0, nanosTimeout)) throw new TimeoutException(); } private void innerSetTrue() { for (;;) { int s = getState(); if (s == TRUE) { return; // 直接退出 } if (compareAndSetState(s, TRUE)) {// cas更新狀態,避免併發更新true操做 releaseShared(0);// 釋放一下鎖對象,喚醒一下阻塞的Thread } } } private void innerSetFalse() { for (;;) { int s = getState(); if (s == FALSE) { return; //直接退出 } if (compareAndSetState(s, FALSE)) {//cas更新狀態,避免併發更新false操做 setState(FALSE); } } } } }
代碼其實仍是挺簡單的,主要是對AQS的一份擴展實現。對應的javadoc和使用說明:
測試代碼:
package com.king.aqs; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * @author taomk * @version 1.0 * @since 15-11-20 下午4:38 */ public class BooleanMutexTest { public static void main(String [] args) { // 測試1 初始化爲true,不會阻塞,會喚醒被狀態false阻塞的線程 BooleanMutex mutex = new BooleanMutex(); mutex.set(true); try { System.out.println("1. =======>" + System.currentTimeMillis()); mutex.lock(); //不會被阻塞 System.out.println("1. =======>" + System.currentTimeMillis()); } catch (InterruptedException e) { e.printStackTrace(); } // 測試2 初始化false,主線程阻塞,子線程爲true,喚醒 final BooleanMutex mutex2 = new BooleanMutex(); try { final CountDownLatch count = new CountDownLatch(1); ExecutorService executor = Executors.newCachedThreadPool(); System.out.println("2. =======>" + System.currentTimeMillis()); executor.submit(new Callable() { public Object call() throws Exception { Thread.sleep(1000); mutex2.set(true); System.out.println("2. =======>" + System.currentTimeMillis()); count.countDown(); return null; } }); mutex2.lock(); //會被阻塞,等異步線程釋放鎖對象 System.out.println("2. =======>" + System.currentTimeMillis()); count.await(); System.out.println("2. =======>" + System.currentTimeMillis()); executor.shutdown(); } catch (InterruptedException e) { e.printStackTrace(); } // 測試3 初始化爲true,不會被阻塞 try { final BooleanMutex mutex3 = new BooleanMutex(); mutex3.set(true); System.out.println("3. =======>" + System.currentTimeMillis()); final CountDownLatch count = new CountDownLatch(10); ExecutorService executor = Executors.newCachedThreadPool(); for (int i = 0; i < 10; i++) { executor.submit(new Callable() { public Object call() throws Exception { mutex3.lock(); System.out.println("3. =======>" + System.currentTimeMillis()); count.countDown(); return null; } }); } count.await(); System.out.println("3. =======>" + System.currentTimeMillis()); executor.shutdown(); } catch (InterruptedException e) { e.printStackTrace(); } // 測試4 初始化false,子線程阻塞 try { final BooleanMutex mutex4 = new BooleanMutex();//初始爲false final CountDownLatch count = new CountDownLatch(10); ExecutorService executor = Executors.newCachedThreadPool(); System.out.println("4. =======>" + System.currentTimeMillis()); for (int i = 0; i < 10; i++) { executor.submit(new Callable() { public Object call() throws Exception { mutex4.lock();//被阻塞 System.out.println("4. =======>" + System.currentTimeMillis()); count.countDown(); return null; } }); } Thread.sleep(1000); mutex4.set(true); System.out.println("4. =======>" + System.currentTimeMillis()); count.await(); executor.shutdown(); } catch (InterruptedException e) { e.printStackTrace(); } } }