Java分佈式鎖三種實現方案

方案一:數據庫樂觀鎖

樂觀鎖一般實現基於數據版本(version)的記錄機制實現的,好比有一張紅包表(t_bonus),有一個字段(left_count)記錄禮物的剩餘個數,用戶每領取一個獎品,對應的left_count減1,在併發的狀況下如何要保證left_count不爲負數,樂觀鎖的實現方式爲在紅包表上添加一個版本號字段(version),默認爲0。java

異常實現流程
-- 可能會發生的異常狀況
-- 線程1查詢,當前left_count爲1,則有記錄
select * from t_bonus where id = 10001 and left_count > 0

-- 線程2查詢,當前left_count爲1,也有記錄
select * from t_bonus where id = 10001 and left_count > 0

-- 線程1完成領取記錄,修改left_count爲0,
update t_bonus set left_count = left_count - 1 where id = 10001

-- 線程2完成領取記錄,修改left_count爲-1,產生髒數據
update t_bonus set left_count = left_count - 1 where id = 10001

 

經過樂觀鎖實現
-- 添加版本號控制字段
ALTER TABLE table ADD COLUMN version INT DEFAULT '0' NOT NULL AFTER t_bonus;

-- 線程1查詢,當前left_count爲1,則有記錄,當前版本號爲1234
select left_count, version from t_bonus where id = 10001 and left_count > 0

-- 線程2查詢,當前left_count爲1,有記錄,當前版本號爲1234
select left_count, version from t_bonus where id = 10001 and left_count > 0

-- 線程1,更新完成後當前的version爲1235,update狀態爲1,更新成功
update t_bonus set version = 1235, left_count = left_count-1 where id = 10001 and version = 1234

-- 線程2,更新因爲當前的version爲1235,udpate狀態爲0,更新失敗,再針對相關業務作異常處理
update t_bonus set version = 1235, left_count = left_count-1 where id = 10001 and version = 1234

 

方案二:基於Redis的分佈式鎖

SETNX命令(SET if Not eXists)
語法:SETNX key value
功能:原子性操做,當且僅當 key 不存在,將 key 的值設爲 value ,並返回1;若給定的 key 已經存在,則 SETNX 不作任何動做,並返回0。
Expire命令
語法:expire(key, expireTime)
功能:key設置過時時間
GETSET命令
語法:GETSET key value
功能:將給定 key 的值設爲 value ,並返回 key 的舊值 (old value),當 key 存在但不是字符串類型時,返回一個錯誤,當key不存在時,返回nil。
GET命令
語法:GET key
功能:返回 key 所關聯的字符串值,若是 key 不存在那麼返回特殊值 nil 。
DEL命令
語法:DEL key [KEY …]
功能:刪除給定的一個或多個 key ,不存在的 key 會被忽略。node

第一種:使用redis的setnx()、expire()方法,用於分佈式鎖
  1. setnx(lockkey, 1) 若是返回0,則說明佔位失敗;若是返回1,則說明佔位成功
  2. expire()命令對lockkey設置超時時間,爲的是避免死鎖問題。
  3. 執行完業務代碼後,能夠經過delete命令刪除key。

這個方案實際上是能夠解決平常工做中的需求的,但從技術方案的探討上來講,可能還有一些能夠完善的地方。好比,若是在第一步setnx執行成功後,在expire()命令執行成功前,發生了宕機的現象,那麼就依然會出現死鎖的問題linux

第二種:使用redis的setnx()、get()、getset()方法,用於分佈式鎖,解決死鎖問題
  1. setnx(lockkey, 當前時間+過時超時時間) ,若是返回1,則獲取鎖成功;若是返回0則沒有獲取到鎖,轉向2。
  2. get(lockkey)獲取值oldExpireTime ,並將這個value值與當前的系統時間進行比較,若是小於當前系統時間,則認爲這個鎖已經超時,能夠容許別的請求從新獲取,轉向3。
  3. 計算newExpireTime=當前時間+過時超時時間,而後getset(lockkey, newExpireTime) 會返回當前lockkey的值currentExpireTime。
  4. 判斷currentExpireTime與oldExpireTime 是否相等,若是相等,說明當前getset設置成功,獲取到了鎖。若是不相等,說明這個鎖又被別的請求獲取走了,那麼當前請求能夠直接返回失敗,或者繼續重試。
  5. 在獲取到鎖以後,當前線程能夠開始本身的業務處理,當處理完畢後,比較本身的處理時間和對於鎖設置的超時時間,若是小於鎖設置的超時時間,則直接執行delete釋放鎖;若是大於鎖設置的超時時間,則不須要再鎖進行處理。
import cn.com.tpig.cache.redis.RedisService;
import cn.com.tpig.utils.SpringUtils;

/**
 * Created by IDEA
 * User: shma1664
 * Date: 2016-08-16 14:01
 * Desc: redis分佈式鎖
 */
public final class RedisLockUtil {

    private static final int defaultExpire = 60;

    private RedisLockUtil() {
        //
    }

    /**
     * 加鎖
     * @param key redis key
     * @param expire 過時時間,單位秒
     * @return true:加鎖成功,false,加鎖失敗
     */
    public static boolean lock(String key, int expire) {

        RedisService redisService = SpringUtils.getBean(RedisService.class);
        long status = redisService.setnx(key, "1");

        if(status == 1) {
            redisService.expire(key, expire);
            return true;
        }

        return false;
    }

    public static boolean lock(String key) {
        return lock2(key, defaultExpire);
    }

    /**
     * 加鎖
     * @param key redis key
     * @param expire 過時時間,單位秒
     * @return true:加鎖成功,false,加鎖失敗
     */
    public static boolean lock2(String key, int expire) {

        RedisService redisService = SpringUtils.getBean(RedisService.class);

        long value = System.currentTimeMillis() + expire;
        long status = redisService.setnx(key, String.valueOf(value));

        if(status == 1) {
            return true;
        }
        long oldExpireTime = Long.parseLong(redisService.get(key, "0"));
        if(oldExpireTime < System.currentTimeMillis()) {
            //超時
            long newExpireTime = System.currentTimeMillis() + expire;
            long currentExpireTime = Long.parseLong(redisService.getSet(key, String.valueOf(newExpireTime)));
            if(currentExpireTime == oldExpireTime) {
                return true;
            }
        }
        return false;
    }

    public static void unLock1(String key) {
        RedisService redisService = SpringUtils.getBean(RedisService.class);
        redisService.del(key);
    }

    public static void unLock2(String key) {    
        RedisService redisService = SpringUtils.getBean(RedisService.class);    
        long oldExpireTime = Long.parseLong(redisService.get(key, "0"));   
        if(oldExpireTime > System.currentTimeMillis()) {        
            redisService.del(key);    
        }
   }

}

public void drawRedPacket(long userId) {
    String key = "draw.redpacket.userid:" + userId;

    boolean lock = RedisLockUtil.lock2(key, 60);
    if(lock) {
        try {
            //領取操做
        } finally {
            //釋放鎖
            RedisLockUtil.unLock(key);
        }
    } else {
        new RuntimeException("重複領取獎勵");
    }
}

 

Spring AOP基於註解方式和SpEL實現開箱即用的redis分佈式鎖策略
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

/**
 * RUNTIME
 * 定義註解
 * 編譯器將把註釋記錄在類文件中,在運行時 VM 將保留註釋,所以能夠反射性地讀取。
 * @author shma1664
 *
 */
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface RedisLockable {


    String[] key() default "";

    long expiration() default 60;
}
import javax.annotation.Resource;

import java.lang.reflect.Method;

import com.autohome.api.dealer.util.cache.RedisClient;
import com.google.common.base.Joiner;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.Signature;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.aspectj.lang.reflect.MethodSignature;
import org.springframework.expression.EvaluationContext;
import org.springframework.expression.Expression;
import org.springframework.expression.ExpressionParser;
import org.springframework.expression.spel.standard.SpelExpressionParser;
import org.springframework.expression.spel.support.StandardEvaluationContext;
import org.springframework.stereotype.Component;

/**
 * Created by IDEA
 * User: mashaohua
 * Date: 2016-09-28 18:08
 * Desc:
 */
@Aspect
@Component
public class RedisLockAop {

    @Resource
    private RedisClient redisClient;

    @Pointcut("execution(* com.autohome.api.dealer.tuan.service.*.*(..))")
    public void pointcut(){}

    @Around("pointcut()")
    public Object doAround(ProceedingJoinPoint point) throws Throwable{
        Signature signature = point.getSignature();
        MethodSignature methodSignature = (MethodSignature) signature;
        Method method = methodSignature.getMethod();
        String targetName = point.getTarget().getClass().getName();
        String methodName = point.getSignature().getName();
        Object[] arguments = point.getArgs();

        if (method != null && method.isAnnotationPresent(RedisLockable.class)) {
            RedisLockable redisLock = method.getAnnotation(RedisLockable.class);
            long expire = redisLock.expiration();
            String redisKey = getLockKey(targetName, methodName, redisLock.key(), arguments);
            boolean isLock = RedisLockUtil.lock2(redisKey, expire);
            if(!isLock) {
                try {
                    return point.proceed();
                } finally {
                    unLock2(redisKey);
                }
            } else {
                throw new RuntimeException("您的操做太頻繁,請稍後再試");
            }
        }

        return point.proceed();
    }

    private String getLockKey(String targetName, String methodName, String[] keys, Object[] arguments) {

        StringBuilder sb = new StringBuilder();
        sb.append("lock.").append(targetName).append(".").append(methodName);

        if(keys != null) {
            String keyStr = Joiner.on(".").skipNulls().join(keys);
            String[] parameters = ReflectParamNames.getNames(targetName, methodName);
            ExpressionParser parser = new SpelExpressionParser();
            Expression expression = parser.parseExpression(keyStr);
            EvaluationContext context = new StandardEvaluationContext();
            int length = parameters.length;
            if (length > 0) {
                for (int i = 0; i < length; i++) {
                    context.setVariable(parameters[i], arguments[i]);
                }
            }
            String keysValue = expression.getValue(context, String.class);
            sb.append("#").append(keysValue);
        }
        return sb.toString();
    }
 
<!-- https://mvnrepository.com/artifact/javassist/javassist -->
<dependency>
    <groupId>org.javassist</groupId>
    <artifactId>javassist</artifactId>
    <version>3.18.1-GA</version>
</dependency>

 

import javassist.*;
import javassist.bytecode.CodeAttribute;
import javassist.bytecode.LocalVariableAttribute;
import javassist.bytecode.MethodInfo;
import org.apache.log4j.Logger;

/**
 * Created by IDEA
 * User: mashaohua
 * Date: 2016-09-28 18:39
 * Desc:
 */
public class ReflectParamNames {
    private static Logger log = Logger.getLogger(ReflectParamNames.class);
    private  static ClassPool pool = ClassPool.getDefault();

    static{
        ClassClassPath classPath = new ClassClassPath(ReflectParamNames.class);
        pool.insertClassPath(classPath);
    }

    public static String[] getNames(String className,String methodName) {
        CtClass cc = null;
        try {
            cc = pool.get(className);
            CtMethod cm = cc.getDeclaredMethod(methodName);
            // 使用javaassist的反射方法獲取方法的參數名
            MethodInfo methodInfo = cm.getMethodInfo();
            CodeAttribute codeAttribute = methodInfo.getCodeAttribute();
            LocalVariableAttribute attr = (LocalVariableAttribute) codeAttribute.getAttribute(LocalVariableAttribute.tag);
            if (attr == null) return new String[0];

            int begin = 0;

            String[] paramNames = new String[cm.getParameterTypes().length];
            int count = 0;
            int pos = Modifier.isStatic(cm.getModifiers()) ? 0 : 1;

            for (int i = 0; i < attr.tableLength(); i++){
                //  爲何 加這個判斷,發如今windows 跟linux執行時,參數順序不一致,經過觀察,實際的參數是從this後面開始的
                if (attr.variableName(i).equals("this")){
                    begin = i;
                    break;
                }
            }

            for (int i = begin+1; i <= begin+paramNames.length; i++){
                paramNames[count] = attr.variableName(i);
                count++;
            }
            return paramNames;
        } catch (Exception e) {
            e.printStackTrace();
        }finally{
            try {
                if(cc != null) cc.detach();
            } catch (Exception e2) {
                log.error(e2.getMessage());
            }


        }
        return new String[0];
    }
}

 

在須要使用分佈式鎖的地方添加註解
/**
 * 抽獎接口
 * 添加redis分佈式鎖保證一個訂單隻有一個請求處理,防止用戶刷禮物,支持SpEL表達式
 * redisLockKey:lock.com.autohome.api.dealer.tuan.service.impl.drawBonus#orderId
 * @param orderId 訂單id
 * @return 抽中的獎品信息
 */
@RedisLockable(key = {"#orderId"}, expiration = 120)
@Override
public BonusConvertBean drawBonus(Integer orderId) throws BonusException{
    // 業務邏輯
}

 

第三種方案:基於Zookeeper的分佈式鎖
利用節點名稱的惟一性來實現獨佔鎖

ZooKeeper機制規定同一個目錄下只能有一個惟一的文件名,zookeeper上的一個znode看做是一把鎖,經過createznode的方式來實現。全部客戶端都去建立/lock/${lock_name}_lock節點,最終成功建立的那個客戶端也即擁有了這把鎖,建立失敗的能夠選擇監聽繼續等待,仍是放棄拋出異常實現獨佔鎖。redis

package com.shma.example.zookeeper.lock;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;

/**
 * Created by IDEA
 * User: mashaohua
 * Date: 2016-09-30 16:09
 * Desc:
 */
public class ZookeeperLock implements Lock, Watcher {
    private ZooKeeper zk;
    private String root = "/locks";//
    private String lockName;//競爭資源的標誌
    private String myZnode;//當前鎖
    private int sessionTimeout = 30000;
    private List<Exception> exception = new ArrayList<Exception>();

    /**
     * 建立分佈式鎖,使用前請確認config配置的zookeeper服務可用
     * @param config 127.0.0.1:2181
     * @param lockName 競爭資源標誌,lockName中不能包含單詞lock
     */
    public ZookeeperLock(String config, String lockName){
        this.lockName = lockName;
        // 建立一個與服務器的鏈接
        try {
            zk = new ZooKeeper(config, sessionTimeout, this);
            Stat stat = zk.exists(root, false);
            if(stat == null){
                // 建立根節點
                zk.create(root, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            }
        } catch (IOException e) {
            exception.add(e);
        } catch (KeeperException e) {
            exception.add(e);
        } catch (InterruptedException e) {
            exception.add(e);
        }
    }

    @Override
    public void lock() {
        if(exception.size() > 0){
            throw new LockException(exception.get(0));
        }
        if(!tryLock()) {
            throw new LockException("您的操做太頻繁,請稍後再試");
        }
    }

    @Override
    public void lockInterruptibly() throws InterruptedException {
        this.lock();
    }

    @Override
    public boolean tryLock() {
        try {
            myZnode = zk.create(root + "/" + lockName, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            return true;
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return false;
    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return tryLock();
    }

    @Override
    public void unlock() {
        try {
            zk.delete(myZnode, -1);
            myZnode = null;
            zk.close();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (KeeperException e) {
            e.printStackTrace();
        }
    }

    @Override
    public Condition newCondition() {
        return null;
    }

    @Override
    public void process(WatchedEvent watchedEvent) {
        //
    }

}

ZookeeperLock lock = null;
try {
    lock = new ZookeeperLock("127.0.0.1:2182","test1");
    lock.lock();
    //業務邏輯處理
} catch (LockException e) {
    throw e;
} finally {
    if(lock != null)
        lock.unlock();
}

 

利用臨時順序節點控制時序實現

/lock已經預先存在,全部客戶端在它下面建立臨時順序編號目錄節點,和選master同樣,編號最小的得到鎖,用完刪除,依次方便。
算法思路:對於加鎖操做,可讓全部客戶端都去/lock目錄下建立臨時順序節點,若是建立的客戶端發現自身建立節點序列號是/lock/目錄下最小的節點,則得到鎖。不然,監視比本身建立節點的序列號小的節點(比本身建立的節點小的最大節點),進入等待。
對於解鎖操做,只須要將自身建立的節點刪除便可。算法

package com.shma.example.zookeeper.lock;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;

/**
 * Created by IDEA
 * User: mashaohua
 * Date: 2016-09-30 16:09
 * Desc:
 */
public class DistributedLock implements Lock, Watcher{
    private ZooKeeper zk;
    private String root = "/locks";//
    private String lockName;//競爭資源的標誌
    private String waitNode;//等待前一個鎖
    private String myZnode;//當前鎖
    private CountDownLatch latch;//計數器
    private int sessionTimeout = 30000;
    private List<Exception> exception = new ArrayList<Exception>();

    /**
     * 建立分佈式鎖,使用前請確認config配置的zookeeper服務可用
     * @param config 127.0.0.1:2181
     * @param lockName 競爭資源標誌,lockName中不能包含單詞lock
     */
    public DistributedLock(String config, String lockName){
        this.lockName = lockName;
        // 建立一個與服務器的鏈接
        try {
            zk = new ZooKeeper(config, sessionTimeout, this);
            Stat stat = zk.exists(root, false);
            if(stat == null){
                // 建立根節點
                zk.create(root, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
            }
        } catch (IOException e) {
            exception.add(e);
        } catch (KeeperException e) {
            exception.add(e);
        } catch (InterruptedException e) {
            exception.add(e);
        }
    }

    /**
     * zookeeper節點的監視器
     */
    public void process(WatchedEvent event) {
        if(this.latch != null) {
            this.latch.countDown();
        }
    }

    public void lock() {
        if(exception.size() > 0){
            throw new LockException(exception.get(0));
        }
        try {
            if(this.tryLock()){
                System.out.println("Thread " + Thread.currentThread().getId() + " " +myZnode + " get lock true");
                return;
            }
            else{
                waitForLock(waitNode, sessionTimeout);//等待鎖
            }
        } catch (KeeperException e) {
            throw new LockException(e);
        } catch (InterruptedException e) {
            throw new LockException(e);
        }
    }

    public boolean tryLock() {
        try {
            String splitStr = "_lock_";
            if(lockName.contains(splitStr))
                throw new LockException("lockName can not contains \\u000B");
            //建立臨時子節點
            myZnode = zk.create(root + "/" + lockName + splitStr, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL);
            System.out.println(myZnode + " is created ");
            //取出全部子節點
            List<String> subNodes = zk.getChildren(root, false);
            //取出全部lockName的鎖
            List<String> lockObjNodes = new ArrayList<String>();
            for (String node : subNodes) {
                String _node = node.split(splitStr)[0];
                if(_node.equals(lockName)){
                    lockObjNodes.add(node);
                }
            }
            Collections.sort(lockObjNodes);
            System.out.println(myZnode + "==" + lockObjNodes.get(0));
            if(myZnode.equals(root+"/"+lockObjNodes.get(0))){
                //若是是最小的節點,則表示取得鎖
                return true;
            }
            //若是不是最小的節點,找到比本身小1的節點
            String subMyZnode = myZnode.substring(myZnode.lastIndexOf("/") + 1);
            waitNode = lockObjNodes.get(Collections.binarySearch(lockObjNodes, subMyZnode) - 1);
        } catch (KeeperException e) {
            throw new LockException(e);
        } catch (InterruptedException e) {
            throw new LockException(e);
        }
        return false;
    }

    public boolean tryLock(long time, TimeUnit unit) {
        try {
            if(this.tryLock()){
                return true;
            }
            return waitForLock(waitNode,time);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return false;
    }

    private boolean waitForLock(String lower, long waitTime) throws InterruptedException, KeeperException {
        Stat stat = zk.exists(root + "/" + lower,true);
        //判斷比本身小一個數的節點是否存在,若是不存在則無需等待鎖,同時註冊監聽
        if(stat != null){
            System.out.println("Thread " + Thread.currentThread().getId() + " waiting for " + root + "/" + lower);
            this.latch = new CountDownLatch(1);
            this.latch.await(waitTime, TimeUnit.MILLISECONDS);
            this.latch = null;
        }
        return true;
    }

    public void unlock() {
        try {
            System.out.println("unlock " + myZnode);
            zk.delete(myZnode,-1);
            myZnode = null;
            zk.close();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (KeeperException e) {
            e.printStackTrace();
        }
    }

    public void lockInterruptibly() throws InterruptedException {
        this.lock();
    }

    public Condition newCondition() {
        return null;
    }

    public class LockException extends RuntimeException {
        private static final long serialVersionUID = 1L;
        public LockException(String e){
            super(e);
        }
        public LockException(Exception e){
            super(e);
        }
    }

}
相關文章
相關標籤/搜索