基於ZooKeeper的分佈式鎖

1、簡介java

  鎖的概念,在Java平常開發和麪試中,都是個很重要的知識點。鎖能很好的控制生產數據的安全性,好比商品的數量超賣問題等。傳統的作法中,能夠直接利用數據庫鎖(行鎖或者表鎖)來進行數據訪問控制。隨着請求量逐步變多的狀況下,將壓力懟到數據庫上會對其性能產生極大影響。這時候,單體應用中能夠利用JVM鎖,在程序層面進行訪問的控制,將壓力前移,對數據庫友好。當請求量再進一步變多,這時候通常會考慮集羣分佈式去處理,不斷的加機器來抗壓。這時候,JVM鎖就不能很好的控制壓力了,同一時刻仍是會有大量請求懟到數據庫上,這時就須要提高爲分佈式鎖去控制了,將壓力繼續停留在程序層面。node

  Java的面向接口編程,能夠很好很快的去切換實現而不須要動業務代碼部分。下面,基於Lock接口去使用鎖。git

2、JVM鎖github

  基於ReentrantLock實現鎖控制,業務控制層service部分代碼以下,用 lock 鎖去控制併發訪問面試

package com.cfang.service;

import java.sql.Time;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Isolation;
import org.springframework.transaction.annotation.Transactional;

import com.cfang.dao.ProductDao;

import lombok.extern.slf4j.Slf4j;

@Service
@Slf4j
@Scope("prototype")
public class ProductWithLockService {
    
    private Lock lock = new ReentrantLock();

    @Autowired
    private ProductDao productDao;
    
    @Transactional
    public boolean buy(String userName, String productname, int number) {
        boolean result = false;
        try {
            lock.lock();
//            TimeUnit.SECONDS.sleep(1);
            log.info("用戶{}欲購買{}個{}",  userName, number, productname);
            int stock = productDao.getStock(productname);
            log.info("{} 查詢數量{}...", userName, stock);
            if(stock < number) {
                log.warn("庫存不足...");
                return false;
            }
            result = productDao.buy(userName, productname, number);
        } catch (Exception e) {
            
        } finally {
            log.info("{} 釋放鎖...", userName);
            lock.unlock();
        }
        log.info("{}購買結果,{}",userName,  result);
        return result;
    }
}

  在單體應用中,這樣子使用是能夠的,可是當應用部署多套的時候,那麼,就不能很好的保障併發控制了,同一時刻的請求可能會大量打到數據庫上。因此,這就引入下面的分佈式鎖去控制了。spring

3、基於ZooKeeper的分佈式鎖sql

    首先,鎖獲取釋放的工具類:數據庫

package com.cfang.zkLockUtil;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.exception.ZkNodeExistsException;
import org.apache.commons.lang3.StringUtils;

import com.cfang.zkClient.MyZkSerializer;

import lombok.extern.slf4j.Slf4j;

@Slf4j
public class ZkLockUtil implements Lock{
    
    private String znode;
    private ZkClient zkClient;
    
    public ZkLockUtil(String znode) {
        if(StringUtils.isBlank(znode)) {
            throw new IllegalArgumentException("鎖節點znode不能爲空字符串");
        }
        this.znode = znode;
        this.zkClient = new ZkClient("111.231.51.200:2181,111.231.51.200:2182,111.231.51.200:2183");
        this.zkClient.setZkSerializer(new MyZkSerializer());
    }

    @Override
    public void lock() {
        if(!tryLock()) { //搶鎖失敗
            // 阻塞等待鎖節點的釋放
            waitLock();
            //遞歸調用,從新嘗試去搶佔鎖
            lock();
        }
    }
    
    private void waitLock() {
        CountDownLatch latch = new CountDownLatch(1);
        // 註冊監聽znode鎖節點變化,當刪除的時候,說明鎖被釋放
        IZkDataListener listener = new IZkDataListener() {
            
            @Override
            public void handleDataDeleted(String dataPath) throws Exception {
                log.info("znode節點被刪除,鎖釋放...");
                latch.countDown();
            }
            
            @Override
            public void handleDataChange(String dataPath, Object data) throws Exception {
            }
        };
        this.zkClient.subscribeDataChanges(this.znode, listener);
        try {
            // 阻塞等待鎖znode節點的刪除釋放
            if(this.zkClient.exists(znode)) {
                latch.await();
            }
        } catch (Exception e) {
        }
        //取消znode節點監聽
        this.zkClient.unsubscribeDataChanges(this.znode, listener);
    }
    
    @Override
    public boolean tryLock() {
        boolean result = false;
        try {
            this.zkClient.createEphemeral(znode); //建立臨時節點
            result = true;
        } catch (ZkNodeExistsException e) {
            log.warn("鎖節點znode已存在,搶佔失敗...");
            result = false;
        } catch (Exception e) {
            log.warn("建立鎖節點znode異常,{}...", e.getMessage());
        }
        return result;
    }

    @Override
    public void unlock() {
        zkClient.delete(znode);
    }
    
    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        // TODO Auto-generated method stub
        return false;
    }
    
    @Override
    public void lockInterruptibly() throws InterruptedException {
        // TODO Auto-generated method stub
        
    }

    @Override
    public Condition newCondition() {
        // TODO Auto-generated method stub
        return null;
    }

}

  業務控制service中,就是將基本的JVM鎖的service中,Lock的實現更換便可:apache

private Lock lock = new ZkLockUtil("/p1node");

  當程序運行中,全部的請求會去爭搶建立zk節點,誰建立成功,則就得到鎖資源,繼續執行業務代碼。其餘全部線程基於遞歸等待,等待zk節點的刪除,而後再去嘗試爭搶建立。達到控制併發的目的。編程

可是,這種可是有個很差的地方,也就是,當一個鎖釋放後,全部的線程都會一會兒全去爭搶,每次都是輪迴這樣哄搶的過程,會有必定的壓力,也沒必要如此。因此,下面基於zk永久節點下臨時順序節點作點改善,每一個線程節點,只須要關注前面一個節點變化便可,不須要形成哄搶事件。

4、ZooKeeper的分佈式鎖提升版

   鎖獲取釋放的工具類:

package com.cfang.zkLockUtil;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import org.apache.commons.lang3.StringUtils;

import com.cfang.zkClient.MyZkSerializer;

import lombok.extern.slf4j.Slf4j;

@Slf4j
public class ZKLockImproveUtil implements Lock{
    
    private String znode;
    private ZkClient zkClient;
    private ThreadLocal<String> currentNode = new ThreadLocal<String>(); //當前節點
    private ThreadLocal<String> beforeNode = new ThreadLocal<String>();  //前一個節點
    
    public ZKLockImproveUtil(String znode) {
        if(StringUtils.isBlank(znode)) {
            throw new IllegalArgumentException("鎖節點znode不能爲空字符串");
        }
        this.znode = znode;
        this.zkClient = new ZkClient("111.231.51.200:2181,111.231.51.200:2182,111.231.51.200:2183");
        this.zkClient.setZkSerializer(new MyZkSerializer());
        
        try {
            if(!this.zkClient.exists(znode)) {
                this.zkClient.createPersistent(znode, true); // true是否建立層級目錄
            }
        } catch (Exception e) {
        }
    }

    @Override
    public void lock() {
        if(!tryLock()) {
            waitLock();
            lock();
        }
    }
    
    private void waitLock() {
        CountDownLatch latch = new CountDownLatch(1);
        IZkDataListener listener = new IZkDataListener() {
            
            @Override
            public void handleDataDeleted(String dataPath) throws Exception {
                log.info("{}節點刪除,鎖釋放...", dataPath);
                latch.countDown();
            }
            
            @Override
            public void handleDataChange(String dataPath, Object data) throws Exception {
            }
        };
        
        this.zkClient.subscribeDataChanges(beforeNode.get(), listener);
        
        try {
            if(this.zkClient.exists(beforeNode.get())) {
                latch.await();
            }
        } catch (Exception e) {
        }
        
        this.zkClient.unsubscribeDataChanges(beforeNode.get(), listener);
    }

    @Override
    public boolean tryLock() {
        boolean result = false;
        // 建立順序臨時節點
        if(null == currentNode.get() || !this.zkClient.exists(currentNode.get())) {
            String enode = this.zkClient.createEphemeralSequential(znode + "/", "zk-locked");
            this.currentNode.set(enode);
        }
        // 獲取znode節點下的全部子節點
        List<String> list = this.zkClient.getChildren(znode);
        Collections.sort(list);
        
        /**
         * 若是當前節點是第一個的話,則是爲獲取鎖,繼續執行
         * 不是頭結點的話,則去查詢其前面一個節點,而後準備監聽前一個節點的刪除釋放操做
         */
        
        if(currentNode.get().equals(this.znode + "/" + list.get(0))) {
            log.info("{}節點爲頭結點,得到鎖...", currentNode.get());
            result = true;
        } else {
            int currentIndex = list.indexOf(currentNode.get().substring(this.znode.length() + 1));
            String bnode = this.znode + "/" + list.get(currentIndex - 1);
            this.beforeNode.set(bnode);
        }
        return result;
    }

    @Override
    public void unlock() {
        if(null != this.currentNode) {
            this.zkClient.delete(currentNode.get());
            this.currentNode.set(null);
        }
    }
    
    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        // TODO Auto-generated method stub
        return false;
    }
    
    @Override
    public void lockInterruptibly() throws InterruptedException {
        // TODO Auto-generated method stub
    }

    @Override
    public Condition newCondition() {
        // TODO Auto-generated method stub
        return null;
    }

    
}

  service中更換實現:

private Lock lock = new ZKLockImproveUtil("/pnode");

5、小結

  主要是學習測試使用,並未考慮到生產實際的問題,好比 若是業務處理中假死狀態,致使zk不釋放鎖,那麼就會致使死鎖問題(能夠對鎖節點來個有效期處理)。

  上述爲部分代碼片斷,總體工程能夠在github上獲取,地址:https://github.com/qiuhan00/zkLock

相關文章
相關標籤/搜索