分佈式鎖的四種JAVA實現方式

分佈式鎖的四種JAVA實現方式

前言

做爲這一段時間學習分佈式鎖的總結,本文總結了四種Java分佈式鎖的實現方式,簡單編寫了代碼,進行模擬實現。相關代碼存放在個人github倉庫java

爲何要用鎖

系統內,有多個消費者,須要對同一共享數據併發訪問和消費時,會有線程安全問題。例如在秒殺、搶優惠券等場景下,商品庫存的數量是有限的,在高併發下,會有"超買"或"超賣"的問題。所以咱們須要使用鎖,解決多線程對共享數據併發訪問的線程安全問題。node

咱們能夠這樣來模擬併發訪問:mysql

一、模擬商品庫存只有一件,而且提供一個減小庫存數量的方法,當庫存數量大於0時,能夠減小庫存,返回true。當庫存不大於0時,不減小庫存,返回false。git

public class Stock {
    //庫存數量
    private static int num=1;

    // 減小庫存數量的方法
    public boolean reduceStock(){
        if(num>0){
            try {
                //一些邏輯處理
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            num--;
            return true;
        }

        return false;
    }
}

二、模擬兩個線程,同時進行減小庫存操做。github

@SpringBootTest
public class SampleTest {

    static class StockThread implements Runnable{
        public void run() {
            //調用減小庫存的方法
            boolean b = new Stock().reduceStock();

            if (b) {
                System.out.println(Thread.currentThread().getName()+"減小庫存成功");
            }else {
                System.out.println(Thread.currentThread().getName()+"減小庫存失敗");
            }
        }
    }

    @Test
    public void test() throws InterruptedException {
        new Thread(new StockThread(),"線程1").start();
        new Thread(new StockThread(),"線程2").start();

        Thread.sleep(3000);
    }

三、從運行結果能夠看出,在併發時,雖然庫存數量爲1,本應該只有一個線程減小庫存成功,另外一個線程減小庫存失敗,但運行時兩個線程都操做成功了,出現了線程安全問題。redis

9dfb2ce171918a6251af210bea96c86b.jpeg

單機下的鎖使用方式

若是單機版的系統,咱們有不少種解決方案。咱們可使用JDK提供的ReentrantLock,在減小庫存前加鎖,減小庫存後釋放鎖。這樣就能避免線程安全問題。代碼示例以下:spring

@SpringBootTest
public class ReentrantLockTest {

    private static Lock lock = new ReentrantLock();

    static class StockThread implements Runnable{

        public void run() {
            //減小庫存前加鎖
            lock.lock();
            //調用減小庫存的方法
            boolean b = new Stock().reduceStock();
            //減小庫存後釋放鎖
            lock.unlock();
            if (b) {
                System.out.println(Thread.currentThread().getName()+"減小庫存成功");
            }else {
                System.out.println(Thread.currentThread().getName()+"減小庫存失敗");
            }
        }
    }

    @Test
    public static void main(String[] args) throws InterruptedException {
        new Thread(new StockThread(),"線程1").start();
        new Thread(new StockThread(),"線程2").start();

        Thread.sleep(4000);
    }

}

爲何要用分佈式鎖

當系統使用分佈式架構時,服務會有多個實例存在。不一樣服務實例內的線程,使用ReentrantLock是沒法感知彼此是否對同一資源進行了加鎖操做的。當多個請求進入到不一樣實例時,使用ReentrantLock則依然有線程安全問題,由於咱們須要使用分佈式鎖,保證一個資源在同一時間內只能被同一個線程執行。sql

使用數據庫實現分佈式鎖

數據庫分佈式鎖,是經過操做一張鎖表來實現的。對該鎖表設置惟一索引,不一樣請求對該表插入同一個主鍵的數據,若是插入成功,則認爲成功獲取到鎖;若是插入失敗,則判斷獲取鎖失敗。數據庫

代碼實現apache

一、建立鎖表。

CREATE TABLE `lock_record` ( 
	`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '主鍵', 
	`lock_name` varchar(50) DEFAULT NULL COMMENT '鎖名稱', 
	PRIMARY KEY (`id`), 
	UNIQUE KEY `lock_name` (`lock_name`)
)ENGINE=InnoDB AUTO_INCREMENT=38 DEFAULT CHARSET=utf8

二、在項目內引入相關依賴,這裏使用Mybatis作持久化映射。

<!--數據庫驅動-->
<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>5.1.47</version>
</dependency>

<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <version>1.18.12</version>
</dependency>
<!--引入mybatis plus 就不須要引入mybatis了-->
<dependency>
    <groupId>com.baomidou</groupId>
    <artifactId>mybatis-plus-boot-starter</artifactId>
    <version>3.4.0</version>
</dependency>

三、配置數據庫鏈接屬性。

spring:
  datasource:
    url: jdbc:mysql://${mysqlAddress}:3306/${dbName}?useUnicode=true&characterEncoding=utf-8&useSSL=false
    username: ${userName}
    password: ${password}
    driver-class-name: com.mysql.jdbc.Driver

四、在啓動類添加mybatis包掃描註解。

@SpringBootApplication
@MapperScan("com.haha.mapper")
public class LockDemoApplication {

    public static void main(String[] args) {
        SpringApplication.run(LockDemoApplication.class, args);
    }

}

四、編寫entity和mapper。

@Data
public class LockRecord {
    private Integer id;
    private String lockName;
}
public interface LockRecordMapper extends BaseMapper<LockRecord> {

}

五、編寫數據庫鎖類,實現Lock接口。

@Service
public class DbLock implements Lock {

    private static final String LOCK_NAME = "db_lock";

    @Autowired
    private LockRecordMapper lockRecordMapper;

    /**
     * 上鎖
     */
    public synchronized void lock() {
        while (true){
            boolean b = tryLock();
            if(b){
                //添加記錄
                LockRecord lockRecord = new LockRecord();
                lockRecord.setLockName(LOCK_NAME);
                lockRecordMapper.insert(lockRecord);
                return;
            }else{
                try {
                    Thread.sleep(1500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("等待中");
            }
        }
    }

    /**
     * 嘗試獲取鎖,根據指定的名稱,在數據庫表中發起查詢
     * @return
     */
    public boolean tryLock() {

        QueryWrapper<LockRecord> queryWrapper = new QueryWrapper<>();
        queryWrapper.eq("lock_name",LOCK_NAME);
        List<LockRecord> lockRecords = lockRecordMapper.selectList(queryWrapper);

        if(lockRecords.size()==0){
            return true;
        }

        return false;
    }

    /**
     * 解鎖 刪除指定名稱的記錄
     */
    public void unlock() {
        QueryWrapper<LockRecord> queryWrapper = new QueryWrapper<>();
        queryWrapper.eq("lock_name",LOCK_NAME);
        lockRecordMapper.delete(queryWrapper);
    }



    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return false;
    }


    public void lockInterruptibly() throws InterruptedException {

    }

    public Condition newCondition() {
        return null;
    }
}

六、模擬兩個線程,使用數據庫鎖,同時進行減小庫存操做。

@SpringBootTest
public class DbLockTest {

    @Autowired
    private DbLock dbLock;

    static class StockThread implements Runnable{

        private DbLock dbLock;

        public StockThread(DbLock dbLock){
            this.dbLock = dbLock;
        }

        public void run() {

            dbLock.lock();
            //調用減小庫存的方法
            boolean b = new Stock().reduceStock();

            dbLock.unlock();
            if (b) {
                System.out.println(Thread.currentThread().getName()+"減小庫存成功");
            }else {
                System.out.println(Thread.currentThread().getName()+"減小庫存失敗");
            }
        }
    }

    @Test
    public void test() throws InterruptedException {
        new Thread(new StockThread(this.dbLock),"線程1").start();
        new Thread(new StockThread(this.dbLock),"線程2").start();

        Thread.sleep(4000);
    }
}

數據庫鎖缺陷

一、數據庫鎖強依賴數據庫的可用性,一旦數據庫宕機,會致使業務系統不可用,所以須要對數據庫作HA。

二、數據庫鎖沒有失效時間,一旦獲取該鎖的請求所在的服務實例宕機,會致使該資源被長期鎖住,其餘請求沒法獲取該鎖。

使用redis實現分佈式鎖

redis分佈式鎖,是經過代碼調用setnx命令,只在鍵不存在的狀況下, 將鍵的值設置爲某個值。若鍵已經存在, 則setnx命令不作任何動做。爲了能處理"獲取該鎖的請求所在的服務實例宕機,會致使該資源被長期鎖住,其餘請求沒法獲取該鎖"這種狀況,咱們還須要設置超時時間。

代碼實現

一、在項目內引入redis相關依賴。

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
    <version>2.3.3.RELEASE</version>
</dependency>

二、在項目中配置redis。

spring:
  redis:
    host: 127.0.0.1
    port: 6379

三、編寫redis分佈式鎖。

@Component
public class RedisLock implements Lock {

    private static final String LOCK_NAME = "redis_lock";

    @Autowired
    private RedisTemplate redisTemplate;


    public void lock() {
        while (true){
            Boolean b = redisTemplate.opsForValue().setIfAbsent("lockName", LOCK_NAME,10,TimeUnit.SECONDS);

            if (b) {
                return;
            }else{
                System.out.println("循環等待中");
            }
        }
    }

    public void lockInterruptibly() throws InterruptedException {

    }

    public boolean tryLock() {
        return false;
    }

    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return false;
    }

    public void unlock() {
        redisTemplate.delete("lockName");
    }

    public Condition newCondition() {
        return null;
    }
}

四、模擬兩個線程,使用redis分佈式鎖,同時進行減小庫存操做。

@SpringBootTest
public class RedisLockTest {

    @Autowired
    private RedisLock redisLock;

    static class StockThread implements Runnable{

        private RedisLock redisLock;

        public StockThread(RedisLock redisLock){
            this.redisLock= redisLock;
        }

        public void run() {

            redisLock.lock();
            //調用減小庫存的方法
            boolean b = new Stock().reduceStock();

            redisLock.unlock();
            if (b) {
                System.out.println(Thread.currentThread().getName()+"減小庫存成功");
            }else {
                System.out.println(Thread.currentThread().getName()+"減小庫存失敗");
            }
        }
    }

    @Test
    public void test() throws InterruptedException {
        new Thread(new StockThread(redisLock),"線程1").start();
        new Thread(new StockThread(redisLock),"線程2").start();

        Thread.sleep(4000);
    }
    
}

redis分佈式鎖缺陷

一、強依賴redis的可用性,一旦redis宕機,會致使業務系統不可用,所以最好搭建redis集羣。

二、由於對鎖設置了超時時間,若是某次請求不能在該次限制時間內完成操做,也會致使在某些時刻,多個請求獲取到鎖。

解決方案也很簡單,咱們在調用setnx時,將值設置爲該次請求線程的id,而且在服務實例內,設置一個守護線程,當鎖快要超時時,判斷請求是否完成,若是未完成,延長超時時間。

使用redission實現分佈式鎖

redisson是一個經常使用的Redis Java客戶端,爲Java上的分佈式應用程序提供了基於Redis的對象,集合,鎖,同步器和服務的分佈式實現。使用redisson,咱們能夠實現基於redis集羣的分佈式鎖。

代碼實現

一、啓動redis集羣(此處用單機模擬)。

二、在項目內引入redisson相關依賴。

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
    <version>2.3.3.RELEASE</version>
</dependency>
<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson</artifactId>
    <version>3.6.5</version>
</dependency>

三、在項目中配置redisson,在配置文件添加redis配置和配置類。

spring:
  redis:
    host: 127.0.0.1
    port: 6379
@Configuration
public class RedissonConfig {

    @Autowired
    private RedisProperties redisProperties;

    @Bean
    public RedissonClient redissonClient(){
        Config config = new Config();
        String redisUrl = String.format("redis://%s:%s",redisProperties.getHost()+"",redisProperties.getPort()+"");
        config.useSingleServer().setAddress(redisUrl).setPassword(redisProperties.getPassword());
        config.useSingleServer().setDatabase(3);
        return Redisson.create(config);
    }
}

四、模擬兩個線程,使用redisson,同時進行減小庫存操做。

@SpringBootTest
public class RedissionLockTest {

    @Autowired
    private Redisson redisson;

    static class StockThread implements Runnable{

        private RLock mylock;

        public StockThread(RLock lock){
            this.mylock = lock;
        }

        public void run() {

            mylock.lock();
            //調用減小庫存的方法
            boolean b = new Stock().reduceStock();

            mylock.unlock();
            if (b) {
                System.out.println(Thread.currentThread().getName()+"減小庫存成功");
            }else {
                System.out.println(Thread.currentThread().getName()+"減小庫存失敗");
            }
        }
    }

    @Test
    public void test() throws InterruptedException {

        RLock lock = redisson.getLock("redisson_lock");

        new Thread(new StockThread(lock),"線程1").start();
        new Thread(new StockThread(lock),"線程2").start();
        
        Thread.sleep(4000);

    }
}

使用zookeeper實現分佈式鎖

zookeeper是一個分佈式的,開放源碼的分佈式應用程序協調服務,天生就適合用於實現分佈式鎖。

使用zookeeper實現分佈式鎖的步驟

一、設置鎖的根節點。判斷鎖的根節點是否存在,若是不存在,首先建立根節點,例如叫「/locks」。

二、客戶端若是須要佔用鎖,則在「/locks」下建立臨時且有序的子節點,而後判斷本身建立的子節點是否爲當前子節點列表中序號最小的子節點。若是是則認爲得到鎖,不然監聽前一個子節點變動消息。

三、當監聽到前一個子節點已經從zookeper上被刪除,則認爲得到鎖。

四、當客戶端獲取鎖並完成業務流程後,則刪除對應的子節點,完成釋放鎖的工做,以便後面的節點得到分佈式鎖。

五、若是客戶端獲取鎖以後,由於某些緣由宕機,此時因爲客戶端與zookeeper斷開鏈接,該客戶端建立的臨時有序節點也會自動從zookeeper上移除,從而讓後面的節點得到分佈式鎖。

代碼實現

一、在項目內引入zookeeper相關依賴。

<dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.4.12</version>
    <!-- 排除衝突jar -->
    <exclusions>
        <exclusion>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
        </exclusion>
    </exclusions>
</dependency>

二、在項目中配置zookeeper,在配置文件添加redis配置和編寫配置類。

zookeeper:
  address: 127.0.0.1:2181
  timeout: 4000
@Configuration
public class ZookeeperConfig {

    @Value("${zookeeper.address}")
    private String connectString;

    @Value("${zookeeper.timeout}")
    private Integer timeout;


    @Bean(name = "zkClient")
    public ZooKeeper zkClient(){
        ZooKeeper zooKeeper=null;
        try {
            final CountDownLatch countDownLatch = new CountDownLatch(1);
            zooKeeper = new ZooKeeper(connectString, timeout, new Watcher() {
                @Override
                public void process(WatchedEvent event) {
                    if(Event.KeeperState.SyncConnected==event.getState()){
                        //若是收到了服務端的SyncConnected響應事件,表示鏈接成功
                        countDownLatch.countDown();
                    }
                }
            });
            countDownLatch.await();

        }catch (Exception e){
            e.printStackTrace();
        }
        return  zooKeeper;
    }
}

三、編寫zookeeper分佈式鎖。

public class ZkLock implements Lock {

    //zk客戶端
    private ZooKeeper zk;

    //鎖的根節點
    private String root ="/locks";
    //當前鎖的名稱
    private String lockName;

    //當前線程建立的臨時有序節點
    private ThreadLocal<String> nodeId = new ThreadLocal<>();

    private final static byte[] data = new byte[0];

    public ZkLock(ZooKeeper zooKeeper, String lockName){
        this.zk = zooKeeper;
        this.lockName = lockName;

        try {
            Stat stat = zk.exists(root, false);
            if(stat==null) {
                //建立根節點
                zk.create(root, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            }
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    /**
     * 監聽器,監聽臨時節點的刪除
     */
    class LockWatcher implements Watcher{

        private CountDownLatch latch;

        public LockWatcher(CountDownLatch latch){
            this.latch = latch;
        }

        public void process(WatchedEvent watchedEvent) {
            if(watchedEvent.getType()== Event.EventType.NodeDeleted){
                latch.countDown();
            }
        }
    }

    public void lock() {
        try {
            //在根節點下建立臨時有序節點
            String myNode = zk.create(root + "/" + lockName, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);

            System.out.println(Thread.currentThread().getName()+myNode+" create");

            //獲取根節點下的全部根節點下
            List<String> subNodes = zk.getChildren(root, false);

            //排序
            TreeSet<String> sortedNodes = new TreeSet<String>();
            for(String node:subNodes){
                sortedNodes.add(root+"/"+node);
            }
            //獲取序號最下的子節點
            String smallNode = sortedNodes.first();

            //若是該次建立的臨時有序節點是最小的子節點,則表示取得鎖
            if(myNode.equals(smallNode) ){
                System.out.println(Thread.currentThread().getName()+myNode+" get lock");
                this.nodeId.set(myNode);
                return;
            }

            //不然,取得當前節點的前一個節點
            String preNode = sortedNodes.lower(myNode);

            CountDownLatch latch = new CountDownLatch(1);
            //查詢前一個節點,同時註冊監聽
            Stat stat = zk.exists(preNode, new LockWatcher(latch));
            // 若是比本身小的前一個節點查詢時,已經不存在則無需等待,若是存在則監聽
            if(stat!=null){
                System.out.println(Thread.currentThread().getName()+myNode+" waiting for "+ root+"/"+preNode+" release lock");
                latch.await();//等待
            }
            nodeId.set(myNode);

        }catch (Exception e){
            throw new RuntimeException(e);
        }
    }

    public void unlock() {
        try{
            //釋放鎖時,只須要將本次建立的臨時有序節點移除掉
            System.out.println(Thread.currentThread().getName()+" unlock");
            if(nodeId!=null){
                zk.delete(nodeId.get(),-1);
            }
            nodeId.remove();

        }catch (InterruptedException e){
            e.printStackTrace();
        }catch (KeeperException e){
            e.printStackTrace();
        }
    }

    public void lockInterruptibly() throws InterruptedException {

    }

    public boolean tryLock() {
        return false;
    }

    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return false;
    }

    public Condition newCondition() {
        return null;
    }
}

四、模擬兩個線程,使用zookeeper分佈式鎖,同時進行減小庫存操做。

@SpringBootTest
public class ZookeeperLockTest {

    @Autowired
    private ZooKeeper zooKeeper;

    private static final String LOCK_NAME = "zk_lock";

    static class StockThread implements Runnable{

        private ZkLock zkLock;

        public StockThread(ZkLock zkLock){
            this.zkLock= zkLock;
        }

        public void run() {

            zkLock.lock();
            //調用減小庫存的方法
            boolean b = new Stock().reduceStock();

            zkLock.unlock();
            if (b) {
                System.out.println(Thread.currentThread().getName()+"減小庫存成功");
            }else {
                System.out.println(Thread.currentThread().getName()+"減小庫存失敗");
            }
        }
    }

    @Test
    public void test() throws InterruptedException {

        ZkLock zkLock = new ZkLock(zooKeeper,LOCK_NAME);

        new Thread(new ZookeeperLockTest.StockThread(zkLock),"線程1").start();
        new Thread(new ZookeeperLockTest.StockThread(zkLock),"線程2").start();

        Thread.sleep(4000);
    }
}

zookeeper分佈式鎖缺陷

一、強依賴zookeeper的可用性,一旦zookeeper宕機,會致使業務系統不可用,所以最好搭建zookeeper集羣。

相關文章
相關標籤/搜索