全部的鎖都是悲觀的,他們老是假設每一次的臨界區操做會產生衝突,若是有多個線程同時須要訪問臨界區資源,就寧肯犧牲性能讓線程進行等待,因此說鎖會阻塞線程執行.而無鎖是一種樂觀的策略,它會假設對資源的訪問是沒有衝突的,全部的線程均可以在不停頓的狀態下繼續執行.無鎖的策略是使用一種叫作比較交換的技術(CAS)來鑑別線程衝突,一旦檢測到衝突產生,就重試當前操做,直到沒有衝突爲止.算法
CAS算法包含三個參數(V,E,N),V表示要更新的變量,E表示預期值,N表示新值.僅當V值等於E值時(意思是說其餘線程沒有更新V值),纔會將V值設爲N.若是V值和E值不一樣,則說明已經有其餘線程作了更新(V值),則當前線程什麼都不作.最後,CAS返回當前V的真實值.數組
最簡單的無鎖安全整數:AtomicInteger安全
public class AtomicIntegerDemo { static AtomicInteger i = new AtomicInteger(); public static class AddThread implements Runnable { @Override public void run() { for (int k = 0;k < 10000;k++) { i.incrementAndGet(); } } } public static void main(String[] args) throws InterruptedException { Thread[] ts = new Thread[10]; for (int k = 0;k < 10;k++) { ts[k] = new Thread(new AddThread()); ts[k].start(); ts[k].join(); } System.out.println(i); } }
運行結果:多線程
100000dom
由於jdk 8的incrementAndGet()已經牽涉到底層Unsafe類,它有大量的native標識,跟C語言掛鉤的,這個咱們先不說.咱們本身來用無鎖的對象引用AtomicReference來模擬實現一個這個過程.ide
public class AtomicReferenceInteger { //i即V值(V,E,N) static AtomicReference<Integer> i = new AtomicReference<>(0); public static class AddThread implements Runnable { @Override public void run() { for (int k = 0;k < 10000;k++) { while (true) { //m即E值 Integer m = i.get(); //++m即N值,比較m跟i的值是否相等,若是相等,就把++m寫入i,若是i值被其餘線程修改,則繼續循環 if (i.compareAndSet(m,++m)) { break; } } } } } public static void main(String[] args) throws InterruptedException { Thread[] ts = new Thread[10]; for (int k = 0;k < 10;k++) { ts[k] = new Thread(new AddThread()); ts[k].start(); ts[k].join(); } System.out.println(i); } }
運行結果:性能
100000this
以上能夠看出spa
while (true) { //m即E值 Integer m = i.get(); //++m即N值,比較m跟i的值是否相等,若是相等,就把++m寫入i,若是i值被其餘線程修改,則繼續循環 if (i.compareAndSet(m,++m)) { break; } }
即模擬實現了incrementAndGet();線程
帶有時間戳的對象引用:AtomicStampedReference
使用帶時間戳的對象引用時,對象值和時間戳都必須知足指望值,寫入纔會成功.所以,即便對象值被反覆讀寫,寫回原值,只要時間戳發生變化,就能防止不恰當的寫入.
public class AtomicReferenceAcount { public static void main(String[] args) { // AtomicReference<Integer> money = new AtomicReference<>(); AtomicStampedReference<Integer> money = new AtomicStampedReference<>(19,0); // money.set(19); for (int i = 0;i < 10000;i++) { final int timestamp = money.getStamp(); new Thread(new Runnable() { @Override public void run() { while (true) { // Integer m = money.get(); Integer m = money.getReference(); if (m < 20) { //比較money跟m的值相等,才更新money爲m+20,若是不等則從新來一遍,這裏money的值可能會被其餘線程修改 //當有其餘線程改變了時間戳timestamp的時候,總體沒法寫入 if (money.compareAndSet(m, m + 20, timestamp, timestamp )) { System.out.println("餘額小於20元,充值成功,餘額:" + money.getReference() + "元"); break; } } else { break; } } } }).start(); } Runnable r2 = new Runnable() { @Override public void run() { for (int i = 0;i < 10000;i++) { while (true) { int timestamp = money.getStamp(); Integer m = money.getReference(); if (m > 10) { System.out.println("大於10元"); if (money.compareAndSet(m,m - 10,timestamp,timestamp)) { System.out.println("成功消費10元,餘額:" + money.getReference()); break; } }else { System.out.println("沒有足夠金額"); break; } } // try { // Thread.sleep(100); // }catch (InterruptedException e) { // // } } } }; new Thread(r2).start(); } }
咱們先讓時間戳永遠不變,運行結果(部分選取)
餘額小於20元,充值成功,餘額:39元
大於10元
成功消費10元,餘額:29
大於10元
成功消費10元,餘額:19
大於10元
成功消費10元,餘額:9
沒有足夠金額
沒有足夠金額
沒有足夠金額
沒有足夠金額
沒有足夠金額
沒有足夠金額
沒有足夠金額
沒有足夠金額
沒有足夠金額
沒有足夠金額
沒有足夠金額
沒有足夠金額
沒有足夠金額
沒有足夠金額
沒有足夠金額
沒有足夠金額
沒有足夠金額
沒有足夠金額
沒有足夠金額
沒有足夠金額
沒有足夠金額
沒有足夠金額
沒有足夠金額
沒有足夠金額
大於10元
成功消費10元,餘額:19
大於10元
成功消費10元,餘額:9
沒有足夠金額
沒有足夠金額
沒有足夠金額
沒有足夠金額
沒有足夠金額
沒有足夠金額
沒有足夠金額
沒有足夠金額
沒有足夠金額
沒有足夠金額
沒有足夠金額
沒有足夠金額
沒有足夠金額
沒有足夠金額
沒有足夠金額
沒有足夠金額
沒有足夠金額
沒有足夠金額
沒有足夠金額
沒有足夠金額
沒有足夠金額
沒有足夠金額
沒有足夠金額
沒有足夠金額
沒有足夠金額
沒有足夠金額
沒有足夠金額
沒有足夠金額
沒有足夠金額
沒有足夠金額
沒有足夠金額
沒有足夠金額
沒有足夠金額
沒有足夠金額
沒有足夠金額
沒有足夠金額
沒有足夠金額
沒有足夠金額
沒有足夠金額
沒有足夠金額
沒有足夠金額
沒有足夠金額
沒有足夠金額
沒有足夠金額
沒有足夠金額
沒有足夠金額
沒有足夠金額
沒有足夠金額
沒有足夠金額
沒有足夠金額
沒有足夠金額
沒有足夠金額
沒有足夠金額
沒有足夠金額
.
.
沒有足夠金額
沒有足夠金額
大於10元
成功消費10元,餘額:19
大於10元
成功消費10元,餘額:29
大於10元
成功消費10元,餘額:19
大於10元
成功消費10元,餘額:9
沒有足夠金額
沒有足夠金額
沒有足夠金額
沒有足夠金額
大於10元
成功消費10元,餘額:19
餘額小於20元,充值成功,餘額:39元
餘額小於20元,充值成功,餘額:29元
大於10元
餘額小於20元,充值成功,餘額:29元
餘額小於20元,充值成功,餘額:39元
餘額小於20元,充值成功,餘額:29元
成功消費10元,餘額:29
大於10元
成功消費10元,餘額:19
大於10元
成功消費10元,餘額:9
根據結果咱們會看到,兩邊的線程會不斷的讀取,寫入.
如今咱們把時間戳改變+1
public class AtomicReferenceAcount { public static void main(String[] args) { // AtomicReference<Integer> money = new AtomicReference<>(); AtomicStampedReference<Integer> money = new AtomicStampedReference<>(19,0); // money.set(19); for (int i = 0;i < 10000;i++) { final int timestamp = money.getStamp(); new Thread(new Runnable() { @Override public void run() { while (true) { // Integer m = money.get(); Integer m = money.getReference(); if (m < 20) { //比較money跟m的值相等,才更新money爲m+20,若是不等則從新來一遍,這裏money的值可能會被其餘線程修改 //當有其餘線程改變了時間戳timestamp的時候,總體沒法寫入 if (money.compareAndSet(m, m + 20, timestamp, timestamp + 1 )) { System.out.println("餘額小於20元,充值成功,餘額:" + money.getReference() + "元"); break; } } else { break; } } } }).start(); } Runnable r2 = new Runnable() { @Override public void run() { for (int i = 0;i < 10000;i++) { while (true) { int timestamp = money.getStamp(); Integer m = money.getReference(); if (m > 10) { System.out.println("大於10元"); if (money.compareAndSet(m,m - 10,timestamp,timestamp + 1)) { System.out.println("成功消費10元,餘額:" + money.getReference()); break; } }else { System.out.println("沒有足夠金額"); break; } } // try { // Thread.sleep(100); // }catch (InterruptedException e) { // // } } } }; new Thread(r2).start(); } }
運行結果:
餘額小於20元,充值成功,餘額:39元
大於10元
成功消費10元,餘額:29
大於10元
成功消費10元,餘額:19
大於10元
成功消費10元,餘額:9
沒有足夠金額
沒有足夠金額
沒有足夠金額
沒有足夠金額
沒有足夠金額
沒有足夠金額
.
.
時間戳+1後,不管運行多少次都不會出現重複充值的現象了。
數組的無鎖:AtomicIntegerArray
public class AtomicIntegerArrayDemo { static AtomicIntegerArray arr = new AtomicIntegerArray(10); public static class AddThread implements Runnable { @Override public void run() { //數組內的全部元素各加1000次1 for (int k = 0;k < 10000;k++) { arr.getAndIncrement(k % arr.length()); } } } public static void main(String[] args) throws InterruptedException { Thread[] ts = new Thread[10]; //10個線程來執行 for (int k = 0;k < 10;k++) { ts[k] = new Thread(new AddThread()); ts[k].start(); ts[k].join(); } System.out.println(arr); } }
運行結果:
[10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000]
這裏arr.getAndIncrement(int i)就是對第i個下標的元素加1
普通變量享受原子操做:AtomicIntegerFieldUpdater
public class AtomicIntegerFieldUpdateDemo { public static class Candidate { int id; volatile int score; } public final static AtomicIntegerFieldUpdater<Candidate> scoreUpdater = AtomicIntegerFieldUpdater.newUpdater(Candidate.class,"score"); public static AtomicInteger allScore = new AtomicInteger(0); public static void main(String[] args) throws InterruptedException { final Candidate stu = new Candidate(); Thread[] t = new Thread[10000]; for (int i = 0;i < 10000;i++) { t[i] = new Thread(new Runnable() { @Override public void run() { if (Math.random() > 0.4) { scoreUpdater.incrementAndGet(stu); allScore.incrementAndGet(); } } }); t[i].start(); t[i].join(); } System.out.println("score=" + stu.score); System.out.println("allScore=" + allScore); } }
運行結果:
score=5952
allScore=5952
不管運行多少次,咱們均可以看到score跟allScore相等,說明普通變量int score進行了原子操做(注意int score必須聲明爲volatile,多線程可見,且不能爲私有類型)。
無鎖Vector實現
模仿Vector機制來完成一個無鎖線程安全的List集合(源碼來自amino)
public class LockFreeVector<E> extends AbstractList<E> { private static final boolean debug = false; /** * Size of the first bucket. sizeof(bucket[i+1])=2*sizeof(bucket[i]) * 第一個數組大小 */ private static final int FIRST_BUCKET_SIZE = 8; /** * number of buckets. 30 will allow 8*(2^30-1) elements * 全部數組的個數 */ private static final int N_BUCKET = 30; /** * We will have at most N_BUCKET number of buckets. And we have * sizeof(buckets.get(i))=FIRST_BUCKET_SIZE**(i+1) * 存放數據的二維數組,方便動態擴展 */ private final AtomicReferenceArray<AtomicReferenceArray<E>> buckets; /** * @author ganzhi * 寫入類 * @param <E> */ static class WriteDescriptor<E> { //指望值,即E(V,E,N) public E oldV; //寫入的新值,即N public E newV; //要修改的原子數組,即V public AtomicReferenceArray<E> addr; //要修改的數組的索引位置 public int addr_ind; /** * Creating a new descriptor. * * @param addr Operation address 要寫入的數組 * @param addr_ind Index of address 要寫入的數組索引 * @param oldV old operand 預期值 * @param newV new operand 新值 */ public WriteDescriptor(AtomicReferenceArray<E> addr, int addr_ind, E oldV, E newV) { this.addr = addr; this.addr_ind = addr_ind; this.oldV = oldV; this.newV = newV; } /** * set newV. * 給原子數組進行原子操做賦值 */ public void doIt() { addr.compareAndSet(addr_ind, oldV, newV); } } /** * @author ganzhi * 爲了更有序的讀寫數組,使用CAS操做寫入新數據 * 寫入器 * @param <E> */ static class Descriptor<E> { //整個Vector長度(非數組長度,是幾個數組加起來的長度) public int size; //寫入對象,對全部線程可見 volatile WriteDescriptor<E> writeop; /** * Create a new descriptor. * * @param size Size of the vector * @param writeop Executor write operation */ public Descriptor(int size, WriteDescriptor<E> writeop) { this.size = size; this.writeop = writeop; } /** * 完成寫入,doIt()方法纔是真正對原子數組的寫入 */ public void completeWrite() { WriteDescriptor<E> tmpOp = writeop; if (tmpOp != null) { tmpOp.doIt(); writeop = null; // this is safe since all write to writeop use // null as r_value. } } } /** * 當前線程寫入器的原子引用 */ private AtomicReference<Descriptor<E>> descriptor; private static final int zeroNumFirst = Integer .numberOfLeadingZeros(FIRST_BUCKET_SIZE);; /** * Constructor. */ public LockFreeVector() { //初始化一個能夠存放30個原子數組的數組,即二維數組 buckets = new AtomicReferenceArray<AtomicReferenceArray<E>>(N_BUCKET); //給第0位初始化一個8位長的原子數組 buckets.set(0, new AtomicReferenceArray<E>(FIRST_BUCKET_SIZE)); //初始化一個原子引用的Descriptor對象,無長度,無內容 descriptor = new AtomicReference<Descriptor<E>>(new Descriptor<E>(0, null)); } /** * add e at the end of vector. * 最核心功能,將元素壓入Vector最後一個位置 * @param e * element added */ public void push_back(E e) { //預期寫入器 Descriptor<E> desc; //新值寫入器 Descriptor<E> newd; do { //獲取當前線程寫入器給預期寫入器 desc = descriptor.get(); //若是有其餘線程在循環跳出後修改了當前線程寫入器,則完成一次寫入,預防措施 desc.completeWrite(); //判斷將數據插入到Vector的哪個數組中,Vector總共有30個數組 //數組的長度,第一個是8,第二個是16,第三個是32。。。 int pos = desc.size + FIRST_BUCKET_SIZE; int zeroNumPos = Integer.numberOfLeadingZeros(pos); //取得第幾個數組 int bucketInd = zeroNumFirst - zeroNumPos; //若是這個數組爲空 if (buckets.get(bucketInd) == null) { //取得上一個數組的長度*2 int newLen = 2 * buckets.get(bucketInd - 1).length(); if (debug) System.out.println("New Length is:" + newLen); //原子性增長新數組,若是這個數組爲空,則建立一個新長度的數組,長度是上一個數組的2倍 //若是不爲空則等待 buckets.compareAndSet(bucketInd, null, new AtomicReferenceArray<E>(newLen)); } //取得元素在目標數組中的索引位 int idx = (0x80000000>>>zeroNumPos) ^ pos; //建立一個新的寫入對象,包含目標數組buckets.get(bucketInd),待插入索引位idx,目標是否爲空,插入對象e newd = new Descriptor<E>(desc.size + 1, new WriteDescriptor<E>( buckets.get(bucketInd), idx, null, e)); //若是當前線程寫入器與預期寫入器不等,則從新循環,若是相等則將新值寫入器賦給當前線程寫入器 } while (!descriptor.compareAndSet(desc, newd)); //獲取到新寫入器的當前線程寫入器完成寫入 descriptor.get().completeWrite(); } /** * Remove the last element in the vector. * * @return element removed */ public E pop_back() { Descriptor<E> desc; Descriptor<E> newd; E elem; do { desc = descriptor.get(); desc.completeWrite(); int pos = desc.size + FIRST_BUCKET_SIZE - 1; int bucketInd = Integer.numberOfLeadingZeros(FIRST_BUCKET_SIZE) - Integer.numberOfLeadingZeros(pos); int idx = Integer.highestOneBit(pos) ^ pos; elem = buckets.get(bucketInd).get(idx); newd = new Descriptor<E>(desc.size - 1, null); } while (!descriptor.compareAndSet(desc, newd)); return elem; } /** * Get element with the index. * * @param index * index * @return element with the index */ @Override public E get(int index) { int pos = index + FIRST_BUCKET_SIZE; int zeroNumPos = Integer.numberOfLeadingZeros(pos); //獲取第幾個數組 int bucketInd = zeroNumFirst - zeroNumPos; //獲取該數組的索引位 int idx = (0x80000000>>>zeroNumPos) ^ pos; return buckets.get(bucketInd).get(idx); } /** * Set the element with index to e. * * @param index * index of element to be reset * @param e * element to set */ /** * {@inheritDoc} */ public E set(int index, E e) { int pos = index + FIRST_BUCKET_SIZE; int bucketInd = Integer.numberOfLeadingZeros(FIRST_BUCKET_SIZE) - Integer.numberOfLeadingZeros(pos); int idx = Integer.highestOneBit(pos) ^ pos; AtomicReferenceArray<E> bucket = buckets.get(bucketInd); while (true) { E oldV = bucket.get(idx); if (bucket.compareAndSet(idx, oldV, e)) return oldV; } } /** * reserve more space. * * @param newSize * new size be reserved */ public void reserve(int newSize) { int size = descriptor.get().size; int pos = size + FIRST_BUCKET_SIZE - 1; int i = Integer.numberOfLeadingZeros(FIRST_BUCKET_SIZE) - Integer.numberOfLeadingZeros(pos); if (i < 1) i = 1; int initialSize = buckets.get(i - 1).length(); while (i < Integer.numberOfLeadingZeros(FIRST_BUCKET_SIZE) - Integer.numberOfLeadingZeros(newSize + FIRST_BUCKET_SIZE - 1)) { i++; initialSize *= FIRST_BUCKET_SIZE; buckets.compareAndSet(i, null, new AtomicReferenceArray<E>( initialSize)); } } /** * size of vector. * * @return size of vector */ public int size() { return descriptor.get().size; } /** * {@inheritDoc} */ @Override public boolean add(E object) { push_back(object); return true; } }
這裏咱們重點對push_back(E e)方法(將對象添加到Vector的最末尾),get(int index)方法(取出第幾個)進行了中文標註,其間Vector是一個二維數組,當第一個數組存滿後,擴展到第二個數組,每一個數組的長度都是乘2擴展的。