你們都知道加鎖是用來在併發狀況防止同一個資源被多方搶佔的有效手段,加鎖其實就是同步互斥(或稱獨佔)也行,即:同一時間不論有多少併發請求,只有一個能處理,其他要麼排隊等待,要麼放棄執行。關於鎖的實現網上大把的例子,我這裏只是梳理與總結一下,以便參考方便。html
同步互斥按做用範圍可分爲:java
下面分別經過代碼示例來演示常見的線程間同步互斥的實現方法:redis
/** * 線程間同步(synchronized同步互斥鎖,開啓多個線程,若某個線程得到鎖,則其他線程所有阻塞等待排隊,當釋放鎖後,則下一個繼續得到鎖,其他線程仍等待) */ private void testSynchronized() { ExecutorService executorService = Executors.newFixedThreadPool(2); Runnable runnable = () -> { long threadId = Thread.currentThread().getId(); for (int i = 0; i <= 100; i++) { synchronized (lockObj) { if (count > 0) { try { Thread.sleep(200L); } catch (InterruptedException e) { e.printStackTrace(); } System.out.printf("threadId:%s,number:%d --count:%d %n", threadId, i, --count); } } //若未加鎖,則可能會出現負數,即併發問題 // if (count>0){ // try { // Thread.sleep(200L); // } catch (InterruptedException e) { // e.printStackTrace(); // } // System.out.printf("threadId:%s,number:%d --count:%d %n",threadId,i,--count); // } } }; executorService.execute(runnable); executorService.execute(runnable); System.out.printf("lasted count:%d", count); }
/** * 線程間同步(synchronized同步互斥鎖+通知等待模式,開啓多個線程,當得到鎖後,則可經過Object.notify()方法發出通知,通知其它等待鎖或wait狀況下恢復繼續執行,示例演示的是生產與消費互相等待) */ private void testWaitAndNotify() { count = 0; ExecutorService executorService = Executors.newFixedThreadPool(2); Runnable productRunnable = () -> { long threadId = Thread.currentThread().getId(); for (int i = 0; i <= 50; i++) { synchronized (lockObj) { //獲取鎖 try { Thread.sleep(200L); } catch (InterruptedException e) { e.printStackTrace(); } System.out.printf("threadId:%s,number:%d --生產後 count:%d %n", threadId, i, ++count); lockObj.notify();//發出通知 try { System.out.printf("threadId:%s,number:%d,等待生產%n", threadId, i); if (i == 50) break; lockObj.wait();//等待通知,阻塞當前線程 System.out.printf("threadId:%s,number:%d,收到通知,準備生產%n", threadId, i); } catch (InterruptedException e) { e.printStackTrace(); } } } count = -1; System.out.printf("threadId:%s,已生產完了。%n", threadId); }; Runnable consumeRunnable = () -> { long threadId = Thread.currentThread().getId(); for (int i = 0; i <= 200; i++) { synchronized (lockObj) { //獲取鎖 if (count > 0) { try { Thread.sleep(200L); } catch (InterruptedException e) { e.printStackTrace(); } System.out.printf("threadId:%s,number:%d --消費後 count:%d %n", threadId, i, --count); lockObj.notify(); //發出通知 } else { try { System.out.printf("threadId:%s,number:%d,等待消費%n", threadId, i); if (count == -1) break; lockObj.wait();//等待通知,阻塞當前線程 System.out.printf("threadId:%s,number:%d,收到通知,準備消費%n", threadId, i); } catch (InterruptedException e) { e.printStackTrace(); } } } } System.out.printf("threadId:%s,已消費完了。%n", threadId); }; executorService.execute(consumeRunnable); executorService.execute(productRunnable); }
/** * 線程間同步(條件鎖ReentrantLock、Condition,開啓多個線程,當lock()獲取鎖後,則能夠經過Lock的條件實例方法signal發送信號,通知其它等待鎖或await狀況下恢復繼續執行,示例演示的是生產與消費互相等待) */ private void testLock() { final Lock lock = new ReentrantLock(); final Condition lockCond = lock.newCondition(); count = 0; ExecutorService executorService = Executors.newFixedThreadPool(2); Runnable productRunnable = () -> { long threadId = Thread.currentThread().getId(); lock.lock();//先得到鎖 for (int i = 0; i <= 50; i++) { try { Thread.sleep(200L); } catch (InterruptedException e) { e.printStackTrace(); } System.out.printf("threadId:%s,number:%d --生產後 count:%d %n", threadId, i, ++count); lockCond.signal();//放出信號 try { System.out.printf("threadId:%s,number:%d,等待生產%n", threadId, i); if (i == 50) break; lockCond.await();//等待信號,阻塞當前線程 System.out.printf("threadId:%s,number:%d,收到通知,準備生產%n", threadId, i); } catch (InterruptedException e) { e.printStackTrace(); } } lock.unlock();//釋放鎖 count = -1; System.out.printf("threadId:%s,已生產完了。%n", threadId); }; Runnable consumeRunnable = () -> { long threadId = Thread.currentThread().getId(); lock.lock();//先得到鎖 for (int i = 0; i <= 200; i++) { if (count > 0) { try { Thread.sleep(200L); } catch (InterruptedException e) { e.printStackTrace(); } System.out.printf("threadId:%s,number:%d --消費後 count:%d %n", threadId, i, --count); lockCond.signal();//放出信號 } else { try { System.out.printf("threadId:%s,number:%d,等待消費%n", threadId, i); if (count == -1) break; lockCond.await();//等待信號,阻塞當前線程 System.out.printf("threadId:%s,number:%d,收到通知,準備消費%n", threadId, i); } catch (InterruptedException e) { e.printStackTrace(); } } } lock.unlock(); System.out.printf("threadId:%s,已消費完了。%n", threadId); }; executorService.execute(consumeRunnable); executorService.execute(productRunnable); }
/** * 線程間同步(Future,採用Executors.submit開啓1個或多個線程返回Future,線程後臺異步執行不阻塞主線程,當須要得到線程結果時,即:Future.get,則會等待獲取結果, * 固然可使用CompletableFuture來實現徹底的異步回調處理結果,無需任何阻塞) */ private void testFuture() { //refer:https://www.cnblogs.com/xiaoxi/p/8303574.html ExecutorService executorService = Executors.newSingleThreadExecutor(); Future<Long> task = executorService.submit(() -> { long total = 0; System.out.println("子線程for loop start..."); for (int i = 0; i <= 100; i++) { try { Thread.sleep(50L); } catch (InterruptedException e) { e.printStackTrace(); } total += i; } System.out.printf("子線程for loop end,total=%d %n", total); return total; }); //主線程處理其它邏輯,此時是與子線程在並行執行 for (int n = 0; n <= 30; n++) { System.out.printf("主線程for loop中,n=%d %n", n); } try { long result = task.get();//等待子線程結果,若是未執行完則會阻塞主線程直到子線程完成出結果 System.out.printf("主線程獲取子線程計算的結果,total=%d %n", result); } catch (Exception e) { e.printStackTrace(); } //使用CompletableFuture可異步回調獲取結果,不會阻塞主線程 CompletableFuture.supplyAsync(() -> { long total = 0; System.out.println("子線程for loop start..."); for (int i = 0; i <= 100; i++) { try { Thread.sleep(50L); } catch (InterruptedException e) { e.printStackTrace(); } total += i; } System.out.printf("threadId:%s,子線程for loop end,total=%d %n", Thread.currentThread().getId(), total); return total; }).thenAccept(result -> { //當子線程執行完成後會回調該方法 System.out.printf("threadId:%s,回調獲取子線程計算的結果,total=%d %n", Thread.currentThread().getId(), result); }); //主線程處理其它邏輯,此時是與子線程在並行執行 long threadId = Thread.currentThread().getId(); for (int n = 0; n <= 30; n++) { System.out.printf("threadId:%s,主線程for loop2中,n=%d %n", threadId, n); } System.out.printf("threadId:%s,主線程已執行完成。%n", threadId); }
/** * 線程間同步(CountDownLatch,同時運行多個線程,在CountDownLatch計數器count爲0前主線程會阻塞等待) */ private void testCountDownLatch() { ExecutorService executorService = Executors.newFixedThreadPool(3); final CountDownLatch latch = new CountDownLatch(3); Runnable runnable = () -> { long threadId = Thread.currentThread().getId(); for (int i = 0; i <= 100; i++) { try { Thread.sleep(50L); } catch (InterruptedException e) { e.printStackTrace(); } System.out.printf("threadId:%s,number:%d %n", threadId, i); } System.out.printf("threadId:%s,已處理完成。%n", threadId); latch.countDown();//扣減計數器-1 }; //開3個線程並行處理 for (int i = 1; i <= 3; i++) { executorService.execute(runnable); } long mainThreadId = Thread.currentThread().getId(); try { System.out.printf("threadId:%s,主線程等待中...%n", mainThreadId); latch.await();//等待所有執行完成,即計數器爲0,阻塞主線程 } catch (InterruptedException e) { e.printStackTrace(); } System.out.printf("threadId:%s,主線程確認全部子線程都處理完成,count:%d,開始執行主線程邏輯。%n", mainThreadId, latch.getCount()); System.out.printf("threadId:%s,主線程已執行完成!%n", mainThreadId); }
/** * 線程間同步(Semaphore,開啓多個線程,使用acquire獲取1個許可【可指定一次獲取多個許可】, * 若未能獲取到則等待,若已得到許可則佔用了1個可用許可總數且可進入繼續執行,待執行完成後應釋放許可) */ private void testSemaphore(){ Semaphore wcSemaphore = new Semaphore(5,true); Runnable runnable =() -> { long threadId = Thread.currentThread().getId(); System.out.printf("threadId:%s,等待進入WC,目前還有:%d空位,排隊等候人數:%d %n", threadId,wcSemaphore.availablePermits(), wcSemaphore.getQueueLength()); try { wcSemaphore.acquire(); System.out.printf("threadId:%s,進入WC,目前還有:%d空位,排隊等候人數:%d,關門 %n", threadId,wcSemaphore.availablePermits(), wcSemaphore.getQueueLength()); Thread.sleep(1000L); System.out.printf("threadId:%s,離開WC,目前還有:%d空位,排隊等候人數:%d,開門 %n", threadId,wcSemaphore.availablePermits(), wcSemaphore.getQueueLength()); wcSemaphore.release(); } catch (InterruptedException e) { e.printStackTrace(); } }; ExecutorService executorService = Executors.newFixedThreadPool(5); for (int n=1;n<=10;n++){ executorService.execute(runnable); } long mainThreadId = Thread.currentThread().getId(); System.out.printf("threadId:%s,清潔阿姨等待打掃WC,目前還有:%d空位,排隊等候人數:%d %n", mainThreadId,wcSemaphore.availablePermits(),wcSemaphore.getQueueLength()); //若是還有排隊且剩餘空位未所有處理則等待 while (wcSemaphore.hasQueuedThreads() && wcSemaphore.drainPermits()!=5 && wcSemaphore.availablePermits()!=5){ try { Thread.sleep(50L); } catch (InterruptedException e) { e.printStackTrace(); } } try { wcSemaphore.acquire(5); System.out.printf("threadId:%s,清潔阿姨開始打掃WC,關上WC入口,即全部人均不可再使用,目前還有:%d空位,排隊等候人數:%d %n", mainThreadId,wcSemaphore.availablePermits(), wcSemaphore.getQueueLength()); } catch (InterruptedException e) { e.printStackTrace(); } }
/** * 進程間同步(FileLock文件鎖,同時開啓多個進程實例,若已得到鎖的實例在執行,則後面的進程實例均只能等待,固然可使用tryLock非阻塞模式) */ private void testFileLock() { File lockFile = new File(System.getProperty("user.dir") + File.separator + "app.lock"); if (!lockFile.exists()) { try { if (!lockFile.createNewFile()) { System.out.printf("建立文件失敗:" + lockFile.getAbsolutePath()); return; } } catch (IOException e) { e.printStackTrace(); } } try { FileChannel fileChannel = new FileOutputStream(lockFile).getChannel(); String jvmName = ManagementFactory.getRuntimeMXBean().getName(); System.out.printf("jvm ProcessName:%s, 準備獲取鎖 ... %n", jvmName); FileLock lock = fileChannel.lock();//獲取文件鎖 for (int i = 0; i <= 100; i++) { try { Thread.sleep(100L); } catch (InterruptedException e) { e.printStackTrace(); } System.out.printf("jvm ProcessName:%s, number:%d %n", jvmName, i); } lock.release(); fileChannel.close(); System.out.printf("jvm ProcessName:%s, 處理完成,釋放鎖 %n", jvmName); } catch (Exception e) { e.printStackTrace(); } }
---網上也有不少,我這裏把以前用C#實現的基於DB的分佈式鎖的例子(JAVA同理)拿出來【原理是:有一張T_Locking表,SType=LOCK名,SValue=LOCK惟一標識值,RecordTime=加鎖記錄時間,獲取鎖時直接根據SType=N'Lock名' and SValue=N''來更新,若被其它實例更新,則SValue不可能爲空,則獲取鎖失敗,而後進一步判斷是否出現過時鎖的狀況,若是過時則仍能夠嘗試更新獲取鎖便可】算法
/// <summary> /// 設置分佈式鎖 /// </summary> /// <returns></returns> private bool SetDistributedLockBasedDB() { bool hasLock = false; try { var sqlDapperUtil = new SqlDapperUtil(Constants.CfgKey_KYLDConnectionName); ////此處利用DB的更新排它鎖,確保併發時只有一個優先執行,而當某個執行成功後,其它後續執行會由於條件更新不知足而更新失敗,實現了多併發時只有一個能更新成功,即得到鎖。 hasLock = sqlDapperUtil.ExecuteCommand("update [dbo].[T_Locking] set SValue=@SValue,RecordTime=getdate() where SType=N'Lock名' and SValue=N'' ", new { SValue = Lock惟一標識值 }); if (!hasLock) //若是未得到鎖,還須要考慮加鎖後未被正常釋放鎖的狀況,故以下再次嘗試對加鎖超過1小時以上的進行從新更新再次得到鎖,避免無效鎖一直處於鎖定狀態 { hasLock = sqlDapperUtil.ExecuteCommand("update [dbo].[T_Locking] set SValue=@SValue,RecordTime=getdate() " + "where SType = N'Lock名' and SValue <> N'' and RecordTime < DATEADD(hh, -1, getdate())", new { SValue = Lock惟一標識值 }); } } catch (Exception ex) { logger.Error("SetDistributedLockBasedDB Error: " + ex.ToString()); } return hasLock; } /// <summary> /// 釋放分佈式鎖 /// </summary> private void ReleaseDistributedLockBasedDB() { try { var sqlDapperUtil = new SqlDapperUtil(Constants.CfgKey_KYLDConnectionName); sqlDapperUtil.ExecuteCommand("update [dbo].[T_Locking] set SValue=N'',RecordTime=getdate() where SType=N'Lock名' and SValue=@SValue", new { SValue = Lock惟一標識值 }); } catch (Exception ex) { logger.Error("ReleaseDistributedLockBasedDB Error: " + ex.ToString()); } }
實現方式1(原生實現):Redis分佈式鎖的正確實現方式(Java版)sql
實現方式2(Redlock):Redis分佈式鎖的官方算法RedLock以及Java版本實現庫Redissonc#
*有同時列出三種實現方案的文章,可參見: Java分佈式鎖三種實現方案app