下面說一下分佈式實現的幾種方式:java
1、數據庫悲觀鎖node
所謂的悲觀鎖:顧名思義,就是很悲觀,每次去拿數據的時候都認爲別人會修改,因此每次拿數據的時候都會上鎖。這樣別人拿數據的時候就要等待直到鎖的釋放。redis
這裏是採用oracle的 select ...... where id=1 for update 來實現分佈式鎖,建議加上nowait,或者wait 以及 of數據庫
下面是demo:apache
select * from table where id=1 for update nowait;//當鎖被佔用,不等待直接報錯 select * from table where id=1 for update wait 6;//當鎖被佔用,等待6s select * from table where id=1 for update of columns nowait;//鎖定執行的列,反之則鎖全部列。
該方案,在高併發時顯然不適用,依賴於數據庫的性能以及鎖機制,會形成鎖沒法釋放。api
2、數據庫樂觀鎖緩存
所謂的樂觀鎖:就是很樂觀,每次去拿數據的時候都認爲別人不會修改,因此不會上鎖,可是在更新的時候會判斷一下在此期間別人有沒有去更新這個數據。通常的方案都是加一個版本號字段(version),在查詢數據時將版本號帶出來,更新後將版本號+1,若是版本號一致才更新,並獲取影響行數,若是沒更新則報錯。服務器
3、redis的setnxsession
因爲redis是單線程工做的,因此它存取key-value 的時候是單線程工做的,而且多個線程請求redis並不存在競爭問題。因此能夠設置一個標識來做爲一把鎖,只有獲取了該鎖以後,才能對共享的資源進行操做,沒有拿到鎖的線程處於不斷去取鎖的狀態,直到等到上一個線程釋放鎖(即後一個線程能夠取到鎖)或者超過規定超時時間再也不取鎖。併發
import com.test.core.base.utils.ApplicationUtil; import com.test.redis.api.RedisStringOperationService; /** * redis分佈式鎖 * @author LIPENG * @date 2017年9月22日 下午4:25:56 * @version V1.0 */ public class RedisDistributionLock { private static final int DEFAULT_ACQUIRY_RESOLUTION_MILLIS = 100; /** * Lock key path. */ private String lockKey; /** * 鎖超時時間,防止線程在入鎖之後,無限的執行等待 */ private int expireMsecs = 60 * 1000; /** * 鎖等待時間,防止線程飢餓 */ private int timeoutMsecs = 10 * 1000; private volatile boolean locked = false; /** * Detailed constructor with default acquire timeout 10000 msecs and lock expiration of 60000 msecs. * * @param lockKey lock key (ex. account:1, ...) */ public RedisDistributionLock(String lockKey) { this.lockKey = lockKey + "_lock"; } /** * Detailed constructor with default lock expiration of 60000 msecs. * */ public RedisDistributionLock(String lockKey, int timeoutMsecs) { this(lockKey); this.timeoutMsecs = timeoutMsecs; } /** * Detailed constructor. * */ public RedisDistributionLock(String lockKey, int timeoutMsecs, int expireMsecs) { this(lockKey, timeoutMsecs); this.expireMsecs = expireMsecs; } /** * @return lock key */ public String getLockKey() { return lockKey; } /** * 得到 lock. * 實現思路: 主要是使用了redis 的setnx命令,緩存了鎖. * reids緩存的key是鎖的key,全部的共享, value是鎖的到期時間(注意:這裏把過時時間放在value了,沒有時間上設置其超時時間) * 執行過程: * 1.經過setnx嘗試設置某個key的值,成功(當前沒有這個鎖)則返回,成功得到鎖 * 2.鎖已經存在則獲取鎖的到期時間,和當前時間比較,超時的話,則設置新的值 * * @return true if lock is acquired, false acquire timeouted * @throws InterruptedException in case of thread interruption */ public synchronized boolean lock() throws InterruptedException { int timeout = timeoutMsecs; while (timeout >= 0) { long expires = System.currentTimeMillis() + expireMsecs + 1; String expiresStr = String.valueOf(expires); //鎖到期時間 if (getRedisStringOperationService().setNX(lockKey, expiresStr)) { // lock acquired locked = true; return true; } String currentValueStr = getRedisStringOperationService().get(lockKey); //redis裏的時間 if (currentValueStr != null && Long.parseLong(currentValueStr) < System.currentTimeMillis()) { //判斷是否爲空,不爲空的狀況下,若是被其餘線程設置了值,則第二個條件判斷是過不去的 // lock is expired String oldValueStr = getRedisStringOperationService().getAndSet(lockKey, expiresStr); //獲取上一個鎖到期時間,並設置如今的鎖到期時間, //只有一個線程才能獲取上一個線上的設置時間,由於jedis.getSet是同步的 if (oldValueStr != null && oldValueStr.equals(currentValueStr)) { //防止誤刪(覆蓋,由於key是相同的)了他人的鎖——這裏達不到效果,這裏值會被覆蓋,可是由於什麼相差了不多的時間,因此能夠接受 //[分佈式的狀況下]:如過這個時候,多個線程剛好都到了這裏,可是隻有一個線程的設置值和當前值相同,他纔有權利獲取鎖 // lock acquired locked = true; return true; } } timeout -= DEFAULT_ACQUIRY_RESOLUTION_MILLIS; /* 延遲100 毫秒, 這裏使用隨機時間可能會好一點,能夠防止飢餓進程的出現,即,當同時到達多個進程, 只會有一個進程得到鎖,其餘的都用一樣的頻率進行嘗試,後面有來了一些進行,也以一樣的頻率申請鎖,這將可能致使前面來的鎖得不到知足. 使用隨機的等待時間能夠必定程度上保證公平性 */ Thread.sleep(DEFAULT_ACQUIRY_RESOLUTION_MILLIS); } return false; } /** * 釋放鎖 */ public synchronized void unlock() { if (locked) { getRedisStringOperationService().delete(lockKey); locked = false; } } private RedisStringOperationService getRedisStringOperationService(){ return ApplicationUtil.getBean(RedisStringOperationService.class); } }
調用方式:
RedisDistributionLock lock = new RedisDistributionLock("key", 10000, 20000); try { if (lock.lock()) { // 須要加鎖的代碼 } } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); }
4、使用zookeeper
當不少進程須要訪問共享資源時,咱們能夠經過zk來實現分佈式鎖。主要步驟是:
1.創建一個節點,假如名爲:lock 。節點類型爲持久節點(PERSISTENT)
2.每當進程須要訪問共享資源時,會調用分佈式鎖的lock()或tryLock()方法得到鎖,這個時候會在第一步建立的lock節點下創建相應的順序子節點,節點類型爲臨時順序節點(EPHEMERAL_SEQUENTIAL),經過組成特定的名字name+lock+順序號。
3.在創建子節點後,對lock下面的全部以name開頭的子節點進行排序,判斷剛剛創建的子節點順序號是不是最小的節點,假如是最小節點,則得到該鎖對資源進行訪問。
4.假如不是該節點,就得到該節點的上一順序節點,並給該節點是否存在註冊監聽事件。同時在這裏阻塞。等待監聽事件的發生,得到鎖控制權。
5.當調用完共享資源後,調用unlock()方法,關閉zk,進而能夠引起監聽事件,釋放該鎖。
實現的分佈式鎖是嚴格的按照順序訪問的併發鎖。
package cn.wpeace.zktest; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.Watcher.Event.KeeperState; import org.apache.zookeeper.data.Stat; /** * @author peace * */ public class DistributedLock implements Lock, Watcher{ private ZooKeeper zk; private String root = "/locks";//根 private String lockName;//競爭資源的標誌 private String waitNode;//等待前一個鎖 private String myZnode;//當前鎖 private CountDownLatch latch;//計數器 private CountDownLatch connectedSignal=new CountDownLatch(1); private int sessionTimeout = 30000; /** * 建立分佈式鎖,使用前請確認config配置的zookeeper服務可用 * @param config 192.168.1.127:2181 * @param lockName 競爭資源標誌,lockName中不能包含單詞_lock_ */ public DistributedLock(String config, String lockName){ this.lockName = lockName; // 建立一個與服務器的鏈接 try { zk = new ZooKeeper(config, sessionTimeout, this); connectedSignal.await(); Stat stat = zk.exists(root, false);//此去不執行 Watcher if(stat == null){ // 建立根節點 zk.create(root, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT); } } catch (IOException e) { throw new LockException(e); } catch (KeeperException e) { throw new LockException(e); } catch (InterruptedException e) { throw new LockException(e); } } /** * zookeeper節點的監視器 */ public void process(WatchedEvent event) { //創建鏈接用 if(event.getState()==KeeperState.SyncConnected){ connectedSignal.countDown(); return; } //其餘線程放棄鎖的標誌 if(this.latch != null) { this.latch.countDown(); } } public void lock() { try { if(this.tryLock()){ System.out.println("Thread " + Thread.currentThread().getId() + " " +myZnode + " get lock true"); return; } else{ waitForLock(waitNode, sessionTimeout);//等待鎖 } } catch (KeeperException e) { throw new LockException(e); } catch (InterruptedException e) { throw new LockException(e); } } public boolean tryLock() { try { String splitStr = "_lock_"; if(lockName.contains(splitStr)) throw new LockException("lockName can not contains \\u000B"); //建立臨時子節點 myZnode = zk.create(root + "/" + lockName + splitStr, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL); System.out.println(myZnode + " is created "); //取出全部子節點 List<String> subNodes = zk.getChildren(root, false); //取出全部lockName的鎖 List<String> lockObjNodes = new ArrayList<String>(); for (String node : subNodes) { String _node = node.split(splitStr)[0]; if(_node.equals(lockName)){ lockObjNodes.add(node); } } Collections.sort(lockObjNodes); if(myZnode.equals(root+"/"+lockObjNodes.get(0))){ //若是是最小的節點,則表示取得鎖 System.out.println(myZnode + "==" + lockObjNodes.get(0)); return true; } //若是不是最小的節點,找到比本身小1的節點 String subMyZnode = myZnode.substring(myZnode.lastIndexOf("/") + 1); waitNode = lockObjNodes.get(Collections.binarySearch(lockObjNodes, subMyZnode) - 1);//找到前一個子節點 } catch (KeeperException e) { throw new LockException(e); } catch (InterruptedException e) { throw new LockException(e); } return false; } public boolean tryLock(long time, TimeUnit unit) { try { if(this.tryLock()){ return true; } return waitForLock(waitNode,time); } catch (Exception e) { e.printStackTrace(); } return false; } private boolean waitForLock(String lower, long waitTime) throws InterruptedException, KeeperException { Stat stat = zk.exists(root + "/" + lower,true);//同時註冊監聽。 //判斷比本身小一個數的節點是否存在,若是不存在則無需等待鎖,同時註冊監聽 if(stat != null){ System.out.println("Thread " + Thread.currentThread().getId() + " waiting for " + root + "/" + lower); this.latch = new CountDownLatch(1); this.latch.await(waitTime, TimeUnit.MILLISECONDS);//等待,這裏應該一直等待其餘線程釋放鎖 this.latch = null; } return true; } public void unlock() { try { System.out.println("unlock " + myZnode); zk.delete(myZnode,-1); myZnode = null; zk.close(); } catch (InterruptedException e) { e.printStackTrace(); } catch (KeeperException e) { e.printStackTrace(); } } public void lockInterruptibly() throws InterruptedException { this.lock(); } public Condition newCondition() { return null; } public class LockException extends RuntimeException { private static final long serialVersionUID = 1L; public LockException(String e){ super(e); } public LockException(Exception e){ super(e); } } }
調用方法:
DistributedLock lock = new DistributedLock("192.168.1.127:2181","lock"); lock.lock(); //共享資源 if(lock != null) lock.unlock();