前面已經介紹過無鎖:
– 無障礙
無障礙是一種最弱的非阻塞調度
自由出入臨界區
無競爭時,有限步內完成操做
有競爭時,回滾數據有競爭時,回滾數據
好進很差出,很容易進去,可是進去發現不少線程競爭相同的資源的時候,會須要回滾數據,好比要讀取xy,已經讀過了x,讀到y的時候發如今競爭,會從x從新讀。java
– 無鎖
是無障礙的
保證有一個線程能夠勝出
while (!atomicVar.compareAndSet(localVar, localVar+1)) {
localVar = atomicVar.get();
}
由於無障礙中,若是存在不斷的競爭,將會全部的都出不來,因此無鎖就須要每次競爭都能勝出一個,這樣保證程序可以順暢的執行下去。web
CAS算法的過程是這樣:它包含3個參數CAS(V,E,N)。V表示要更新的變量,E表示預期值,N表示新值。僅當V 值等於E值時,纔會將V的值設爲N,若是V值和E值不一樣,則說明已經有其餘線程作了更新,則當前線程什麼 都不作。最後,CAS返回當前V的真實值。CAS操做是抱着樂觀的態度進行的,它老是認爲本身能夠成功完成 操做。當多個線程同時使用CAS操做一個變量時,只有一個會勝出,併成功更新,其他均會失敗。失敗的線程 不會被掛起,僅是被告知失敗,而且容許再次嘗試,固然也容許失敗的線程放棄操做。基於這樣的原理,CAS 操做即時沒有鎖,也能夠發現其餘線程對當前線程的干擾,並進行恰當的處理。面試
CAS是一個原子操做,是由一條cpu指令完成的。算法
java中提供了不少無所類的使用,若是一個線程被掛起,將會消耗八萬個時光周期,可是若是是無鎖的,最多隻是循環,也就只會消耗幾個時光周期,因此無鎖的方式比阻塞的方式要好不少。express
cmpxchg /* accumulator = AL, AX, or EAX, depending on whether a byte, word, or doubleword comparison is being performed */ if(accumulator == Destination) { ZF = 1; //判斷是否和指望值相等,相等的話就給一個轉換標誌。同時進行轉換。 Destination = Source; } else { ZF = 0; //不相等的話就給一個不轉換的標誌。同時不轉換。 accumulator = Destination; }
這是一個原子操做是安全的。apache
Number數組
public final int get()//取得當前值
public final void set(int newValue)//設置當前值
public final int getAndSet(int newValue)//設置新值,並返回舊值
public final boolean compareAndSet(int expect, int u)//若是當前值爲expect,則設置爲u
public final int getAndIncrement() //當前值加1,返回舊值
public final int getAndDecrement()//當前值減1,返回舊值
public final int getAndAdd(int delta)//當前值增長delta,返回舊值
public final int incrementAndGet() //當前值加1,返回新值
public final int decrementAndGet() //當前值減1,返回新值
public final int addAndGet(int delta)//當前值增長delta,返回新值安全
compareAndSet方法多線程
//expect指望值,update更新的新值,成功返回true,失敗返回false public final boolean compareAndSet(int expect, int update) { return unsafe.compareAndSwapInt(this, valueOffset, expect, update); } //unsafe是不安全的,java將指針進行了屏蔽封裝,而unsafe會提供相似指針的操做,對這個類的偏移量上的指望值
偏移量valueOffset哪裏來的?併發
valueOffset = unsafe.objectFieldOffset
getAndIncrement方法
//返回當前值,而且加一 public final int getAndIncrement() { for(;;){ int current = get(); int next = current + 1; if(compareAndSet(current,next)) return current; } }
get是獲得當前這個類的private volatile int value;這個值加一,而後compareAndSet,若是當前的值和指望值相等的時候返回當前的這個值,不然,繼續循環,和無鎖的機制是同樣的。若是在判斷以前有其餘的線程拿到了current值,在下面的if將會失敗
import java.util.concurrent.atomic.AtomicInteger; public class Test { static AtomicInteger i = new AtomicInteger(); public static class AddThread implements Runnable{ public void run(){ for (int j = 0; j < 10000; j++) { i.incrementAndGet(); } } } public static void main(String[] args) throws InterruptedException { Thread[] ts = new Thread[10]; for (int k = 0; k < 10; k++) { ts[k] = new Thread(new AddThread()); } for (int k = 0; k < 10; k++) { ts[k].start(); } for (int k = 0; k < 10; k++) { ts[k].join(); } System.out.println(i); } }
非安全的操做,好比: 根據偏移量設置值 、park() 、底層的CAS操做
非公開API,在不一樣版本的JDK中, 可能有較大差別
static { try { valueOffset = unsafe.objectFieldOffset (AtomicInteger.class.getDeclaredField("value")); } catch (Exception ex) { throw new Error(ex); } }
拿到value這個字段在本類(至關於c中的首地址)中的偏移量是多少。
//得到給定對象偏移量上的int值
public native int getInt(Object o, long offset); //設置給定對象偏移量上的int值
public native void putInt(Object o, long offset, int x); //得到字段在對象中的偏移量
public native long objectFieldOffset(Field f); //設置給定對象的int值,使用volatile語義
public native void putIntVolatile(Object o, long offset, int x); //得到給定對象對象的int值,使用volatile語義
public native int getIntVolatile(Object o, long offset); //和putIntVolatile()同樣,可是它要求被操做字段就是volatile類型的 public native void putOrderedInt(Object o, long offset, int x);
對象的引用
對引用進行修改 是一個模板類,抽象化了數據類型
get()
set(V) compareAndSet() getAndSet(V)
大部分的方法和compareAndSet差很少。
例子:
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; public class Test { public final static AtomicReference<String> atomicStr = new AtomicReference<String>("abc"); public static void main(String[] args) throws InterruptedException { for (int i = 0; i < 10; i++) { final int num = i; new Thread() { public void run() { try { Thread.sleep(Math.abs((int)Math.random()*100)); } catch (InterruptedException e) { e.printStackTrace(); } if (atomicStr.compareAndSet(("abc"), "def")) { System.out.println("Thread"+Thread.currentThread().getId()+"change value to"+atomicStr.compareAndSet(("abc"), "def")); }else{ System.out.println("Thread"+Thread.currentThread().getId()+"FALED"); } } }.start(); } } }
和對象的引用差很少
stamped表示時間戳、惟一性等
ABA問題
如圖,一個值爲A,期初一個線程1拿到它,而後作相關的操做,這個時候線程2拿到這個A而且改成了B,線程3拿到了B改成了A,這個時候線程1將拿到的A和講過了線程2 線程3後的A進行比較,相同的話A就改成C。
發現是沒問題的,由於A仍是變爲了A,若是是加法,沒問題,但是若是比較關注中間的過程呢?好比網吧充錢,沒錢機器就自動衝,但是隻能自動充值一次,這個時候第二次仍是到了臨界點呢??
這個時候就須要給一個時間戳或者惟一的標識。每次改變的時候都傳遞一個包含A和時間戳的對象,每次在比價A的同時比較時間戳。
源碼解析:
//將原來的value作了一層封裝分爲了reference和stamp private static class Pair<T> { final T reference; final int stamp; private Pair(T reference, int stamp) { this.reference = reference; this.stamp = stamp; } static <T> Pair<T> of(T reference, int stamp) { return new Pair<T>(reference, stamp); } } //compareAndSet的參數也由原來的兩個,變爲了四個,包括指望值和新的值 public boolean compareAndSet(V expectedReference, V newReference, int expectedStamp, int newStamp) { Pair<V> current = pair; return //當兩個都相等的時候纔有機會向下執行 expectedReference == current.reference && expectedStamp == current.stamp && ((newReference == current.reference && newStamp == current.stamp) || casPair(current, Pair.of(newReference, newStamp))); } //jdk中常常會有函數叫作cas...來更新列表的頭部、尾部等操做 //pairOffset就是 private boolean casPair(Pair<V> cmp, Pair<V> val) { return UNSAFE.compareAndSwapObject(this, pairOffset, cmp, val); }
支持無鎖的數組
//得到數組第i個下標的元素
public final int get(int i)
//得到數組的長度
public final int length()
//將數組第i個下標設置爲newValue,並返回舊的值
public final int getAndSet(int i, int newValue) //進行CAS操做,若是第i個下標的元素等於expect,則設置爲update,設置成功返回true public final boolean compareAndSet(int i, int expect, int update)
//將第i個下標的元素加1
public final int getAndIncrement(int i) //將第i個下標的元素減1
public final int getAndDecrement(int i) //將第i個下標的元素增長delta(delta能夠是負數) public final int getAndAdd(int i, int delta)
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicIntegerArray; import java.util.concurrent.atomic.AtomicReference; public class Test { static AtomicIntegerArray arr = new AtomicIntegerArray(10); public static class AddThread implements Runnable { public void run() { for (int i = 0; i < 10000; i++) { arr.getAndIncrement(i % arr.length()); } } } public static void main(String[] args) throws InterruptedException { Thread[] ts = new Thread[10]; for (int i = 0; i < 10; i++) { ts[i] = new Thread((new AddThread())); } for (int i = 0; i < 10; i++) { ts[i].start(); } for (int i = 0; i < 10; i++) { ts[i].join(); } System.out.println(arr); } }
若是是不安全的話,不會每一個都是1000,應該比這個小。
private static final int base = unsafe.arrayBaseOffset(int[].class); public final int get(int i) { return getRaw(checkedByteOffset(i)); } private int getRaw(long offset) { //數組所在的基地址開始取offset的偏移量 return unsafe.getIntVolatile(array, offset); } //返回第i個元素在數組中的偏移量是多少 private long checkedByteOffset(int i) { if (i < 0 || i >= array.length) throw new IndexOutOfBoundsException("index " + i); return byteOffset(i); } private static long byteOffset(int i) { //i偏移了shift的數值 return ((long) i << shift) + base; //經過下面的計算 i左移兩位(二進制,即在末尾加兩個零 //,若是是十進制就是乘以4) } static { //數組中每一個元素有多寬,int就是4(每一個int是4個byte) int scale = unsafe.arrayIndexScale(int[].class); if ((scale & (scale - 1)) != 0) throw new Error("data type scale not a power of two"); //shift的值 //numberOfLeadingZeros前導零,一個數字化成二進制 //前面的零的個數 //4------> 00000....100 32-3=29 //4的前導零就是29 shift = 31 - Integer.numberOfLeadingZeros(scale); //因此shift=2 }
讓普通變量也享受原子操做
但願擁有原子操做,可是不改變原原子的類型。
使用盡可能少的代碼、在不改變原來類型的基礎上、作出較少的改變來實現原子操做。
AtomicIntegerFieldUpdate.newUpdate()
incrementAndGet()
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicIntegerArray; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicReference; public class Test { public static class Candidate{ int id; volatile int score; } public final static AtomicIntegerFieldUpdater<Candidate> scoreUpdater = AtomicIntegerFieldUpdater.newUpdater(Candidate.class, "score"); public static AtomicInteger allScore = new AtomicInteger(); public static void main(String[] args) throws InterruptedException { final Candidate stu = new Candidate(); Thread[] threads = new Thread[10000]; for (int i = 0; i <10000 ; i++) { threads[i] = new Thread(){ public void run(){ if (Math.random()>0.4){ scoreUpdater.incrementAndGet(stu); allScore.incrementAndGet(); } } }; threads[i].start(); } for (int i = 0; i <10000 ; i++) { threads[i].join(); } System.out.println("sore="+stu.score); System.out.println("allScore="+allScore); } }
經過AtomicInteger來驗證,發現是相同的安全的結果,主要就是上面提到的兩個方法,經過反射實現的。
jdk中的vector是有鎖的。
源碼解析:
/** * Appends the specified element to the end of this Vector. * * @param e element to be appended to this Vector * @return {@code true} (as specified by {@link Collection#add}) * @since 1.2 */ public synchronized boolean add(E e) { //記錄vector被修改的次數 modCount++; // vector底層是數組 // 判斷時候越界,若是越界了就進行擴展,擴展代碼在下面 ensureCapacityHelper(elementCount + 1); //不越界就將e加在後面 elementData[elementCount++] = e; return true; } // 是一個同步的方法,每次只有一個線程能進行add操做,全部的元素都保存在elementData 中 private void ensureCapacityHelper(int minCapacity) { // overflow-conscious code if (minCapacity - elementData.length > 0) grow(minCapacity); } private void grow(int minCapacity) { // overflow-conscious code int oldCapacity = elementData.length; //擴展增量(擴容)capacityIncrement能夠本身制定,若是本身不指定的話就 //是默認的oldCapacity+oldCapacity------>newCapacity也就是 //乘以二 int newCapacity = oldCapacity + ((capacityIncrement > 0) ? capacityIncrement : oldCapacity); if (newCapacity - minCapacity < 0) newCapacity = minCapacity; if (newCapacity - MAX_ARRAY_SIZE > 0) newCapacity = hugeCapacity(minCapacity); //把老元素放到新的元素中去 elementData = Arrays.copyOf(elementData, newCapacity); } //建立了一個新的數組,把原來的數組複製過去。 public static <T,U> T[] copyOf(U[] original, int newLength, Class<? extends T[]> newType) { @SuppressWarnings("unchecked") T[] copy = ((Object)newType == (Object)Object[].class) ? (T[]) new Object[newLength] : (T[]) Array.newInstance(newType.getComponentType(), newLength); System.arraycopy(original, 0, copy, 0, Math.min(original.length, newLength)); return copy; }
注意看裏面的註釋很重要:
提取出來幾個
vector底層是數組
判斷時候越界,若是越界了就進行擴展,
add是一個同步的方法(有鎖的),每次只有一個線程能進行add操做
擴展增量(擴容)capacityIncrement能夠本身制定,若是本身不指定的話就
是默認的oldCapacity+oldCapacity------>newCapacity也就是
乘以二(有次面試我被問過)!!!因此當擴容的數字很大的時候,建議給個擴容的大小
把老元素放到新的元素中去
建立了一個新的數組,把原來的數組複製過去。
這裏只將源碼貼出來
和普通的vector實現的區別是原來的vector是一維數組,而這裏的數組是二維的數組,爲何用二維數組?二維數組中的第一個容量是n,第二個就是2n,第三個就是4n。。。。以此類推,就像是一個個不一樣容量的籃子。
FIRST_BUCKET_SIZE給定第一個籃子的大小
N_BUCKET有多少個籃子
private final AtomicReferenceArray<AtomicReferenceArray> buckets;
經過AtomicReferenceArray的二重數組來封裝這些籃子。AtomicReferenceArray和AtomicArray差很少只是將array的值換成了對象。
有個分析的很不錯的博文:
http://reimuwang.org/2018/05/17/Java 併發-無鎖的Vector實現/
/* * Copyright (c) 2007 IBM Corporation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package main.java.org.amino.ds.lockfree; import java.util.AbstractList; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReferenceArray; /** * It is a thread safe and lock-free vector. * This class implement algorithm from:<br> * * Lock-free Dynamically Resizable Arrays <br> * * Damian Dechev, Peter Pirkelbauer, and Bjarne Stroustrup<br> * Texas A&M University College Station, TX 77843-3112<br> * {dechev, peter.pirkelbauer}@tamu.edu, bs@cs.tamu.edu * * * @author Zhi Gan * * @param <E> type of element in the vector * */ public class LockFreeVector<E> extends AbstractList<E> { private static final boolean debug = false; /** * Size of the first bucket. sizeof(bucket[i+1])=2*sizeof(bucket[i]) */ private static final int FIRST_BUCKET_SIZE = 8; /** * number of buckets. 30 will allow 8*(2^30-1) elements */ private static final int N_BUCKET = 30; /** * We will have at most N_BUCKET number of buckets. And we have * sizeof(buckets.get(i))=FIRST_BUCKET_SIZE**(i+1) */ private final AtomicReferenceArray<AtomicReferenceArray<E>> buckets; /** * @author ganzhi * * @param <E> */ static class WriteDescriptor<E> { public E oldV; public E newV; public AtomicReferenceArray<E> addr; public int addr_ind; /** * Creating a new descriptor. * * @param addr Operation address * @param addr_ind Index of address * @param oldV old operand * @param newV new operand */ public WriteDescriptor(AtomicReferenceArray<E> addr, int addr_ind, E oldV, E newV) { this.addr = addr; this.addr_ind = addr_ind; this.oldV = oldV; this.newV = newV; } /** * set newV. */ public void doIt() { addr.compareAndSet(addr_ind, oldV, newV); } } /** * @author ganzhi * * @param <E> */ static class Descriptor<E> { public int size; volatile WriteDescriptor<E> writeop; /** * Create a new descriptor. * * @param size Size of the vector * @param writeop Executor write operation */ public Descriptor(int size, WriteDescriptor<E> writeop) { this.size = size; this.writeop = writeop; } /** * */ public void completeWrite() { WriteDescriptor<E> tmpOp = writeop; if (tmpOp != null) { tmpOp.doIt(); writeop = null; // this is safe since all write to writeop use // null as r_value. } } } private AtomicReference<Descriptor<E>> descriptor; private static final int zeroNumFirst = Integer .numberOfLeadingZeros(FIRST_BUCKET_SIZE);; /** * Constructor. */ public LockFreeVector() { buckets = new AtomicReferenceArray<AtomicReferenceArray<E>>(N_BUCKET); buckets.set(0, new AtomicReferenceArray<E>(FIRST_BUCKET_SIZE)); descriptor = new AtomicReference<Descriptor<E>>(new Descriptor<E>(0, null)); } /** * add e at the end of vector. * * @param e * element added */ public void push_back(E e) { Descriptor<E> desc; Descriptor<E> newd; do { desc = descriptor.get(); desc.completeWrite(); //desc.size Vector 自己的大小 //FIRST_BUCKET_SIZE 第一個一位數組的大小 int pos = desc.size + FIRST_BUCKET_SIZE; int zeroNumPos = Integer.numberOfLeadingZeros(pos); // 取出pos 的前導領 //zeroNumFirst 爲FIRST_BUCKET_SIZE 的前導領 int bucketInd = zeroNumFirst - zeroNumPos; //哪一個一位數組 //判斷這個一維數組是否已經啓用 if (buckets.get(bucketInd) == null) { //newLen 一維數組的長度 int newLen = 2 * buckets.get(bucketInd - 1).length(); if (debug) System.out.println("New Length is:" + newLen); buckets.compareAndSet(bucketInd, null, new AtomicReferenceArray<E>(newLen)); } int idx = (0x80000000>>>zeroNumPos) ^ pos; //在這個一位數組中,我在哪一個位置 newd = new Descriptor<E>(desc.size + 1, new WriteDescriptor<E>( buckets.get(bucketInd), idx, null, e)); } while (!descriptor.compareAndSet(desc, newd)); descriptor.get().completeWrite(); } /** * Remove the last element in the vector. * * @return element removed */ public E pop_back() { Descriptor<E> desc; Descriptor<E> newd; E elem; do { desc = descriptor.get(); desc.completeWrite(); int pos = desc.size + FIRST_BUCKET_SIZE - 1; int bucketInd = Integer.numberOfLeadingZeros(FIRST_BUCKET_SIZE) - Integer.numberOfLeadingZeros(pos); int idx = Integer.highestOneBit(pos) ^ pos; elem = buckets.get(bucketInd).get(idx); newd = new Descriptor<E>(desc.size - 1, null); } while (!descriptor.compareAndSet(desc, newd)); return elem; } /** * Get element with the index. * * @param index * index * @return element with the index */ @Override public E get(int index) { int pos = index + FIRST_BUCKET_SIZE; int zeroNumPos = Integer.numberOfLeadingZeros(pos); int bucketInd = zeroNumFirst - zeroNumPos; int idx = (0x80000000>>>zeroNumPos) ^ pos; return buckets.get(bucketInd).get(idx); } /** * Set the element with index to e. * * @param index * index of element to be reset * @param e * element to set */ /** * {@inheritDoc} */ public E set(int index, E e) { int pos = index + FIRST_BUCKET_SIZE; int bucketInd = Integer.numberOfLeadingZeros(FIRST_BUCKET_SIZE) - Integer.numberOfLeadingZeros(pos); int idx = Integer.highestOneBit(pos) ^ pos; AtomicReferenceArray<E> bucket = buckets.get(bucketInd); while (true) { E oldV = bucket.get(idx); if (bucket.compareAndSet(idx, oldV, e)) return oldV; } } /** * reserve more space. * * @param newSize * new size be reserved */ public void reserve(int newSize) { int size = descriptor.get().size; int pos = size + FIRST_BUCKET_SIZE - 1; int i = Integer.numberOfLeadingZeros(FIRST_BUCKET_SIZE) - Integer.numberOfLeadingZeros(pos); if (i < 1) i = 1; int initialSize = buckets.get(i - 1).length(); while (i < Integer.numberOfLeadingZeros(FIRST_BUCKET_SIZE) - Integer.numberOfLeadingZeros(newSize + FIRST_BUCKET_SIZE - 1)) { i++; initialSize *= FIRST_BUCKET_SIZE; buckets.compareAndSet(i, null, new AtomicReferenceArray<E>( initialSize)); } } /** * size of vector. * * @return size of vector */ public int size() { return descriptor.get().size; } /** * {@inheritDoc} */ @Override public boolean add(E object) { push_back(object); return true; } }