併發編程之:數據結構

1.併發List

併發list有三種使用模式java

第一種使用Collections.synchronizedList方法進行包裝:

List arrayList = new ArrayList();
//使用Collections.synchronizedList方法進行包裝
List list = Collections.synchronizedList(arrayList);

咱們進入該包裝方法內部查看源碼發現,其實是有新建立了一個SynchronizedList對象,而SynchronizedList類對接對原有ArrayList類對象進行了包裝,並對原對象的操做加上了同步控制。源碼以下:數組

static class SynchronizedList<E>
    extends SynchronizedCollection<E>
    implements List<E> {
    private static final long serialVersionUID = -7754090372962971524L;

    final List<E> list;

    SynchronizedList(List<E> list) {
        super(list);
        this.list = list;
    }
    SynchronizedList(List<E> list, Object mutex) {
        super(list, mutex);
        this.list = list;
    }

    public boolean equals(Object o) {
        if (this == o)
            return true;
        synchronized (mutex) {return list.equals(o);}
    }
    public int hashCode() {
        synchronized (mutex) {return list.hashCode();}
    }

    public E get(int index) {
        synchronized (mutex) {return list.get(index);}
    }
    public E set(int index, E element) {
        synchronized (mutex) {return list.set(index, element);}
    }
    public void add(int index, E element) {
        synchronized (mutex) {list.add(index, element);}
    }
    public E remove(int index) {
        synchronized (mutex) {return list.remove(index);}
    }

    public int indexOf(Object o) {
        synchronized (mutex) {return list.indexOf(o);}
    }
    public int lastIndexOf(Object o) {
        synchronized (mutex) {return list.lastIndexOf(o);}
    }

    public boolean addAll(int index, Collection<? extends E> c) {
        synchronized (mutex) {return list.addAll(index, c);}
    }

    public ListIterator<E> listIterator() {
        return list.listIterator(); // Must be manually synched by user
    }

    public ListIterator<E> listIterator(int index) {
        return list.listIterator(index); // Must be manually synched by user
    }

    public List<E> subList(int fromIndex, int toIndex) {
        synchronized (mutex) {
            return new SynchronizedList<>(list.subList(fromIndex, toIndex),
                                        mutex);
        }
    }

    @Override
    public void replaceAll(UnaryOperator<E> operator) {
        synchronized (mutex) {list.replaceAll(operator);}
    }
    @Override
    public void sort(Comparator<? super E> c) {
        synchronized (mutex) {list.sort(c);}
    }

    /**
     * SynchronizedRandomAccessList instances are serialized as
     * SynchronizedList instances to allow them to be deserialized
     * in pre-1.4 JREs (which do not have SynchronizedRandomAccessList).
     * This method inverts the transformation.  As a beneficial
     * side-effect, it also grafts the RandomAccess marker onto
     * SynchronizedList instances that were serialized in pre-1.4 JREs.
     *
     * Note: Unfortunately, SynchronizedRandomAccessList instances
     * serialized in 1.4.1 and deserialized in 1.4 will become
     * SynchronizedList instances, as this method was missing in 1.4.
     */
    private Object readResolve() {
        return (list instanceof RandomAccess
                ? new SynchronizedRandomAccessList<>(list)
                : this);
    }
}

第二種使用List的同步實現類Vector:

咱們經過查看Vector源碼發現,Vector類實現了List接口,並實現的接口方法都是同步的方式實現,因爲Vector中的實現方法是同步方法必然會對元素存儲效率產生嚴重影響。如get/set方法代碼片斷安全

/**
 * Returns the element at the specified position in this Vector.
 *
 * @param index index of the element to return
 * @return object at the specified index
 * @throws ArrayIndexOutOfBoundsException if the index is out of range
 *            ({@code index < 0 || index >= size()})
 * @since 1.2
 */
public synchronized E get(int index) {
    if (index >= elementCount)
        throw new ArrayIndexOutOfBoundsException(index);

    return elementData(index);
}

/**
 * Replaces the element at the specified position in this Vector with the
 * specified element.
 *
 * @param index index of the element to replace
 * @param element element to be stored at the specified position
 * @return the element previously at the specified position
 * @throws ArrayIndexOutOfBoundsException if the index is out of range
 *         ({@code index < 0 || index >= size()})
 * @since 1.2
 */
public synchronized E set(int index, E element) {
    if (index >= elementCount)
        throw new ArrayIndexOutOfBoundsException(index);

    E oldValue = elementData(index);
    elementData[index] = element;
    return oldValue;
}

/**
 * Appends the specified element to the end of this Vector.
 *
 * @param e element to be appended to this Vector
 * @return {@code true} (as specified by {@link Collection#add})
 * @since 1.2
 */
public synchronized boolean add(E e) {
    modCount++;
    ensureCapacityHelper(elementCount + 1);
    elementData[elementCount++] = e;
    return true;
}

第三種使用List實現類CopyOnWriteArrayList:

CopyOnWriteArrayList的內部實現與Vector不一樣,當對象進行讀操做時,直接返回結果,操做中不進行同步控制;當進行寫操做時會在該方法上面加上鎖,而後複製一個副本列表,對副本進行修改,最後將副本寫回原來列表。數據結構

其get/set/add方法源碼片斷以下多線程

/**
 * {@inheritDoc}
 *
 * @throws IndexOutOfBoundsException {@inheritDoc}
 */
public E get(int index) {
    return get(getArray(), index);
}

/**
 * Replaces the element at the specified position in this list with the
 * specified element.
 *
 * @throws IndexOutOfBoundsException {@inheritDoc}
 */
public E set(int index, E element) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        Object[] elements = getArray();
        E oldValue = get(elements, index);

        if (oldValue != element) {
            int len = elements.length;
            Object[] newElements = Arrays.copyOf(elements, len);
            newElements[index] = element;
            setArray(newElements);
        } else {
            // Not quite a no-op; ensures volatile write semantics
            setArray(elements);
        }
        return oldValue;
    } finally {
        lock.unlock();
    }
}

/**
 * Appends the specified element to the end of this list.
 *
 * @param e element to be appended to this list
 * @return {@code true} (as specified by {@link Collection#add})
 */
public boolean add(E e) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        Object[] elements = getArray();
        int len = elements.length;
        Object[] newElements = Arrays.copyOf(elements, len + 1);
        newElements[len] = e;
        setArray(newElements);
        return true;
    } finally {
        lock.unlock();
    }
}

老是所知CopyOnWriteArrayList只是對修改進行了同步控制,因此在讀操做上面比上面兩種方式要快一些。併發

2.併發set

第一種方式也是有Collections.synchronizedSet方法進行包裝:

Set hashSet = new HashSet();
//Collections.synchronizedSet進行包裝
Set set = Collections.synchronizedSet(hashSet);

包裝以後新生產一個包裝了原對象的SynchronizedSet對象,SynchronizedSet對象的讀寫操做仍是對原有的set進行讀寫,只是在包裝的讀寫方法上面加上了同步控制。app

第二種和list比較類似,併發set也有一個CopyOnWriteArraySet實現:

它實現了set接口,而且是線程安全的。它的內部實現徹底依賴於CopyOnWriteArrayList咱們能夠查看源碼dom

public class CopyOnWriteArraySet<E> extends AbstractSet<E>
        implements java.io.Serializable {
    private static final long serialVersionUID = 5457747651344034263L;

    private final CopyOnWriteArrayList<E> al;

    /**
     * Creates an empty set.
     */
    public CopyOnWriteArraySet() {
        al = new CopyOnWriteArrayList<E>();
    }

    /**
     * Creates a set containing all of the elements of the specified
     * collection.
     *
     * @param c the collection of elements to initially contain
     * @throws NullPointerException if the specified collection is null
     */
    public CopyOnWriteArraySet(Collection<? extends E> c) {
        if (c.getClass() == CopyOnWriteArraySet.class) {
            @SuppressWarnings("unchecked") CopyOnWriteArraySet<E> cc =
                (CopyOnWriteArraySet<E>)c;
            al = new CopyOnWriteArrayList<E>(cc.al);
        }
        else {
            al = new CopyOnWriteArrayList<E>();
            al.addAllAbsent(c);
        }
    }

    /**
     * Returns the number of elements in this set.
     *
     * @return the number of elements in this set
     */
    public int size() {
        return al.size();
    }

    /**
     * Returns {@code true} if this set contains no elements.
     *
     * @return {@code true} if this set contains no elements
     */
    public boolean isEmpty() {
        return al.isEmpty();
    }

    public boolean contains(Object o) {
        return al.contains(o);
    }

    /**
     */
    public Object[] toArray() {
        return al.toArray();
    }

    /**
     */
    public <T> T[] toArray(T[] a) {
        return al.toArray(a);
    }

    /**
     */
    public void clear() {
        al.clear();
    }

    /**
     */
    public boolean remove(Object o) {
        return al.remove(o);
    }

    /**
     */
    public boolean add(E e) {
        return al.addIfAbsent(e);
    }

    /**
     */
    public boolean containsAll(Collection<?> c) {
        return al.containsAll(c);
    }

    /**
     */
    public boolean addAll(Collection<? extends E> c) {
        return al.addAllAbsent(c) > 0;
    }

    /**
     */
    public boolean removeAll(Collection<?> c) {
        return al.removeAll(c);
    }

    /**
     */
    public boolean retainAll(Collection<?> c) {
        return al.retainAll(c);
    }

    /**
     */
    public Iterator<E> iterator() {
        return al.iterator();
    }

    /**
     */
    public boolean equals(Object o) {
        if (o == this)
            return true;
        if (!(o instanceof Set))
            return false;
        Set<?> set = (Set<?>)(o);
        Iterator<?> it = set.iterator();

        // Uses O(n^2) algorithm that is only appropriate
        // for small sets, which CopyOnWriteArraySets should be.

        //  Use a single snapshot of underlying array
        Object[] elements = al.getArray();
        int len = elements.length;
        // Mark matched elements to avoid re-checking
        boolean[] matched = new boolean[len];
        int k = 0;
        outer: while (it.hasNext()) {
            if (++k > len)
                return false;
            Object x = it.next();
            for (int i = 0; i < len; ++i) {
                if (!matched[i] && eq(x, elements[i])) {
                    matched[i] = true;
                    continue outer;
                }
            }
            return false;
        }
        return k == len;
    }

    public boolean removeIf(Predicate<? super E> filter) {
        return al.removeIf(filter);
    }

    public void forEach(Consumer<? super E> action) {
        al.forEach(action);
    }

    /**
     */
    public Spliterator<E> spliterator() {
        return Spliterators.spliterator
            (al.getArray(), Spliterator.IMMUTABLE | Spliterator.DISTINCT);
    }

    /**
     */
    private static boolean eq(Object o1, Object o2) {
        return (o1 == null) ? o2 == null : o1.equals(o2);
    }
}

3.併發Map

一樣適用Map時也能夠經過Collections.synchronizedMap進行包裝實現一個線程安全的Map,可是這不是最優的實現方式,因爲Map是一個咱們適用至關頻繁的數據結構,因此JDK爲提供了一個專用於高併發的Map實現ConcurrentHashMap,它的內部實現了鎖分離,爲多線程併發下高性能提供了保證。ide

ConcurrentHashMap是專門爲線程併發設計的Map,它的get方法是無鎖的,它的put方法操做的鎖力度又小於同步的map因此總體性能高於同步的。高併發

4.併發Queue

在併發隊列上,JDK提供了兩套實現,一個是以ConcurrentListQueue爲表明的高性能隊列,一個是以BlockingQueue爲表明的阻塞隊列。他們都繼承自Queue接口。

ConcurrentListQueue:

是一個適合於高併發場景下的隊列。它經過無鎖的方式,實現了高併發狀態下的高性能。

BlockingQueue:

是阻塞隊列BlockingQueue的表明實現,主要應用場景是生產者消費者的共享數據。因爲LinkedBlockingQueue提供一種阻塞等待的機制,好比當消費者速度快於生產者時隊列中的消息很快就會被清空,此時消費者再從隊列中取數據時就會被阻塞等待。同理當生產者速度快於消費者時,隊列很快就會被放滿數據此時再往隊列裏面放數據時生產者就會被阻塞等待。

BlockingQueue隊列主要實現方法:

offer(object):將object添加到BlockingQueue裏,若是隊列有足夠空間就返回true,不然返回false(該方法不會阻塞當前執行的線程)。

offer(object,timeout,timeunit):是阻塞隊列特有的方法,將object添加到BlockingQueue裏,若是隊列有足夠空間則當即返回true,若是隊列沒有足夠空間則堵塞timeout時間段,若是還添加不進去返回fase。

put(object):把object添加到隊裏,若是隊列已滿則該線程會阻塞直到隊列有空間加進去。

poll():從隊列裏面取出元素,若是沒有返回null,該方法不會阻塞線程。

poll(timeout,timeunit):從隊列裏面取出元素。若是隊裏裏面沒有元素會阻塞timeout時間段,若是超時尚未數據可取則返回null。

take():從隊列裏面取出元素,若是隊列裏面沒有元素則線程會阻塞直到有新元素加入隊列。

drainTo(list):將隊列裏面的數據一次性所有取出到list裏面,該方法能夠一次性取出全部數據到list中不用進行堵塞,能夠提升數據提取效率。

ArrayBlockingQueue:

是一種基於數組的阻塞隊列實現,內部維護了一個定長數組用於換成數據對象。另外還保存着兩個整形變量分別標記着隊列的頭部和尾部在數組中的位置。

LinkedBlockingQueue:

是一種基於鏈表的阻塞隊列實現。

5.併發Deque(Double-Ended Queue)雙端隊列

deque是一個能夠在隊列的頭部或尾部進行入隊和出隊操做,新增操做以下:

有三個實現類:LinkedList、ArrayDeque、LinkedBlockingDeque。

LinkedList使用鏈表實現的雙端隊列,ArrayDeque使用數組實現的雙端隊列。一般狀況下因爲ArrayDeque基於數組實現,具備高效的隨機訪問性能,所以ArrayDeque具備更好的遍歷性能。可是當隊列大小變化較大時ArrayDeque須要從新分配內存,進行數組複製,在這種狀況下基於鏈表的LinkedList具備更好的表現。可是不管LinkedList或是ArrayDeque都不是線程安全的。

LinkedBlockingDeque是一個線程安全的雙端隊列實現。LinkedBlockingDeque是一個鏈表結構,每個隊列節點都維護一個前驅節點和一個後驅節點。LinkedBlockingDeque沒有進行讀寫鎖分離,所以在高併發使用中其性能要低於LinkedBlockingQueue,更低於ConCurrentLinkedQueue。

相關文章
相關標籤/搜索