在Java多線程併發的狀況下同時對一個變量進行操做會出現線程安全的問題,假如咱們如今使用20個線程對一個變量不停累加1,代碼以下:java
1 public class ThreadDemo implements Runnable { 2 private int num = 0; 3 @Override 4 public void run() { 5 try { 6 Thread.sleep(1000); 7 } catch (InterruptedException e) { 8 e.printStackTrace(); 9 } 10 num++; 11 System.out.println(Thread.currentThread().getName() + "處理成功,num = " + num); 12 } 13 14 public static void main(String[] args) { 15 ThreadDemo demo = new ThreadDemo(); 16 for (int i = 0; i < 20; i++) { 17 new Thread(demo,"線程" + i).start(); 18 } 19 } 20 }
理想狀況是累加到20,但實際運行的結果以下:算法
1 線程2處理成功,num = 3 2 線程3處理成功,num = 3 3 線程1處理成功,num = 3 4 線程0處理成功,num = 3 5 線程5處理成功,num = 4 6 線程7處理成功,num = 6 7 線程9處理成功,num = 8 8 線程4處理成功,num = 5 9 線程6處理成功,num = 4 10 線程8處理成功,num = 7 11 線程10處理成功,num = 9 12 線程11處理成功,num = 11 13 線程13處理成功,num = 12 14 線程12處理成功,num = 11 15 線程14處理成功,num = 13 16 線程15處理成功,num = 14 17 線程19處理成功,num = 18 18 線程18處理成功,num = 18 19 線程16處理成功,num = 16 20 線程17處理成功,num = 16
實際運行的結果可能有多種狀況,由於在Java多線程併發的狀況下會有這種安全問題,致使結果不許確,針對這種問題,有如下幾種解決方案數據庫
一、方案一:synchronized編程
1 public class ThreadDemo implements Runnable { 2 3 private int num = 0; 4 5 private synchronized void increase(){ 6 num++; 7 } 8 9 @Override 10 public void run() { 11 try { 12 Thread.sleep(1000); 13 } catch (InterruptedException e) { 14 e.printStackTrace(); 15 } 16 increase(); 17 System.out.println(Thread.currentThread().getName() + "處理成功,num = " + num); 18 } 19 20 public static void main(String[] args) { 21 ThreadDemo demo = new ThreadDemo(); 22 for (int i = 0; i < 20; i++) { 23 new Thread(demo,"線程" + i).start(); 24 } 25 } 26 }
運行結果以下:數組
1 線程0處理成功,num = 1 2 線程1處理成功,num = 2 3 線程2處理成功,num = 3 4 線程3處理成功,num = 4 5 線程4處理成功,num = 5 6 線程5處理成功,num = 6 7 線程6處理成功,num = 7 8 線程7處理成功,num = 8 9 線程12處理成功,num = 13 10 線程14處理成功,num = 15 11 線程15處理成功,num = 16 12 線程16處理成功,num = 17 13 線程10處理成功,num = 11 14 線程9處理成功,num = 10 15 線程8處理成功,num = 10 16 線程13處理成功,num = 14 17 線程11處理成功,num = 13 18 線程17處理成功,num = 18 19 線程18處理成功,num = 19 20 線程19處理成功,num = 20
這個時候,代碼就是線程安全的了,由於咱們加了synchronized,也就是讓每一個線程要進入increase()方法以前先得嘗試加鎖,同一時間只有一個線程能加鎖,其餘線程須要等待鎖。經過這樣處理,就能夠保證換個data每次都會累加1,不會出現數據錯亂的問題。可是,如此簡單的data++操做,都要加一個重磅的synchronized鎖來解決多線程併發問題,一個接一個的排隊,加鎖,處理數據,釋放鎖,下一個再進來,有點大材小用,synchronized是能夠解決更加複雜的併發編程場景和問題的。緩存
二、更高效方案:Atomic類安全
對於這種簡單的data++類的操做,java併發包下面提供了一系列的Atomic原子類,好比說AtomicInteger,能夠保證多線程併發安全的狀況下,高性能的併發更新一個數值,代碼以下:多線程
1 public class ThreadDemo implements Runnable { 2 3 private AtomicInteger num = new AtomicInteger(0); 4 5 @Override 6 public void run() { 7 try { 8 Thread.sleep(1000); 9 } catch (InterruptedException e) { 10 e.printStackTrace(); 11 } 12 num.incrementAndGet(); 13 System.out.println(Thread.currentThread().getName() + "處理成功,num = " + num); 14 } 15 16 public static void main(String[] args) { 17 ThreadDemo demo = new ThreadDemo(); 18 for (int i = 0; i < 20; i++) { 19 new Thread(demo,"線程" + i).start(); 20 } 21 } 22 }
運行結果以下:併發
線程1處理成功,num = 2 線程0處理成功,num = 2 線程2處理成功,num = 3 線程4處理成功,num = 5 線程3處理成功,num = 4 線程5處理成功,num = 6 線程6處理成功,num = 7 線程7處理成功,num = 8 線程9處理成功,num = 11 線程11處理成功,num = 12 線程13處理成功,num = 14 線程15處理成功,num = 16 線程16處理成功,num = 17 線程17處理成功,num = 18 線程10處理成功,num = 11 線程8處理成功,num = 11 線程14處理成功,num = 15 線程12處理成功,num = 13 線程18處理成功,num = 19 線程19處理成功,num = 20
多個線程能夠併發的執行AtomicInteger的incrementAndGet()方法,意思就是data的值累加1,接着返回累加後最新的值,這個代碼裏就沒有看到加鎖和釋放鎖app
1 public final int getAndAddInt(Object var1, long var2, int var4) { 2 int var5; 3 do { 4 var5 = this.getIntVolatile(var1, var2); 5 } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4)); 6 7 return var5; 8 }
實際上,Atomic原子類底層用的不是傳統意義的鎖機制,而是無鎖化的CAS機制,經過CAS機制保證多線程修改一個數值的安全性,有點樂觀鎖的意思,先取值,累加的時候會比較原來的值是否改變,若是改變就不會執行,會再次獲取最新值(Compare And Swap)
三、Java8對CAS的優化:LongAdder
CAS機制能夠輕量級的實現線程安全,但CAS也有一個問題就是在多線程同時更改一個變量的值的時候可能會循環屢次纔會變動成功,在高併發的狀況下這種狀況會更常見,耗費大量系統資源在死循環上面,所以Java8推出了LongAdder
1 /** 2 * Adds the given value. 3 * 4 * @param x the value to add 5 */ 6 public void add(long x) { 7 Cell[] as; long b, v; int m; Cell a; 8 if ((as = cells) != null || !casBase(b = base, b + x)) { 9 boolean uncontended = true; 10 if (as == null || (m = as.length - 1) < 0 || 11 (a = as[getProbe() & m]) == null || 12 !(uncontended = a.cas(v = a.value, v + x))) 13 longAccumulate(x, null, uncontended); 14 } 15 } 16 17 /** 18 * Handles cases of updates involving initialization, resizing, 19 * creating new Cells, and/or contention. See above for 20 * explanation. This method suffers the usual non-modularity 21 * problems of optimistic retry code, relying on rechecked sets of 22 * reads. 23 * 24 * @param x the value 25 * @param fn the update function, or null for add (this convention 26 * avoids the need for an extra field or function in LongAdder). 27 * @param wasUncontended false if CAS failed before call 28 */ 29 final void longAccumulate(long x, LongBinaryOperator fn, 30 boolean wasUncontended) { 31 int h; 32 if ((h = getProbe()) == 0) { 33 ThreadLocalRandom.current(); // force initialization 34 h = getProbe(); 35 wasUncontended = true; 36 } 37 boolean collide = false; // True if last slot nonempty 38 for (;;) { 39 Cell[] as; Cell a; int n; long v; 40 if ((as = cells) != null && (n = as.length) > 0) { 41 if ((a = as[(n - 1) & h]) == null) { 42 if (cellsBusy == 0) { // Try to attach new Cell 43 Cell r = new Cell(x); // Optimistically create 44 if (cellsBusy == 0 && casCellsBusy()) { 45 boolean created = false; 46 try { // Recheck under lock 47 Cell[] rs; int m, j; 48 if ((rs = cells) != null && 49 (m = rs.length) > 0 && 50 rs[j = (m - 1) & h] == null) { 51 rs[j] = r; 52 created = true; 53 } 54 } finally { 55 cellsBusy = 0; 56 } 57 if (created) 58 break; 59 continue; // Slot is now non-empty 60 } 61 } 62 collide = false; 63 } 64 else if (!wasUncontended) // CAS already known to fail 65 wasUncontended = true; // Continue after rehash 66 else if (a.cas(v = a.value, ((fn == null) ? v + x : 67 fn.applyAsLong(v, x)))) 68 break; 69 else if (n >= NCPU || cells != as) 70 collide = false; // At max size or stale 71 else if (!collide) 72 collide = true; 73 else if (cellsBusy == 0 && casCellsBusy()) { 74 try { 75 if (cells == as) { // Expand table unless stale 76 Cell[] rs = new Cell[n << 1]; 77 for (int i = 0; i < n; ++i) 78 rs[i] = as[i]; 79 cells = rs; 80 } 81 } finally { 82 cellsBusy = 0; 83 } 84 collide = false; 85 continue; // Retry with expanded table 86 } 87 h = advanceProbe(h); 88 } 89 else if (cellsBusy == 0 && cells == as && casCellsBusy()) { 90 boolean init = false; 91 try { // Initialize table 92 if (cells == as) { 93 Cell[] rs = new Cell[2]; 94 rs[h & 1] = new Cell(x); 95 cells = rs; 96 init = true; 97 } 98 } finally { 99 cellsBusy = 0; 100 } 101 if (init) 102 break; 103 } 104 else if (casBase(v = base, ((fn == null) ? v + x : 105 fn.applyAsLong(v, x)))) 106 break; // Fall back on using base 107 } 108 }
LongAdder就是嘗試使用分段CAS的方式來提高高併發執行CAS操做的性能,當併發更新的線程數量過多,其內部會搞一個Cell數組,每一個數組是一個數值分段,這時,讓大量的線程分別去對不一樣Cell內部的value值進行CAS累加操做,把CAS計算壓力分散到了不一樣的Cell分段數值中,這樣就能夠大幅度的下降多線程併發更新同一個數值時出現的無限循環的問題,並且他內部實現了自動分段遷移的機制,也就是若是某個Cell的value執行CAS失敗了,那麼就會自動去找另一個Cell分段內的value值進行CAS操做。
1 /** 2 * Returns the current sum. The returned value is <em>NOT</em> an 3 * atomic snapshot; invocation in the absence of concurrent 4 * updates returns an accurate result, but concurrent updates that 5 * occur while the sum is being calculated might not be 6 * incorporated. 7 * 8 * @return the sum 9 */ 10 public long sum() { 11 Cell[] as = cells; Cell a; 12 long sum = base; 13 if (as != null) { 14 for (int i = 0; i < as.length; ++i) { 15 if ((a = as[i]) != null) 16 sum += a.value; 17 } 18 } 19 return sum; 20 }
最後,從LongAdder中獲取當前累加的總值,就會把base值和全部Cell分段數值加起來返回
從LongAdder分段加鎖的實現邏輯中,咱們也能夠對於一些併發量較大,持續時間較長的不適用緩存模式的搶購類項目的樂觀鎖進行改造,假如商品有1000個庫存,那麼徹底能夠給拆成20個庫存段,能夠在數據庫的表裏建20個庫存字段,好比stock_01,stock_02,以此類推,總之,就是把你的1000件庫存給他拆開,每一個庫存段是50件庫存,好比stock_01對應50件庫存,stock_02對應50件庫存。接着,每秒1000個請求過來了,經過簡單的隨機算法,每一個請求都是隨機在20個分段庫存裏,選擇一個進行加鎖。這樣有最多20個下單請求一塊兒執行,每一個下單請求鎖了一個庫存分段,而後在業務邏輯裏面,就對數據庫的那個分段庫存進行操做便可,包括查庫存 -> 判斷庫存是否充足 -> 扣減庫存,經過這種方式提高併發量,提升用戶體驗。