基於Consul的分佈式信號量實現

https://mp.weixin.qq.com/s/3FjMGvN6KVoABKhhy2k_Qwhtml

在以前《基於Consul的分佈式鎖實現》一文中咱們介紹如何基於Consul的KV存儲來實現分佈式互斥鎖。本文將繼續討論基於Consul的分佈式鎖實現。信號量是咱們在實現併發控制時會常常使用的手段,主要用來限制同時併發線程或進程的數量,好比:Zuul默認狀況下就使用信號量來限制每一個路由的併發數,以實現不一樣路由間的資源隔離。

信號量(Semaphore),有時被稱爲信號燈,是在多線程環境下使用的一種設施,是能夠用來保證兩個或多個關鍵代碼段不被併發調用。在進入一個關鍵代碼段以前,線程必須獲取一個信號量;一旦該關鍵代碼段完成了,那麼該線程必須釋放信號量。其它想進入該關鍵代碼段的線程必須等待直到第一個線程釋放信號量。爲了完成這個過程,須要建立一個信號量VI,而後將Acquire Semaphore VI以及Release Semaphore VI分別放置在每一個關鍵代碼段的首末端,確認這些信號量VI引用的是初始建立的信號量。如在這個停車場系統中,車位是公共資源,每輛車比如一個線程,看門人起的就是信號量的做用。微信

實現思路

  • 信號量存儲:semaphore/key
  • acquired操做:
    • 建立session
    • 鎖定key競爭者:semaphore/key/session
    • 查詢信號量:semaphore/key/.lock,能夠得到以下內容(若是是第一次建立信號量,將獲取不到,這個時候就直接建立)
      基於Consul的分佈式信號量實現
  • 若是持有者已達上限,返回false,若是阻塞模式,就繼續嘗試acquired操做
  • 若是持有者未達上限,更新semaphore/key/.lock的內容,將當前線程的sessionId加入到holders中。注意:更新的時候須要設置cas,它的值是「查詢信號量」步驟得到的「ModifyIndex」值,該值用於保證更新操做的基礎沒有被其餘競爭者更新。若是更新成功,就開始執行具體邏輯。若是沒有更新成功,說明有其餘競爭者搶佔了資源,返回false,阻塞模式下繼續嘗試acquired操做
  • release操做:
    • 從semaphore/key/.lock的holders中移除當前sessionId
    • 刪除semaphore/key/session
    • 刪除當前的session

流程圖

基於Consul的分佈式信號量實現

代碼實現

public class Semaphore {

    private Logger logger = Logger.getLogger(getClass());

    private static final String prefix = "semaphore/";  // 信號量參數前綴

    private ConsulClient consulClient;
    private int limit;
    private String keyPath;
    private String sessionId = null;
    private boolean acquired = false;

    /**
     *
     * @param consulClient consul客戶端實例
     * @param limit 信號量上限值
     * @param keyPath 信號量在consul中存儲的參數路徑
     */
    public Semaphore(ConsulClient consulClient, int limit, String keyPath) {
        this.consulClient = consulClient;
        this.limit = limit;
        this.keyPath = prefix + keyPath;
    }

    /**
     * acquired信號量
     *
     * @param block 是否阻塞。若是爲true,那麼一直嘗試,直到獲取到該資源爲止。
     * @return
     * @throws IOException
     */
    public Boolean acquired(boolean block) throws IOException {

        if(acquired) {
            logger.error(sessionId + " - Already acquired");
            throw new RuntimeException(sessionId + " - Already acquired");
        }

        // create session
        clearSession();
        this.sessionId = createSessionId("semaphore");
        logger.debug("Create session : " + sessionId);

        // add contender entry
        String contenderKey = keyPath + "/" + sessionId;
        logger.debug("contenderKey : " + contenderKey);
        PutParams putParams = new PutParams();
        putParams.setAcquireSession(sessionId);
        Boolean b = consulClient.setKVValue(contenderKey, "", putParams).getValue();
        if(!b) {
            logger.error("Failed to add contender entry : " + contenderKey + ", " + sessionId);
            throw new RuntimeException("Failed to add contender entry : " + contenderKey + ", " + sessionId);
        }

        while(true) {
            // try to take the semaphore
            String lockKey = keyPath + "/.lock";
            String lockKeyValue;

            GetValue lockKeyContent = consulClient.getKVValue(lockKey).getValue();

            if (lockKeyContent != null) {
                // lock值轉換
                lockKeyValue = lockKeyContent.getValue();
                BASE64Decoder decoder = new BASE64Decoder();
                byte[] v = decoder.decodeBuffer(lockKeyValue);
                String lockKeyValueDecode = new String(v);
                logger.debug("lockKey=" + lockKey + ", lockKeyValueDecode=" + lockKeyValueDecode);

                Gson gson = new Gson();
                ContenderValue contenderValue = gson.fromJson(lockKeyValueDecode, ContenderValue.class);
                // 當前信號量已滿
                if(contenderValue.getLimit() == contenderValue.getHolders().size()) {
                    logger.debug("Semaphore limited " + contenderValue.getLimit() + ", waiting...");
                    if(block) {
                        // 若是是阻塞模式,再嘗試
                        try {
                            Thread.sleep(100L);
                        } catch (InterruptedException e) {
                        }
                        continue;
                    }
                    // 非阻塞模式,直接返回沒有獲取到信號量
                    return false;
                }
                // 信號量增長
                contenderValue.getHolders().add(sessionId);
                putParams = new PutParams();
                putParams.setCas(lockKeyContent.getModifyIndex());
                boolean c = consulClient.setKVValue(lockKey, contenderValue.toString(), putParams).getValue();
                if(c) {
                    acquired = true;
                    return true;
                }
                else
                    continue;
            } else {
                // 當前信號量尚未,因此建立一個,並立刻搶佔一個資源
                ContenderValue contenderValue = new ContenderValue();
                contenderValue.setLimit(limit);
                contenderValue.getHolders().add(sessionId);

                putParams = new PutParams();
                putParams.setCas(0L);
                boolean c = consulClient.setKVValue(lockKey, contenderValue.toString(), putParams).getValue();
                if (c) {
                    acquired = true;
                    return true;
                }
                continue;
            }
        }
    }

    /**
     * 建立sessionId
     * @param sessionName
     * @return
     */
    public String createSessionId(String sessionName) {
        NewSession newSession = new NewSession();
        newSession.setName(sessionName);
        return consulClient.sessionCreate(newSession, null).getValue();
    }

    /**
     * 釋放session、並從lock中移除當前的sessionId
     * @throws IOException
     */
    public void release() throws IOException {
        if(this.acquired) {
            // remove session from lock
            while(true) {
                String contenderKey = keyPath + "/" + sessionId;
                String lockKey = keyPath + "/.lock";
                String lockKeyValue;

                GetValue lockKeyContent = consulClient.getKVValue(lockKey).getValue();
                if (lockKeyContent != null) {
                    // lock值轉換
                    lockKeyValue = lockKeyContent.getValue();
                    BASE64Decoder decoder = new BASE64Decoder();
                    byte[] v = decoder.decodeBuffer(lockKeyValue);
                    String lockKeyValueDecode = new String(v);
                    Gson gson = new Gson();
                    ContenderValue contenderValue = gson.fromJson(lockKeyValueDecode, ContenderValue.class);
                    contenderValue.getHolders().remove(sessionId);
                    PutParams putParams = new PutParams();
                    putParams.setCas(lockKeyContent.getModifyIndex());
                    consulClient.deleteKVValue(contenderKey);
                    boolean c = consulClient.setKVValue(lockKey, contenderValue.toString(), putParams).getValue();
                    if(c) {
                        break;
                    }
                }
            }
            // remove session key

        }
        this.acquired = false;
        clearSession();
    }

    public void clearSession() {
        if(sessionId != null) {
            consulClient.sessionDestroy(sessionId, null);
            sessionId = null;
        }
    }

    class ContenderValue implements Serializable {

        private Integer limit;
        private List<String> holders = new ArrayList<>();

        public Integer getLimit() {
            return limit;
        }

        public void setLimit(Integer limit) {
            this.limit = limit;
        }

        public List<String> getHolders() {
            return holders;
        }

        public void setHolders(List<String> holders) {
            this.holders = holders;
        }

        @Override
        public String toString() {
            return new Gson().toJson(this);
        }

    }
}

單元測試

下面單元測試的邏輯:經過線程的方式來模擬不一樣的分佈式服務來獲取信號量執行業務邏輯。因爲信號量與簡單的分佈式互斥鎖有所不一樣,它不是隻限定一個線程能夠操做,而是能夠控制多個線程的併發,因此經過下面的單元測試,咱們設置信號量爲3,而後同時啓動15個線程來競爭的狀況,來觀察分佈式信號量實現的結果如何。session

public class TestLock {

    private Logger logger = Logger.getLogger(getClass());

    @Test
    public void testSemaphore() throws Exception {
        new Thread(new SemaphoreRunner(1)).start();
        new Thread(new SemaphoreRunner(2)).start();
        new Thread(new SemaphoreRunner(3)).start();
        new Thread(new SemaphoreRunner(4)).start();
        new Thread(new SemaphoreRunner(5)).start();
        new Thread(new SemaphoreRunner(6)).start();
        new Thread(new SemaphoreRunner(7)).start();
        new Thread(new SemaphoreRunner(8)).start();
        new Thread(new SemaphoreRunner(9)).start();
        new Thread(new SemaphoreRunner(10)).start();
        Thread.sleep(1000000L);
    } 
}

public class SemaphoreRunner implements Runnable {

    private Logger logger = Logger.getLogger(getClass()); 
    private int flag;

    public SemaphoreRunner(int flag) {
        this.flag = flag;
    }

    @Override
    public void run() {
        Semaphore semaphore = new Semaphore(new ConsulClient(), 3, "mg-init");
        try {
            if (semaphore.acquired(true)) {
                // 獲取到信號量,執行業務邏輯
                logger.info("Thread " + flag + " start!");
                Thread.sleep(new Random().nextInt(10000));
                logger.info("Thread " + flag + " end!");
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                // 信號量釋放、Session鎖釋放、Session刪除
                semaphore.release();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}
執行結果:
INFO  [Thread-6] SemaphoreRunner - Thread 7 start!
 INFO  [Thread-2] SemaphoreRunner - Thread 3 start!
 INFO  [Thread-7] SemaphoreRunner - Thread 8 start!
 INFO  [Thread-2] SemaphoreRunner - Thread 3 end!
 INFO  [Thread-5] SemaphoreRunner - Thread 6 start!
 INFO  [Thread-6] SemaphoreRunner - Thread 7 end!
 INFO  [Thread-9] SemaphoreRunner - Thread 10 start!
 INFO  [Thread-5] SemaphoreRunner - Thread 6 end!
 INFO  [Thread-1] SemaphoreRunner - Thread 2 start!
 INFO  [Thread-7] SemaphoreRunner - Thread 8 end!
 INFO  [Thread-10] SemaphoreRunner - Thread 11 start!
 INFO  [Thread-10] SemaphoreRunner - Thread 11 end!
 INFO  [Thread-12] SemaphoreRunner - Thread 13 start!
 INFO  [Thread-1] SemaphoreRunner - Thread 2 end!
 INFO  [Thread-3] SemaphoreRunner - Thread 4 start!
 INFO  [Thread-9] SemaphoreRunner - Thread 10 end!
 INFO  [Thread-0] SemaphoreRunner - Thread 1 start!
 INFO  [Thread-3] SemaphoreRunner - Thread 4 end!
 INFO  [Thread-14] SemaphoreRunner - Thread 15 start!
 INFO  [Thread-12] SemaphoreRunner - Thread 13 end!
 INFO  [Thread-0] SemaphoreRunner - Thread 1 end!
 INFO  [Thread-13] SemaphoreRunner - Thread 14 start!
 INFO  [Thread-11] SemaphoreRunner - Thread 12 start!
 INFO  [Thread-13] SemaphoreRunner - Thread 14 end!
 INFO  [Thread-4] SemaphoreRunner - Thread 5 start!
 INFO  [Thread-4] SemaphoreRunner - Thread 5 end!
 INFO  [Thread-8] SemaphoreRunner - Thread 9 start!
 INFO  [Thread-11] SemaphoreRunner - Thread 12 end!
 INFO  [Thread-14] SemaphoreRunner - Thread 15 end!
 INFO  [Thread-8] SemaphoreRunner - Thread 9 end!

從測試結果,咱們能夠發現當信號量持有者數量達到信號量上限3的時候,其餘競爭者就開始進行等待了,只有當某個持有者釋放信號量以後,纔會有新的線程變成持有者,從而開始執行本身的業務邏輯。因此,分佈式信號量能夠幫助咱們有效的控制同時操做某個共享資源的併發數。多線程

優化建議與參考文檔

同前文同樣,這裏只是作了簡單的實現。線上應用還必須加入TTL的session清理以及對.lock資源中的無效holder進行清理的機制。併發

參考文檔:
https://www.consul.io/docs/guides/semaphore.htmldom

Spring Cloud新書推薦

基於Consul的分佈式信號量實現
基於Consul的分佈式信號量實現
版權聲明分佈式

本文采用 CC BY 3.0 CN協議 進行許可。 可自由轉載、引用,但需署名做者且註明文章出處。如轉載至微信公衆號,請在文末添加做者公衆號二維碼。
長按指紋
一鍵關注
基於Consul的分佈式信號量實現
基於Consul的分佈式信號量實現ide

相關文章
相關標籤/搜索