分佈式鎖(zookeeper)與接口冪等性實現

背景

隨着數據量的增大,用戶的增多,系統的併發訪問愈來愈大,傳統的單機已經知足不了需求,分佈式系統成爲一種必然的趨勢。分佈式系統錯綜複雜,今天,咱們着重對分佈式系統的互斥性與冪等性進行分析與解決。java

互斥性

互斥性問題也就是共享資源的搶佔問題。如何解決呢?也就是鎖,保證對共享資源的串行化訪問。互斥性要如何實現?。在java中,最經常使用的是synchronized和lock這兩種內置的鎖,但這隻適用於單進程中的多線程。對於在同一操做系統下的多個進程間,常見的鎖實現有pv信號量等。然而,當問題擴展到多臺機器的多個操做系統時,也就是分佈式鎖,狀況就複雜多了。node

  • 鎖要存在哪裏。必須提供一個全部主機都能訪問到的存儲空間
  • 加鎖的進程在掛掉以後,如何確保鎖被解開,釋放資源。能夠經過超時機制或者定時檢測心跳來實現
  • 不一樣進程間如何獲取相同的惟一標識來競爭鎖。能夠利用要保護的資源生成一個惟一的id
  • 獲取鎖操做的原子性。必須保證讀取鎖狀態、加鎖兩步的原子性
  • 鎖的可重入性。某個線程試圖再次獲取由本身持有的鎖,這個操做會百分百成功,這就是可重入性。若是不能保證可重入性,就會有死鎖的可能。
  • 阻塞鎖與自旋鎖。當獲取不到鎖時,阻塞鎖就是線程阻塞自身,等待喚醒,自旋鎖就是不斷的嘗試從新獲取鎖。
  • 公平鎖與非公平鎖。公平鎖保證按照請求的順序獲取鎖,非公平鎖就是能夠插隊。公平鎖通常要維持一個隊列來實現,因此非公平鎖的性能會更好一點。
  • 避免驚羣效應。若是分佈式鎖是阻塞鎖,當鎖的佔有者釋放鎖時,要避免同時喚醒多個阻塞的線程,產生驚羣效應。

zookeeper實現

今天重點講解使用zookeeper實現分佈式鎖。我的感受zookeeper是最適合實現分佈式鎖。它的幾個特性:apache

  • 順序節點:能夠避免驚羣效應
  • 臨時節點:避免機器宕機卻是鎖沒法釋放
  • watch機制:能夠及時喚醒等待的線程

zk實現分佈式鎖的流程以下
這裏寫圖片描述
我這裏用zk實現了一個可重入的、阻塞的、公平的分佈式鎖,代碼以下:多線程

package locks;

import lombok.extern.slf4j.Slf4j;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import utils.ZkUtils;
import watcher.PredecessorNodeWatcher;
import watcher.SessionWatcher;

import java.io.IOException;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
* Created by huangwt on 2018/3/21.
*/
@Slf4j
public class ReentrantZKLock {

    private final static String BASE_NODE = "/baseNode";
    private final static String CHILDREN_NODE = "/node_";

    private final Lock localLock;
    private final Condition condition;

    //用於重入檢測
    private static ThreadLocal<AtomicInteger> threadLocal = new ThreadLocal<AtomicInteger>();

    private ZooKeeper zooKeeper = null;

    private String node = null;

    ReentrantZKLock(String addr, int timeout) {
        try {
            zooKeeper = new ZooKeeper(addr, timeout, new SessionWatcher());
            localLock = new ReentrantLock();
            condition = localLock.newCondition();
        } catch (IOException e) {
            log.error("get zookeeper failed", e);
            throw new RuntimeException(e);
        }
    }

    public void lock() {
        //重入檢測
        if (checkReentrant()) {
            return;
        }
        try {
            node = zooKeeper.create(BASE_NODE + CHILDREN_NODE, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
            while (true) {
                localLock.lock();
                try {
                    List<String> childrenNodes = zooKeeper.getChildren(BASE_NODE, false);
                    ZkUtils.childNodeSort(childrenNodes);
                    //當前節點的索引
                    int myNodeIndex = childrenNodes.indexOf(node);
                    //當前節點的前一個節點
                    int beforeNodeIndex = myNodeIndex - 1;
                    Stat stat = null;
                    while (beforeNodeIndex >= 0) {
                        stat = zooKeeper.exists(childrenNodes.get(beforeNodeIndex), new PredecessorNodeWatcher(condition));
                        if (stat != null) {
                            break;
                        }
                    }

                    if (stat != null) {  //前序節點存在,等待前序節點被刪除,釋放鎖
                        condition.await();
                    } else { // 獲取到鎖
                        threadLocal.set(new AtomicInteger(1));
                        return;
                    }
                } finally {
                    localLock.unlock();
                }
            }
        } catch (Exception e) {
            log.error("lock failed", e);
            throw new RuntimeException(e);
        }

    }

    public void unlock() {
        AtomicInteger times = threadLocal.get();
        if (times == null) {
            return;
        }
        if (times.decrementAndGet() == 0) {
            threadLocal.remove();
            try {
                zooKeeper.delete(node, -1);
            } catch (Exception e) {
                log.error("unlock faild", e);
                throw new RuntimeException(e);
            }
        }

    }

    private boolean checkReentrant() {
        AtomicInteger times = threadLocal.get();
        if (times != null) {
            times.incrementAndGet();
            return true;
        }

        return false;
    }
}
package utils;

import java.util.Collections;
import java.util.Comparator;
import java.util.List;

/**
* Created by huangwt on 2018/3/24.
*/
public class ZkUtils {

    /**
    * 對子節點排序
    *
    * @param node
    */
    public static void childNodeSort(List<String> node) {
        Collections.sort(node, new ChildNodeCompare());
    }

    private static class ChildNodeCompare implements Comparator<String> {

        public int compare(String childNode1, String childNode2) {
            return childNode1.compareTo(childNode2);
        }
    }

}
package watcher;

import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;

import java.util.concurrent.locks.Condition;

/**
* Created by huangwt on 2018/3/24.
*/
public class PredecessorNodeWatcher implements Watcher {
    private Condition condition = null;

    public PredecessorNodeWatcher(Condition condition) {
        this.condition = condition;
    }

    public void process(WatchedEvent event) {
        //前序節點被刪除,鎖被釋放,喚醒當前等待線程
        if(event.getType() == Event.EventType.NodeDeleted){
            condition.signal();
        }
    }
}
package watcher;

import lombok.extern.slf4j.Slf4j;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;

/**
* Created by huangwt on 2018/3/24.
*/
@Slf4j
public class SessionWatcher implements Watcher {
    public void process(WatchedEvent event) {
        if (event.getState() == Event.KeeperState.SyncConnected) {
            log.info("get zookeeper success");
        }
    }
}

主要是使用了ThreadLocal實現了鎖的可重入性,使用watch機制實現了阻塞鎖,使用臨時節點實現的公平鎖。
這段代碼只是一個demo供你們參考,還有不少問題沒解決。好比當zookper掛掉的時候,阻塞的線程就沒法被喚醒,這時候就須要監聽zk的心跳。併發

冪等性

冪等性是系統接口對外的一種承諾,數學表達爲:f(f(x)) = f(x)。
冪等性指的是,使用相同參數對同一資源重複調用某個接口的結果與調用一次的結果相同。分佈式

爲何須要冪等性?

假設如今有一個方法 :Boolean withdraw(account_id, amount) ,做用是從account_id對應的帳戶中扣除amount數額的錢,若是扣除成功則返回true,帳戶餘額減小amount; 若是扣除失敗則返回false,帳戶餘額不變。
如以上流程,接口沒法冪等,可能致使重複扣款。性能

解決

  • 請求獲取ticketId
  • 請求扣款,傳入ticketId
  • 根據ticketId查詢這次操做是否存在,若是存在則表示該操做已經執行過,直接返回結果;若是不存在,扣款,保存結果
  • 返回結果到客戶端

這裏寫圖片描述

相關文章
相關標籤/搜索