Java多線程奇幻之旅——CAS算法實現線程安全

前言

對於線程安全,咱們有說不盡的話題。大多數保證線程安全的方法是添加各類類型鎖,使用各類同步機制,用限制對共享的、可變的類變量併發訪問的方式來保證線程安全。文本從另外一個角度,使用「比較交換算法」(CompareAndSwap)實現一樣的需求。咱們實現一個簡單的「棧」,並逐步重構代碼來進行講解。
本文通俗易懂,不會涉及到過多的底層知識,適合初學者閱讀(言外之意是各位大神能夠繞道了)。java

旅程開始

1.先定個小目標,實現一個「棧」

「棧」(stack)是你們常用的抽象數據類型(啥?!不知道,請自行百度)。「棧」知足「後進先出」特性。咱們用鏈表數據結構完成一個簡單的實現:node

public class Stack<E> {
    //鏈表結構頭部節點
    private Node<E> head;
    /**
     * 入棧
     * @param item
     */
    public void push(E item) {
        //爲新插入item建立一個新node
        Node<E> newHead = new Node<>(item);
        if(head!=null){
            //將新節點的下一個節點指向原來的頭部
            newHead.next = head;
        }
        //將頭部指向新的節點
        head=newHead;
    }
    /**
     * 出棧
     * @return
     */
    public E pop() {
        if(head==null){
            //當前鏈表爲空
            return null;
        }
        //暫存當前節點。
        Node<E> oldHead=head;
        //將當前節點指向當前節點的下一個節點
        head=head.next;
        //從暫存的當前節點記錄返回數據
        return oldHead.item;
    }
    /**
     * 鏈表中的節點
     * @param <E>
     */
    private static class Node<E> {
        //節點保存的數據
        public final E item;
        //指向下一個鏈表中下一個節點
        public Node<E> next;
        public Node(E item) {
            this.item = item;
        }
    }
}

代碼使用鏈表數據結構實現「棧」,在Stack中維護一個鏈表的「頭部節點」,經過對頭部節點的操做完成入棧和出棧操做。
咱們運行代碼測試一下:算法

public static void main(String[] args) {
    Stack<Integer> stack=new Stack<>();
    for (int i = 0; i < 3; i++) {
        //入棧一、二、3
        stack.push(i+1);
    }
    for (int i = 0; i < 3; i++) {
        //出棧三、二、1
        System.out.println(stack.pop());
    }
}

結果爲:編程

3
2
1

咱們使用入棧方法向Stack插入一、二、3,使用出棧方法打印爲三、二、1,符合預期。安全

2.讓多線程搗搗亂

前面咱們已經測試過咱們的方法,符合咱們對Stack功能的預期,那是否是任何狀況先咱們的「棧」都能正常工做呢?數據結構

咱們運行以下代碼:多線程

public static void main(String[] args) {
    Stack<Integer> stack=new Stack<>();
    int max=3;
    Thread[] threads=new Thread[max];
    for (int i = 0; i < max; i++) {
        int temp=i;
        //入棧一、二、3
        Thread thread=new Thread(new Runnable() {
            @Override
            public void run() {
                stack.push(temp+1);
            }
        });
        thread.start();
        threads[temp]=thread;
    }
    //等待全部線程完成。
    for (int i = 0; i < max; i++) {
        try {
            threads[i].join();
        } catch (InterruptedException e) {
        }
    }

    for (int i = 0; i < max; i++) {
        //出棧三、二、1
        System.out.println(stack.pop());
    }
}

你可能運行了不少次,每次運行時除了打印順序(三、二、1或二、三、1或一、二、3)有變化以外也沒有發現其餘異常,你可能會說打印順序變化很正常呀,由於咱們的將入棧操做放到異步線程中操做,三個線程的執行過程由系統調度,因此入棧操做的內容天然每次都有可能不一樣。
好吧,你說的沒錯,至少從大量運行的結果上看是這樣的,可是這就是多線程編程的奇(tao)幻(yan)之處,也許你運行一次沒有問題,兩次沒有問題,一萬次也沒有問題,可是終有一次你會獲得那個意想不到的結果(你也不想獲得,由於那是bug)。這就像一個「黑天鵝事件」,小几率可是必定會發生,且發生後對你的系統影響不堪設想。
下面讓我帶你看看如何獲得意料以外的結果:併發

咱們使用調試模式運行上面的程序在Stack中push()方法第一行打一個斷點,而後按照表格中的順序切換不一樣的線程以單步調試(step over)方式運行run方法中的每一步,直到遇到Resume。異步

執行順序 thread-0 thread-1 thread-2
1 Node<E> newHead = new Node<>(item); -- --
2 head=newHead; -- --
3 (Resume) -- --
4 -- Node<E> newHead = new Node<>(item); --
5 -- -- Node<E> newHead = new Node<>(item);
6 -- newHead.next = head; --
7 -- -- newHead.next = head;
8 -- head=newHead; --
9 -- -- head=newHead;
10 -- (Resume)
11 -- -- (Resume)

當你再次看到打印結果,你會發現結果爲三、一、null,「黑天鵝」出現了。ide

異常結果是如何產生的?
當thread-0執行到順序3時,head表示的鏈表爲node(1)。
當thread-1執行到順序10時,head表示的鏈表爲node(2)->node(1)。
當thread-2執行到順序11時,head表示的鏈表爲node(3)->node(1)。
當三個線程都執行完畢以後,head的最終表示爲node(3)->node(1),也就是說thread-2將thread-1的執行結果覆蓋了。

語句newHead.next = head;是對頭部節點的讀取。語句head=newHead;是對頭部節點的寫入操做。這兩條語句組成了一個「讀取——設置——寫入」語句模式(就像n=n+1)。
若是一個線程執行了共享頭部變量讀取語句,切換其餘線程執行了修改共享變量的值,再切回到第一個線程後,第一個線程中修改頭部結點的數據就不是最新的數據爲依據的,因此修改以後其餘線程的修改就被覆蓋了。
只有保證這兩條語句及中間語句以原子方式執行,才能避免多線程覆蓋問題。
你們能夠任意調整代碼中讀取頭部節點和寫入頭部節點的調試順序,製造多線程交錯讀寫觀察不一樣的異常結果。

爲何咱們直接執行沒法看到異常結果呢?
由於咱們的run方法很簡單,在CPU分配的時間片內能運行完,沒有出如今不一樣的運行週期中交錯運行的狀態。因此咱們纔要用調試模式這種交錯運行。

爲何上文中我說過這種異常必定會發生?
緣由在於咱們在Stack類中對共享的、可變的變量head進行的多線程讀寫操做。

怎麼才能保證類Stack在多線程狀況下運行正確?
引用一段《JAVA併發編程實踐》中的話:

不管什麼時候,只要有多於一個的線程訪問給定的狀態變量,並且其中某個線程會寫入該變量,此時必須使用同步來協調線程對該變量的訪問。

好吧,看來咱們必須採用「同步」方法了,來保障咱們的Stack類在多線程並行和單線程串行的狀況下都有正確的結果,也就是說將Stack變成一個線程安全的類。

3.讓你搗亂,請家長!

既然多線程總來搗亂,咱們就請他的家長,讓家長管管他,守守規矩,不在搗亂。
咱們已經知道了Stack類問什麼不能再多線程下正確的運行的緣由,全部咱們要限制多線程對Stack類中head變量的併發寫入,Stack方法中push()和pop()方法都會對head進行寫操做,因此要限制這兩個方法不能多線程併發訪問,因此咱們想到了synchronized關鍵字。

程序重構:

public class SynchronizedStack<E> {
    //鏈表結構頭部節點
    private Node<E> head;
    /**
     * 入棧
     * @param item
     */
    public synchronized void push(E item) {
        //爲新插入item建立一個新node
        Node<E> newHead = new Node<>(item);
        if(head!=null){
            //將新節點的下一個節點指向原來的頭部
            newHead.next = head;
        }
        //將頭部指向新的節點
        head=newHead;
    }
    /**
     * 出棧
     * @return
     */
    public synchronized E pop() {
        if(head==null){
            //當前鏈表爲空
            return null;
        }
        //暫存當前節點。
        Node<E> oldHead=head;
        //將當前節點指向當前節點的下一個節點
        head=head.next;
        //從暫存的當前節點記錄返回數據
        return oldHead.item;
    }
    /**
     * 鏈表中的節點
     * @param <E>
     */
    private static class Node<E> {
        //節點保存的數據
        public final E item;
        //指向下一個鏈表中下一個節點
        public Node<E> next;
        public Node(E item) {
            this.item = item;
        }
    }
}

Stack類替換爲SynchronizedStack類的測試方法。

public static void main(String[] args) {
        SynchronizedStack<Integer> stack=new SynchronizedStack<>();
        int max=3;
        Thread[] threads=new Thread[max];
        for (int i = 0; i < max; i++) {
            int temp=i;
            //入棧一、二、3
            Thread thread=new Thread(new Runnable() {
                @Override
                public void run() {
                    stack.push(temp+1);
                }
            });
            thread.start();
            threads[temp]=thread;
        }
        //等待全部線程完成。
        for (int i = 0; i < max; i++) {
            try {
                threads[i].join();
            } catch (InterruptedException e) {
            }
        }

        for (int i = 0; i < max; i++) {
            //出棧三、二、1
            System.out.println(stack.pop());
        }
    }

咱們再次運行第二章爲多線程準備的測試方法,發現當執行一個線程的方法時,其餘線程的方法均被阻塞,只能等到第一個線程方法執行完成以後才能執行其餘線程方法。
咱們只不過是在push()pop()方法上加入了synchronized 關鍵字,就將這兩個方法編程了同步方法,在多線程併發的狀況下也如同單線程串行調用通常,方法再不能在線程間交替運行,也就不能對head變量作併發更改了,這樣修改的Stack類就是線程安全的了。

除了synchronized關鍵字,還有其餘的方式實現加鎖嗎?
除了synchronized關鍵字還可使用java.util.concurrent.locks包中各類鎖來保證同步,可是大概思路都是相同的,都是使用阻塞其餘線程的方式在達到防止併發寫入的目的。

阻塞線程是否會影響執行效率?
若是和不加經過的「棧」類相比,在多線程執行的以後效率必定會有影響,由於同步方法限制了線程之間的併發性,可是爲了保證「棧」類的在多線程環境時功能正確,咱們不得不作出效率和正確性的權衡。

必需要對整個方法加上鎖嗎?
咱們上面已經分析了須要加鎖的範圍,只要保證讀取頭部節點和寫入頭部節點之間的語句原子性就能夠。因此咱們能夠這樣執行。

/**
     * 入棧
     *
     * @param item
     */
    public void push(E item) {
        //爲新插入item建立一個新node
        Node<E> newHead = new Node<>(item);
        synchronized (this) {
            if (head != null) {
                //將新節點的下一個節點指向原來的頭部
                newHead.next = head;
            }
            //將頭部指向新的節點
            head = newHead;
        }
    }

    /**
     * 出棧
     *
     * @return
     */
    public E pop() {
        synchronized (this) {
            if (head == null) {
                //當前鏈表爲空
                return null;
            }
            //暫存當前節點。
            Node<E> oldHead = head;
            //將當前節點指向當前節點的下一個節點
            head = head.next;
            //從暫存的當前節點記錄返回數據
            return oldHead.item;
        }
    }

經過synchronized 塊實現,由於方法比較簡單,因此也沒有很明顯的縮小加鎖範圍。

除了加鎖的方式,是否還有其餘方式?
固然,咱們還有無鎖化編程來解決線程之間同步的問題。這就是下面要介紹的比較交換算法。

4.換個思路,樂觀一點

加鎖實現線程同步的方式是預防性方式。不管共享變量是否會被併發修改,咱們都只容許同一時刻只有一個線程運行方法來阻止併發發生。這就至關於咱們假設併發必定會發生,因此比較悲觀。
如今咱們換一種思路,樂觀一點,不要假設對變量的併發修改必定發生,這樣也就不用對方法加鎖阻止多線程並行運行方法了。可是一旦發生了併發修改,咱們想法發解決就是了,解決的方法就是將這個操做重試一下。

繼續重構「棧」代碼:

public class TreiberStack<E> {
    private AtomicReference<Node<E>> headNode = new AtomicReference<>();
    public void push(E item) {
        Node<E> newHead = new Node<>(item);
        Node<E> oldHead;
        do {
            oldHead = headNode.get();
            newHead.next = oldHead;
        } while (!headNode.compareAndSet(oldHead, newHead));
    }
    public E pop() {
        Node<E> oldHead;
        Node<E> newHead;
        do {
            oldHead = headNode.get();
            if (oldHead == null)
                return null;
            newHead = oldHead.next;
        } while (!headNode.compareAndSet(oldHead, newHead));
        return oldHead.item;
    }
    private static class Node<E> {
        public final E item;
        public Node<E> next;

        public Node(E item) {
            this.item = item;
        }
    }
}

這個就是大名鼎鼎的Treiber Stack,我也只是作了一次代碼的搬運工。
咱們來看看TreiberStack和咱們前面的Stack有什麼不一樣。

首先關注第一行:

private AtomicReference<Node<E>> headNode = new AtomicReference<>();

咱們用了一個AtomicReference類存儲鏈表的頭部節點,這個類能夠獲取存儲對象的最新值,而且在修改存儲值時候採用比較交換算法保證原子操做,具體你們能夠自行百度。

而後重點關注pop()push()方法中都有的一個代碼結構:

//略...
do {
    oldHead = headNode.get();
    //略...
     
} while (!headNode.compareAndSet(oldHead, newHead));
//略...

咱們AtomicReferenceget()方法最新的獲取頭部節點,而後調用AtomicReferencecompareAndSet()將設置新頭部節點,若是當前線程執行這兩端代碼的時候若是有其餘已經修改了頭部節點的值,'compareAndSet()'方法返回false ,代表修改失敗,循環繼續,不然修改爲功,跳出循環。

這樣一個代碼結構和synchronized關鍵字修飾的方法同樣,都保證了對於頭部節點的讀取和寫入操做及中間代碼在一個線程下原子執行,前者是經過其餘線程修改過就重試的方式,後者經過阻塞其餘線程的方式,一個是樂觀的方式,一個是悲觀的方式。

你們能夠按照前面的例子本身寫測試方法測試。

後記

咱們經過對「棧」的一步一步代碼重構,逐步介紹了什麼是線程安全及保證線程安全的各類方法。這裏須要說明一點,對於一個類來講,是否須要支持線程安全是由類的使用場景決定,不是有類所提供的功能決定的,若是一個類不會被應用於多線程的狀況下也就無需將他轉化爲線程安全的類。

關於CAS特色等更多內容鑑於本文篇幅有限,我會另文再續。


參考

《JAVA併發編程實踐》

相關文章
相關標籤/搜索