利用consul在spring boot中實現最簡單的分佈式鎖

由於在項目實際過程當中所採用的是微服務架構,考慮到承載量基本每一個相同業務的服務都是多節點部署,因此針對某些資源的訪問就不得不用到用到分佈式鎖了。java

這裏列舉一個最簡單的場景,假若有一個智能售貨機,因爲機器自己的緣由不能同一臺機器不能同時出兩個商品,這就要求在在出貨流程前針對同一臺機器在同一時刻出現併發git

建立訂單時只能有一筆訂單建立成功,可是訂單服務是多節點部署的,因此就不得不用到分佈式鎖了。github

以上只是一種簡單的業務場景,在各類大型互聯網實際應用中,須要分佈式鎖的業務場景會更多,綜合比較了業界基於各類中間件來實現的分佈式鎖方案,而後結合實際業務最終web

決定採用consul來實現,由於咱們的項目中採用了consul作註冊中心,而且consul天生能夠保證一致性(這點相似zk),固然zk也能實現分佈式鎖,可是這裏不對這點作過多討論。redis

redis雖然也能實現分佈式鎖,可是可能由於場景比較複雜,若是redis採用cluster部署的話,若是某一主節點出現故障的話,有必定概率會出現腦裂現象,這樣就可能會讓競爭者在spring

併發時同時得到到鎖,這樣可能會破壞掉後面的業務,固然出現這種狀況的機率很低,可是也不能徹底排除,由於redis的根本不能保證強一致性致使的。apache

 

好了,這裏說的最簡單的分佈式鎖的意思是,多個競爭者同一時間併發去得到鎖時,獲取失敗的就直接返回了,獲取成功的繼續後續的流程,而後在合適的時間釋放鎖,而且爲鎖session

加了超時時間,防止得到到鎖的進程或線程在將來得及釋放鎖時本身掛掉了,致使資源處於一直被鎖定的狀態沒法獲得釋放。主要的實現邏輯就是這樣,若是有人想實現得到鎖失架構

敗的競爭者一直繼續嘗試得到,能夠基於該示例進行修改,加上自旋邏輯就OK。併發

如下是鎖實現代碼:

  1 package com.lyb.consullock;
  2 
  3 import com.ecwid.consul.v1.ConsulClient;
  4 import com.ecwid.consul.v1.agent.model.NewCheck;
  5 import com.ecwid.consul.v1.kv.model.PutParams;
  6 import com.ecwid.consul.v1.session.model.NewSession;
  7 import com.ecwid.consul.v1.session.model.Session;
  8 import lombok.Data;
  9 
 10 
 11 import java.time.LocalDateTime;
 12 import java.util.ArrayList;
 13 import java.util.List;
 14 
 15 
 16 public class DistributedLock{
 17     private ConsulClient consulClient;
 18 
 19     /**
 20      * 構造函數
 21      * @param consulHost 註冊consul的client或服務端的Ip或主機名,或域名
 22      * @param consulPort 端口號
 23      */
 24     public DistributedLock(String consulHost,int consulPort){
 25         consulClient = new ConsulClient(consulHost,consulPort);
 26     }
 27 
 28     /**
 29      * 得到鎖的方法
 30      * @param lockName 競爭的資源名
 31      * @param ttlSeconds 鎖的超時時間,超過該時間自動釋放
 32      * @return
 33      */
 34     public LockContext getLock(String lockName,int ttlSeconds){
 35         LockContext lockContext = new LockContext();
 36         if(ttlSeconds<10 || ttlSeconds > 86400) ttlSeconds = 60;
 37         String sessionId = createSession(lockName,ttlSeconds);
 38         boolean success = lock(lockName,sessionId);
 39         if(success == false){
 40             consulClient.sessionDestroy(sessionId,null);
 41             lockContext.setGetLock(false);
 42 
 43             return lockContext;
 44         }
 45 
 46         lockContext.setSession(sessionId);
 47         lockContext.setGetLock(true);
 48 
 49         return lockContext;
 50     }
 51 
 52     /**
 53      * 釋放鎖
 54      * @param sessionID
 55      */
 56     public void releaseLock(String sessionID){
 57         consulClient.sessionDestroy(sessionID,null);
 58     }
 59 
 60     private String createSession(String lockName,int ttlSeconds){
 61         NewCheck check = new NewCheck();
 62         check.setId("check "+lockName);
 63         check.setName(check.getId());
 64         check.setTtl(ttlSeconds+"s"); //該值和session ttl共同決定決定鎖定時長
 65         check.setTimeout("10s");
 66         consulClient.agentCheckRegister(check);
 67         consulClient.agentCheckPass(check.getId());
 68 
 69         NewSession session = new NewSession();
 70         session.setBehavior(Session.Behavior.RELEASE);
 71         session.setName("session "+lockName);
 72         session.setLockDelay(1);
 73         session.setTtl(ttlSeconds + "s"); //和check ttl共同決定鎖時長
 74         List<String> checks = new ArrayList<>();
 75         checks.add(check.getId());
 76         session.setChecks(checks);
 77         String sessionId = consulClient.sessionCreate(session,null).getValue();
 78 
 79         return sessionId;
 80     }
 81 
 82     private boolean lock(String lockName,String sessionId){
 83         PutParams putParams = new PutParams();
 84         putParams.setAcquireSession(sessionId);
 85 
 86         boolean isSuccess = consulClient.setKVValue(lockName,"lock:"+ LocalDateTime.now(),putParams).getValue();
 87 
 88         return isSuccess;
 89     }
 90 
 91     /**
 92      * 競爭鎖時返回的對象
 93      */
 94     @Data
 95     public class LockContext{
 96         /**
 97          * 得到鎖成功返回該值,比便後面用該值來釋放鎖
 98          */
 99         private String session;
100         /**
101          * 是否得到到鎖
102          */
103         private boolean isGetLock;
104     }
105 }

pom文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.6.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.lyb</groupId>
    <artifactId>consul-lock</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>consul-lock</name>
    <description>Demo project for Spring Boot</description>

    <properties>
        <java.version>1.8</java.version>
        <spring-cloud.version>Greenwich.SR2</spring-cloud.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-consul-discovery</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.8</version>
            <optional>true</optional>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>${spring-cloud.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

測試代碼:

package com.lyb.consullock;

import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

@RunWith(SpringRunner.class)
@SpringBootTest
public class ConsulLockApplicationTests {
    @Autowired
    private ServiceConfig serviceConfig;
    @Test
    public void lockSameResourer() {
        //針對相同資源在同一時刻只有一個線程會得到鎖
        ExecutorService threadPool = Executors.newFixedThreadPool(10);
        for (int a=0;a<20;a++){
            threadPool.submit(
                    () -> {
                        for (int i = 0;i < 100; i++) {
                            DistributedLock lock = new DistributedLock(
                                    serviceConfig.getConsulRegisterHost(),
                                    serviceConfig.getConsulRegisterPort());

                            DistributedLock.LockContext lockContext = lock.getLock("test lock", 10);
                            if (lockContext.isGetLock()) {
                                System.out.println(Thread.currentThread().getName() + "得到了鎖");
                                try {
                                    TimeUnit.SECONDS.sleep(1);
                                    lock.releaseLock(lockContext.getSession());
                                } catch (InterruptedException e) {
                                    e.printStackTrace();
                                }
                            }else {
                                //System.out.println(Thread.currentThread().getName() + "沒有得到鎖");
                            }
                        }
                    });
        }

        try {
            TimeUnit.MINUTES.sleep(2);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    @Test
    public void lockDiffResource(){
        //針對不通的資源全部線程都應該能得到鎖
        ExecutorService threadPool = Executors.newFixedThreadPool(10);
        for (int a=0;a<20;a++){
            threadPool.submit(
                    () -> {
                        for (int i = 0;i < 100; i++) {
                            DistributedLock lock = new DistributedLock(
                                    serviceConfig.getConsulRegisterHost(),
                                    serviceConfig.getConsulRegisterPort());

                            DistributedLock.LockContext lockContext = lock.getLock("test lock"+Thread.currentThread().getName(), 10);
                            if (lockContext.isGetLock()) {
                                System.out.println(Thread.currentThread().getName() + "得到了鎖");
                                try {
                                    TimeUnit.SECONDS.sleep(1);
                                    lock.releaseLock(lockContext.getSession());
                                } catch (InterruptedException e) {
                                    e.printStackTrace();
                                }
                            }else {
                                //System.out.println(Thread.currentThread().getName() + "沒有得到鎖");
                                Assert.assertTrue(lockContext.isGetLock());
                            }
                        }
                    });
        }

        try {
            TimeUnit.MINUTES.sleep(2);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

 但願對你們有所幫助

項目路徑:

https://github.com/wenwuxianren/consul-lock

相關文章
相關標籤/搜索