無 鎖 算法 詳 解
無 鎖 的Vector 實現:
參照着JDK中的 Vector 源碼
一、Vector中的 add 方法的實現,它是一個同步方法,因此保證了每一次只能又一個值對數組 elementData 進行操做。
protected Object[] elementData; 經過數據來實現存儲
protected int elementCount; 記錄對這個Vector的操做數html
public synchronized boolean add(E e) {
modCount++;
ensureCapacityHelper(elementCount + 1);//這邊是作越界判斷
elementData[elementCount++] = e;
return true;
}算法
private void ensureCapacityHelper(int minCapacity) {
// overflow-conscious code
if (minCapacity - elementData.length > 0)
grow(minCapacity);//若是沒有越界
}數組
private void grow(int minCapacity) {
// overflow-conscious code
int oldCapacity = elementData.length;
int newCapacity = oldCapacity + ((capacityIncrement > 0) ?
capacityIncrement : oldCapacity);
//若是初始化的時候不指定增量capacityIncrement,那麼就是將oldCapacity+oldCapacity賦值給新的長度,若是指定增量那麼就是oldCapacity+capacityIncrement
if (newCapacity - minCapacity < 0)
newCapacity = minCapacity;
if (newCapacity - MAX_ARRAY_SIZE > 0)
newCapacity = hugeCapacity(minCapacity);
elementData = Arrays.copyOf(elementData, newCapacity);
//最後將老的元素和新的一塊兒加入到Vector中
}
public static <T> T[] copyOf(T[] original, int newLength) {
return (T[]) copyOf(original, newLength, original.getClass());
}
值得注意的一點就是若是指定增量,那麼能夠減小空間的浪費。併發
JDK閱讀只是爲未無鎖的Vector作鋪墊ide
無鎖的Vector的實現高併發
/**性能
@date
/
public class LockFreeVector<E> extends AbstractList<E>{
private static final boolean debug = false;
private static final int FIRST_BUCKET_SIZE = 8;
//雖然這邊籃子的個數是固定的,可是絕對是夠用的由於總共的籃子數就是82^30-1
private static final int N_BUCKET = 30;
private final AtomicReferenceArray<AtomicReferenceArray<E>> buckets;
//利用二維數組來嵌套是爲了使一維的AtomicReferenceArray儘可能避免修改,在一維數組填充滿了時是不會去擴充的,
// 而是往二維數組裏面填充,這樣就避免了修改一維數組,並且高併發就是避免沒必要要的寫來影響性能
public LockFreeVector(AtomicReferenceArray<AtomicReferenceArray<E>> buckets) {
this.buckets = buckets;
}
static class WriteDescriptor<E>{
public E oldV;
public E newV;
public AtomicReferenceArray<E> addr;
public int addr_ind;this
public WriteDescriptor( AtomicReferenceArray<E> addr, int addr_ind,E oldV, E newV) { this.oldV = oldV; this.newV = newV; this.addr = addr; this.addr_ind = addr_ind; } public void doInt(){ addr.compareAndSet(addr_ind,oldV,newV); }
}
static class Descriptor<E>{
public int size;
volatile WriteDescriptor<E> writeOp;.net
public Descriptor(int size, WriteDescriptor<E> writeOp) { this.size = size; this.writeOp = writeOp; } public void completeWrite(){ WriteDescriptor<E> tmpOp = writeOp; if(tmpOp != null){ tmpOp.doInt(); writeOp=null; } }
}線程
private AtomicReference<Descriptor<E>> descriptor;
private static final int zeroNumFirst = Integer.numberOfLeadingZeros(FIRST_BUCKET_SIZE);
public LockFreeVector(){
buckets = new AtomicReferenceArray<AtomicReferenceArray<E>>(N_BUCKET);
buckets.set(0,new AtomicReferenceArray<E>(FIRST_BUCKET_SIZE));
descriptor = new AtomicReference<Descriptor<E>>(new Descriptor<E>(0,null));
}
public void push_back(E e){
Descriptor<E> desc;
Descriptor<E> newD;
do { desc=descriptor.get(); desc.completeWrite(); int pos = desc.size+FIRST_BUCKET_SIZE; int zeroNumPos = Integer.numberOfLeadingZeros(pos); int bucketInd = zeroNumFirst - zeroNumPos; if(buckets.get(bucketInd)==null){ int newLen = 2 * buckets.get(bucketInd - 1).length(); if (debug) System.out.println("New Length is:"+newLen); buckets.compareAndSet(bucketInd,null,new AtomicReferenceArray<E>(newLen)); } //0x80000000就是1000000... 總共32位 int idx = (0x80000000>>>zeroNumPos) ^ pos; newD = new Descriptor<>(desc.size + 1,new WriteDescriptor<E>(buckets.get(bucketInd),idx,null, e)); }while (!descriptor.compareAndSet(desc,newD)); descriptor.get().completeWrite();
}
public E pop_back(){
Descriptor<E> desc;
Descriptor<E> newD;
E elem;
do {
desc = descriptor.get();
desc.completeWrite();
int pos = desc.size + FIRST_BUCKET_SIZE - 1;
int bucketInd = Integer.numberOfLeadingZeros(FIRST_BUCKET_SIZE) - Integer.numberOfLeadingZeros(pos);
int idx = Integer.highestOneBit(pos) ^ pos;
elem = buckets.get(bucketInd).get(idx);
newD = new Descriptor<E>(desc.size - 1,null);
}while (!descriptor.compareAndSet(desc,newD));
return elem;
}
@Override
public E get(int index){
int pos = index + FIRST_BUCKET_SIZE;
int zeroNumPos = Integer.numberOfLeadingZeros(pos);
int bucketInd = zeroNumFirst - zeroNumPos;
int idx = (0x80000000>>>zeroNumPos) ^ pos;
return buckets.get(bucketInd).get(idx);
}
@Override
public E set(int index,E e){
int pos = index + FIRST_BUCKET_SIZE;
int bucketInd = Integer.numberOfLeadingZeros(FIRST_BUCKET_SIZE) - Integer.numberOfLeadingZeros(pos);
int idx = Integer.highestOneBit(pos) ^ pos;
AtomicReferenceArray<E> bucket = buckets.get(bucketInd);
while (true){
E oldV = bucket.get(idx);
if (bucket.compareAndSet(idx,oldV,e))
return oldV;
}
}
public void reserve(int newSize){
int size = descriptor.get().size;
int pos = size + FIRST_BUCKET_SIZE - 1;
int i = Integer.numberOfLeadingZeros(FIRST_BUCKET_SIZE) - Integer.numberOfLeadingZeros(pos);
if (i<1) i = 1; int initialSize = buckets.get(i - 1).length(); while (i < Integer.numberOfLeadingZeros(FIRST_BUCKET_SIZE) - Integer.numberOfLeadingZeros(newSize + FIRST_BUCKET_SIZE - 1)){ i++; initialSize *= FIRST_BUCKET_SIZE; buckets.compareAndSet(i, null , new AtomicReferenceArray<E>(initialSize)); }
}
public int size(){
return descriptor.get().size;
}
@Override
public boolean add(E obj){
push_back(obj);
return true;
}
}
它的結構是:private final AtomicReferenceArray<AtomicReferenceArray<E>> buckets;
從這裏咱們能夠看到,它的內部是採用的是 無鎖的引用數組, 數組嵌套數組
變量buckets存放全部的內部元素。從定義上看,它是一個保存着數組的數組,也就是一般的二維數組。特別之處在於這些數組都是使用CAS的原子數組。爲何使用二維數組去實現一個一維的Vector呢?這是爲了未來Vector進行動態擴展時能夠更加方便。咱們知道,AtomicReferenceArray內部使用Object[]來進行實際數據的存儲,這使得動態空間增長特別的麻煩,所以使用二維數組的好處就是爲未來增長新的元素。
至關於一個二維數組,它的大小能夠動態的進行擴展,
爲了更有序的讀寫數組,定義了一個Descriptor的靜態內部類。它的做用是使用CAS操做寫入新數據。
它定義了
private static final int FIRST_BUCKET_SIZE = 8;
/**
FIRST_BUCKET_SIZE:爲第一個數組的長度
N_BUCKET 整個二維數組最大可擴轉至30
每次的擴展是成倍的增長,即:第一個數組長度爲8,第二個爲8<<1,第三個爲8<<2 ......第30個爲 8<<29
3. push_back
在第23行,使用descriptor將數據真正地寫入數組中。這個descriptor寫入的數據由20~21行構造的WriteDescriptor決定。
在循環最開始(第5行),使用descriptor先將數據寫入數組,是爲了防止上一個線程設置完descriptor後(22行),還沒來得及執行第23行的寫入,所以,作一次預防性的操做。
第8~10行經過當前Vector的大小(desc.size),計算新的元素應該落入哪一個數組。這裏使用了位運算進行計算。
LockFreeVector每次都會擴容。它的第一個數組長度爲8,第2個就是16,第3個就是32,依次類推。它們的二進制表示以下:
它們之和就是整個LockFreeVector的總大小,所以,若是每個數組都剛好填滿,那麼總大小應該相似以下的值(以4個數組爲例)00000000 00000000 00000000 01111000:4個數組都剛好填滿時的大小。
致使這個數字進位的最小條件,就是加上二進制的1000。而這個數字整好是8(FIRST_BUCKET_SIZE就是8)這就是第8行代碼的意義。
它可使得數組大小發生一次二進制進位(若是不進位說明還在第一個數組中),進位後前導零的數量就會發生變化。而元素所在的數組,和pos(第8行定義的比變量)的前導零直接相關。每進行一次數組擴容,它的前導零就會減1。若是歷來沒有擴容過,它的前導零就是28個。之後,逐級減1。這就是第9行得到pos前導零的緣由。第10行,經過pos的前導零能夠當即定位使用哪一個數組(也就是獲得了bucketInd的值)。
第11行,判斷這個數組是否存在。若是不存在,則建立這個數組,大小爲前一個數組的兩倍,並把它設置到buckets中。
接着再看一下元素沒有剛好填滿的狀況:
那麼總大小以下:
總個數加上二進制1000後,獲得:
顯然,經過前導零能夠定位到第4個數組。而剩餘位,顯然就表示元素在當前數組內偏移量(也就是數組下標)。根據這個理論,就能夠經過pos計算這個元素應該放在給定數組的哪一個位置。經過第19行代碼,得到pos的除了第一位數字1之外的其餘位的數值。所以,pos的前導零能夠表示元素所在的數組,而pos的後面幾位,則表示元素所在這個數組中的位置。由此,第19行代碼就取得了元素所在位置idx。
代碼理解能夠參考:
https://blog.csdn.net/netcobol/article/details/79785651
和
http://www.shaoqun.com/a/197387.html