隨着數據量的增大,用戶的增多,系統的併發訪問愈來愈大,傳統的單機已經知足不了需求,分佈式系統成爲一種必然的趨勢。分佈式系統錯綜複雜,今天,咱們着重對分佈式系統的互斥性與冪等性進行分析與解決。java
互斥性問題也就是共享資源的搶佔問題。如何解決呢?也就是鎖,保證對共享資源的串行化訪問。互斥性要如何實現?。在java中,最經常使用的是synchronized和lock這兩種內置的鎖,但這隻適用於單進程中的多線程。對於在同一操做系統下的多個進程間,常見的鎖實現有pv信號量等。然而,當問題擴展到多臺機器的多個操做系統時,也就是分佈式鎖,狀況就複雜多了。node
今天重點講解使用zookeeper實現分佈式鎖。我的感受zookeeper是最適合實現分佈式鎖。它的幾個特性:apache
zk實現分佈式鎖的流程以下
我這裏用zk實現了一個可重入的、阻塞的、公平的分佈式鎖,代碼以下:多線程
package locks; import lombok.extern.slf4j.Slf4j; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.Stat; import utils.ZkUtils; import watcher.PredecessorNodeWatcher; import watcher.SessionWatcher; import java.io.IOException; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; /** * Created by huangwt on 2018/3/21. */ @Slf4j public class ReentrantZKLock { private final static String BASE_NODE = "/baseNode"; private final static String CHILDREN_NODE = "/node_"; private final Lock localLock; private final Condition condition; //用於重入檢測 private static ThreadLocal<AtomicInteger> threadLocal = new ThreadLocal<AtomicInteger>(); private ZooKeeper zooKeeper = null; private String node = null; ReentrantZKLock(String addr, int timeout) { try { zooKeeper = new ZooKeeper(addr, timeout, new SessionWatcher()); localLock = new ReentrantLock(); condition = localLock.newCondition(); } catch (IOException e) { log.error("get zookeeper failed", e); throw new RuntimeException(e); } } public void lock() { //重入檢測 if (checkReentrant()) { return; } try { node = zooKeeper.create(BASE_NODE + CHILDREN_NODE, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); while (true) { localLock.lock(); try { List<String> childrenNodes = zooKeeper.getChildren(BASE_NODE, false); ZkUtils.childNodeSort(childrenNodes); //當前節點的索引 int myNodeIndex = childrenNodes.indexOf(node); //當前節點的前一個節點 int beforeNodeIndex = myNodeIndex - 1; Stat stat = null; while (beforeNodeIndex >= 0) { stat = zooKeeper.exists(childrenNodes.get(beforeNodeIndex), new PredecessorNodeWatcher(condition)); if (stat != null) { break; } } if (stat != null) { //前序節點存在,等待前序節點被刪除,釋放鎖 condition.await(); } else { // 獲取到鎖 threadLocal.set(new AtomicInteger(1)); return; } } finally { localLock.unlock(); } } } catch (Exception e) { log.error("lock failed", e); throw new RuntimeException(e); } } public void unlock() { AtomicInteger times = threadLocal.get(); if (times == null) { return; } if (times.decrementAndGet() == 0) { threadLocal.remove(); try { zooKeeper.delete(node, -1); } catch (Exception e) { log.error("unlock faild", e); throw new RuntimeException(e); } } } private boolean checkReentrant() { AtomicInteger times = threadLocal.get(); if (times != null) { times.incrementAndGet(); return true; } return false; } }
package utils; import java.util.Collections; import java.util.Comparator; import java.util.List; /** * Created by huangwt on 2018/3/24. */ public class ZkUtils { /** * 對子節點排序 * * @param node */ public static void childNodeSort(List<String> node) { Collections.sort(node, new ChildNodeCompare()); } private static class ChildNodeCompare implements Comparator<String> { public int compare(String childNode1, String childNode2) { return childNode1.compareTo(childNode2); } } }
package watcher; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import java.util.concurrent.locks.Condition; /** * Created by huangwt on 2018/3/24. */ public class PredecessorNodeWatcher implements Watcher { private Condition condition = null; public PredecessorNodeWatcher(Condition condition) { this.condition = condition; } public void process(WatchedEvent event) { //前序節點被刪除,鎖被釋放,喚醒當前等待線程 if(event.getType() == Event.EventType.NodeDeleted){ condition.signal(); } } }
package watcher; import lombok.extern.slf4j.Slf4j; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; /** * Created by huangwt on 2018/3/24. */ @Slf4j public class SessionWatcher implements Watcher { public void process(WatchedEvent event) { if (event.getState() == Event.KeeperState.SyncConnected) { log.info("get zookeeper success"); } } }
主要是使用了ThreadLocal實現了鎖的可重入性,使用watch機制實現了阻塞鎖,使用臨時節點實現的公平鎖。
這段代碼只是一個demo供你們參考,還有不少問題沒解決。好比當zookper掛掉的時候,阻塞的線程就沒法被喚醒,這時候就須要監聽zk的心跳。併發
冪等性是系統接口對外的一種承諾,數學表達爲:f(f(x)) = f(x)。
冪等性指的是,使用相同參數對同一資源重複調用某個接口的結果與調用一次的結果相同。分佈式
假設如今有一個方法 :Boolean withdraw(account_id, amount) ,做用是從account_id對應的帳戶中扣除amount數額的錢,若是扣除成功則返回true,帳戶餘額減小amount; 若是扣除失敗則返回false,帳戶餘額不變。
如以上流程,接口沒法冪等,可能致使重複扣款。性能