💛分佈式解決方案源碼,請幫我點個star哦!
💛原文地址爲http://www.javashuo.com/article/p-gquycsdz-c.html,轉載請註明出處!html
在單體項目中jvm中的鎖便可完成須要,可是微服務、分佈式環境下,同一個服務可能部署在多臺服務器上,多個jvm之間沒法經過經常使用的jvm鎖來完成同步操做,須要借用分佈式鎖來完成上鎖、釋放鎖。例如在訂單服務中,咱們須要根據日期來生成訂單號流水,就有可能產生相同的時間日期,從而出現重複訂單號。(jdk8使用LocalDateTime線程安全,不會存在這樣的問題)java
第一種方案實現較爲簡單,邏輯就是誰建立成功該節點,誰就持有鎖,建立失敗的本身進行阻塞,A線程先持有鎖,B線程獲取失敗就會阻塞,同時對/lockPath設置監聽,A線程執行完操做後刪除節點,觸發監聽器,B線程此時解除阻塞,從新去獲取鎖。node
咱們模仿原生jdk的lock接口設計,採用模板方法設計模式來編寫分佈式鎖,這樣的好處是擴展性強,咱們能夠快速切換到redis分佈式鎖、數據庫分佈式鎖等實現方式。git
建立Lock接口github
public interface Lock { /** * 獲取鎖 */ void getLock() throws Exception; /** * 釋放鎖 */ void unlock() throws Exception; }
AbstractTemplateLock抽象類redis
public abstract class AbstractTemplateLock implements Lock { @Override public void getLock() { if (tryLock()) { System.out.println(Thread.currentThread().getName() + "獲取鎖成功"); } else { //等待 waitLock();//事件監聽 若是節點被刪除則能夠從新獲取 //從新獲取 getLock(); } } protected abstract void waitLock(); protected abstract boolean tryLock(); protected abstract void releaseLock(); @Override public void unlock() { releaseLock(); } }
zookeeper分佈式鎖邏輯數據庫
@Slf4j public class ZkTemplateLock extends AbstractTemplateLock { private static final String zkServers = "127.0.0.1:2181"; private static final int sessionTimeout = 8000; private static final int connectionTimeout = 5000; private static final String lockPath = "/lockPath"; private ZkClient client; public ZkTemplateLock() { client = new ZkClient(zkServers, sessionTimeout, connectionTimeout); log.info("zk client 鏈接成功:{}",zkServers); } @Override protected void waitLock() { CountDownLatch latch = new CountDownLatch(1); IZkDataListener listener = new IZkDataListener() { @Override public void handleDataDeleted(String dataPath) throws Exception { System.out.println("監聽到節點被刪除"); latch.countDown(); } @Override public void handleDataChange(String dataPath, Object data) throws Exception {} }; //完成 watcher 註冊 client.subscribeDataChanges(lockPath, listener); //阻塞本身 if (client.exists(lockPath)) { try { latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } } //取消watcher註冊 client.unsubscribeDataChanges(lockPath, listener); } @Override protected boolean tryLock() { try { client.createEphemeral(lockPath); System.out.println(Thread.currentThread().getName()+"獲取到鎖"); } catch (Exception e) { log.error("建立失敗"); return false; } return true; } @Override public void releaseLock() { client.delete(this.lockPath); } }
缺點:設計模式
每次去競爭鎖,都只會有一個線程拿到鎖,當線程數龐大時會發生「驚羣」現象,zookeeper節點可能會運行緩慢甚至宕機。這是由於其餘線程沒獲取到鎖時都會監聽/lockPath節點,當A線程釋放完畢,海量的線程都同時中止阻塞,去爭搶鎖,這種操做十分耗費資源,且性能大打折扣。安全
臨時順序節點與臨時節點不一樣的是產生的節點是有序的,咱們能夠利用這一特色,只讓當前線程監聽上一序號的線程,每次獲取鎖的時候判斷本身的序號是否爲最小,最小即獲取到鎖,執行完畢就刪除當前節點繼續判斷誰爲最小序號的節點。
服務器
臨時順序節點操做源碼
@Slf4j public class ZkSequenTemplateLock extends AbstractTemplateLock { private static final String zkServers = "127.0.0.1:2181"; private static final int sessionTimeout = 8000; private static final int connectionTimeout = 5000; private static final String lockPath = "/lockPath"; private String beforePath; private String currentPath; private ZkClient client; public ZkSequenTemplateLock() { client = new ZkClient(zkServers); if (!client.exists(lockPath)) { client.createPersistent(lockPath); } log.info("zk client 鏈接成功:{}",zkServers); } @Override protected void waitLock() { CountDownLatch latch = new CountDownLatch(1); IZkDataListener listener = new IZkDataListener() { @Override public void handleDataDeleted(String dataPath) throws Exception { System.out.println("監聽到節點被刪除"); latch.countDown(); } @Override public void handleDataChange(String dataPath, Object data) throws Exception {} }; //給排在前面的節點增長數據刪除的watcher,本質是啓動另外一個線程去監聽上一個節點 client.subscribeDataChanges(beforePath, listener); //阻塞本身 if (client.exists(beforePath)) { try { System.out.println("阻塞"+currentPath); latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } } //取消watcher註冊 client.unsubscribeDataChanges(beforePath, listener); } @Override protected boolean tryLock() { if (currentPath == null) { //建立一個臨時順序節點 currentPath = client.createEphemeralSequential(lockPath + "/", "lock-data"); System.out.println("current:" + currentPath); } //得到全部的子節點並排序。臨時節點名稱爲自增加的字符串 List<String> childrens = client.getChildren(lockPath); //排序list,按天然順序排序 Collections.sort(childrens); if (currentPath.equals(lockPath + "/" + childrens.get(0))) { return true; } else { //若是當前節點不是排第一,則獲取前面一個節點信息,賦值給beforePath int curIndex = childrens.indexOf(currentPath.substring(lockPath.length() + 1)); beforePath = lockPath + "/" + childrens.get(curIndex - 1); } System.out.println("beforePath"+beforePath); return false; } @Override public void releaseLock() { System.out.println("delete:" + currentPath); client.delete(currentPath); } }
curator提供瞭如下種類的鎖:
咱們採用第一種Shared Reentrant Lock中的InterProcessMutex
來完成上鎖、釋放鎖的的操做
public class ZkLockWithCuratorTemplate implements Lock { // zk host地址 private String host = "localhost"; // zk自增存儲node private String lockPath = "/curatorLock"; // 重試休眠時間 private static final int SLEEP_TIME_MS = 1000; // 最大重試1000次 private static final int MAX_RETRIES = 1000; //會話超時時間 private static final int SESSION_TIMEOUT = 30 * 1000; //鏈接超時時間 private static final int CONNECTION_TIMEOUT = 3 * 1000; //curator核心操做類 private CuratorFramework curatorFramework; InterProcessMutex lock; public ZkLockWithCuratorTemplate() { curatorFramework = CuratorFrameworkFactory.builder() .connectString(host) .connectionTimeoutMs(CONNECTION_TIMEOUT) .sessionTimeoutMs(SESSION_TIMEOUT) .retryPolicy(new ExponentialBackoffRetry(SLEEP_TIME_MS, MAX_RETRIES)) .build(); curatorFramework.start(); lock = new InterProcessMutex (curatorFramework, lockPath); } @Override public void getLock() throws Exception { //5s後超時釋放鎖 lock.acquire(5, TimeUnit.SECONDS); } @Override public void unlock() throws Exception { lock.release(); } }
源碼以及測試類地址
https://github.com/Motianshi/distribute-tool