如何處理海量數據(上):從併發編程到分佈式系統

在這裏想寫寫本身在學習併發處理的學習思路,也會聊聊本身遇到的那些坑,以此爲記,但願鞭策本身不斷學習、永不放棄!redis

具體筆者認爲大致可分爲兩部分:數據庫

第一部分:Java多線程編程。編程

第二部分:高併發的解決思路。數組

第三部分:分佈式架構中redis、zookeeper分佈式鎖的應用。緩存

本文着重講解第一塊。安全

一、Java內存模型與線程。多線程

併發編程主要討論如下幾點:多個線程操做相同資源,保證線程安全,合理使用資源。架構

一般咱們能夠將物理計算機中出現的併發問題類比到JVM中的併發。併發

物理計算機處理器、高速緩存、主內存間交互關係如圖:app

處理器和內存的運行速度存在幾個數量級別的差距,所以爲解決此矛盾引入了告訴緩存這一律念。當多個處理器的運行任務都涉及到同一塊主內存區域時,將可能致使各自緩存數據的不一致問題,爲解決一致性問題,須要各個處理器訪問緩存時都遵循一些協議,在讀寫時要根據協議來進行操做。(MSI、MESI、MOSI、Synapse、Firefly及Dragon Protocol等)

處理器爲提升性能,會對輸入代碼亂序執行(Out-Of-Order Execution) 優化。

類比Java內存模型,線程、主內存、工做內存交互關係如圖:

JMM定義程序中各個變量訪問規則,即在虛擬機中將內存取出和存儲的底層細節。

線程A若是要跟線程B要通訊的話,必須經歷如下兩個步驟: 1)線程A把本地內存A中更新過的共享變量的值刷新到主內存中。 2)線程B去主內存中讀取A更新過的共享變量的值。

線程的工做內存中保存了該線程使用到變量的主內存副本拷貝(也可理解爲此線程的私有拷貝),線程對變量的操做(讀取、賦值等)都在工做內存中進行,而不能直接讀寫主內存中變量。不一樣線程之間的通訊業須要經過主內存來完成。 主內存對應Java堆中對象實例數據部分,而工做內存則對應虛擬機棧中部分區域。

在此還有一個須要說起的點!

指令重排序

執行程序時,爲提升性能,編譯器和處理器經常會對指令作出重排序。分三種:

1)編譯器優化的重排序。

2)指令並行重排序。

3)內存系統重排序。

JMM的編譯器會禁止特定類型的編譯器重排序,對於處理器重排序(後二者),則要求Java編譯器在生成指令序列時,插入特定類型的內存屏障指令,經過內存屏障指令來禁止特定類型的處理器重排序。

內存之間的交互操做

JMM中定義了8種操做來來描述工做內存與主內存之間的實現細節。

  • lock(鎖定):做用於主內存的變量,它把一個變量標識爲一條線程獨佔狀態。

  • unlock(解鎖):做用於主內存的變量,它把一個處於鎖定狀態的變量釋放出來,釋放後的變量才能夠被其餘線程鎖定。

  • read(讀取):做用於主內存的變量,它把一個變量從主內存傳輸到線程工做內存中,以便後邊的load操做。

  • load(載入):做用於主內存的變量,它把read操做從主內存中獲得的變量值放到工做內存副本中。

  • use(使用):做用於工做內存的變量,它把工做內存中一個變量的值傳遞給執行引擎,每當虛擬機遇到一個須要使用到變量的值的字節碼指令時將會執行這個操做。

  • assign(賦值):做用於工做內存的變量,它把從執行引擎接收到的值賦給工做內存,每當虛擬機遇到一個給變量賦值的字節碼指令時執行此操做。

  • store(存儲):做用於工做內存的變量,它把工做內存的變量的值傳送到主內存中,以便之後的write操做使用。

  • write(寫入):做用於主內存的變量,它把store操縱從工做內存中獲得的變量值放入到主內存的變量中。

JMM規定了執行上述八種操做時必須知足的規則(與happens-before原則是等效的,即先行發生原則):

  • 不容許read和load、store和write操做之一單獨出現

  • 不容許一個線程丟棄它的最近assign的操做,即變量在工做內存中改變了以後必須同步到主內存中。

  • 不容許一個線程無緣由地(沒有發生過任何assign操做)把數據從工做內存同步回主內存中。

  • 一個新的變量只能在主內存中誕生,不容許在工做內存中直接使用一個未被初始化(load或assign)的變量。即就是對一個變量實施use和store操做以前,必須先執行過了assign和load操做。

  • 一個變量在同一時刻只容許一條線程對其進行lock操做,lock和unlock必須成對出現。

  • 若是對一個變量執行lock操做,將會清空工做內存中此變量的值,在執行引擎使用這個變量前須要從新執行load或assign操做初始化變量的值。

  • 若是一個變量事先沒有被lock操做鎖定,則不容許對它執行unlock操做;也不容許去unlock一個被其餘線程鎖定的變量。

  • 對一個變量執行unlock操做以前,必須先把此變量同步到主內存中(執行store和write操做)。

補充:JVM-攻城掠地

二、測試工具

PostMan、Apache Bench、JMeter、LoadRunner

三、線程安全性

原子性:提供了互斥訪問,同一時刻只能由一個線程來對它進行操做。

可見性:一個線程對主內存的修改能夠及時被其餘線程觀察到。

有序性:一個線程觀察其它線程中指令執行順序,因爲指令重排序的存在,該觀察的結果通常爲雜亂無章的。 (happens-before原則) Java程序的自然有序性能夠總結爲:若是本線程內觀察,全部的操做都是有序的;若是在一個線程觀察另外一個線程,全部的操做都是無須的。前者指的是線程內的串行語義,後者指的是指令重排序和工做內存和主內存同步延遲現象。

原子性-Atomic包

  • AtomicXXX:CAS、Unsafe.compareAndSwapInt

經過CAS來保證原子性,即Compare And Swap 比較交換:

CAS利用處理器提供的CMPXCHG指令實現,自旋CAS實現的基本思路就是循環進行CAS直到成功爲止。 比較內存的值與預期的值,若相同則修改預期的值。

CAS雖然能夠進行高效的進行源自操做,可是CAS仍在存在三大問題。

  • ABA問題。 在Java1.5開始,JDK的Atomic包裏提供了一個類AtomicStampedReference來解決ABA問題。 大部分狀況下ABA問題並不影響程序併發的正確性,若是須要解決ABA問題,改用傳統的互斥同步可能會比原子類更加高效。

  • 循環時間長開銷大,

  • 以及只能保證一個共享變量進行的原子操做。

測試:

public class AtomicExample1 {    // 請求總數
    public static int clientTotal = 5000;    // 同時併發執行的線程數
    public static int threadTotal = 200;    public static AtomicInteger count = new AtomicInteger(0);    public static void main(String[] args) throws Exception {
        ExecutorService executorService = Executors.newCachedThreadPool();        final Semaphore semaphore = new Semaphore(threadTotal);        final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);        for (int i = 0; i < clientTotal ; i++) {
            executorService.execute(() -> {                try {
                    semaphore.acquire();
                    add();
                    semaphore.release();
                } catch (Exception e) {
                    log.error("exception", e);
                }
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
        executorService.shutdown();
        log.info("count:{}", count.get());
    }    private static void add() {
        count.incrementAndGet();        // count.getAndIncrement();
    }
}

AtomicInteger

源碼實現

public final int incrementAndGet() {        return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
    }  public final int getAndAddInt(Object var1, long var2, int var4) {        int var5;
        do {
            var5 = this.getIntVolatile(var1, var2);
        } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));        return var5;
    }    //當前的指爲var2,底層穿過來的值var5 若是當前的值與底層傳過來的值同樣的話,則將其更新問var5+var4

AtomicLong與LongAdder

  • Java內存模型要求lock、unlock、read、load、assign、use、store、write這8個操做都是具備原子性,可是對於64位的數據類型(long、double),容許虛擬機將沒有被volatile修飾的64位數據的讀寫操做劃分爲兩次32位的操做來進行,即容許虛擬機實現選擇能夠不保證64位數據類型的load、store、read和write這四個原子操做,可是能夠視爲原子性操做。

  • LongAdder實現熱點數據的分離,更快,若是有併發更新可能會出現偏差。

  • AtomicLong CAS中若是併發量大,則會不斷進行循環調用,效率會比較低。 
    底層用數組實現,其結果爲數組的求和累加。

public void add(long x) {
        Cell[] as; long b, v; int m; Cell a;        if ((as = cells) != null || !casBase(b = base, b + x)) {
            boolean uncontended = true;            if (as == null || (m = as.length - 1) < 0 ||
                (a = as[getProbe() & m]) == null ||
                !(uncontended = a.cas(v = a.value, v + x)))
                longAccumulate(x, null, uncontended);
        }
    }    /**
     * Equivalent to {@code add(1)}.
     */
    public void increment() {        add(1L);
    }

AtomicBoolean

  • 但願某件事情只執行一次。

public final boolean compareAndSet(boolean expect, boolean update) {        int e = expect ? 1 : 0;        int u = update ? 1 : 0;        return unsafe.compareAndSwapInt(this, valueOffset, e, u);
    }

AtomicReference

public final V getAndSet(V newValue) {        return (V)unsafe.getAndSetObject(this, valueOffset, newValue);
    } public final Object getAndSetObject(Object var1, long var2, Object var4) {
        Object var5;        do {
            var5 = this.getObjectVolatile(var1, var2);
        } while(!this.compareAndSwapObject(var1, var2, var5, var4));        return var5;
    }

AtomicIntegerFieldUpdater

  • 以原子性更新類中某一個屬性,這屬性須要用volatile進行修飾。

public class AtomicExample5 {    private static AtomicIntegerFieldUpdater<AtomicExample5> updater =
            AtomicIntegerFieldUpdater.newUpdater(AtomicExample5.class, "count");

    @Getter    public volatile int count = 100;    public static void main(String[] args) {

        AtomicExample5 example5 = new AtomicExample5();        if (updater.compareAndSet(example5, 100, 120)) {            log.info("update success 1, {}", example5.getCount());
        }        if (updater.compareAndSet(example5, 100, 120)) {            log.info("update success 2, {}", example5.getCount());
        } else {            log.info("update failed, {}", example5.getCount());
        }
    }
}

AtomicStampedReference

  • 做用是首先檢查當前引用是否等於預期引用,而且檢查當前標誌是否等於預期標誌,若是所有相等,則以原子的方式將該引用和該標誌的值設置爲給定的更新值。

public boolean compareAndSet(V   expectedReference,
                                 V   newReference,                                 int expectedStamp,                                 int newStamp) {
        Pair<V> current = pair;        return
            expectedReference == current.reference &&
            expectedStamp == current.stamp &&
            ((newReference == current.reference &&
              newStamp == current.stamp) ||
             casPair(current, Pair.of(newReference, newStamp)));
    }

AtomicLongArray 維護數組

原子性-鎖及對比

  • synchronized:依賴JVM,不可中斷鎖,適合競爭不激烈,可讀性號。

  • Lock:依賴特殊的CPU指令,代碼實現,ReentrantLock。可中斷鎖,多樣化同步,競爭激烈的時候能維持常態。

  • Atomic:競爭激烈的時候能維持常態,比Lock性能更好,只能同步一個值。

線程安全-可見性

致使共享變量在線程間不可見的緣由

1)線程交叉執行。

2)重排序結合線程交叉執行。

3)共享變量更新後的值沒有在工做內存與主內存及時更新。

JMM關於synchronizd的兩條規定:

  • 線程解鎖前,必須把共享變量的最新值刷新到主內存。

  • 線程加鎖時,將清空工做內存中共享變量的值,從而使用共享變量時須要從主內存中讀取最新的值。

volatile-可見性 經過加入內存屏障和禁止重排序優化實現。

  • 對volatile變量寫操做時,會在寫操做後加入一條store屏障指令,將本地內存共享變量的值刷新到主內存。

  • 對volatile變量讀操做時,會在讀操做前加入一條load屏障指令,從主內存中讀取共享變量。

必須符合如下場景纔可以使用:

  • 運算結果並不依賴變量當前值,或者可以確保只有單一線程修改變量的值。

  • 變量不須要與其餘狀態變量共同參與不變約束。 
    緣由:volatile變量在各個線程工做內存中不存在一致性問題,可是Java裏面的運算並不是原子性操做,致使volatile變量運算在併發下同樣是不安全的。(能夠經過反編譯來驗證)

private static void add() {
        count++;        // 一、count 取出當前內存中的值
        // 二、+1
        // 三、count 寫回主存
        //即:兩個線程同時執行+1寫回主存就出現問題。
    }

volatile一般用來做爲狀態標記量

volatile boolean inited = false;//線程1:context = loadContext();
inited = true;//線程2;while (!inited){
    sleep();
}
doSomethingWithConfig(context);

四、安全發佈對象

發佈對象:使一個對象可以被當前範圍以外代碼所使用。

對象逸出:一種錯誤的發佈。當一個對象尚未構造完成,就能被其它線程所見。

安全發佈對象

  • 在靜態初始化函數中初始化一個對象的引用。

  • 將對象的引用保存到volatile類型域或者AtomicReference對象中。

  • 對象引用保存到某個正確構造對象final類型域中。

  • 將對象的引用保存到一個由鎖保護的域中。

public class SingletonExample4 {    // 私有構造函數
    private SingletonExample4() {

    }    // 一、memory = allocate() 分配對象的內存空間
    // 二、ctorInstance() 初始化對象
    // 三、instance = memory 設置instance指向剛分配的內存

    // JVM和cpu優化,發生了指令重排

    // 一、memory = allocate() 分配對象的內存空間
    // 三、instance = memory 設置instance指向剛分配的內存
    // 二、ctorInstance() 初始化對象

    // 單例對象
    private volatile static SingletonExample4 instance = null;    // 靜態的工廠方法
    public static SingletonExample4 getInstance() {        if (instance == null) { // 雙重檢測機制        // B
            synchronized (SingletonExample4.class) { // 同步鎖
                if (instance == null) {
                    instance = new SingletonExample4(); // A - 3
                }
            }
        }        return instance;
    }
}

經過枚舉實現單例模式

/**
 * 枚舉模式:最安全
 */@ThreadSafe@Recommendpublic class SingletonExample7 {    // 私有構造函數
    private SingletonExample7() {

    }    public static SingletonExample7 getInstance() {        return Singleton.INSTANCE.getInstance();
    }    private enum Singleton {
        INSTANCE;        private SingletonExample7 singleton;        // JVM保證這個方法絕對只調用一次
        Singleton() {
            singleton = new SingletonExample7();
        }        public SingletonExample7 getInstance() {            return singleton;
        }
    }
}

五、線程安全策略

1) 不可變對象

知足條件:

  • 對象建立之後其狀態就不能修改。

  • 對象對全部域都是final類型。

  • 對象是正確建立的。(對象在建立期間,this沒有逸出)

  • Collections.unmodifiableXXX:Collection、List、Set、Map……

  • Guava:ImmutableXXX:Collection、List、Set、Map……

2) 線程封閉

  • Ad-hoc線程封閉:程序控制實現,最糟糕,忽略。

  • 堆棧封閉:局部變量,無併發問題。

  • ThreadLocal線程封閉:特別好的封閉方法。(實現權限管理)

3) 線程不安全寫法

  • StringBuilder -> StringBuffer

  • SimpleDateFormat -> JodaTime(推薦)

  • ArrayList、HashSet、HashMap等Collections

  • 先檢查再執行:if(condition(a)){handle(a);} ->非原子操做

4) 同步容器

  • ArrayList -->Vector,Stack

  • HashMap -->HashTable (key、value不能爲null)

  • Collections.synchronizedXXX(List、Set、Map) 注意:同步容器在某些場合並不必定能夠作到線程安全。

    5) 線程安全-併發容器-J.U.C

ArrayList -> CopyOnWriteArrayList

  • 拷貝數組過大,容易形成young GC FUll GC

  • 不適用於實時讀的場景,適合讀取多寫少的場景。

  • 實現讀寫分離,知足最終一致性,使用的時候另外開闢空間。

  • 讀取未加鎖,寫加鎖。

public void add(int index, E element) {
        final ReentrantLock lock = this.lock;        lock.lock();        try {
            Object[] elements = getArray();            int len = elements.length;            if (index > len || index < 0)                throw new IndexOutOfBoundsException("Index: "+index+                                                    ", Size: "+len);
            Object[] newElements;            int numMoved = len - index;            if (numMoved == 0)
                newElements = Arrays.copyOf(elements, len + 1);            else {
                newElements = new Object[len + 1];
                System.arraycopy(elements, 0, newElements, 0, index);
                System.arraycopy(elements, index, newElements, index + 1,
                                 numMoved);
            }
            newElements[index] = element;
            setArray(newElements);
        } finally {            lock.unlock();
        }
    }    public E get(int index) {        return get(getArray(), index);
    }

HashSet、TreeSet --> CopyOnWriteArraySet、ConcurrentSkipListSet

ConcurrentSkipListSet對批量操做不能保證原子性。

HashMap、TreeMap --> ConcurrentHashMap、ConcurrentSkipListMap

ConcurrentHashMap效率相對比ConcurrentSkipListMap高,ConcurrentSkipListMap有些其不具備的特性:

  • ConcurrentSkipListMap 的key有序

  • 支持更高的併發 
     

六、J.U.C之AQSAbstractQueuedSynchronizer-AQS

  • 使用Node實現FIFO隊列,能夠用於構建鎖或者其它同步裝置的基礎框架。

  • 利用了一個int類型表示狀態。

  • 使用方法是繼承。

  • 子類經過繼承並經過實現它的方法管理其狀態{acquire和release}的方法操縱狀態。

  • 能夠同步實現排它鎖和共享鎖模式(獨佔、共享)

AQS同步組件 1)等待多線程完成的CountDownLatch(JDK1.5)

容許一個或多個線程等待其餘線程完成操做。

其構造函數接收一個int類型的參數做爲計數器,調用countDown方法的時候,計數器的值會減1,CountDownLatch的await方法會阻塞當前線程,直到N變爲零。

應用:並行計算,解析Excel中多個sheet的數據。

2)控制併發線程數的 Semaphore 用來控制同時訪問特定資源線程的數量。 應用:流量控制,特別是公共資源有限的場景,如數據庫鏈接。

//可用的許可的數量Semaphore(int permits)//獲取一個許可aquire()//使用完成後歸還許可release()//嘗試獲取許可證tryAcquire()

3)同步屏障 CyclicBarrier

讓一組線程達到一個屏障(同步點)時被阻塞,直到最後一個線程到達屏障時,纔會開門,全部被屏障攔截的線程纔會繼續執行。 應用:多線程計算數據,最後合併計算結果的場景。

CyclicBarrier和CountDownLatch的區別

  • CountDownLatch計數器只能使用一次,CyclicBarrier能夠調用reset()方法重置。因此CyclicBarrier能夠支持更加複雜的場景,如發生錯誤後重置計數器,並讓線程從新執行。

//屏障攔截的線程數量CyclicBarrier(int permits)//已經到達屏障await()//CyclicBarrier阻塞線程的數量getNumberWaiting()

4)重入鎖 ReentrantLock (排他鎖:同時容許單個線程訪問。) 支持重進入的鎖,表示該鎖可以支持一個線程對資源的重複加鎖。(即實現重進入:任意線程獲取到鎖以後可以再次獲取該鎖而不會被鎖阻塞。)

  • 該鎖支持獲取鎖時的公平和非公平性選擇

public ReentrantLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
    }

公平鎖就是等待時間最長的線程最優先獲取鎖,也就是說獲取鎖的是順序的(FIFO)。而非公平則容許插隊。 非公平由於不保障順序,則效率相對較高,而公平鎖則能夠減小飢餓發生的機率。

  • 提供了一個Condition類,能夠分組喚醒須要喚醒的線程。

  • 提供可以中斷等待鎖的線程機制,lock.lockInterruptibly()

ReentrantReadWriteLock (讀寫鎖,實現悲觀讀取,同時容許多個線程訪問)

在寫線程訪問時,全部的讀線程和其餘寫線程均被堵塞。其維護了一對鎖,經過分離讀鎖、寫鎖,使得併發性比排他鎖有很大提高。

適用於讀多寫少的環境,可以提供比排他鎖更好的併發與吞吐量。

不足:ReentrantReadWriteLock是讀寫鎖,在多線程環境下,大多數狀況是讀的狀況遠遠大於寫的操做,所以可能致使寫的飢餓問題。

StampedLock

是ReentrantReadWriteLock 的加強版,是爲了解決ReentrantReadWriteLock的一些不足。 StampedLock讀鎖並不會阻塞寫鎖,設計思路也比較簡單,就是在讀的時候發現有寫操做,再去讀多一次。。 StampedLock有兩種鎖,一種是悲觀鎖,另一種是樂觀鎖。若是線程拿到樂觀鎖就讀和寫不互斥,若是拿到悲觀鎖就讀和寫互斥。

參考: Java8對讀寫鎖的改進:StampedLock

5)Condition

參考: Java線程(九):Condition-線程通訊更高效的方式

6)FutureTask

參考: Java併發編程:Callable、Future和FutureTask

7)Fork/Join

參考: Fork/Join 模式高級特性

8)BlocklingQueue

  • ArrayBlockingQueue

  • DelayQueue

  • LinkedBlockingQueue

  • PriorityBlockingQueue

  • SynchronousQueue

參考:Java中的阻塞隊列

七、線程池

參考: Java 四種線程池的用法分析

越寫越多,受不了了,已經凌晨4點,參考項之後再補,先休息!

參考資料:

《深刻理解Java虛擬機》

《Java併發編程藝術》

《Java多線程編程核心技術》

相關文章
相關標籤/搜索