Key鎖

java中的幾種鎖:synchronized,ReentrantLock,ReentrantReadWriteLock已基本能夠知足編程需求,但其粒度都太大,同一時刻只有一個線程能進入同步塊,這對於某些高併發的場景並不適用。本文實現了一個基於KEY(主鍵)的互斥鎖,具備更細的粒度,在緩存或其餘基於KEY的場景中有很大的用處。下面將講解這個鎖的設計和實現java

(關於這個鎖的討論貼:KeyLock討論貼-CSDN數據庫

設想這麼一個場景:轉帳編程

private int[] accounts; // 帳戶數組,其索引爲帳戶ID,內容爲金額
	
	public boolean transfer(int from, int to, int money) {
		if (accounts[from] < money)
			return false;
		accounts[from] -= money;
		accounts[to] += money;
		return true;
	}

 從from中轉出金額到to中。可能同時會有不少個線程同時調用這個轉帳方法,爲保證原子性,保證金額不會出錯,必須爲這個方法加個鎖,防止對共享變量accounts的併發修改。數組

加鎖後的代碼以下:緩存

private int[] accounts; // 帳戶數組,其索引爲帳戶ID,內容爲金額
	private Lock lock = new ReentrantLock();

	public boolean transfer(int from, int to, int money) {
		lock.lock();
		try {
			if (accounts[from] < money)
				return false;
			accounts[from] -= money;
			accounts[to] += money;
			return true;
		} finally {
			lock.unlock();
		}
	}

 好了,加鎖後這個代碼就能保證金額不出錯了。但問題又出現了,一次只能執行一個轉帳過程!意思就是A給B轉帳的時候,C要給D轉帳也得等A給B轉完了才能開始轉。這就有點扯蛋了,就像只有一個櫃檯,全部人必須排隊等前面的處理完了才能到本身,效率過低。多線程

解決這種狀況有一個方案:A給B轉帳的時候只鎖定A和B的帳戶,使其轉帳期間不能再有其餘針對A和B帳戶的操做,但其餘帳戶的操做能夠並行發生。相似於以下場景:併發

public boolean transfer(int from, int to, int money) {
		lock.lock(from, to);
		try {
			if (accounts[from] < money)
				return false;
			accounts[from] -= money;
			accounts[to] += money;
			return true;
		} finally {
			lock.unlock(from, to);
		}
	}

 但很顯然,JAVA並無爲咱們提供這樣的鎖(也有多是我沒找到。。。)dom

因而,就在這樣的需求下我花了整一天來實現了這個鎖——KeyLock(代碼量很短,但多線程的東西真的很讓人頭疼)ide

不一樣於synchronized等鎖,KeyLock是對所需處理的數據的KEY(主鍵)進行加鎖,只要是對不一樣key操做,其就能夠並行處理,大大提升了線程的並行度(最後有幾個鎖的對比測試高併發

總結下就是:對相同KEY操做的線程互斥,對不一樣KEY操做的線程能夠並行

KeyLock有以下幾個特性

    一、細粒度,高並行性
    二、可重入
    三、公平鎖
    四、加鎖開銷比ReentrantLock大,適用於處理耗時長、key範圍大的場景

KeyLock代碼以下(註釋不多,由於我也不知道該怎麼寫清楚,能看懂就看,懶得看的直接用就行):

public class KeyLock<K> {
	// 保存全部鎖定的KEY及其信號量
	private final ConcurrentMap<K, Semaphore> map = new ConcurrentHashMap<K, Semaphore>();
	// 保存每一個線程鎖定的KEY及其鎖定計數
	private final ThreadLocal<Map<K, LockInfo>> local = new ThreadLocal<Map<K, LockInfo>>() {
		@Override
		protected Map<K, LockInfo> initialValue() {
			return new HashMap<K, LockInfo>();
		}
	};

	/**
	 * 鎖定key,其餘等待此key的線程將進入等待,直到調用{@link #unlock(K)}
	 * 使用hashcode和equals來判斷key是否相同,所以key必須實現{@link #hashCode()}和
	 * {@link #equals(Object)}方法
	 * 
	 * @param key
	 */
	public void lock(K key) {
		if (key == null)
			return;
		LockInfo info = local.get().get(key);
		if (info == null) {
			Semaphore current = new Semaphore(1);
			current.acquireUninterruptibly();
			Semaphore previous = map.put(key, current);
			if (previous != null)
				previous.acquireUninterruptibly();
			local.get().put(key, new LockInfo(current));
		} else {
			info.lockCount++;
		}
	}
	
	/**
	 * 釋放key,喚醒其餘等待此key的線程
	 * @param key
	 */
	public void unlock(K key) {
		if (key == null)
			return;
		LockInfo info = local.get().get(key);
		if (info != null && --info.lockCount == 0) {
			info.current.release();
			map.remove(key, info.current);
			local.get().remove(key);
		}
	}

	/**
	 * 鎖定多個key
	 * 建議在調用此方法前先對keys進行排序,使用相同的鎖定順序,防止死鎖發生
	 * @param keys
	 */
	public void lock(K[] keys) {
		if (keys == null)
			return;
		for (K key : keys) {
			lock(key);
		}
	}

	/**
	 * 釋放多個key
	 * @param keys
	 */
	public void unlock(K[] keys) {
		if (keys == null)
			return;
		for (K key : keys) {
			unlock(key);
		}
	}

	private static class LockInfo {
		private final Semaphore current;
		private int lockCount;

		private LockInfo(Semaphore current) {
			this.current = current;
			this.lockCount = 1;
		}
	}
}

 KeyLock使用示例

 

private int[] accounts;
	private KeyLock<Integer> lock = new KeyLock<Integer>();
	
	public boolean transfer(int from, int to, int money) {
		Integer[] keys = new Integer[] {from, to};
		Arrays.sort(keys); //對多個key進行排序,保證鎖定順序防止死鎖
		lock.lock(keys);
		try {
			//處理不一樣的from和to的線程均可進入此同步塊
			if (accounts[from] < money)
				return false;
			accounts[from] -= money;
			accounts[to] += money;
			return true;
		} finally {
			lock.unlock(keys);
		}
	}

 好,工具備了,接下來就是測試了,爲了測出並行度,我把轉帳過程延長了,加了個sleep(2),使每一個轉帳過程至少要花2毫秒(這只是個demo,真實環境下對數據庫操做也很費時)。

 

 

測試代碼以下:

//場景:多線程併發轉帳
public class Test {
	private final int[] account; // 帳戶數組,其索引爲帳戶ID,內容爲金額

	public Test(int count, int money) {
		account = new int[count];
		Arrays.fill(account, money);
	}

	boolean transfer(int from, int to, int money) {
		if (account[from] < money)
			return false;
		account[from] -= money;
		try {
			Thread.sleep(2);
		} catch (Exception e) {
		}
		account[to] += money;
		return true;
	}
	
	int getAmount() {
		int result = 0;
		for (int m : account)
			result += m;
		return result;
	}

	public static void main(String[] args) throws Exception {
		int count = 100;		//帳戶個數
		int money = 10000;		//帳戶初始金額
		int threadNum = 8;		//轉帳線程數
		int number = 10000;		//轉帳次數
		int maxMoney = 1000;	//隨機轉帳最大金額
		Test test = new Test(count, money);
		
		//不加鎖
//		Runner runner = test.new NonLockRunner(maxMoney, number);
		//加synchronized鎖
//		Runner runner = test.new SynchronizedRunner(maxMoney, number);
		//加ReentrantLock鎖
//		Runner runner = test.new ReentrantLockRunner(maxMoney, number);
		//加KeyLock鎖
		Runner runner = test.new KeyLockRunner(maxMoney, number);
		
		Thread[] threads = new Thread[threadNum];
		for (int i = 0; i < threadNum; i++)
			threads[i] = new Thread(runner, "thread-" + i);
		long begin = System.currentTimeMillis();
		for (Thread t : threads)
			t.start();
		for (Thread t : threads)
			t.join();
		long time = System.currentTimeMillis() - begin;
		System.out.println("類型:" + runner.getClass().getSimpleName());
		System.out.printf("耗時:%dms\n", time);
		System.out.printf("初始總金額:%d\n", count * money);
		System.out.printf("終止總金額:%d\n", test.getAmount());
	}

	// 轉帳任務
	abstract class Runner implements Runnable {
		final int maxMoney;
		final int number;
		private final Random random = new Random();
		private final AtomicInteger count = new AtomicInteger();

		Runner(int maxMoney, int number) {
			this.maxMoney = maxMoney;
			this.number = number;
		}

		@Override
		public void run() {
			while(count.getAndIncrement() < number) {
				int from = random.nextInt(account.length);
				int to;
				while ((to = random.nextInt(account.length)) == from)
					;
				int money = random.nextInt(maxMoney);
				doTransfer(from, to, money);
			}
		}

		abstract void doTransfer(int from, int to, int money);
	}

	// 不加鎖的轉帳
	class NonLockRunner extends Runner {
		NonLockRunner(int maxMoney, int number) {
			super(maxMoney, number);
		}

		@Override
		void doTransfer(int from, int to, int money) {
			transfer(from, to, money);
		}
	}

	// synchronized的轉帳
	class SynchronizedRunner extends Runner {
		SynchronizedRunner(int maxMoney, int number) {
			super(maxMoney, number);
		}

		@Override
		synchronized void doTransfer(int from, int to, int money) {
			transfer(from, to, money);
		}
	}

	// ReentrantLock的轉帳
	class ReentrantLockRunner extends Runner {
		private final ReentrantLock lock = new ReentrantLock();

		ReentrantLockRunner(int maxMoney, int number) {
			super(maxMoney, number);
		}

		@Override
		void doTransfer(int from, int to, int money) {
			lock.lock();
			try {
				transfer(from, to, money);
			} finally {
				lock.unlock();
			}
		}
	}

	// KeyLock的轉帳
	class KeyLockRunner extends Runner {
		private final KeyLock<Integer> lock = new KeyLock<Integer>();

		KeyLockRunner(int maxMoney, int number) {
			super(maxMoney, number);
		}

		@Override
		void doTransfer(int from, int to, int money) {
			Integer[] keys = new Integer[] {from, to};
			Arrays.sort(keys);
			lock.lock(keys);
			try {
				transfer(from, to, money);
			} finally {
				lock.unlock(keys);
			}
		}
	}
}

 最最重要的測試結果

 

 

(8線程對100個帳戶隨機轉帳總共10000次):

       類型:NonLockRunner(不加鎖)
       耗時:2482ms
       初始總金額:1000000
       終止總金額:998906(沒法保證原子性)

       類型:SynchronizedRunner(加synchronized鎖)
       耗時:20872ms
       初始總金額:1000000
       終止總金額:1000000

       類型:ReentrantLockRunner(加ReentrantLock鎖)
       耗時:21588ms
       初始總金額:1000000
       終止總金額:1000000

       類型:KeyLockRunner(加KeyLock鎖)
       耗時:2831ms
       初始總金額:1000000
       終止總金額:1000000

 

轉載:http://blog.csdn.net/icebamboo_moyun/article/details/9391915

相關文章
相關標籤/搜索