java併發編程實戰一之基礎篇

緩存一致性問題

計算機在執行程序時,每條指令都是在CPU中執行的,而執行指令過程當中,勢必涉及到數據的讀取和寫入。因爲程序運行過程當中的臨時數據是存放在主存(物理內存)當中的,這時就存在一個問題,因爲CPU執行速度很快,而從內存讀取數據和向內存寫入數據的過程跟CPU執行指令的速度比起來要慢的多,所以若是任什麼時候候對數據的操做都要經過和內存的交互來進行,會大大下降指令執行的速度。所以在CPU裏面就有了高速緩存。
也就是,當程序在運行過程當中,會將運算須要的數據從主存複製一份到CPU的高速緩存當中,那麼CPU進行計算時就能夠直接從它的高速緩存讀取數據和向其中寫入數據,當運算結束以後,再將高速緩存中的數據刷新到主存當中。
在多核CPU中,每條線程可能運行於不一樣的CPU中,所以每一個線程運行時有本身的高速緩存。被多個線程訪問的共享變量在多個CPU中都存在緩存,這裏那麼就可能存在緩存不一致的問題java

因此就出現了緩存一致性協議。最出名的就是Intel 的MESI協議,MESI協議保證了每一個緩存中使用的共享變量的副本是一致的。它核心的思想是:當CPU寫數據時,若是發現操做的變量是共享變量,即在其餘CPU中也存在該變量的副本,會發出信號通知其餘CPU將該變量的緩存行置爲無效狀態,所以當其餘CPU須要讀取這個變量時,發現本身緩存中緩存該變量的緩存行是無效的,那麼它就會從內存從新讀取。
緩存一致性
  緩存

線程安全性

  • 原子性安全

    即一個操做或者多個操做 要麼所有執行而且執行的過程不會被任何因素打斷,要麼就都不執行
    思考:?int long double讀寫操做的原子性
    思考:?int i++的原子性多線程

  • 可見性併發

    可見性是指當多個線程訪問同一個變量時,一個線程修改了這個變量的值,其餘線程可以當即看獲得修改的值
    與緩存相關,某線程改變了數據,其餘線程沒有當即看到修改後的值app

  • 有序性異步

    即程序執行的順序按照代碼的前後順序執行
    與指令重排序有關。通常來講,處理器爲了提升程序運行效率,可能會對輸入代碼進行優化,它不保證程序中各個語句的執行前後順序同代碼中的順序一致,可是它會保證程序最終執行結果和代碼順序執行的結果是一致的。
    Java內存模型具有一些先天的「有序性」,即不須要經過任何手段就可以獲得保證的有序性,這個一般也稱爲 happens-before 原則。若是兩個操做的執行次序沒法從happens-before原則推導出來,那麼它們就不能保證它們的有序性,虛擬機能夠隨意地對它們進行重排序。
    • 程序次序規則:一個線程內,按照代碼順序,書寫在前面的操做先行發生於書寫在後面的操做
    • 鎖定規則:一個unLock操做先行發生於後面對同一個鎖額lock操做
    • volatile變量規則:對一個變量的寫操做先行發生於後面對這個變量的讀操做
    • 傳遞規則:若是操做A先行發生於操做B,而操做B又先行發生於操做C,則能夠得出操做A先行發生於操做C
    • 線程啓動規則:Thread對象的start()方法先行發生於此線程的每一個一個動做
    • 線程中斷規則:對線程interrupt()方法的調用先行發生於被中斷線程的代碼檢測到中斷事件的發生
    • 線程終結規則:線程中全部的操做都先行發生於線程的終止檢測,咱們能夠經過Thread.join()方法結束、Thread.isAlive()的返回值手段檢測到線程已經終止執行
    • 對象終結規則:一個對象的初始化完成先行發生於他的finalize()方法的開始

當多個線程訪問某個類時,無論運行時環境採用何種調度方式或者這些線程將如何交替執行,而且在主調代碼中不須要任何額外的同步或協同,這個類都能表現出正確的行爲,那麼就稱這個類是線程安全的。ide

對象的共享

加鎖與volatile

加鎖機制既能夠確保可見性,又能夠確保原子性,而volatile變量只能確保可見性。函數

發佈與逸出

發佈一個對象的意思是指,使對象可以在當前做用域以外的代碼中使用。
發佈內部狀態可能會破壞封裝性,並使得程序難以維持不變性條件。
當某個不該該發佈的對象被髮布時,這種狀況就被稱之爲逸出。
當一個對象發佈時,在該對象的非私有域中引用的全部對象一樣會被髮布。通常來講,若是一個已經發布的對象可以經過非私有的變量引用和方法調用到達其餘的對象,那麼這些對象也都會被髮布。
不要在構造過程當中使this引用逸出。工具

線程封閉

若是僅在單線程內訪問數據,就不須要同步。

  • Ad-hoc線程封閉(脆弱)
  • 棧封閉
  • ThreadLocal類

不變性

知足同步需求的另外一種方法是使用不可變對象(Immutable Object)
不可變對象:

  • 對象建立之後其狀態就不能修改
  • 對象的全部域都是final類型
  • 對象是正確建立的

安全發佈

  • 在靜態初始化函數中初始化一個對象引用
  • 將對象的引用保存到volatile類型的域或者AtomicReferance對象中
  • 將對象的引用保存到某個正確構造對象的final類型域中
  • 將對象的引用保存到一個由鎖保護的域中

線程安全庫的容器類:
HashTable、synchronizedMap、ConcurrentMap
Vector、CopyOnWriteArrayList、CopyOnWriteArraySet、synchronizedList、synchronizedSet
BlockingQueue、ConcurrentLinkedQueue

事實不可變對象(Effectively Immutable Object):若是對象從技術上看是可變的,但其狀態在發佈後不會再改變,那麼把這種對象稱爲事實不可變對象。

  • 不可變對象能夠經過任意機制發佈
  • 事實不可變對象必須經過安全方式發佈
  • 可變對象必須經過安全方式來發布,而且必須是線程安全的或者由某個鎖保護起來

安全地共享對象

  • 線程封閉
  • 只讀共享
  • 線程安全共享
  • 保護共享

對象的組合

設計線程安全的類

在設計線程安全類的過程當中,須要包含如下三個基本要素:

  • 找出構成對象狀態的全部變量
  • 找出約束狀態變量的不變性條件
  • 創建對象狀態的併發訪問管理策略

Java監視器模式:對於任何一種鎖對象,自始至終都使用該鎖對象,均可以用來保護對象的狀態

@NotThreadSafe
public class MutablePoint {
    public int x,y;

    public MutablePoint() {
        x = 0;
        y = 0;
    }

    public MutablePoint(MutablePoint p) {
        this.x = p.x;
        this.y = p.y;
    }
}
@ThreadSafe
public class MonitorVehicleTracker {
    @GuardedBy("this")
    private final Map<String,MutablePoint> locations;

    public MonitorVehicleTracker(Map<String, MutablePoint> locations) {
        this.locations = deepCopy(locations);
    }
    
    public synchronized Map<String,MutablePoint> getLocations(){
        return deepCopy(locations);
    }
    
    public synchronized MutablePoint getLocation(String id){
        MutablePoint loc = locations.get(id);
        return loc == null ? null : new MutablePoint(loc);
    }
    
    public synchronized void setLocation(String id,int x,int y){
        MutablePoint loc = locations.get(id);
        if(loc == null){
            throw new IllegalArgumentException("No such ID:" + id);
        }
        loc.x = x;
        loc.y = y;
    }
    
    
    private static Map<String,MutablePoint> deepCopy(Map<String,MutablePoint> m){
        Map<String,MutablePoint> result = new HashMap<>();
        for(String id:m.keySet()){
            result.put(id,new MutablePoint(m.get(id)));
        }
        return Collections.unmodifiableMap(result);
    }
}

線程安全性的委託

若是一個類是由多個獨立且線程安全的狀態變量組成,而且在全部的操做中都不包含無效狀態轉換,那麼能夠將線程安全性委託給底層的狀態變量。

public class VisualComponent {
    private final List<KeyListener> keyListeners = new CopyOnWriteArrayList<>();
    private final List<MouseListener> mouseListeners = new CopyOnWriteArrayList<>();
    
    public void addKeyListener(KeyListener listener){
        keyListeners.add(listener);
    }
    
    public void addMouseListener(MouseListener listener){
        mouseListeners.add(listener);
    }
    
    public void removeKeyListener(KeyListener listener){
        keyListeners.remove(listener);
    }
    
    public void removeMouseListener(MouseListener listener){
        mouseListeners.remove(listener);
    }
    
}

Java裏的基礎構建模塊

同步容器類

Vector HashTable Collections.synchronizedXxx工廠方法

  • 同步容器的線程安全問題
public static <T> T getLast(Vector<T> vector){
        int lastIndex = vector.size() - 1;
        return vector.get(lastIndex);
    }
    
    public static <T> T deleteLast(Vector<T> vector){
        int lastIndex = vector.size() - 1;
        return vector.remove(lastIndex);
    }

在多線程中上述方法是不安全的,雖然Vector是安全的容器,但size()方法和get()或者remove()同時使用,存在「先檢查再運行」操做,就會拋出異常(ArrayIndexOutOfBoundsException),因此須要在客戶端加鎖

public static <T> T getLast(Vector<T> vector) {
        synchronized (vector) {
            int lastIndex = vector.size() - 1;
            return vector.get(lastIndex);
        }
    }
    
    public static <T> T deleteLast(Vector<T> vector) {
        synchronized (vector) {
            int lastIndex = vector.size() - 1;
            return vector.remove(lastIndex);
        }
    }
  • 迭代器與ConcurrentModificationException
    在設計同步容器的迭代器時並無考慮併發修改的問題,它們表現出的行爲是及時失敗(fail-fast)。
List<Widget> widgeList = Collections.synchronizedList(new ArrayList<Widget>());
        //可能拋出ConcurrentModificationException
        for(Widget w:widgeList){
            doSomeThing(w);
        }

解決方法有兩種:一是加鎖,但可能會產生死鎖;二是克隆,這裏的性能開銷也很大。

  • 隱藏迭代器
    容器的toString()、hashCode()、equals()、containsAll()、removeAll()、retainAll()以及把容器做爲參數的構造方法,都會對容器進行迭代。這些操做都有可能拋出ConcurrentModificationException

併發容器

  • ConcurrentHashMap 替代同步Map
    使用分段鎖
    迭代器具備弱一致性(能夠容忍併發修改,但並不能保證在迭代器被構造後將修改操做反映給容器,因此size()和isEmpty()的語義被略微減弱了。
  • CopyOnWriteArrayList/CopyOnWriteArraySet
    "寫入時複製"容器,每次修改時,都會建立並從新發佈一個新的容器副本
/**
     * Replaces the element at the specified position in this list with the
     * specified element.
     *
     * @throws IndexOutOfBoundsException {@inheritDoc}
     */
    public E set(int index, E element) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            Object[] elements = getArray();
            E oldValue = get(elements, index);

            if (oldValue != element) {
                int len = elements.length;
                Object[] newElements = Arrays.copyOf(elements, len);
                newElements[index] = element;
                setArray(newElements);
            } else {
                // Not quite a no-op; ensures volatile write semantics
                setArray(elements);
            }
            return oldValue;
        } finally {
            lock.unlock();
        }
    }

    /**
     * Appends the specified element to the end of this list.
     *
     * @param e element to be appended to this list
     * @return <tt>true</tt> (as specified by {@link Collection#add})
     */
    public boolean add(E e) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            Object[] elements = getArray();
            int len = elements.length;
            Object[] newElements = Arrays.copyOf(elements, len + 1);
            newElements[len] = e;
            setArray(newElements);
            return true;
        } finally {
            lock.unlock();
        }
    }

容器規模較大時,底層複製須要必定的開銷。僅當迭代操做遠遠多於修改操做時,才使用"寫入時複製"容器。

  • ConcurrentSkipListMap 替代同步的SortedMap
  • ConcurrentSkipListSet 替代同步的SortedSet
  • Queue ConcurrentLinkedQueue(先進先出) PriorityQueue(優先隊列)
  • BlockingQueue 阻塞隊列 LinkedBlockingQueue/ArrayBlockingQueue(FIFO) PriorityBlockingQueue(優先隊列) SynchronousQueue(維護一組線程,不維護存儲空間,直接交付)
    put()和take()是可阻塞的
    支持生產者消費者模式
    支持有界或者無界隊列
  • Deque BlockDeque 雙端隊列和工做密取
    每一個消費者都有本身的雙端隊列,消費完本身的任務,就去其餘隊列的末尾祕密的獲取工做

阻塞方法中斷方法

某方法拋出InterruptedException時,表示該方法是一個阻塞方法。
捕獲異常,恢復中斷

try {
            processTask(fileQueue.take())
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }

同步工具類

  • CountDownLatch
    是一個或多個線程等待一組事件發生
public class TestHarness {

    public static long timeTasks(int nThreads,final Runnable task) throws InterruptedException {
        final CountDownLatch startGate = new CountDownLatch(1);
        final CountDownLatch endGate = new CountDownLatch(nThreads);

        for(int i = 0;i < nThreads;i++){
//            Runnable t = new Runnable() {
//                @Override
//                public void run() {
//                    try {
//                        startGate.await();
//                        try {
//                            task.run();
//                        } finally {
//                            endGate.countDown();
//                        }
//                    } catch (InterruptedException e) {
//                        e.printStackTrace();
//                    }
//                }
//            };
            Thread t = new Thread(){
                @Override
                public void run() {
                    try {
                        startGate.await();
                        try {
                            task.run();
                        } finally {
                            endGate.countDown();
                        }
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            };
            t.start();
        }

        long start = System.nanoTime();

        startGate.countDown();

        endGate.await();

        long end = System.nanoTime();

        return end - start;

    }

    public static void main(String[] args) throws InterruptedException {
        Runnable a = new Runnable() {
            @Override
            public void run() {
                int sum = 0;
                for(int i = 0; i < 1000000; i++){
                    sum += i;
                }
                System.out.println(sum);
            }
        };

       System.out.println( timeTasks(100,a) );
    }

}
  • FutureTask
    異步獲取執行的結果
  • 信號量 Semaphore
    控制同時訪問某個特定資源的操做數量或者同時執行某個制定操做的數量
    實現資源池
    對容器施加邊界
  • 柵欄 CyclicBarrier Exchanger

簡單的可伸縮性緩存

public class Momoizerl<A,V> implements Computable<A,V> {
    private final ConcurrentMap<A,Future<V>> cache = new ConcurrentHashMap<>();
    private final Computable<A,V> c;

    public Momoizerl(Computable<A, V> c) {
        this.c = c;
    }

    @Override
    public V compute(final A arg) throws InterruptedException {
        while (true){
            Future<V> f = cache.get(arg);
            if(f == null){
                Callable<V> eval = new Callable<V>() {
                    @Override
                    public V call() throws Exception {
                        return c.compute(arg);
                    }
                };
                FutureTask<V> ft = new FutureTask<V>(eval);
                f = cache.putIfAbsent(arg,ft);
                if(f == null){
                    f = ft;
                    ft.run();
                }
            }
            try {
                return f.get();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }
    }
}

基礎小結

  • 可變狀態越少,越容易確保線程安全性
  • 儘可能將域聲明爲final類型
  • 不可變對象必定是線程安全的
  • 使用所來保護可變變量
  • 當保護同一個不變性條件中的全部變量時,使用同一個鎖
  • 複合操做,使用鎖
  • 安全的適當的使用併發容器和同步工具
相關文章
相關標籤/搜索