1.可重入鎖Shared Reentrant Lock
首先咱們先看一個全局可重入的鎖(
能夠屢次獲取,不會被阻塞
)。Shared意味着鎖是全局可見的,客戶端均可以請求鎖。Reentrant和JDK的ReentrantLock相似,意味着同一個客戶端在擁有鎖的同時,能夠屢次獲取,不會被阻塞。
1.可重入鎖相關類介紹
它是由類InterProcessMutex來實現。
它的主要方法:
// 構造方法public InterProcessMutex(CuratorFramework client, String path)public InterProcessMutex(CuratorFramework client, String path, LockInternalsDriver driver)// 經過acquire得到鎖,並提供超時機制:public void acquire() throws Exceptionpublic boolean acquire(long time, TimeUnit unit) throws Exception// 撤銷鎖public void makeRevocable(RevocationListener<InterProcessMutex> listener)public void makeRevocable(final RevocationListener<InterProcessMutex> listener, Executor executor)
錯誤處理:
仍是強烈推薦你使用ConnectionStateListener處理鏈接狀態的改變。當鏈接LOST時你再也不擁有鎖。
2.編寫示例程序
首先讓咱們建立一個模擬的共享資源, 這個資源指望只能單線程的訪問,不然會有併發問題。
public class FakeLimitedResource{ private final AtomicBoolean inUse = new AtomicBoolean(false); // 模擬只能單線程操做的資源 public void use() throws InterruptedException { if (!inUse.compareAndSet(false, true)) { // 在正確使用鎖的狀況下,此異常不可能拋出 throw new IllegalStateException("Needs to be used by one client at a time"); } try { Thread.sleep((long) (3 * Math.random())); } finally { inUse.set(false); } }}
而後建立一個ExampleClientThatLocks類,它負責請求鎖,使用資源,釋放鎖這樣一個完整的訪問過程。
public class ExampleClientThatLocks{ private final InterProcessMutex lock; private final FakeLimitedResource resource; private final String clientName; public ExampleClientThatLocks(CuratorFramework client, String lockPath, FakeLimitedResource resource, String clientName) { this.resource = resource; this.clientName = clientName; lock = new InterProcessMutex(client, lockPath); } public void doWork(long time, TimeUnit unit) throws Exception { if (!lock.acquire(time, unit)) { throw new IllegalStateException(clientName + " 不能獲得互斥鎖"); } try { System.out.println(clientName + " 已獲取到互斥鎖"); resource.use(); // 使用資源 Thread.sleep(1000 * 1); } finally { System.out.println(clientName + " 釋放互斥鎖"); lock.release(); // 老是在finally中釋放 } }}
最後建立主程序來測試:
public class InterProcessMutexExample{ private static final int QTY = 5; private static final int REPETITIONS = QTY * 10; private static final String PATH = "/examples/locks"; public static void main(String[] args) throws Exception { final FakeLimitedResource resource = new FakeLimitedResource(); final List<CuratorFramework> clientList = new ArrayList<CuratorFramework>(); for (int i = 0; i < QTY; i++) { CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", new ExponentialBackoffRetry(1000, 3)); client.start(); clientList.add(client); } System.out.println("鏈接初始化完成!"); ExecutorService service = Executors.newFixedThreadPool(QTY); for (int i = 0; i < QTY; ++i) { final int index = i; Callable<Void> task = new Callable<Void>() { @Override public Void call() throws Exception { try { final ExampleClientThatLocks example = new ExampleClientThatLocks(clientList.get(index), PATH, resource, "Client " + index); for (int j = 0; j < REPETITIONS; ++j) { example.doWork(10, TimeUnit.SECONDS); } } catch (Throwable e) { e.printStackTrace(); } finally { CloseableUtils.closeQuietly(clientList.get(index)); } return null; } }; service.submit(task); } service.shutdown(); service.awaitTermination(10, TimeUnit.MINUTES); System.out.println("OK!"); }}
代碼也很簡單,生成5個client,每一個client重複執行10次 請求鎖--訪問資源--釋放鎖的過程。每一個client都在獨立的線程中。
結果能夠看到,鎖是隨機的被每一個實例排他性的使用。
既然是可重入鎖,你能夠在一個線程中屢次調用acquire,在線程擁有鎖時它老是返回true。
注意:你不該該在多個線程中用同一個InterProcessMutex, 你能夠在每一個線程中都生成一個InterProcessMutex實例,它們的path都同樣,這樣它們能夠共享同一個鎖。
3.示例運行結果
運行結果控制檯以下:
鏈接初始化完成!Client 4 已獲取到互斥鎖Client 4 釋放互斥鎖Client 3 已獲取到互斥鎖Client 3 釋放互斥鎖......Client 2 已獲取到互斥鎖Client 2 釋放互斥鎖OK!
運行時查看Zookeeper節點信息以下:
2.不可重入鎖Shared Lock
這個鎖和上面的相比,就是少了Reentrant的功能,也就意味着它不能在同一個線程中重入。
這個類是InterProcessSemaphoreMutex
使用方法和上面的類相似
首先咱們將上面的例子修改一下,測試一下它的重入。
修改ExampleClientThatLocks.doWork,連續兩次acquire:
public void doWork(long time, TimeUnit unit) throws Exception{ if (!lock.acquire(time, unit)) { throw new IllegalStateException(clientName + " 不能獲得互斥鎖"); } System.out.println(clientName + " 已獲取到互斥鎖"); if (!lock.acquire(time, unit)) { throw new IllegalStateException(clientName + " 不能獲得互斥鎖"); } System.out.println(clientName + " 再次獲取到互斥鎖"); try { resource.use(); // 使用資源 Thread.sleep(1000 * 1); } finally { System.out.println(clientName + " 釋放互斥鎖"); lock.release(); // 老是在finally中釋放 lock.release(); // 獲取鎖幾回 釋放鎖也要幾回 }}
注意:咱們也須要調用release兩次。這和JDK的ReentrantLock用法一致。若是少調用一次release,則此線程依然擁有鎖。
上面的代碼沒有問題,咱們能夠屢次調用acquire,後續的acquire也不會阻塞。
可是將上面的InterProcessMutex換成不可重入鎖InterProcessSemaphoreMutex,若是再運行上面的代碼,結果就會發現線程被阻塞在第二個acquire上,直到超時。也就是此鎖不是可重入的。
3.可重入讀寫鎖Shared Reentrant Read Write Lock
相似JDK的ReentrantReadWriteLock。
一個讀寫鎖管理一對相關的鎖。一個負責讀操做,另一個負責寫操做。讀操做在寫鎖沒被使用時可同時由多個進程使用,而寫鎖在使用時不容許讀(阻塞)。
此鎖是可重入的。一個擁有寫鎖的線程可重入讀鎖,可是讀鎖卻不能進入寫鎖。
這也意味着寫鎖能夠降級成讀鎖, 好比請求寫鎖 --->讀鎖 ---->釋放寫鎖。從讀鎖升級成寫鎖是不行的。
1.可重入讀寫鎖相關類介紹
可重入讀寫鎖
主要由兩個類實現:
InterProcessReadWriteLock、InterProcessMutex
。
使用時首先建立一個InterProcessReadWriteLock實例,而後再根據你的需求獲得讀鎖或者寫鎖,讀寫鎖的類型是InterProcessMutex
。
2.編寫示例程序
示例程序仍使用上面的FakeLimitedResource、InterProcessMutexExample類
public class ExampleClientReadWriteLocks{ private final InterProcessReadWriteLock lock; private final InterProcessMutex readLock; private final InterProcessMutex writeLock; private final FakeLimitedResource resource; private final String clientName; public ExampleClientReadWriteLocks(CuratorFramework client, String lockPath, FakeLimitedResource resource, String clientName) { this.resource = resource; this.clientName = clientName; lock = new InterProcessReadWriteLock(client, lockPath); readLock = lock.readLock(); writeLock = lock.writeLock(); } public void doWork(long time, TimeUnit unit) throws Exception { // 注意只能先獲得寫鎖再獲得讀鎖,不能反過來!!! if (!writeLock.acquire(time, unit)) { throw new IllegalStateException(clientName + " 不能獲得寫鎖"); } System.out.println(clientName + " 已獲得寫鎖"); if (!readLock.acquire(time, unit)) { throw new IllegalStateException(clientName + " 不能獲得讀鎖"); } System.out.println(clientName + " 已獲得讀鎖"); try { resource.use(); // 使用資源 Thread.sleep(1000 * 1); } finally { System.out.println(clientName + " 釋放讀寫鎖"); readLock.release(); writeLock.release(); } }}
在這個類中咱們首先請求了一個寫鎖,而後降級成讀鎖。執行業務處理,而後釋放讀寫鎖。修改
InterProcessMutexExample類中的
ExampleClientThatLocks
爲
ExampleClientReadWriteLocks
而後運行示例。
3.
示例運行結果
運行結果控制檯:
鏈接初始化完成!Client 1 已獲得寫鎖Client 1 已獲得讀鎖Client 1 釋放讀寫鎖......Client 3 已獲得寫鎖Client 3 已獲得讀鎖Client 3 釋放讀寫鎖OK!
此時查看Zookeeper數據節點以下:
4.信號量Shared Semaphore
一個計數的信號量相似JDK的Semaphore。JDK中Semaphore維護的一組許可(permits),而Cubator中稱之爲
租約(Lease)。
有兩種方式能夠決定semaphore的最大租約數。第一種方式是有用戶給定的path決定。第二種方式使用SharedCountReader類。
若是不使用SharedCountReader,沒有內部代碼檢查進程是否假定有10個租約而進程B假定有20個租約。 因此全部的實例必須使用相同的numberOfLeases值.
1.信號量實現類說明
主要類有:
- InterProcessSemaphoreV2 - 信號量實現類
- Lease - 租約(單個信號)
- SharedCountReader - 計數器,用於計算最大租約數量
此次調用acquire會返回一個租約對象。客戶端必須在finally中close這些租約對象,不然這些租約會丟失掉。可是,若是客戶端session因爲某種緣由好比crash丟掉,那麼這些客戶端持有的租約會自動close,這樣其它客戶端能夠繼續使用這些租約。
租約還能夠經過下面的方式返還:
public void returnLease(Lease lease)public void returnAll(Collection<Lease> leases)
注意一次你能夠請求多個租約,若是Semaphore當前的租約不夠,則請求線程會被阻塞。同時還提供了超時的重載方法。
public Lease acquire() throws Exceptionpublic Collection<Lease> acquire(int qty) throws Exceptionpublic Lease acquire(long time, TimeUnit unit) throws Exceptionpublic Collection<Lease> acquire(int qty, long time, TimeUnit unit) throws Exception
2.編寫示例程序
public class InterProcessSemaphoreExample{ private static final int MAX_LEASE = 10; private static final String PATH = "/examples/locks"; public static void main(String[] args) throws Exception { FakeLimitedResource resource = new FakeLimitedResource(); CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", new ExponentialBackoffRetry(1000, 3)); client.start(); InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(client, PATH, MAX_LEASE); Collection<Lease> leases = semaphore.acquire(5); System.out.println("獲取租約數量:" + leases.size()); Lease lease = semaphore.acquire(); System.out.println("獲取單個租約"); resource.use(); Collection<Lease> leases2 = semaphore.acquire(5, 10, TimeUnit.SECONDS); System.out.println("獲取租約,若是爲空則超時: " + leases2); System.out.println("釋放租約"); semaphore.returnLease(lease); System.out.println("釋放集合中的全部租約"); semaphore.returnAll(leases); client.close(); System.out.println("OK!"); }}
首先咱們先得到了5個租約,
接着請求了一個租約,由於semaphore還有5個租約,因此請求能夠知足,返回一個租約,還剩4個租約。
而後再請求一個租約,由於租約不夠,阻塞到超時,仍是沒能知足,返回結果爲null。
3.示例運行結果
運行結果控制檯以下:
獲取租約數量:5獲取單個租約獲取租約,若是爲空則超時: null釋放租約釋放集合中的全部租約OK!
此時查看Zookeeper數據節點以下:
注意:
上面所講的4種鎖都是公平鎖(fair)。從ZooKeeper的角度看,每一個客戶端都按照請求的順序得到鎖。至關公平。
5.多鎖對象 Multi Shared Lock
Multi Shared Lock是一個鎖的容器。當調用acquire,全部的鎖都會被acquire,若是請求失敗,全部的鎖都會被release。一樣調用release時全部的鎖都被release(失敗被忽略)。
基本上,它就是組鎖的表明,在它上面的請求釋放操做都會傳遞給它包含的全部的鎖。
1.主要類說明
主要涉及兩個類:
- InterProcessMultiLock - 對所對象實現類
- InterProcessLock - 分佈式鎖接口類
它的構造函數須要包含的鎖的集合,或者一組ZooKeeper的path。用法和Shared Lock相同。
-------------------------------------------------------------------------------------------------------------------------------