分佈式鎖看這篇就夠了

meadow-811339_1920<!-- more -->html

關注我

mark

轉載請務必註明原創地址爲:http://www.54tianzhisheng.cn/2018/04/24/Distributed_lock/java

什麼是鎖?

  • 在單進程的系統中,當存在多個線程能夠同時改變某個變量(可變共享變量)時,就須要對變量或代碼塊作同步,使其在修改這種變量時可以線性執行消除併發修改變量。
  • 而同步的本質是經過鎖來實現的。爲了實現多個線程在一個時刻同一個代碼塊只能有一個線程可執行,那麼須要在某個地方作個標記,這個標記必須每一個線程都能看到,當標記不存在時能夠設置該標記,其他後續線程發現已經有標記了則等待擁有標記的線程結束同步代碼塊取消標記後再去嘗試設置標記。這個標記能夠理解爲鎖。
  • 不一樣地方實現鎖的方式也不同,只要能知足全部線程都能看獲得標記便可。如 Java 中 synchronize 是在對象頭設置標記,Lock 接口的實現類基本上都只是某一個 volitile 修飾的 int 型變量其保證每一個線程都能擁有對該 int 的可見性和原子修改,linux 內核中也是利用互斥量或信號量等內存數據作標記。
  • 除了利用內存數據作鎖其實任何互斥的都能作鎖(只考慮互斥狀況),如流水錶中流水號與時間結合作冪等校驗能夠看做是一個不會釋放的鎖,或者使用某個文件是否存在做爲鎖等。只須要知足在對標記進行修改能保證原子性和內存可見性便可。

什麼是分佈式?

分佈式的 CAP 理論告訴咱們:node

任何一個分佈式系統都沒法同時知足一致性(Consistency)、可用性(Availability)和分區容錯性(Partition tolerance),最多隻能同時知足兩項。mysql

目前不少大型網站及應用都是分佈式部署的,分佈式場景中的數據一致性問題一直是一個比較重要的話題。基於 CAP理論,不少系統在設計之初就要對這三者作出取捨。在互聯網領域的絕大多數的場景中,都須要犧牲強一致性來換取系統的高可用性,系統每每只須要保證最終一致性。linux

分佈式場景

此處主要指集羣模式下,多個相同服務同時開啓.git

在許多的場景中,咱們爲了保證數據的最終一致性,須要不少的技術方案來支持,好比分佈式事務分佈式鎖等。不少時候咱們須要保證一個方法在同一時間內只能被同一個線程執行。在單機環境中,經過 Java 提供的併發 API 咱們能夠解決,可是在分佈式環境下,就沒有那麼簡單啦。github

  • 分佈式與單機狀況下最大的不一樣在於其不是多線程而是多進程
  • 多線程因爲能夠共享堆內存,所以能夠簡單的採起內存做爲標記存儲位置。而進程之間甚至可能都不在同一臺物理機上,所以須要將標記存儲在一個全部進程都能看到的地方。

什麼是分佈式鎖?

  • 當在分佈式模型下,數據只有一份(或有限制),此時須要利用鎖的技術控制某一時刻修改數據的進程數。
  • 與單機模式下的鎖不只須要保證進程可見,還須要考慮進程與鎖之間的網絡問題。(我以爲分佈式狀況下之因此問題變得複雜,主要就是須要考慮到網絡的延時和不可靠。。。一個大坑)
  • 分佈式鎖仍是能夠將標記存在內存,只是該內存不是某個進程分配的內存而是公共內存如 Redis、Memcache。至於利用數據庫、文件等作鎖與單機的實現是同樣的,只要保證標記能互斥就行。

咱們須要怎樣的分佈式鎖?

  • 能夠保證在分佈式部署的應用集羣中,同一個方法在同一時間只能被一臺機器上的一個線程執行。
  • 這把鎖要是一把可重入鎖(避免死鎖)
  • 這把鎖最好是一把阻塞鎖(根據業務需求考慮要不要這條)
  • 這把鎖最好是一把公平鎖(根據業務需求考慮要不要這條)
  • 有高可用的獲取鎖和釋放鎖功能
  • 獲取鎖和釋放鎖的性能要好

基於數據庫作分佈式鎖

基於樂觀鎖redis

基於表主鍵惟一作分佈式鎖

利用主鍵惟一的特性,若是有多個請求同時提交到數據庫的話,數據庫會保證只有一個操做能夠成功,那麼咱們就能夠認爲操做成功的那個線程得到了該方法的鎖,當方法執行完畢以後,想要釋放鎖的話,刪除這條數據庫記錄便可。算法

上面這種簡單的實現有如下幾個問題:spring

  • 這把鎖強依賴數據庫的可用性,數據庫是一個單點,一旦數據庫掛掉,會致使業務系統不可用。
  • 這把鎖沒有失效時間,一旦解鎖操做失敗,就會致使鎖記錄一直在數據庫中,其餘線程沒法再得到到鎖。
  • 這把鎖只能是非阻塞的,由於數據的 insert 操做,一旦插入失敗就會直接報錯。沒有得到鎖的線程並不會進入排隊隊列,要想再次得到鎖就要再次觸發得到鎖操做。
  • 這把鎖是非重入的,同一個線程在沒有釋放鎖以前沒法再次得到該鎖。由於數據中數據已經存在了。
  • 這把鎖是非公平鎖,全部等待鎖的線程憑運氣去爭奪鎖。
  • 在 MySQL 數據庫中採用主鍵衝突防重,在大併發狀況下有可能會形成鎖表現象。

固然,咱們也能夠有其餘方式解決上面的問題。

  • 數據庫是單點?搞兩個數據庫,數據以前雙向同步,一旦掛掉快速切換到備庫上。
  • 沒有失效時間?只要作一個定時任務,每隔必定時間把數據庫中的超時數據清理一遍。
  • 非阻塞的?搞一個 while 循環,直到 insert 成功再返回成功。
  • 非重入的?在數據庫表中加個字段,記錄當前得到鎖的機器的主機信息和線程信息,那麼下次再獲取鎖的時候先查詢數據庫,若是當前機器的主機信息和線程信息在數據庫能夠查到的話,直接把鎖分配給他就能夠了。
  • 非公平的?再建一張中間表,將等待鎖的線程全記錄下來,並根據建立時間排序,只有最早建立的容許獲取鎖。
  • 比較好的辦法是在程序中生產主鍵進行防重。

基於表字段版本號作分佈式鎖

這個策略源於 mysql 的 mvcc 機制,使用這個策略其實自己沒有什麼問題,惟一的問題就是對數據表侵入較大,咱們要爲每一個表設計一個版本號字段,而後寫一條判斷 sql 每次進行判斷,增長了數據庫操做的次數,在高併發的要求下,對數據庫鏈接的開銷也是沒法忍受的。

基於悲觀鎖

基於數據庫排他鎖作分佈式鎖

在查詢語句後面增長for update,數據庫會在查詢過程當中給數據庫表增長排他鎖 (注意: InnoDB 引擎在加鎖的時候,只有經過索引進行檢索的時候纔會使用行級鎖,不然會使用表級鎖。這裏咱們但願使用行級鎖,就要給要執行的方法字段名添加索引,值得注意的是,這個索引必定要建立成惟一索引,不然會出現多個重載方法之間沒法同時被訪問的問題。重載方法的話建議把參數類型也加上。)。當某條記錄被加上排他鎖以後,其餘線程沒法再在該行記錄上增長排他鎖。

咱們能夠認爲得到排他鎖的線程便可得到分佈式鎖,當獲取到鎖以後,能夠執行方法的業務邏輯,執行完方法以後,經過connection.commit()操做來釋放鎖。

這種方法能夠有效的解決上面提到的沒法釋放鎖和阻塞鎖的問題。

  • 阻塞鎖? for update語句會在執行成功後當即返回,在執行失敗時一直處於阻塞狀態,直到成功。
  • 鎖定以後服務宕機,沒法釋放?使用這種方式,服務宕機以後數據庫會本身把鎖釋放掉。

可是仍是沒法直接解決數據庫單點和可重入問題。

這裏還可能存在另一個問題,雖然咱們對方法字段名使用了惟一索引,而且顯示使用 for update 來使用行級鎖。可是,MySQL 會對查詢進行優化,即使在條件中使用了索引字段,可是否使用索引來檢索數據是由 MySQL 經過判斷不一樣執行計劃的代價來決定的,若是 MySQL 認爲全表掃效率更高,好比對一些很小的表,它就不會使用索引,這種狀況下 InnoDB 將使用表鎖,而不是行鎖。若是發生這種狀況就悲劇了。。。

還有一個問題,就是咱們要使用排他鎖來進行分佈式鎖的 lock,那麼一個排他鎖長時間不提交,就會佔用數據庫鏈接。一旦相似的鏈接變得多了,就可能把數據庫鏈接池撐爆。

優缺點

優勢:簡單,易於理解

缺點:會有各類各樣的問題(操做數據庫須要必定的開銷,使用數據庫的行級鎖並不必定靠譜,性能不靠譜)

基於 Redis 作分佈式鎖

基於 redis 的 setnx()、expire() 方法作分佈式鎖

setnx()

setnx 的含義就是 SET if Not Exists,其主要有兩個參數 setnx(key, value)。該方法是原子的,若是 key 不存在,則設置當前 key 成功,返回 1;若是當前 key 已經存在,則設置當前 key 失敗,返回 0。

expire()

expire 設置過時時間,要注意的是 setnx 命令不能設置 key 的超時時間,只能經過 expire() 來對 key 設置。

使用步驟

一、setnx(lockkey, 1) 若是返回 0,則說明佔位失敗;若是返回 1,則說明佔位成功

二、expire() 命令對 lockkey 設置超時時間,爲的是避免死鎖問題。

三、執行完業務代碼後,能夠經過 delete 命令刪除 key。

這個方案實際上是能夠解決平常工做中的需求的,但從技術方案的探討上來講,可能還有一些能夠完善的地方。好比,若是在第一步 setnx 執行成功後,在 expire() 命令執行成功前,發生了宕機的現象,那麼就依然會出現死鎖的問題,因此若是要對其進行完善的話,可使用 redis 的 setnx()、get() 和 getset() 方法來實現分佈式鎖。

基於 redis 的 setnx()、get()、getset()方法作分佈式鎖

這個方案的背景主要是在 setnx() 和 expire() 的方案上針對可能存在的死鎖問題,作了一些優化。

getset()

這個命令主要有兩個參數 getset(key,newValue)。該方法是原子的,對 key 設置 newValue 這個值,而且返回 key 原來的舊值。假設 key 原來是不存在的,那麼屢次執行這個命令,會出現下邊的效果:

  1. getset(key, "value1") 返回 null 此時 key 的值會被設置爲 value1
  2. getset(key, "value2") 返回 value1 此時 key 的值會被設置爲 value2
  3. 依次類推!
使用步驟
  1. setnx(lockkey, 當前時間+過時超時時間),若是返回 1,則獲取鎖成功;若是返回 0 則沒有獲取到鎖,轉向 2。
  2. get(lockkey) 獲取值 oldExpireTime ,並將這個 value 值與當前的系統時間進行比較,若是小於當前系統時間,則認爲這個鎖已經超時,能夠容許別的請求從新獲取,轉向 3。
  3. 計算 newExpireTime = 當前時間+過時超時時間,而後 getset(lockkey, newExpireTime) 會返回當前 lockkey 的值currentExpireTime。
  4. 判斷 currentExpireTime 與 oldExpireTime 是否相等,若是相等,說明當前 getset 設置成功,獲取到了鎖。若是不相等,說明這個鎖又被別的請求獲取走了,那麼當前請求能夠直接返回失敗,或者繼續重試。
  5. 在獲取到鎖以後,當前線程能夠開始本身的業務處理,當處理完畢後,比較本身的處理時間和對於鎖設置的超時時間,若是小於鎖設置的超時時間,則直接執行 delete 釋放鎖;若是大於鎖設置的超時時間,則不須要再鎖進行處理。
import cn.com.tpig.cache.redis.RedisService;
import cn.com.tpig.utils.SpringUtils;

//redis分佈式鎖
public final class RedisLockUtil {

    private static final int defaultExpire = 60;

    private RedisLockUtil() {
        //
    }

    /**
     * 加鎖
     * @param key redis key
     * @param expire 過時時間,單位秒
     * @return true:加鎖成功,false,加鎖失敗
     */
    public static boolean lock(String key, int expire) {

        RedisService redisService = SpringUtils.getBean(RedisService.class);
        long status = redisService.setnx(key, "1");

        if(status == 1) {
            redisService.expire(key, expire);
            return true;
        }

        return false;
    }

    public static boolean lock(String key) {
        return lock2(key, defaultExpire);
    }

    /**
     * 加鎖
     * @param key redis key
     * @param expire 過時時間,單位秒
     * @return true:加鎖成功,false,加鎖失敗
     */
    public static boolean lock2(String key, int expire) {

        RedisService redisService = SpringUtils.getBean(RedisService.class);

        long value = System.currentTimeMillis() + expire;
        long status = redisService.setnx(key, String.valueOf(value));

        if(status == 1) {
            return true;
        }
        long oldExpireTime = Long.parseLong(redisService.get(key, "0"));
        if(oldExpireTime < System.currentTimeMillis()) {
            //超時
            long newExpireTime = System.currentTimeMillis() + expire;
            long currentExpireTime = Long.parseLong(redisService.getSet(key, String.valueOf(newExpireTime)));
            if(currentExpireTime == oldExpireTime) {
                return true;
            }
        }
        return false;
    }

    public static void unLock1(String key) {
        RedisService redisService = SpringUtils.getBean(RedisService.class);
        redisService.del(key);
    }

    public static void unLock2(String key) {    
        RedisService redisService = SpringUtils.getBean(RedisService.class);    
        long oldExpireTime = Long.parseLong(redisService.get(key, "0"));   
        if(oldExpireTime > System.currentTimeMillis()) {        
            redisService.del(key);    
        }
   }
}
public void drawRedPacket(long userId) {
    String key = "draw.redpacket.userid:" + userId;

    boolean lock = RedisLockUtil.lock2(key, 60);
    if(lock) {
        try {
            //領取操做
        } finally {
            //釋放鎖
            RedisLockUtil.unLock(key);
        }
    } else {
        new RuntimeException("重複領取獎勵");
    }
}

基於 Redlock 作分佈式鎖

Redlock 是 Redis 的做者 antirez 給出的集羣模式的 Redis 分佈式鎖,它基於 N 個徹底獨立的 Redis 節點(一般狀況下 N 能夠設置成 5)。

算法的步驟以下:

  • 一、客戶端獲取當前時間,以毫秒爲單位。
  • 二、客戶端嘗試獲取 N 個節點的鎖,(每一個節點獲取鎖的方式和前面說的緩存鎖同樣),N 個節點以相同的 key 和 value 獲取鎖。客戶端須要設置接口訪問超時,接口超時時間須要遠遠小於鎖超時時間,好比鎖自動釋放的時間是 10s,那麼接口超時大概設置 5-50ms。這樣能夠在有 redis 節點宕機後,訪問該節點時能儘快超時,而減少鎖的正常使用。
  • 三、客戶端計算在得到鎖的時候花費了多少時間,方法是用當前時間減去在步驟一獲取的時間,只有客戶端得到了超過 3 個節點的鎖,並且獲取鎖的時間小於鎖的超時時間,客戶端纔得到了分佈式鎖。
  • 四、客戶端獲取的鎖的時間爲設置的鎖超時時間減去步驟三計算出的獲取鎖花費時間。
  • 五、若是客戶端獲取鎖失敗了,客戶端會依次刪除全部的鎖。 使用 Redlock 算法,能夠保證在掛掉最多 2 個節點的時候,分佈式鎖服務仍然能工做,這相比以前的數據庫鎖和緩存鎖大大提升了可用性,因爲 redis 的高效性能,分佈式緩存鎖性能並不比數據庫鎖差。

可是,有一位分佈式的專家寫了一篇文章《How to do distributed locking》,質疑 Redlock 的正確性。

https://mp.weixin.qq.com/s/1bPLk_VZhZ0QYNZS8LkviA

https://blog.csdn.net/jek123456/article/details/72954106

優缺點

優勢:

性能高

缺點:

失效時間設置多長時間爲好?如何設置的失效時間過短,方法沒等執行完,鎖就自動釋放了,那麼就會產生併發問題。若是設置的時間太長,其餘獲取鎖的線程就可能要平白的多等一段時間。

基於 redisson 作分佈式鎖

redisson 是 redis 官方的分佈式鎖組件。GitHub 地址:https://github.com/redisson/redisson

上面的這個問題 ——> 失效時間設置多長時間爲好?這個問題在 redisson 的作法是:每得到一個鎖時,只設置一個很短的超時時間,同時起一個線程在每次快要到超時時間時去刷新鎖的超時時間。在釋放鎖的同時結束這個線程。

基於 ZooKeeper 作分佈式鎖

zookeeper 鎖相關基礎知識

  • zk 通常由多個節點構成(單數),採用 zab 一致性協議。所以能夠將 zk 當作一個單點結構,對其修改數據其內部自動將全部節點數據進行修改然後才提供查詢服務。
  • zk 的數據以目錄樹的形式,每一個目錄稱爲 znode, znode 中可存儲數據(通常不超過 1M),還能夠在其中增長子節點。
  • 子節點有三種類型。序列化節點,每在該節點下增長一個節點自動給該節點的名稱上自增。臨時節點,一旦建立這個 znode 的客戶端與服務器失去聯繫,這個 znode 也將自動刪除。最後就是普通節點。
  • Watch 機制,client 能夠監控每一個節點的變化,當產生變化會給 client 產生一個事件。

zk 基本鎖

  • 原理:利用臨時節點與 watch 機制。每一個鎖佔用一個普通節點 /lock,當須要獲取鎖時在 /lock 目錄下建立一個臨時節點,建立成功則表示獲取鎖成功,失敗則 watch/lock 節點,有刪除操做後再去爭鎖。臨時節點好處在於當進程掛掉後能自動上鎖的節點自動刪除即取消鎖。
  • 缺點:全部取鎖失敗的進程都監聽父節點,很容易發生羊羣效應,即當釋放鎖後全部等待進程一塊兒來建立節點,併發量很大。

zk 鎖優化

  • 原理:上鎖改成建立臨時有序節點,每一個上鎖的節點均能建立節點成功,只是其序號不一樣。只有序號最小的能夠擁有鎖,若是這個節點序號不是最小的則 watch 序號比自己小的前一個節點 (公平鎖)。
  • 步驟:
  1. 在 /lock 節點下建立一個有序臨時節點 (EPHEMERAL_SEQUENTIAL)。
  2. 判斷建立的節點序號是否最小,若是是最小則獲取鎖成功。不是則取鎖失敗,而後 watch 序號比自己小的前一個節點。
  3. 當取鎖失敗,設置 watch 後則等待 watch 事件到來後,再次判斷是否序號最小。
  4. 取鎖成功則執行代碼,最後釋放鎖(刪除該節點)。
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;

public class DistributedLock implements Lock, Watcher{
    private ZooKeeper zk;
    private String root = "/locks";//根
    private String lockName;//競爭資源的標誌
    private String waitNode;//等待前一個鎖
    private String myZnode;//當前鎖
    private CountDownLatch latch;//計數器
    private int sessionTimeout = 30000;
    private List<Exception> exception = new ArrayList<Exception>();

    /**
     * 建立分佈式鎖,使用前請確認config配置的zookeeper服務可用
     * @param config 127.0.0.1:2181
     * @param lockName 競爭資源標誌,lockName中不能包含單詞lock
     */
    public DistributedLock(String config, String lockName){
        this.lockName = lockName;
        // 建立一個與服務器的鏈接
        try {
            zk = new ZooKeeper(config, sessionTimeout, this);
            Stat stat = zk.exists(root, false);
            if(stat == null){
                // 建立根節點
                zk.create(root, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
            }
        } catch (IOException e) {
            exception.add(e);
        } catch (KeeperException e) {
            exception.add(e);
        } catch (InterruptedException e) {
            exception.add(e);
        }
    }

    /**
     * zookeeper節點的監視器
     */
    public void process(WatchedEvent event) {
        if(this.latch != null) {
            this.latch.countDown();
        }
    }

    public void lock() {
        if(exception.size() > 0){
            throw new LockException(exception.get(0));
        }
        try {
            if(this.tryLock()){
                System.out.println("Thread " + Thread.currentThread().getId() + " " +myZnode + " get lock true");
                return;
            }
            else{
                waitForLock(waitNode, sessionTimeout);//等待鎖
            }
        } catch (KeeperException e) {
            throw new LockException(e);
        } catch (InterruptedException e) {
            throw new LockException(e);
        }
    }

    public boolean tryLock() {
        try {
            String splitStr = "_lock_";
            if(lockName.contains(splitStr))
                throw new LockException("lockName can not contains \\u000B");
            //建立臨時子節點
            myZnode = zk.create(root + "/" + lockName + splitStr, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL);
            System.out.println(myZnode + " is created ");
            //取出全部子節點
            List<String> subNodes = zk.getChildren(root, false);
            //取出全部lockName的鎖
            List<String> lockObjNodes = new ArrayList<String>();
            for (String node : subNodes) {
                String _node = node.split(splitStr)[0];
                if(_node.equals(lockName)){
                    lockObjNodes.add(node);
                }
            }
            Collections.sort(lockObjNodes);
            System.out.println(myZnode + "==" + lockObjNodes.get(0));
            if(myZnode.equals(root+"/"+lockObjNodes.get(0))){
                //若是是最小的節點,則表示取得鎖
                return true;
            }
            //若是不是最小的節點,找到比本身小1的節點
            String subMyZnode = myZnode.substring(myZnode.lastIndexOf("/") + 1);
            waitNode = lockObjNodes.get(Collections.binarySearch(lockObjNodes, subMyZnode) - 1);
        } catch (KeeperException e) {
            throw new LockException(e);
        } catch (InterruptedException e) {
            throw new LockException(e);
        }
        return false;
    }

    public boolean tryLock(long time, TimeUnit unit) {
        try {
            if(this.tryLock()){
                return true;
            }
            return waitForLock(waitNode,time);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return false;
    }

    private boolean waitForLock(String lower, long waitTime) throws InterruptedException, KeeperException {
        Stat stat = zk.exists(root + "/" + lower,true);
        //判斷比本身小一個數的節點是否存在,若是不存在則無需等待鎖,同時註冊監聽
        if(stat != null){
            System.out.println("Thread " + Thread.currentThread().getId() + " waiting for " + root + "/" + lower);
            this.latch = new CountDownLatch(1);
            this.latch.await(waitTime, TimeUnit.MILLISECONDS);
            this.latch = null;
        }
        return true;
    }

    public void unlock() {
        try {
            System.out.println("unlock " + myZnode);
            zk.delete(myZnode,-1);
            myZnode = null;
            zk.close();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (KeeperException e) {
            e.printStackTrace();
        }
    }

    public void lockInterruptibly() throws InterruptedException {
        this.lock();
    }

    public Condition newCondition() {
        return null;
    }

    public class LockException extends RuntimeException {
        private static final long serialVersionUID = 1L;
        public LockException(String e){
            super(e);
        }
        public LockException(Exception e){
            super(e);
        }
    }
}

優缺點

優勢:

有效的解決單點問題,不可重入問題,非阻塞問題以及鎖沒法釋放的問題。實現起來較爲簡單。

缺點:

性能上可能並無緩存服務那麼高,由於每次在建立鎖和釋放鎖的過程當中,都要動態建立、銷燬臨時節點來實現鎖功能。ZK 中建立和刪除節點只能經過 Leader 服務器來執行,而後將數據同步到全部的 Follower 機器上。還須要對 ZK的原理有所瞭解。

基於 Consul 作分佈式鎖

DD 寫過相似文章,其實主要利用 Consul 的 Key / Value 存儲 API 中的 acquire 和 release 操做來實現。

文章地址:http://blog.didispace.com/spring-cloud-consul-lock-and-semphore/

使用分佈式鎖的注意事項

一、注意分佈式鎖的開銷

二、注意加鎖的粒度

三、加鎖的方式

總結

不管你身處一個什麼樣的公司,最開始的工做可能都須要從最簡單的作起。不要提阿里和騰訊的業務場景 qps 如何大,由於在這樣的大場景中你未必能親自參與項目,親自參與項目未必能是核心的設計者,是核心的設計者未必能獨自設計。但願你們能根據本身公司業務場景,選擇適合本身項目的方案。

參考資料

http://www.hollischuang.com/archives/1716

http://www.spring4all.com/question/158

https://www.cnblogs.com/PurpleDream/p/5559352.html

http://www.cnblogs.com/PurpleDream/p/5573040.html

https://www.cnblogs.com/suolu/p/6588902.html

相關文章
相關標籤/搜索