聊聊併發(十四)—基於AQS實現互斥信號(BooleanMutex)

#0 系列目錄#java

#1 背景# 最近一個月都在作項目,我主要負責分佈式任務的調度的功能,須要實現一個分佈式的受權控制。具體的需求:併發

  1. 首先管理員啓動整個任務,並設置執行權限;
  2. 工做節點收到消息後就會建立對應的線程,並開始執行任務(任務都是由一個管理員進行分配);
  3. 運行過程當中管理員須要臨時中斷某個任務,須要設置一個互斥信號,此時對應的工做節點都須要被阻塞,注意不是徹底銷燬;

#2 分析# 先拋開分佈式通信這一塊,首先從單個jvm如何實現進行分析, 簡單點來講:框架

在單jvm中就是兩種線程,一個爲manager,另外一種爲worker。1:n的對應關係,manager能夠隨時掛起worker的全部線程,而worker線程互不干擾異步

咋一看,會以爲是一個比較典型的讀寫鎖的應用場景,讀寫鎖特性:jvm

  1. 當讀寫鎖是寫加鎖狀態時, 在這個鎖被解鎖以前, 全部試圖對這個鎖加鎖的線程都會被阻塞;分佈式

  2. 當讀寫鎖在讀加鎖狀態時, 全部試圖以讀模式對它進行加鎖的線程均可以獲得訪問權, 可是若是線程但願以寫模式對此鎖進行加鎖, 它必須直到知道全部的線程釋放鎖;測試

使用讀寫鎖實現這樣的功能會存在一個問題,就是對應的寫鎖是沒有搶佔權,好比當前有讀鎖未釋放時,寫鎖一直會被阻塞。而項目的需求是,manager是個領導,它能夠不用排隊,隨時打斷你ui

除此以外,整個worker線程操做會是一個跨方法,跨類的複雜實現。經過lock方式實現,異常稍微處理很差,很容易形成鎖未釋放,致使manager一直拿不到對應的鎖操做。並且worker中本省會使用一些lock操做,容易形成死鎖。.net

總結一下:線程

  1. 須要的是一個相似於信號量的PV控制;
  2. 具備的讀寫鎖的,讀線程能夠不互相影響,寫線程擁有最高的搶佔權,能夠不理會讀線程是否在操做;
  3. 支持線程中斷 (worker線程須要容許cancel);

所以本文的互斥信號(BooleanMutex)就應運而生,它是信號量(Semaphore)的一個變種,加入了讀鎖的特性。好比在狀態爲1時能夠一直獲得響應,對應的P操做不會消費對應的資源。

#3 實現# 基於jdk 1.5以後的concurrent的AQS,實現了一個本身的互斥信號控制。

package com.king.lock;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;

/**
 * 互斥信號共享鎖
 */
public class BooleanMutex {

	private Sync sync;

	public BooleanMutex() {
		sync = new Sync();
		set(false);
	}

	/**
	 * 阻塞等待Boolean爲true
	 * @throws InterruptedException
	 */
	public void lock() throws InterruptedException {
		sync.innerLock();
	}

	/**
	 * 阻塞等待Boolean爲true,容許設置超時時間
	 * @param timeout
	 * @param unit
	 * @throws InterruptedException
	 * @throws TimeoutException
	 */
	public void lockTimeOut(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException {
		sync.innerLock(unit.toNanos(timeout));
	}

	public void unlock(){
		set(true);
	}

	/**
	 * 從新設置對應的Boolean mutex
	 * @param mutex
	 */
	public void set(Boolean mutex) {
		if (mutex) {
			sync.innerSetTrue();
		} else {
			sync.innerSetFalse();
		}
	}

	public boolean state() {
		return sync.innerState();
	}

	/**
	 * 互斥信號共享鎖
	 */
	private final class Sync extends AbstractQueuedSynchronizer {
		private static final long serialVersionUID = -7828117401763700385L;

		/**
		 * 狀態爲1,則喚醒被阻塞在狀態爲FALSE的全部線程
		 */
		private static final int TRUE = 1;
		/**
		 * 狀態爲0,則當前線程阻塞,等待被喚醒
		 */
		private static final int FALSE = 0;

		/**
		 * 返回值大於0,則執行;返回值小於0,則阻塞
		 */
		protected int tryAcquireShared(int arg) {
			return getState() == 1 ? 1 : -1;
		}

		/**
		 * 實現AQS的接口,釋放共享鎖的判斷
		 */
		protected boolean tryReleaseShared(int ignore) {
			// 始終返回true,表明能夠release
			return true;
		}

		private boolean innerState() {
			return getState() == 1;
		}

		private void innerLock() throws InterruptedException {
			acquireSharedInterruptibly(0);
		}

		private void innerLock(long nanosTimeout) throws InterruptedException, TimeoutException {
			if (!tryAcquireSharedNanos(0, nanosTimeout))
				throw new TimeoutException();
		}

		private void innerSetTrue() {
			for (;;) {
				int s = getState();
				if (s == TRUE) {
					return; // 直接退出
				}
				if (compareAndSetState(s, TRUE)) {// cas更新狀態,避免併發更新true操做
					releaseShared(0);// 釋放一下鎖對象,喚醒一下阻塞的Thread
				}
			}
		}

		private void innerSetFalse() {
			for (;;) {
				int s = getState();
				if (s == FALSE) {
					return; //直接退出
				}
				if (compareAndSetState(s, FALSE)) {//cas更新狀態,避免併發更新false操做
					setState(FALSE);
				}
			}
		}
	}
}

代碼其實仍是挺簡單的,主要是對AQS的一份擴展實現。對應的javadoc和使用說明:

輸入圖片說明

測試代碼:

package com.king.aqs;

import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * @author taomk
 * @version 1.0
 * @since 15-11-20 下午4:38
 */
public class BooleanMutexTest {

	public static void main(String [] args) {
		// 測試1 初始化爲true,不會阻塞,會喚醒被狀態false阻塞的線程
		BooleanMutex mutex = new BooleanMutex();
		mutex.set(true);
		try {
			System.out.println("1. =======>" + System.currentTimeMillis());
			mutex.lock(); //不會被阻塞
			System.out.println("1. =======>" + System.currentTimeMillis());
		} catch (InterruptedException e) {
			e.printStackTrace();
		}

		// 測試2 初始化false,主線程阻塞,子線程爲true,喚醒
		final BooleanMutex mutex2 = new BooleanMutex();
		try {
			final CountDownLatch count = new CountDownLatch(1);
			ExecutorService executor = Executors.newCachedThreadPool();
			System.out.println("2. =======>" + System.currentTimeMillis());
			executor.submit(new Callable() {
				public Object call() throws Exception {
					Thread.sleep(1000);
					mutex2.set(true);
					System.out.println("2. =======>" + System.currentTimeMillis());
					count.countDown();
					return null;
				}
			});

			mutex2.lock(); //會被阻塞,等異步線程釋放鎖對象
			System.out.println("2. =======>" + System.currentTimeMillis());
			count.await();
			System.out.println("2. =======>" + System.currentTimeMillis());
			executor.shutdown();
		} catch (InterruptedException e) {
			e.printStackTrace();
		}

		// 測試3 初始化爲true,不會被阻塞
		try {
			final BooleanMutex mutex3 = new BooleanMutex();
			mutex3.set(true);
			System.out.println("3. =======>" + System.currentTimeMillis());
			final CountDownLatch count = new CountDownLatch(10);
			ExecutorService executor = Executors.newCachedThreadPool();

			for (int i = 0; i < 10; i++) {
				executor.submit(new Callable() {
					public Object call() throws Exception {
						mutex3.lock();
						System.out.println("3. =======>" + System.currentTimeMillis());
						count.countDown();
						return null;
					}
				});
			}
			count.await();
			System.out.println("3. =======>" + System.currentTimeMillis());
			executor.shutdown();
		} catch (InterruptedException e) {
			e.printStackTrace();
		}

		// 測試4 初始化false,子線程阻塞
		try {
			final BooleanMutex mutex4 = new BooleanMutex();//初始爲false
			final CountDownLatch count = new CountDownLatch(10);
			ExecutorService executor = Executors.newCachedThreadPool();
			System.out.println("4. =======>" + System.currentTimeMillis());
			for (int i = 0; i < 10; i++) {
				executor.submit(new Callable() {
					public Object call() throws Exception {
						mutex4.lock();//被阻塞
						System.out.println("4. =======>" + System.currentTimeMillis());
						count.countDown();
						return null;
					}
				});
			}
			Thread.sleep(1000);
			mutex4.set(true);
			System.out.println("4. =======>" + System.currentTimeMillis());
			count.await();
			executor.shutdown();
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}
}
相關文章
相關標籤/搜索