無鎖即無障礙的運行, 全部線程均可以到達臨界區, 接近於無等待.java
無鎖採用CAS(compare and swap)算法來處理線程衝突, 其原理以下算法
CAS包含3個參數CAS(V,E,N).V表示要更新的變量, E表示預期值, N表示新值.數據庫
僅當V值等於E值時, 纔會將V的值設爲N, 若是V值和E值不一樣, 則說明已經有其餘線程作了更新, 則當前線程什麼數組
都不作. 最後, CAS返回當前V的真實值. CAS操做是抱着樂觀的態度進行的, 它老是認爲本身能夠成功完成操做.安全
當多個線程同時使用CAS操做一個變量時, 只有一個會勝出, 併成功更新, 其他均會失敗.失敗的線程不會被掛起,多線程
僅是被告知失敗, 而且容許再次嘗試, 固然也容許失敗的線程放棄操做.基於這樣的原理, CAS操做即時沒有鎖,dom
也能夠發現其餘線程對當前線程的干擾, 並進行恰當的處理.ide
CPU指令函數
另外, 雖然上述步驟繁多, 實際上CAS整一個操做過程是一個原子操做, 它是由一條CPU指令完成的,源碼分析
從指令層保證操做可靠, 不會被多線程干擾.
無鎖與volatile
無鎖能夠經過cas來保證原子性與線程安全, 他與volatile什麼區別呢?
當給變量加了volatile關鍵字, 表示該變量對全部線程可見, 但不保證原子性.
以volatile i, i++爲例, 分爲如下四步:
其中前三步是線程不安全的, 可能其餘線程會對i進行讀寫.
所以任何依賴於以前值的操做, 如i++, i = i *10使用volatile都不安全.
而諸如get/set, boolean這類可使用volatile.
主要接口
1 // 取得當前值 2 public final int get() 3 // 設置當前值 4 public final void set(int newValue) 5 // 設置新值,並返回舊值 6 public final int getAndSet(int newValue) 7 // 若是當前值爲expect,則設置爲u 8 public final boolean compareAndSet(int expect, int u) 9 // 當前值加1,返回舊值 10 public final int getAndIncrement() 11 // 當前值減1,返回舊值 12 public final int getAndDecrement() 13 // 當前值增長delta,返回舊值 14 public final int getAndAdd(int delta) 15 // 當前值加1,返回新值 16 public final int incrementAndGet() 17 // 當前值減1,返回新值 18 public final int decrementAndGet() 19 // 當前值增長delta,返回新值 20 public final int addAndGet(int delta)
源碼實現
1 // 封裝了一個int對其加減 2 private volatile int value; 3 ....... 4 public final boolean compareAndSet(int expect, int update) { 5 // 經過unsafe 基於CPU的CAS指令來實現, 能夠認爲無阻塞. 6 return unsafe.compareAndSwapInt(this, valueOffset, expect, update); 7 } 8 ....... 9 public final int getAndIncrement() { 10 for (;;) { 11 // 當前值 12 int current = get(); 13 // 預期值 14 int next = current + 1; 15 if (compareAndSet(current, next)) { 16 // 若是加成功了, 則返回當前值 17 return current; 18 } 19 // 若是加失敗了, 說明其餘線程已經修改了數據, 與指望不相符, 20 // 則繼續無限循環, 直到成功. 這種樂觀鎖, 理論上只要等兩三個時鐘週期就能夠設值成功 21 // 相比於直接經過synchronized獨佔鎖的方式操做int, 要大大節約等待時間. 22 } 23 }
Demo
使用10個線程打印0-10000, 最終獲得結果10w.
1 import java.util.concurrent.atomic.AtomicInteger; 2 3 public class AtomicIntegerDemo { 4 static AtomicInteger i = new AtomicInteger(); 5 6 public static class AddThread implements Runnable { 7 public void run() { 8 for (int k = 0; k < 10000; k++) { 9 i.incrementAndGet(); 10 } 11 } 12 } 13 14 public static void main(String[] args) throws InterruptedException { 15 Thread[] ts = new Thread[10]; 16 for (int k = 0; k < 10; k++) { 17 ts[k] = new Thread(new AddThread()); 18 } 19 for (int k = 0; k < 10; k++) { 20 ts[k].start(); 21 } 22 for (int k = 0; k < 10; k++) { 23 ts[k].join(); 24 } 25 System.out.println(i); 26 } 27 }
Unsafe類是在sun.misc包下, 能夠用於一些非安全的操做,好比:
根據偏移量設置值, 線程park(), 底層的CAS操做等等.
1 // 獲取類實例中變量的偏移量 2 valueOffset = unsafe.objectFieldOffset(AtomicInteger.class.getDeclaredField("value")); 3 // 基於偏移量對值進行操做 4 unsafe.compareAndSwapInt(this, valueOffset, expect, update);
主要接口
1 // 得到給定對象偏移量上的int值 2 public native int getInt(Object o, long offset); 3 // 設置給定對象偏移量上的int值 4 public native void putInt(Object o, long offset, int x); 5 // 得到字段在對象中的偏移量 6 public native long objectFieldOffset(Field f); 7 // 設置給定對象的int值,使用volatile語義 8 public native void putIntVolatile(Object o, long offset, int x); 9 // 得到給定對象對象的int值,使用volatile語義 10 public native int getIntVolatile(Object o, long offset); 11 // 和putIntVolatile()同樣,可是它要求被操做字段就是volatile類型的 12 public native void putOrderedInt(Object o, long offset, int x);
與AtomicInteger相似, 只是裏面封裝了一個對象, 而不是int, 對引用進行修改
主要接口
1 get() 2 set(V) 3 compareAndSet() 4 getAndSet(V)
Demo
使用10個線程, 同時嘗試修改AtomicReference中的String, 最終只有一個線程能夠成功.
1 import java.util.concurrent.atomic.AtomicReference; 2 3 public class AtomicReferenceTest { 4 public final static AtomicReference<String> attxnicStr = new AtomicReference<String>("abc"); 5 6 public static void main(String[] args) { 7 for (int i = 0; i < 10; i++) { 8 new Thread() { 9 public void run() { 10 try { 11 Thread.sleep(Math.abs((int) (Math.random() * 100))); 12 } catch (InterruptedException e) { 13 e.printStackTrace(); 14 } 15 if (attxnicStr.compareAndSet("abc", "def")) { 16 System.out.println("Thread:" + Thread.currentThread().getId() + " change value to " + attxnicStr.get()); 17 } else { 18 System.out.println("Thread:" + Thread.currentThread().getId() + " change failed!"); 19 } 20 } 21 }.start(); 22 } 23 } 24 }
也是封裝了一個引用, 主要解決ABA問題.
ABA問題
線程一準備用CAS將變量的值由A替換爲B, 在此以前線程二將變量的值由A替換爲C, 線程三又將C替換爲A, 而後線程一執行CAS時發現變量的值仍然爲A, 因此線程一CAS成功.
主要接口
1 // 比較設置 參數依次爲:指望值 寫入新值 指望時間戳 新時間戳 2 public boolean compareAndSet(V expectedReference,V newReference,int expectedStamp,int newStamp) 3 // 得到當前對象引用 4 public V getReference() 5 // 得到當前時間戳 6 public int getStamp() 7 // 設置當前對象引用和時間戳 8 public void set(V newReference, int newStamp)
源碼分析
1 // 內部封裝了一個Pair對象, 每次對對象操做的時候, stamp + 1 2 private static class Pair<T> { 3 final T reference; 4 final int stamp; 5 private Pair(T reference, int stamp) { 6 this.reference = reference; 7 this.stamp = stamp; 8 } 9 static <T> Pair<T> of(T reference, int stamp) { 10 return new Pair<T>(reference, stamp); 11 } 12 } 13 14 private volatile Pair<V> pair; 15 16 // 進行cas操做的時候, 會對比stamp的值 17 public boolean compareAndSet(V expectedReference, 18 V newReference, 19 int expectedStamp, 20 int newStamp) { 21 Pair<V> current = pair; 22 return 23 expectedReference == current.reference && 24 expectedStamp == current.stamp && 25 ((newReference == current.reference && 26 newStamp == current.stamp) || 27 casPair(current, Pair.of(newReference, newStamp))); 28 }
Demo
後臺使用多個線程對用戶充值, 要求只能充值一次
1 public class AtomicStampedReferenceDemo { 2 static AtomicStampedReference<Integer> money=new AtomicStampedReference<Integer>(19,0); 3 public staticvoid main(String[] args) { 4 //模擬多個線程同時更新後臺數據庫,爲用戶充值 5 for(int i = 0 ; i < 3 ; i++) { 6 final int timestamp=money.getStamp(); 7 newThread() { 8 public void run() { 9 while(true){ 10 while(true){ 11 Integerm=money.getReference(); 12 if(m<20){ 13 if(money.compareAndSet(m,m+20,timestamp,timestamp+1)){ 14 System.out.println("餘額小於20元,充值成功,餘額:"+money.getReference()+"元"); 15 break; 16 } 17 }else{ 18 //System.out.println("餘額大於20元,無需充值"); 19 break ; 20 } 21 } 22 } 23 } 24 }.start(); 25 } 26 27 //用戶消費線程,模擬消費行爲 28 new Thread() { 29 publicvoid run() { 30 for(int i=0;i<100;i++){ 31 while(true){ 32 int timestamp=money.getStamp(); 33 Integer m=money.getReference(); 34 if(m>10){ 35 System.out.println("大於10元"); 36 if(money.compareAndSet(m, m-10,timestamp,timestamp+1)){ 37 System.out.println("成功消費10元,餘額:"+money.getReference()); 38 break; 39 } 40 }else{ 41 System.out.println("沒有足夠的金額"); 42 break; 43 } 44 } 45 try {Thread.sleep(100);} catch (InterruptedException e) {} 46 } 47 } 48 }.start(); 49 } 50 }
支持無鎖的數組
主要接口
1 // 得到數組第i個下標的元素 2 public final int get(int i) 3 // 得到數組的長度 4 public final int length() 5 // 將數組第i個下標設置爲newValue,並返回舊的值 6 public final int getAndSet(int i, int newValue) 7 // 進行CAS操做,若是第i個下標的元素等於expect,則設置爲update,設置成功返回true 8 public final boolean compareAndSet(int i, int expect, int update) 9 // 將第i個下標的元素加1 10 public final int getAndIncrement(int i) 11 // 將第i個下標的元素減1 12 public final int getAndDecrement(int i) 13 // 將第i個下標的元素增長delta(delta能夠是負數) 14 public final int getAndAdd(int i, int delta)
源碼分析
1 // 數組自己基地址 2 private static final int base = unsafe.arrayBaseOffset(int[].class); 3 4 // 封裝了一個數組 5 private final int[] array; 6 7 static { 8 // 數組中對象的寬度, int類型, 4個字節, scale = 4; 9 int scale = unsafe.arrayIndexScale(int[].class); 10 if ((scale & (scale - 1)) != 0) 11 throw new Error("data type scale not a power of two"); 12 // 前導0 : 一個數字轉爲二進制後, 他前面0的個數 13 // 對於4來說, 他就是00000000 00000000 00000000 00000100, 他的前導0 就是29 14 // 因此shift = 2 15 shift = 31 - Integer.numberOfLeadingZeros(scale); 16 } 17 18 // 獲取第i個元素 19 public final int get(int i) { 20 return getRaw(checkedByteOffset(i)); 21 } 22 23 // 第i個元素, 在數組中的偏移量是多少 24 private long checkedByteOffset(int i) { 25 if (i < 0 || i >= array.length) 26 throw new IndexOutOfBoundsException("index " + i); 27 28 return byteOffset(i); 29 } 30 31 // base : 數組基地址, i << shift, 其實就是i * 4, 由於這邊是int array. 32 private static long byteOffset(int i) { 33 // i * 4 + base 34 return ((long) i << shift) + base; 35 } 36 37 // 根據偏移量從數組中獲取數據 38 private int getRaw(long offset) { 39 return unsafe.getIntVolatile(array, offset); 40 }
Demo
1 import java.util.concurrent.atomic.AtomicIntegerArray; 2 3 public class AtomicArrayDemo { 4 static AtomicIntegerArray arr = new AtomicIntegerArray(10); 5 6 public static class AddThread implements Runnable { 7 public void run() { 8 for (int k = 0; k < 10000; k++) { 9 arr.incrementAndGet(k % arr.length()); 10 } 11 } 12 } 13 14 public static void main(String[] args) throws InterruptedException { 15 Thread[] ts = new Thread[10]; 16 for (int k = 0; k < 10; k++) { 17 ts[k] = new Thread(new AddThread()); 18 } 19 for (int k = 0; k < 10; k++) { 20 ts[k].start(); 21 } 22 for (int k = 0; k < 10; k++) { 23 ts[k].join(); 24 } 25 System.out.println(arr); 26 } 27 }
讓普通變量也享受原子操做
主要接口
1 AtomicIntegerFieldUpdater.newUpdater() 2 incrementAndGet()
1 import java.util.concurrent.atomic.AtomicInteger; 2 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; 3 4 public class AtomicIntegerFieldUpdaterDemo { 5 public static class Candidate { 6 int id; 7 // 若是直接把int改爲atomicinteger, 可能對代碼破壞比較大 8 // 所以使用AtomicIntegerFieldUpdater對score進行封裝 9 volatile int score; 10 } 11 12 // 經過反射實現 13 public final static AtomicIntegerFieldUpdater<Candidate> scoreUpdater = AtomicIntegerFieldUpdater.newUpdater(Candidate.class, "score"); 14 // 檢查Updater是否工做正確, allScore的結果應該跟score一致 15 public static AtomicInteger allScore = new AtomicInteger(0); 16 17 public static void main(String[] args) throws InterruptedException { 18 final Candidate stu = new Candidate(); 19 Thread[] t = new Thread[10000]; 20 for (int i = 0; i < 10000; i++) { 21 t[i] = new Thread() { 22 public void run() { 23 if (Math.random() > 0.4) { 24 scoreUpdater.incrementAndGet(stu); 25 allScore.incrementAndGet(); 26 } 27 } 28 }; 29 t[i].start(); 30 } 31 for (int i = 0; i < 10000; i++) { 32 t[i].join(); 33 } 34 35 System.out.println("score=" + stu.score); 36 System.out.println("allScore=" + allScore); 37 } 38 }
jdk中Vector是加鎖的, 網上找的一個無鎖Vector LockFreeVector, 給他添加了源碼中文註釋.
主要關注push_back, 添加元素的函數
1 import java.util.AbstractList; 2 import java.util.concurrent.atomic.AtomicReference; 3 import java.util.concurrent.atomic.AtomicReferenceArray; 4 5 /** 6 * It is a thread safe and lock-free vector. 7 * This class implement algorithm from:<br> 8 * 9 * Lock-free Dynamically Resizable Arrays <br> 10 * 11 * @param <E> type of element in the vector 12 * 13 */ 14 public class LockFreeVector<E> extends AbstractList<E> { 15 private static final boolean debug = false; 16 /** 17 * Size of the first bucket. sizeof(bucket[i+1])=2*sizeof(bucket[i]) 18 */ 19 private static final int FIRST_BUCKET_SIZE = 8; 20 21 /** 22 * number of buckets. 30 will allow 8*(2^30-1) elements 23 */ 24 private static final int N_BUCKET = 30; 25 26 /** 27 * We will have at most N_BUCKET number of buckets. And we have 28 * sizeof(buckets.get(i))=FIRST_BUCKET_SIZE**(i+1) 29 * 30 * 爲何AtomicReferenceArray裏再套一個AtomicReferenceArray呢, 相似一個籃子(buckets)裏放了不少籃子 31 * 爲了在容量擴展時但願儘量少的改動原有數據, 所以把一維數組擴展成二維數組. 32 * 該二維數組並不是均衡的分佈. 可能第一個數組8個元素, 第二個數組16個元素, 第三個數組32個...... 33 */ 34 private final AtomicReferenceArray<AtomicReferenceArray<E>> buckets; 35 36 /** 37 * @param <E> 38 */ 39 static class WriteDescriptor<E> { 40 public E oldV; 41 public E newV; 42 public AtomicReferenceArray<E> addr; 43 public int addr_ind; 44 45 /** 46 * Creating a new descriptor. 47 * 48 * @param addr Operation address 對哪一個數組進行寫 49 * @param addr_ind Index of address 指定index 50 * @param oldV old operand 51 * @param newV new operand 52 */ 53 public WriteDescriptor(AtomicReferenceArray<E> addr, int addr_ind, 54 E oldV, E newV) { 55 this.addr = addr; 56 this.addr_ind = addr_ind; 57 this.oldV = oldV; 58 this.newV = newV; 59 } 60 61 /** 62 * set newV. 63 */ 64 public void doIt() { 65 // 這邊失敗後重試的邏輯在另外的代碼裏. 66 addr.compareAndSet(addr_ind, oldV, newV); 67 } 68 } 69 70 /** 71 * @param <E> 72 */ 73 static class Descriptor<E> { 74 public int size; 75 volatile WriteDescriptor<E> writeop; 76 77 /** 78 * Create a new descriptor. 79 * 80 * @param size Size of the vector 81 * @param writeop Executor write operation 82 */ 83 public Descriptor(int size, WriteDescriptor<E> writeop) { 84 this.size = size; 85 this.writeop = writeop; 86 } 87 88 /** 89 * 90 */ 91 public void completeWrite() { 92 WriteDescriptor<E> tmpOp = writeop; 93 if (tmpOp != null) { 94 tmpOp.doIt(); 95 writeop = null; // this is safe since all write to writeop use 96 // null as r_value. 97 } 98 } 99 } 100 101 private AtomicReference<Descriptor<E>> descriptor; 102 private static final int zeroNumFirst = Integer 103 .numberOfLeadingZeros(FIRST_BUCKET_SIZE); 104 105 /** 106 * Constructor. 107 */ 108 public LockFreeVector() { 109 buckets = new AtomicReferenceArray<AtomicReferenceArray<E>>(N_BUCKET); 110 buckets.set(0, new AtomicReferenceArray<E>(FIRST_BUCKET_SIZE)); 111 descriptor = new AtomicReference<Descriptor<E>>(new Descriptor<E>(0, 112 null)); 113 } 114 115 /** 116 * add e at the end of vector. 117 * 把元素e加到vector中 118 * 119 * @param e 120 * element added 121 */ 122 public void push_back(E e) { 123 Descriptor<E> desc; 124 Descriptor<E> newd; 125 do { 126 desc = descriptor.get(); 127 desc.completeWrite(); 128 // desc.size Vector 自己的大小 129 // FIRST_BUCKET_SIZE 第一個一維數組的大小 130 int pos = desc.size + FIRST_BUCKET_SIZE; 131 // 取出pos 的前導0 132 int zeroNumPos = Integer.numberOfLeadingZeros(pos); 133 // zeroNumFirst 爲FIRST_BUCKET_SIZE 的前導0 134 // bucketInd 數據應該放到哪個一維數組(籃子)裏的 135 int bucketInd = zeroNumFirst - zeroNumPos; 136 // 00000000 00000000 00000000 00001000 第一個籃子滿 8 137 // 00000000 00000000 00000000 00011000 第二個籃子滿 8 + 16 138 // 00000000 00000000 00000000 00111000 第三個籃子滿 8 + 16 + 32 139 // ... bucketInd其實經過前導0相減, 就是爲了得出來當前第幾個籃子是空的. 140 141 // 判斷這個一維數組是否已經啓用, 多是第一次初始化 142 if (buckets.get(bucketInd) == null) { 143 //newLen 一維數組的長度, 取前一個數組長度 * 2 144 int newLen = 2 * buckets.get(bucketInd - 1).length(); 145 // 設置失敗也不要緊, 只要有人初始化成功就行 146 buckets.compareAndSet(bucketInd, null, 147 new AtomicReferenceArray<E>(newLen)); 148 } 149 150 // 在這個一位數組中,我在哪一個位置 151 // 0x80000000是 10000000 00000000 00000000 00000000 152 // 這句話就是把上述111000, 第一個1變成了0, 獲得011000, 即新值的位置. 153 int idx = (0x80000000>>>zeroNumPos) ^ pos; 154 // 經過bucketInd與idx來肯定元素在二維數組中的位置 155 // 指望寫入的時候, 該位置值是null, 若是非null, 說明其餘線程已經寫了, 則繼續循環. 156 newd = new Descriptor<E>(desc.size + 1, new WriteDescriptor<E>( 157 buckets.get(bucketInd), idx, null, e)); 158 // 循環cas設值 159 } while (!descriptor.compareAndSet(desc, newd)); 160 descriptor.get().completeWrite(); 161 } 162 163 /** 164 * Remove the last element in the vector. 165 * 166 * @return element removed 167 */ 168 public E pop_back() { 169 Descriptor<E> desc; 170 Descriptor<E> newd; 171 E elem; 172 do { 173 desc = descriptor.get(); 174 desc.completeWrite(); 175 176 int pos = desc.size + FIRST_BUCKET_SIZE - 1; 177 int bucketInd = Integer.numberOfLeadingZeros(FIRST_BUCKET_SIZE) 178 - Integer.numberOfLeadingZeros(pos); 179 int idx = Integer.highestOneBit(pos) ^ pos; 180 elem = buckets.get(bucketInd).get(idx); 181 newd = new Descriptor<E>(desc.size - 1, null); 182 } while (!descriptor.compareAndSet(desc, newd)); 183 184 return elem; 185 } 186 187 /** 188 * Get element with the index. 189 * 190 * @param index 191 * index 192 * @return element with the index 193 */ 194 @Override 195 public E get(int index) { 196 int pos = index + FIRST_BUCKET_SIZE; 197 int zeroNumPos = Integer.numberOfLeadingZeros(pos); 198 int bucketInd = zeroNumFirst - zeroNumPos; 199 int idx = (0x80000000>>>zeroNumPos) ^ pos; 200 return buckets.get(bucketInd).get(idx); 201 } 202 203 /** 204 * Set the element with index to e. 205 * 206 * @param index 207 * index of element to be reset 208 * @param e 209 * element to set 210 */ 211 /** 212 * {@inheritDoc} 213 */ 214 public E set(int index, E e) { 215 int pos = index + FIRST_BUCKET_SIZE; 216 int bucketInd = Integer.numberOfLeadingZeros(FIRST_BUCKET_SIZE) 217 - Integer.numberOfLeadingZeros(pos); 218 int idx = Integer.highestOneBit(pos) ^ pos; 219 AtomicReferenceArray<E> bucket = buckets.get(bucketInd); 220 while (true) { 221 E oldV = bucket.get(idx); 222 if (bucket.compareAndSet(idx, oldV, e)) 223 return oldV; 224 } 225 } 226 227 /** 228 * reserve more space. 229 * 230 * @param newSize 231 * new size be reserved 232 */ 233 public void reserve(int newSize) { 234 int size = descriptor.get().size; 235 int pos = size + FIRST_BUCKET_SIZE - 1; 236 int i = Integer.numberOfLeadingZeros(FIRST_BUCKET_SIZE) 237 - Integer.numberOfLeadingZeros(pos); 238 if (i < 1) 239 i = 1; 240 241 int initialSize = buckets.get(i - 1).length(); 242 while (i < Integer.numberOfLeadingZeros(FIRST_BUCKET_SIZE) 243 - Integer.numberOfLeadingZeros(newSize + FIRST_BUCKET_SIZE - 1)) { 244 i++; 245 initialSize *= FIRST_BUCKET_SIZE; 246 buckets.compareAndSet(i, null, new AtomicReferenceArray<E>( 247 initialSize)); 248 } 249 } 250 251 /** 252 * size of vector. 253 * 254 * @return size of vector 255 */ 256 public int size() { 257 return descriptor.get().size; 258 } 259 260 /** 261 * {@inheritDoc} 262 */ 263 @Override 264 public boolean add(E object) { 265 push_back(object); 266 return true; 267 } 268 }