分佈式鎖解決方案

下面說一下分佈式實現的幾種方式:java

1、數據庫悲觀鎖node

 所謂的悲觀鎖:顧名思義,就是很悲觀,每次去拿數據的時候都認爲別人會修改,因此每次拿數據的時候都會上鎖。這樣別人拿數據的時候就要等待直到鎖的釋放。redis

這裏是採用oracle的 select  ......  where id=1 for update 來實現分佈式鎖,建議加上nowait,或者wait 以及 of數據庫

下面是demo:apache

 

select * from table where id=1 for update nowait;//當鎖被佔用,不等待直接報錯
select * from table where id=1 for update wait 6;//當鎖被佔用,等待6s
select * from table where id=1 for update of columns nowait;//鎖定執行的列,反之則鎖全部列。

 該方案,在高併發時顯然不適用,依賴於數據庫的性能以及鎖機制,會形成鎖沒法釋放。api

 

 

2、數據庫樂觀鎖緩存

所謂的樂觀鎖:就是很樂觀,每次去拿數據的時候都認爲別人不會修改,因此不會上鎖,可是在更新的時候會判斷一下在此期間別人有沒有去更新這個數據。通常的方案都是加一個版本號字段(version),在查詢數據時將版本號帶出來,更新後將版本號+1,若是版本號一致才更新,並獲取影響行數,若是沒更新則報錯。服務器

 

3、redis的setnxsession

因爲redis是單線程工做的,因此它存取key-value 的時候是單線程工做的,而且多個線程請求redis並不存在競爭問題。因此能夠設置一個標識來做爲一把鎖,只有獲取了該鎖以後,才能對共享的資源進行操做,沒有拿到鎖的線程處於不斷去取鎖的狀態,直到等到上一個線程釋放鎖(即後一個線程能夠取到鎖)或者超過規定超時時間再也不取鎖。併發

 

import com.test.core.base.utils.ApplicationUtil;
import com.test.redis.api.RedisStringOperationService;

/**
 * redis分佈式鎖
 * @author LIPENG
 * @date 2017年9月22日 下午4:25:56
 * @version V1.0
 */
public class RedisDistributionLock {
    private static final int DEFAULT_ACQUIRY_RESOLUTION_MILLIS = 100;
    /**
     * Lock key path.
     */
    private String lockKey;

    /**
     * 鎖超時時間,防止線程在入鎖之後,無限的執行等待
     */
    private int expireMsecs = 60 * 1000;

    /**
     * 鎖等待時間,防止線程飢餓
     */
    private int timeoutMsecs = 10 * 1000;

    private volatile boolean locked = false;

    /**
     * Detailed constructor with default acquire timeout 10000 msecs and lock expiration of 60000 msecs.
     *
     * @param lockKey lock key (ex. account:1, ...)
     */
    public RedisDistributionLock(String lockKey) {
        this.lockKey = lockKey + "_lock";
    }

    /**
     * Detailed constructor with default lock expiration of 60000 msecs.
     *
     */
    public RedisDistributionLock(String lockKey, int timeoutMsecs) {
        this(lockKey);
        this.timeoutMsecs = timeoutMsecs;
    }

    /**
     * Detailed constructor.
     *
     */
    public RedisDistributionLock(String lockKey, int timeoutMsecs, int expireMsecs) {
        this(lockKey, timeoutMsecs);
        this.expireMsecs = expireMsecs;
    }

    /**
     * @return lock key
     */
    public String getLockKey() {
        return lockKey;
    }



    /**
     * 得到 lock.
     * 實現思路: 主要是使用了redis 的setnx命令,緩存了鎖.
     * reids緩存的key是鎖的key,全部的共享, value是鎖的到期時間(注意:這裏把過時時間放在value了,沒有時間上設置其超時時間)
     * 執行過程:
     * 1.經過setnx嘗試設置某個key的值,成功(當前沒有這個鎖)則返回,成功得到鎖
     * 2.鎖已經存在則獲取鎖的到期時間,和當前時間比較,超時的話,則設置新的值
     *
     * @return true if lock is acquired, false acquire timeouted
     * @throws InterruptedException in case of thread interruption
     */
    public synchronized boolean lock() throws InterruptedException {
        int timeout = timeoutMsecs;
        while (timeout >= 0) {
            long expires = System.currentTimeMillis() + expireMsecs + 1;
            String expiresStr = String.valueOf(expires); //鎖到期時間
            if (getRedisStringOperationService().setNX(lockKey, expiresStr)) {
                // lock acquired
                locked = true;
                return true;
            }

            String currentValueStr = getRedisStringOperationService().get(lockKey); //redis裏的時間
            if (currentValueStr != null && Long.parseLong(currentValueStr) < System.currentTimeMillis()) {
                //判斷是否爲空,不爲空的狀況下,若是被其餘線程設置了值,則第二個條件判斷是過不去的
                // lock is expired

                String oldValueStr = getRedisStringOperationService().getAndSet(lockKey, expiresStr);
                //獲取上一個鎖到期時間,並設置如今的鎖到期時間,
                //只有一個線程才能獲取上一個線上的設置時間,由於jedis.getSet是同步的
                if (oldValueStr != null && oldValueStr.equals(currentValueStr)) {
                    //防止誤刪(覆蓋,由於key是相同的)了他人的鎖——這裏達不到效果,這裏值會被覆蓋,可是由於什麼相差了不多的時間,因此能夠接受

                    //[分佈式的狀況下]:如過這個時候,多個線程剛好都到了這裏,可是隻有一個線程的設置值和當前值相同,他纔有權利獲取鎖
                    // lock acquired
                    locked = true;
                    return true;
                }
            }
            timeout -= DEFAULT_ACQUIRY_RESOLUTION_MILLIS;

            /*
                延遲100 毫秒,  這裏使用隨機時間可能會好一點,能夠防止飢餓進程的出現,即,當同時到達多個進程,
                只會有一個進程得到鎖,其餘的都用一樣的頻率進行嘗試,後面有來了一些進行,也以一樣的頻率申請鎖,這將可能致使前面來的鎖得不到知足.
                使用隨機的等待時間能夠必定程度上保證公平性
             */
            Thread.sleep(DEFAULT_ACQUIRY_RESOLUTION_MILLIS);

        }
        return false;
    }


    /**
     * 釋放鎖
     */
    public synchronized void unlock() {
        if (locked) {
        	getRedisStringOperationService().delete(lockKey);
            locked = false;
        }
    }
    
    private RedisStringOperationService getRedisStringOperationService(){
    	return	ApplicationUtil.getBean(RedisStringOperationService.class);
    }

}

 

 

調用方式:

 

RedisDistributionLock lock = new RedisDistributionLock("key", 10000, 20000);
		try {
			if (lock.lock()) {
				// 須要加鎖的代碼
			}
		} catch (InterruptedException e) {
			e.printStackTrace();
		} finally {
			lock.unlock();
		}

 

 

4、使用zookeeper

當不少進程須要訪問共享資源時,咱們能夠經過zk來實現分佈式鎖。主要步驟是: 
1.創建一個節點,假如名爲:lock 。節點類型爲持久節點(PERSISTENT) 
2.每當進程須要訪問共享資源時,會調用分佈式鎖的lock()或tryLock()方法得到鎖,這個時候會在第一步建立的lock節點下創建相應的順序子節點,節點類型爲臨時順序節點(EPHEMERAL_SEQUENTIAL),經過組成特定的名字name+lock+順序號。 
3.在創建子節點後,對lock下面的全部以name開頭的子節點進行排序,判斷剛剛創建的子節點順序號是不是最小的節點,假如是最小節點,則得到該鎖對資源進行訪問。 
4.假如不是該節點,就得到該節點的上一順序節點,並給該節點是否存在註冊監聽事件。同時在這裏阻塞。等待監聽事件的發生,得到鎖控制權。 
5.當調用完共享資源後,調用unlock()方法,關閉zk,進而能夠引起監聽事件,釋放該鎖。 
實現的分佈式鎖是嚴格的按照順序訪問的併發鎖。

 

 

package cn.wpeace.zktest;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.data.Stat;
/**
 * @author peace
 *
 */
public class DistributedLock implements Lock, Watcher{
    private ZooKeeper zk;
    private String root = "/locks";//根
    private String lockName;//競爭資源的標誌
    private String waitNode;//等待前一個鎖
    private String myZnode;//當前鎖
    private CountDownLatch latch;//計數器
    private CountDownLatch connectedSignal=new CountDownLatch(1);
    private int sessionTimeout = 30000; 
    /**
     * 建立分佈式鎖,使用前請確認config配置的zookeeper服務可用
     * @param config 192.168.1.127:2181
     * @param lockName 競爭資源標誌,lockName中不能包含單詞_lock_
     */
    public DistributedLock(String config, String lockName){
        this.lockName = lockName;
        // 建立一個與服務器的鏈接
         try {
            zk = new ZooKeeper(config, sessionTimeout, this);
            connectedSignal.await();
            Stat stat = zk.exists(root, false);//此去不執行 Watcher
            if(stat == null){
                // 建立根節點
                zk.create(root, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT); 
            }
        } catch (IOException e) {
            throw new LockException(e);
        } catch (KeeperException e) {
            throw new LockException(e);
        } catch (InterruptedException e) {
            throw new LockException(e);
        }
    }
    /**
     * zookeeper節點的監視器
     */
    public void process(WatchedEvent event) {
        //創建鏈接用
        if(event.getState()==KeeperState.SyncConnected){
            connectedSignal.countDown();
            return;
        }
        //其餘線程放棄鎖的標誌
        if(this.latch != null) {  
            this.latch.countDown();  
        }
    }

    public void lock() {   
        try {
            if(this.tryLock()){
                System.out.println("Thread " + Thread.currentThread().getId() + " " +myZnode + " get lock true");
                return;
            }
            else{
                waitForLock(waitNode, sessionTimeout);//等待鎖
            }
        } catch (KeeperException e) {
            throw new LockException(e);
        } catch (InterruptedException e) {
            throw new LockException(e);
        } 
    }
    public boolean tryLock() {
        try {
            String splitStr = "_lock_";
            if(lockName.contains(splitStr))
                throw new LockException("lockName can not contains \\u000B");
            //建立臨時子節點
            myZnode = zk.create(root + "/" + lockName + splitStr, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL);
            System.out.println(myZnode + " is created ");
            //取出全部子節點
            List<String> subNodes = zk.getChildren(root, false);
            //取出全部lockName的鎖
            List<String> lockObjNodes = new ArrayList<String>();
            for (String node : subNodes) {
                String _node = node.split(splitStr)[0];
                if(_node.equals(lockName)){
                    lockObjNodes.add(node);
                }
            }
            Collections.sort(lockObjNodes);

            if(myZnode.equals(root+"/"+lockObjNodes.get(0))){
                //若是是最小的節點,則表示取得鎖
                System.out.println(myZnode + "==" + lockObjNodes.get(0));
                return true;
            }
            //若是不是最小的節點,找到比本身小1的節點
            String subMyZnode = myZnode.substring(myZnode.lastIndexOf("/") + 1);
            waitNode = lockObjNodes.get(Collections.binarySearch(lockObjNodes, subMyZnode) - 1);//找到前一個子節點
        } catch (KeeperException e) {
            throw new LockException(e);
        } catch (InterruptedException e) {
            throw new LockException(e);
        }
        return false;
    }
    public boolean tryLock(long time, TimeUnit unit) {
        try {
            if(this.tryLock()){
                return true;
            }
            return waitForLock(waitNode,time);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return false;
    }
    private boolean waitForLock(String lower, long waitTime) throws InterruptedException, KeeperException {
        Stat stat = zk.exists(root + "/" + lower,true);//同時註冊監聽。
        //判斷比本身小一個數的節點是否存在,若是不存在則無需等待鎖,同時註冊監聽
        if(stat != null){
            System.out.println("Thread " + Thread.currentThread().getId() + " waiting for " + root + "/" + lower);
            this.latch = new CountDownLatch(1);
            this.latch.await(waitTime, TimeUnit.MILLISECONDS);//等待,這裏應該一直等待其餘線程釋放鎖
            this.latch = null;
        }
        return true;
    }
    public void unlock() {
        try {
            System.out.println("unlock " + myZnode);
            zk.delete(myZnode,-1);
            myZnode = null;
            zk.close();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (KeeperException e) {
            e.printStackTrace();
        }
    }
    public void lockInterruptibly() throws InterruptedException {
        this.lock();
    }
    public Condition newCondition() {
        return null;
    }

    public class LockException extends RuntimeException {
        private static final long serialVersionUID = 1L;
        public LockException(String e){
            super(e);
        }
        public LockException(Exception e){
            super(e);
        }
    }
}

 

 

 調用方法:

DistributedLock lock   = new DistributedLock("192.168.1.127:2181","lock");
 lock.lock();
 //共享資源
 if(lock != null)
  lock.unlock();
相關文章
相關標籤/搜索