【Zookeeper系列四】ZooKeeper 分佈式鎖實現

#0 系列目錄#java

#1 場景描述# 在分佈式應用, 每每存在多個進程提供同一服務. 這些進程有可能在相同的機器上, 也有可能分佈在不一樣的機器上. 若是這些進程共享了一些資源, 可能就須要分佈式鎖來鎖定對這些資源的訪問

#2 思路# 進程須要訪問共享數據時, 就在"/locks"節點下建立一個sequence類型的子節點, 稱爲thisPath. 當thisPath在全部子節點中最小時, 說明該進程得到了鎖. 進程得到鎖以後, 就能夠訪問共享資源了. 訪問完成後, 須要將thisPath刪除. 鎖由新的最小的子節點得到.

有了清晰的思路以後, 還須要補充一些細節. 進程如何知道thisPath是全部子節點中最小的呢? 能夠在建立的時候, 經過getChildren方法獲取子節點列表, 而後在列表中找到排名比thisPath前1位的節點, 稱爲waitPath, 而後在waitPath上註冊監聽, 當waitPath被刪除後, 進程得到通知, 此時說明該進程得到了鎖.

#3 算法#

  1. lock操做過程:

首先爲一個lock場景,在zookeeper中指定對應的一個根節點,用於記錄資源競爭的內容;

每一個lock建立後,會lazy在zookeeper中建立一個node節點,代表對應的資源競爭標識。 (小技巧:node節點爲EPHEMERAL_SEQUENTIAL,自增加的臨時節點);

進行lock操做時,獲取對應lock根節點下的全部子節點,也即處於競爭中的資源標識;

按照Fair(公平)競爭的原則,按照對應的自增內容作排序,取出編號最小的一個節點作爲lock的owner,判斷本身的節點id是否就爲owner id,若是是則返回,lock成功。

若是本身非owner id,按照排序的結果找到序號比本身前一位的id,關注它鎖釋放的操做(也就是exist watcher),造成一個鏈式的觸發過程

  1. unlock操做過程:

將本身id對應的節點刪除便可,對應的下一個排隊的節點就能夠收到Watcher事件,從而被喚醒獲得鎖後退出

  1. 其中的幾個關鍵點:

node節點選擇爲EPHEMERAL_SEQUENTIAL很重要。

自增加的特性,能夠方便構建一個基於Fair特性的鎖,前一個節點喚醒後一個節點,造成一個鏈式的觸發過程。能夠有效的避免"驚羣效應"(一個鎖釋放,全部等待的線程都被喚醒),有針對性的喚醒,提高性能。

選擇一個EPHEMERAL臨時節點的特性。由於和zookeeper交互是一個網絡操做,不可控因素過多,好比網絡斷了,上一個節點釋放鎖的操做會失敗。臨時節點是和對應的session掛接的,session一旦超時或者異常退出其節點就會消失,相似於ReentrantLock中等待隊列Thread的被中斷處理

獲取lock操做是一個阻塞的操做,而對應的Watcher是一個異步事件,因此須要使用互斥信號共享鎖BooleanMutex進行通知,能夠比較方便的解決鎖重入的問題。(鎖重入能夠理解爲屢次讀操做,鎖釋放爲寫搶佔操做)

  1. 注意:

使用EPHEMERAL會引出一個風險:在非正常狀況下,網絡延遲比較大會出現session timeout,zookeeper就會認爲該client已關閉,從而銷燬其id標示,競爭資源的下一個id就能夠獲取鎖。這時可能會有兩個process同時拿到鎖在跑任務,因此設置好session timeout很重要。

一樣使用PERSISTENT一樣會存在一個死鎖的風險,進程異常退出後,對應的競爭資源id一直沒有刪除,下一個id一直沒法獲取到鎖對象

#4 實現# 1. DistributedLock.java源碼:分佈式鎖

package com.king.lock;

import java.io.IOException;
import java.util.List;
import java.util.SortedSet;
import java.util.TreeSet;

import org.apache.commons.lang3.StringUtils;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;

/**
 * Zookeeper 分佈式鎖
 */
public class DistributedLock {

	private static final int SESSION_TIMEOUT = 10000;

	private static final int DEFAULT_TIMEOUT_PERIOD = 10000;

	private static final String CONNECTION_STRING = "127.0.0.1:2180,127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183";

	private static final byte[] data = {0x12, 0x34};

	private ZooKeeper zookeeper;

	private String root;

	private String id;

	private LockNode idName;

	private String ownerId;

	private String lastChildId;

	private Throwable other = null;

	private KeeperException exception = null;

	private InterruptedException interrupt = null;

	public DistributedLock(String root) {
		try {
			this.zookeeper = new ZooKeeper(CONNECTION_STRING, SESSION_TIMEOUT, null);
			this.root = root;
			ensureExists(root);
		} catch (IOException e) {
			e.printStackTrace();
			other = e;
		}
	}

	/**
	 * 嘗試獲取鎖操做,阻塞式可被中斷
	 */
	public void lock() throws Exception {
		// 可能初始化的時候就失敗了
		if (exception != null) {
			throw exception;
		}

		if (interrupt != null) {
			throw interrupt;
		}

		if (other != null) {
			throw new Exception("", other);
		}

		if (isOwner()) {// 鎖重入
			return;
		}

		BooleanMutex mutex = new BooleanMutex();
		acquireLock(mutex);
		// 避免zookeeper重啓後致使watcher丟失,會出現死鎖使用了超時進行重試
		try {
//			mutex.lockTimeOut(DEFAULT_TIMEOUT_PERIOD, TimeUnit.MICROSECONDS);// 阻塞等待值爲true
			mutex.lock();
		} catch (Exception e) {
			e.printStackTrace();
			if (!mutex.state()) {
				lock();
			}
		}

		if (exception != null) {
			throw exception;
		}

		if (interrupt != null) {
			throw interrupt;
		}

		if (other != null) {
			throw new Exception("", other);
		}
	}

	/**
	 * 嘗試獲取鎖對象, 不會阻塞
	 *
	 * @throws InterruptedException
	 * @throws KeeperException
	 */
	public boolean tryLock() throws Exception {
		// 可能初始化的時候就失敗了
		if (exception != null) {
			throw exception;
		}

		if (isOwner()) { // 鎖重入
			return true;
		}

		acquireLock(null);

		if (exception != null) {
			throw exception;
		}

		if (interrupt != null) {
			Thread.currentThread().interrupt();
		}

		if (other != null) {
			throw new Exception("", other);
		}

		return isOwner();
	}

	/**
	 * 釋放鎖對象
	 */
	public void unlock() throws KeeperException {
		if (id != null) {
			try {
				zookeeper.delete(root + "/" + id, -1);
			} catch (InterruptedException e) {
				Thread.currentThread().interrupt();
			} catch (KeeperException.NoNodeException e) {
				// do nothing
			} finally {
				id = null;
			}
		} else {
			//do nothing
		}
	}

	/**
	 * 判斷某path節點是否存在,不存在就建立
	 * @param path
	 */
	private void ensureExists(final String path) {
		try {
			Stat stat = zookeeper.exists(path, false);
			if (stat != null) {
				return;
			}
			zookeeper.create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
		} catch (KeeperException e) {
			exception = e;
		} catch (InterruptedException e) {
			Thread.currentThread().interrupt();
			interrupt = e;
		}
	}

	/**
	 * 返回鎖對象對應的path
	 */
	public String getRoot() {
		return root;
	}

	/**
	 * 判斷當前是否是鎖的owner
	 */
	public boolean isOwner() {
		return id != null && ownerId != null && id.equals(ownerId);
	}

	/**
	 * 返回當前的節點id
	 */
	public String getId() {
		return this.id;
	}

	// ===================== helper method =============================

	/**
	 * 執行lock操做,容許傳遞watch變量控制是否須要阻塞lock操做
	 */
	private Boolean acquireLock(final BooleanMutex mutex) {
		try {
			do {
				if (id == null) { // 構建當前lock的惟一標識
					long sessionId = zookeeper.getSessionId();
					String prefix = "x-" + sessionId + "-";
					// 若是第一次,則建立一個節點
					String path = zookeeper.create(root + "/" + prefix, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
					int index = path.lastIndexOf("/");
					id = StringUtils.substring(path, index + 1);
					idName = new LockNode(id);
				}

				if (id != null) {
					List<String> names = zookeeper.getChildren(root, false);
					if (names.isEmpty()) {
						id = null; // 異常狀況,從新建立一個
					} else {
						// 對節點進行排序
						SortedSet<LockNode> sortedNames = new TreeSet<>();
						for (String name : names) {
							sortedNames.add(new LockNode(name));
						}

						if (!sortedNames.contains(idName)) {
							id = null;// 清空爲null,從新建立一個
							continue;
						}

						// 將第一個節點作爲ownerId
						ownerId = sortedNames.first().getName();
						if (mutex != null && isOwner()) {
							mutex.unlock();// 直接更新狀態,返回
							return true;
						} else if (mutex == null) {
							return isOwner();
						}

						SortedSet<LockNode> lessThanMe = sortedNames.headSet(idName);
						if (!lessThanMe.isEmpty()) {
							// 關注一下排隊在本身以前的最近的一個節點
							LockNode lastChildName = lessThanMe.last();
							lastChildId = lastChildName.getName();
							// 異步watcher處理
							Stat stat = zookeeper.exists(root + "/" + lastChildId, new Watcher() {
								public void process(WatchedEvent event) {
									acquireLock(mutex);
								}
							});

							if (stat == null) {
								acquireLock(mutex);// 若是節點不存在,須要本身從新觸發一下,watcher不會被掛上去
							}
						} else {
							if (isOwner()) {
								mutex.unlock();
							} else {
								id = null;// 可能本身的節點已超時掛了,因此id和ownerId不相同
							}
						}
					}
				}
			} while (id == null);
		} catch (KeeperException e) {
			exception = e;
			if (mutex != null) {
				mutex.unlock();
			}
		} catch (InterruptedException e) {
			interrupt = e;
			if (mutex != null) {
				mutex.unlock();
			}
		} catch (Throwable e) {
			other = e;
			if (mutex != null) {
				mutex.unlock();
			}
		}

		if (isOwner() && mutex != null) {
			mutex.unlock();
		}
		return Boolean.FALSE;
	}
}

2. BooleanMutex.java源碼:互斥信號共享鎖

package com.king.lock;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;

/**
 * 互斥信號共享鎖
 */
public class BooleanMutex {

	private Sync sync;

	public BooleanMutex() {
		sync = new Sync();
		set(false);
	}

	/**
	 * 阻塞等待Boolean爲true
	 * @throws InterruptedException
	 */
	public void lock() throws InterruptedException {
		sync.innerLock();
	}

	/**
	 * 阻塞等待Boolean爲true,容許設置超時時間
	 * @param timeout
	 * @param unit
	 * @throws InterruptedException
	 * @throws TimeoutException
	 */
	public void lockTimeOut(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException {
		sync.innerLock(unit.toNanos(timeout));
	}

	public void unlock(){
		set(true);
	}

	/**
	 * 從新設置對應的Boolean mutex
	 * @param mutex
	 */
	public void set(Boolean mutex) {
		if (mutex) {
			sync.innerSetTrue();
		} else {
			sync.innerSetFalse();
		}
	}

	public boolean state() {
		return sync.innerState();
	}

	/**
	 * 互斥信號共享鎖
	 */
	private final class Sync extends AbstractQueuedSynchronizer {
		private static final long serialVersionUID = -7828117401763700385L;

		/**
		 * 狀態爲1,則喚醒被阻塞在狀態爲FALSE的全部線程
		 */
		private static final int TRUE = 1;
		/**
		 * 狀態爲0,則當前線程阻塞,等待被喚醒
		 */
		private static final int FALSE = 0;

		/**
		 * 返回值大於0,則執行;返回值小於0,則阻塞
		 */
		protected int tryAcquireShared(int arg) {
			return getState() == 1 ? 1 : -1;
		}

		/**
		 * 實現AQS的接口,釋放共享鎖的判斷
		 */
		protected boolean tryReleaseShared(int ignore) {
			// 始終返回true,表明能夠release
			return true;
		}

		private boolean innerState() {
			return getState() == 1;
		}

		private void innerLock() throws InterruptedException {
			acquireSharedInterruptibly(0);
		}

		private void innerLock(long nanosTimeout) throws InterruptedException, TimeoutException {
			if (!tryAcquireSharedNanos(0, nanosTimeout))
				throw new TimeoutException();
		}

		private void innerSetTrue() {
			for (;;) {
				int s = getState();
				if (s == TRUE) {
					return; // 直接退出
				}
				if (compareAndSetState(s, TRUE)) {// cas更新狀態,避免併發更新true操做
					releaseShared(0);// 釋放一下鎖對象,喚醒一下阻塞的Thread
				}
			}
		}

		private void innerSetFalse() {
			for (;;) {
				int s = getState();
				if (s == FALSE) {
					return; //直接退出
				}
				if (compareAndSetState(s, FALSE)) {//cas更新狀態,避免併發更新false操做
					setState(FALSE);
				}
			}
		}
	}
}

3. 相關說明:

輸入圖片說明

4. 測試類:

package com.king.lock;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.apache.zookeeper.KeeperException;

/**
 * 分佈式鎖測試
 * @author taomk
 * @version 1.0
 * @since 15-11-19 上午11:48
 */
public class DistributedLockTest {

	public static void main(String [] args) {
		ExecutorService executor = Executors.newCachedThreadPool();
		final int count = 50;
		final CountDownLatch latch = new CountDownLatch(count);
		for (int i = 0; i < count; i++) {
			final DistributedLock node = new DistributedLock("/locks");
			executor.submit(new Runnable() {
				public void run() {
					try {
						Thread.sleep(1000);
//						node.tryLock(); // 無阻塞獲取鎖
						node.lock(); // 阻塞獲取鎖
						Thread.sleep(100);

						System.out.println("id: " + node.getId() + " is leader: " + node.isOwner());
					} catch (InterruptedException e) {
						e.printStackTrace();
					} catch (KeeperException e) {
						e.printStackTrace();
					} catch (Exception e) {
						e.printStackTrace();
					} finally {
						latch.countDown();
						try {
							node.unlock();
						} catch (KeeperException e) {
							e.printStackTrace();
						}
					}

				}
			});
		}

		try {
			latch.await();
		} catch (InterruptedException e) {
			e.printStackTrace();
		}

		executor.shutdown();
	}
}

控制檯輸出:

id: x-239027745716109354-0000000248 is leader: true
id: x-22854963329433645-0000000249 is leader: true
id: x-22854963329433646-0000000250 is leader: true
id: x-166970151413415997-0000000251 is leader: true
id: x-166970151413415998-0000000252 is leader: true
id: x-166970151413415999-0000000253 is leader: true
id: x-166970151413416000-0000000254 is leader: true
id: x-166970151413416001-0000000255 is leader: true
id: x-166970151413416002-0000000256 is leader: true
id: x-22854963329433647-0000000257 is leader: true
id: x-239027745716109355-0000000258 is leader: true
id: x-166970151413416003-0000000259 is leader: true
id: x-94912557367427124-0000000260 is leader: true
id: x-22854963329433648-0000000261 is leader: true
id: x-239027745716109356-0000000262 is leader: true
id: x-239027745716109357-0000000263 is leader: true
id: x-166970151413416004-0000000264 is leader: true
id: x-239027745716109358-0000000265 is leader: true
id: x-239027745716109359-0000000266 is leader: true
id: x-22854963329433649-0000000267 is leader: true
id: x-22854963329433650-0000000268 is leader: true
id: x-94912557367427125-0000000269 is leader: true
id: x-22854963329433651-0000000270 is leader: true
id: x-94912557367427126-0000000271 is leader: true
id: x-239027745716109360-0000000272 is leader: true
id: x-94912557367427127-0000000273 is leader: true
id: x-94912557367427128-0000000274 is leader: true
id: x-166970151413416005-0000000275 is leader: true
id: x-94912557367427129-0000000276 is leader: true
id: x-166970151413416006-0000000277 is leader: true
id: x-94912557367427130-0000000278 is leader: true
id: x-94912557367427131-0000000279 is leader: true
id: x-239027745716109361-0000000280 is leader: true
id: x-239027745716109362-0000000281 is leader: true
id: x-166970151413416007-0000000282 is leader: true
id: x-94912557367427132-0000000283 is leader: true
id: x-22854963329433652-0000000284 is leader: true
id: x-166970151413416008-0000000285 is leader: true
id: x-239027745716109363-0000000286 is leader: true
id: x-239027745716109364-0000000287 is leader: true
id: x-166970151413416009-0000000288 is leader: true
id: x-166970151413416010-0000000289 is leader: true
id: x-239027745716109365-0000000290 is leader: true
id: x-94912557367427133-0000000291 is leader: true
id: x-239027745716109366-0000000292 is leader: true
id: x-94912557367427134-0000000293 is leader: true
id: x-22854963329433653-0000000294 is leader: true
id: x-94912557367427135-0000000295 is leader: true
id: x-239027745716109367-0000000296 is leader: true
id: x-239027745716109368-0000000297 is leader: true

#5 升級版# 實現了一個分佈式lock後,能夠解決多進程之間的同步問題,但設計多線程+多進程的lock控制需求,單jvm中每一個線程都和zookeeper進行網絡交互成本就有點高了,因此基於DistributedLock,實現了一個分佈式二層鎖。

大體原理就是ReentrantLock 和 DistributedLock的一個結合:

  1. 單jvm的多線程競爭時,首先須要先拿到第一層的ReentrantLock的鎖
  2. 拿到鎖以後這個線程再去和其餘JVM的線程競爭鎖,最後拿到以後鎖以後就開始處理任務

鎖的釋放過程是一個反方向的操做,先釋放DistributedLock,再釋放ReentrantLock。 能夠思考一下,若是先釋放ReentrantLock,假如這個JVM ReentrantLock競爭度比較高,一直其餘JVM的鎖競爭容易被餓死

1. DistributedReentrantLock.java源碼:多進程+多線程分佈式鎖

package com.king.lock;

import java.text.MessageFormat;
import java.util.concurrent.locks.ReentrantLock;

import org.apache.zookeeper.KeeperException;

/**
 * 多進程+多線程分佈式鎖
 */
public class DistributedReentrantLock extends DistributedLock {

	private static final String ID_FORMAT = "Thread[{0}] Distributed[{1}]";
	private ReentrantLock reentrantLock = new ReentrantLock();

	public DistributedReentrantLock(String root) {
		super(root);
	}

	public void lock() throws Exception {
		reentrantLock.lock();//多線程競爭時,先拿到第一層鎖
		super.lock();
	}

	public boolean tryLock() throws Exception {
		//多線程競爭時,先拿到第一層鎖
		return reentrantLock.tryLock() && super.tryLock();
	}

	public void unlock() throws KeeperException {
		super.unlock();
		reentrantLock.unlock();//多線程競爭時,釋放最外層鎖
	}

	@Override
	public String getId() {
		return MessageFormat.format(ID_FORMAT, Thread.currentThread().getId(), super.getId());
	}

	@Override
	public boolean isOwner() {
		return reentrantLock.isHeldByCurrentThread() && super.isOwner();
	}
}

2. 測試代碼:

package com.king.lock;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.apache.zookeeper.KeeperException;

/**
 * @author taomk
 * @version 1.0
 * @since 15-11-23 下午12:15
 */
public class DistributedReentrantLockTest {

	public static void main(String [] args) {
		ExecutorService executor = Executors.newCachedThreadPool();
		final int count = 50;
		final CountDownLatch latch = new CountDownLatch(count);

		final DistributedReentrantLock lock = new DistributedReentrantLock("/locks"); //單個鎖
		for (int i = 0; i < count; i++) {
			executor.submit(new Runnable() {
				public void run() {
					try {
						Thread.sleep(1000);
						lock.lock();
						Thread.sleep(100);

						System.out.println("id: " + lock.getId() + " is leader: " + lock.isOwner());
					} catch (Exception e) {
						e.printStackTrace();
					} finally {
						latch.countDown();
						try {
							lock.unlock();
						} catch (KeeperException e) {
							e.printStackTrace();
						}
					}
				}
			});
		}

		try {
			latch.await();
		} catch (InterruptedException e) {
			e.printStackTrace();
		}

		executor.shutdown();
	}
}

#6 最後# 其實再能夠發散一下,實現一個分佈式的read/write lock,也差很少就是這個理了。大體思路:

  1. 競爭資源標示: read_自增id , write_自增id
  2. 首先按照自增id進行排序,若是隊列的前邊都是read標識,對應的全部read都得到鎖若是隊列的前邊是write標識,第一個write節點獲取鎖
  3. watcher監聽: read監聽距離本身最近的一個write節點的existwrite監聽距離本身最近的一個節點(read或者write節點)
相關文章
相關標籤/搜索