java多線程設計模式

詳見:http://blog.yemou.net/article/query/info/tytfjhfascvhzxcyt220java

java多線程設計模式算法

java語言已經內置了多線程支持,全部實現Runnable接口的類均可被啓動一個新線程,新線程會執行該實例的run()方法,當run()方法執行完畢後,線程就結束了。一旦一個線程執行完畢,這個實例就不能再從新啓動,只能從新生成一個新實例,再啓動一個新線程。

Thread類是實現了Runnable接口的一個實例,它表明一個線程的實例,而且,啓動線程的惟一方法就是經過Thread類的start()實例方法:

Thread t = new Thread();
t.start();

start()方法是一個native方法,它將啓動一個新線程,並執行run()方法。Thread類默認的run()方法什麼也不作就退出了。注意:直接調用run()方法並不會啓動一個新線程,它和調用一個普通的java方法沒有什麼區別。

所以,有兩個方法能夠實現本身的線程:

方法1:本身的類extend Thread,並複寫run()方法,就能夠啓動新線程並執行本身定義的run()方法。例如:

public class MyThread extends Thread {
    public run() {
        System.out.println("MyThread.run()");
    }
}

在合適的地方啓動線程:new MyThread().start();

方法2:若是本身的類已經extends另外一個類,就沒法直接extends Thread,此時,必須實現一個Runnable接口:

public class MyThread extends OtherClass implements Runnable {
    public run() {
        System.out.println("MyThread.run()");
    }
}

爲了啓動MyThread,須要首先實例化一個Thread,並傳入本身的MyThread實例:

MyThread myt = new MyThread();
Thread t = new Thread(myt);
t.start();

事實上,當傳入一個Runnable target參數給Thread後,Thread的run()方法就會調用target.run(),參考JDK源代碼:

public void run() {
    if (target != null) {
        target.run();
    }
}

線程還有一些Name, ThreadGroup, isDaemon等設置,因爲和線程設計模式關聯不多,這裏就很少說了。


因爲同一進程內的多個線程共享內存空間,在Java中,就是共享實例,當多個線程試圖同時修改某個實例的內容時,就會形成衝突,所以,線程必須實現共享互斥,使多線程同步。

最簡單的同步是將一個方法標記爲synchronized,對同一個實例來講,任一時刻只能有一個synchronized方法在執行。當一個方法正在執行某個synchronized方法時,其餘線程若是想要執行這個實例的任意一個synchronized方法,都必須等待當前執行 synchronized方法的線程退出此方法後,才能依次執行。

可是,非synchronized方法不受影響,無論當前有沒有執行synchronized方法,非synchronized方法均可以被多個線程同時執行。

此外,必須注意,只有同一實例的synchronized方法同一時間只能被一個線程執行,不一樣實例的synchronized方法是能夠併發的。例如,class A定義了synchronized方法sync(),則不一樣實例a1.sync()和a2.sync()能夠同時由兩個線程來執行。


多線程同步的實現最終依賴鎖機制。咱們能夠想象某一共享資源是一間屋子,每一個人都是一個線程。當A但願進入房間時,他必須得到門鎖,一旦A得到門鎖,他進去後就馬上將門鎖上,因而B,C,D就不得不在門外等待,直到A釋放鎖出來後,B,C,D中的某一人搶到了該鎖(具體搶法依賴於 JVM的實現,能夠先到先得,也能夠隨機挑選),而後進屋又將門鎖上。這樣,任一時刻最多有一人在屋內(使用共享資源)。

Java語言規範內置了對多線程的支持。對於Java程序來講,每個對象實例都有一把「鎖」,一旦某個線程得到了該鎖,別的線程若是但願得到該鎖,只能等待這個線程釋放鎖以後。得到鎖的方法只有一個,就是synchronized關鍵字。例如:

public class SharedResource {
    private int count = 0;

    public int getCount() { return count; }

    public synchronized void setCount(int count) { this.count = count; }

}

同步方法public synchronized void setCount(int count) { this.count = count; } 事實上至關於:

public void setCount(int count) {
    synchronized(this) { // 在此得到this鎖
         this.count = count;
    } // 在此釋放this鎖
}

紅色部分表示須要同步的代碼段,該區域爲「危險區域」,若是兩個以上的線程同時執行,會引起衝突,所以,要更改SharedResource的內部狀態,必須先得到SharedResource實例的鎖。

退出synchronized塊時,線程擁有的鎖自動釋放,因而,別的線程又能夠獲取該鎖了。

爲了提升性能,不必定要鎖定this,例如,SharedResource有兩個獨立變化的變量:

public class SharedResouce {
    private int a = 0;
    private int b = 0;

    public synchronized void setA(int a) { this.a = a; }

    public synchronized void setB(int b) { this.b = b; }
}

若同步整個方法,則setA()的時候沒法setB(),setB()時沒法setA()。爲了提升性能,可使用不一樣對象的鎖:

public class SharedResouce {
    private int a = 0;
    private int b = 0;
    private Object sync_a = new Object();
    private Object sync_b = new Object();

    public void setA(int a) {
        synchronized(sync_a) {
            this.a = a;
        }
    }

    public synchronized void setB(int b) {
        synchronized(sync_b) {
            this.b = b;
        }
    }
}


一般,多線程之間須要協調工做。例如,瀏覽器的一個顯示圖片的線程displayThread想要執行顯示圖片的任務,必須等待下載線程 downloadThread將該圖片下載完畢。若是圖片尚未下載完,displayThread能夠暫停,當downloadThread完成了任務後,再通知displayThread「圖片準備完畢,能夠顯示了」,這時,displayThread繼續執行。

以上邏輯簡單的說就是:若是條件不知足,則等待。當條件知足時,等待該條件的線程將被喚醒。在Java中,這個機制的實現依賴於wait/notify。等待機制與鎖機制是密切關聯的。例如:

synchronized(obj) {
    while(!condition) {
        obj.wait();
    }
    obj.doSomething();
}

當線程A得到了obj鎖後,發現條件condition不知足,沒法繼續下一處理,因而線程A就wait()。

在另外一線程B中,若是B更改了某些條件,使得線程A的condition條件知足了,就能夠喚醒線程A:

synchronized(obj) {
    condition = true;
    obj.notify();
}

須要注意的概念是:

# 調用obj的wait(), notify()方法前,必須得到obj鎖,也就是必須寫在synchronized(obj) {} 代碼段內。

# 調用obj.wait()後,線程A就釋放了obj的鎖,不然線程B沒法得到obj鎖,也就沒法在synchronized(obj) {} 代碼段內喚醒A。

# 當obj.wait()方法返回後,線程A須要再次得到obj鎖,才能繼續執行。

# 若是A1,A2,A3都在obj.wait(),則B調用obj.notify()只能喚醒A1,A2,A3中的一個(具體哪個由JVM決定)。

# obj.notifyAll()則能所有喚醒A1,A2,A3,可是要繼續執行obj.wait()的下一條語句,必須得到obj鎖,所以,A1,A2,A3只有一個有機會得到鎖繼續執行,例如A1,其他的須要等待A1釋放obj鎖以後才能繼續執行。

# 當B調用obj.notify/notifyAll的時候,B正持有obj鎖,所以,A1,A2,A3雖被喚醒,可是仍沒法得到obj鎖。直到B退出synchronized塊,釋放obj鎖後,A1,A2,A3中的一個纔有機會得到鎖繼續執行。



前面講了wait/notify機制,Thread還有一個sleep()靜態方法,它也能使線程暫停一段時間。sleep與wait的不一樣點是: sleep並不釋放鎖,而且sleep的暫停和wait暫停是不同的。obj.wait會使線程進入obj對象的等待集合中並等待喚醒。

可是wait()和sleep()均可以經過interrupt()方法打斷線程的暫停狀態,從而使線程馬上拋出InterruptedException。

若是線程A但願當即結束線程B,則能夠對線程B對應的Thread實例調用interrupt方法。若是此刻線程B正在 wait/sleep/join,則線程B會馬上拋出InterruptedException,在catch() {} 中直接return便可安全地結束線程。

須要注意的是,InterruptedException是線程本身從內部拋出的,並非interrupt()方法拋出的。對某一線程調用 interrupt()時,若是該線程正在執行普通的代碼,那麼該線程根本就不會拋出InterruptedException。可是,一旦該線程進入到 wait()/sleep()/join()後,就會馬上拋出InterruptedException。



GuardedSuspention模式主要思想是:

當條件不知足時,線程等待,直到條件知足時,等待該條件的線程被喚醒。

咱們設計一個客戶端線程和一個服務器線程,客戶端線程不斷髮送請求給服務器線程,服務器線程不斷處理請求。當請求隊列爲空時,服務器線程就必須等待,直到客戶端發送了請求。

先定義一個請求隊列:Queue

package com.crackj2ee.thread;

import java.util.*;

public class Queue {
    private List queue = new LinkedList();

    public synchronized Request getRequest() {
        while(queue.size()==0) {
            try {
                this.wait();
            }
            catch(InterruptedException ie) {
                return null;
            }
        }
        return (Request)queue.remove(0);
    }

    public synchronized void putRequest(Request request) {
        queue.add(request);
        this.notifyAll();
    }

}

藍色部分就是服務器線程的等待條件,而客戶端線程在放入了一個request後,就使服務器線程等待條件知足,因而喚醒服務器線程。

客戶端線程:ClientThread

package com.crackj2ee.thread;

public class ClientThread extends Thread {
    private Queue queue;
    private String clientName;

    public ClientThread(Queue queue, String clientName) {
        this.queue = queue;
        this.clientName = clientName;
    }

    public String toString() {
        return "[ClientThread-" + clientName + "]";
    }

    public void run() {
        for(int i=0; i<100; i++) {
            Request request = new Request("" + (long)(Math.random()*10000));
            System.out.println(this + " send request: " + request);
            queue.putRequest(request);
            try {
                Thread.sleep((long)(Math.random() * 10000 + 1000));
            }
            catch(InterruptedException ie) {
            }
        }
        System.out.println(this + " shutdown.");
    }
}

服務器線程:ServerThread

package com.crackj2ee.thread;
public class ServerThread extends Thread {
    private boolean stop = false;
    private Queue queue;

    public ServerThread(Queue queue) {
        this.queue = queue;
    }

    public void shutdown() {
        stop = true;
        this.interrupt();
        try {
            this.join();
        }
        catch(InterruptedException ie) {}
    }

    public void run() {
        while(!stop) {
            Request request = queue.getRequest();
            System.out.println("[ServerThread] handle request: " + request);
            try {
                Thread.sleep(2000);
            }
            catch(InterruptedException ie) {}
        }
        System.out.println("[ServerThread] shutdown.");
    }
}

服務器線程在紅色部分可能會阻塞,也就是說,Queue.getRequest是一個阻塞方法。這和java標準庫的許多IO方法相似。

最後,寫一個Main來啓動他們:

package com.crackj2ee.thread;

public class Main {

    public static void main(String[] args) {
        Queue queue = new Queue();
        ServerThread server = new ServerThread(queue);
        server.start();
        ClientThread[] clients = new ClientThread[5];
        for(int i=0; i<clients.length; i++) {
            clients[i] = new ClientThread(queue, ""+i);
            clients[i].start();
        }
        try {
            Thread.sleep(100000);
        }
        catch(InterruptedException ie) {}
        server.shutdown();
    }
}

咱們啓動了5個客戶端線程和一個服務器線程,運行結果以下:

[ClientThread-0] send request: Request-4984
[ServerThread] handle request: Request-4984
[ClientThread-1] send request: Request-2020
[ClientThread-2] send request: Request-8980
[ClientThread-3] send request: Request-5044
[ClientThread-4] send request: Request-548
[ClientThread-4] send request: Request-6832
[ServerThread] handle request: Request-2020
[ServerThread] handle request: Request-8980
[ServerThread] handle request: Request-5044
[ServerThread] handle request: Request-548
[ClientThread-4] send request: Request-1681
[ClientThread-0] send request: Request-7859
[ClientThread-3] send request: Request-3926
[ServerThread] handle request: Request-6832
[ClientThread-2] send request: Request-9906


能夠觀察到ServerThread處理來自不一樣客戶端的請求。

思考

Q: 服務器線程的wait條件while(queue.size()==0)可否換成if(queue.size()==0)?

A: 在這個例子中能夠,由於服務器線程只有一個。可是,若是服務器線程有多個(例如Web應用程序有多個線程處理併發請求,這很是廣泛),就會形成嚴重問題。

Q: 可否用sleep(1000)代替wait()?

A: 絕對不能夠。sleep()不會釋放鎖,所以sleep期間別的線程根本沒有辦法調用getRequest()和putRequest(),致使全部相關線程都被阻塞。

Q: (Request)queue.remove(0)能夠放到synchronized() {}塊外面嗎?

A: 不能夠。由於while()是測試queue,remove()是使用queue,二者是一個原子操做,不能放在synchronized外面。

總結

多線程設計看似簡單,實際上必須很是仔細地考慮各類鎖定/同步的條件,稍不當心,就可能出錯。而且,當線程較少時,極可能發現不了問題,一旦問題出現又難以調試。

所幸的是,已有一些被驗證過的模式能夠供咱們使用,咱們會繼續介紹一些經常使用的多線程設計模式。


前面談了多線程應用程序能極大地改善用戶相應。例如對於一個Web應用程序,每當一個用戶請求服務器鏈接時,服務器就能夠啓動一個新線程爲用戶服務。

然而,建立和銷燬線程自己就有必定的開銷,若是頻繁建立和銷燬線程,CPU和內存開銷就不可忽略,垃圾收集器還必須負擔更多的工做。所以,線程池就是爲了不頻繁建立和銷燬線程。

每當服務器接受了一個新的請求後,服務器就從線程池中挑選一個等待的線程並執行請求處理。處理完畢後,線程並不結束,而是轉爲阻塞狀態再次被放入線程池中。這樣就避免了頻繁建立和銷燬線程。

Worker Pattern實現了相似線程池的功能。首先定義Task接口:

package com.crackj2ee.thread;
public interface Task {
    void execute();
}

線程將負責執行execute()方法。注意到任務是由子類經過實現execute()方法實現的,線程自己並不知道本身執行的任務。它只負責運行一個耗時的execute()方法。

具體任務由子類實現,咱們定義了一個CalculateTask和一個TimerTask:

// CalculateTask.java
package com.crackj2ee.thread;
public class CalculateTask implements Task {
    private static int count = 0;
    private int num = count;
    public CalculateTask() {
        count++;
    }
    public void execute() {
        System.out.println("[CalculateTask " + num + "] start");
        try {
            Thread.sleep(3000);
        }
        catch(InterruptedException ie) {}
        System.out.println("[CalculateTask " + num + "] done.");
    }
}

// TimerTask.java
package com.crackj2ee.thread;
public class TimerTask implements Task {
    private static int count = 0;
    private int num = count;
    public TimerTask() {
        count++;
    }
    public void execute() {
        System.out.println("[TimerTask " + num + "] start");
        try {
            Thread.sleep(2000);
        }
        catch(InterruptedException ie) {}
        System.out.println("[TimerTask " + num + "] done.");
    }
}

以上任務均簡單的sleep若干秒。

TaskQueue實現了一個隊列,客戶端能夠將請求放入隊列,服務器線程能夠從隊列中取出任務:

package com.crackj2ee.thread;
import java.util.*;
public class TaskQueue {
    private List queue = new LinkedList();
    public synchronized Task getTask() {
        while(queue.size()==0) {
            try {
                this.wait();
            }
            catch(InterruptedException ie) {
                return null;
            }
        }
        return (Task)queue.remove(0);
    }
    public synchronized void putTask(Task task) {
        queue.add(task);
        this.notifyAll();
    }
}

終於到了真正的WorkerThread,這是真正執行任務的服務器線程:

package com.crackj2ee.thread;
public class WorkerThread extends Thread {
    private static int count = 0;
    private boolean busy = false;
    private boolean stop = false;
    private TaskQueue queue;
    public WorkerThread(ThreadGroup group, TaskQueue queue) {
        super(group, "worker-" + count);
        count++;
        this.queue = queue;
    }
    public void shutdown() {
        stop = true;
        this.interrupt();
        try {
            this.join();
        }
        catch(InterruptedException ie) {}
    }
    public boolean isIdle() {
        return !busy;
    }
    public void run() {
        System.out.println(getName() + " start.");        
        while(!stop) {
            Task task = queue.getTask();
            if(task!=null) {
                busy = true;
                task.execute();
                busy = false;
            }
        }
        System.out.println(getName() + " end.");
    }
}

前面已經講過,queue.getTask()是一個阻塞方法,服務器線程可能在此wait()一段時間。此外,WorkerThread還有一個shutdown方法,用於安全結束線程。

最後是ThreadPool,負責管理全部的服務器線程,還能夠動態增長和減小線程數:

package com.crackj2ee.thread;
import java.util.*;
public class ThreadPool extends ThreadGroup {
    private List threads = new LinkedList();
    private TaskQueue queue;
    public ThreadPool(TaskQueue queue) {
        super("Thread-Pool");
        this.queue = queue;
    }
    public synchronized void addWorkerThread() {
        Thread t = new WorkerThread(this, queue);
        threads.add(t);
        t.start();
    }
    public synchronized void removeWorkerThread() {
        if(threads.size()>0) {
            WorkerThread t = (WorkerThread)threads.remove(0);
            t.shutdown();
        }
    }
    public synchronized void currentStatus() {
        System.out.println("-----------------------------------------------");
        System.out.println("Thread count = " + threads.size());
        Iterator it = threads.iterator();
        while(it.hasNext()) {
            WorkerThread t = (WorkerThread)it.next();
            System.out.println(t.getName() + ": " + (t.isIdle() ? "idle" : "busy"));
        }
        System.out.println("-----------------------------------------------");
    }
}

currentStatus()方法是爲了方便調試,打印出全部線程的當前狀態。

最後,Main負責完成main()方法:

package com.crackj2ee.thread;
public class Main {
    public static void main(String[] args) {
        TaskQueue queue = new TaskQueue();
        ThreadPool pool = new ThreadPool(queue);
        for(int i=0; i<10; i++) {
            queue.putTask(new CalculateTask());
            queue.putTask(new TimerTask());
        }
        pool.addWorkerThread();
        pool.addWorkerThread();
        doSleep(8000);
        pool.currentStatus();
        pool.addWorkerThread();
        pool.addWorkerThread();
        pool.addWorkerThread();
        pool.addWorkerThread();
        pool.addWorkerThread();
        doSleep(5000);
        pool.currentStatus();
    }
    private static void doSleep(long ms) {
        try {
            Thread.sleep(ms);
        }
        catch(InterruptedException ie) {}
    }
}

main()一開始放入了20個Task,而後動態添加了一些服務線程,並按期打印線程狀態,運行結果以下:

worker-0 start.
[CalculateTask 0] start
worker-1 start.
[TimerTask 0] start
[TimerTask 0] done.
[CalculateTask 1] start
[CalculateTask 0] done.
[TimerTask 1] start
[CalculateTask 1] done.
[CalculateTask 2] start
[TimerTask 1] done.
[TimerTask 2] start
[TimerTask 2] done.
[CalculateTask 3] start
-----------------------------------------------
Thread count = 2
worker-0: busy
worker-1: busy
-----------------------------------------------
[CalculateTask 2] done.
[TimerTask 3] start
worker-2 start.
[CalculateTask 4] start
worker-3 start.
[TimerTask 4] start
worker-4 start.
[CalculateTask 5] start
worker-5 start.
[TimerTask 5] start
worker-6 start.
[CalculateTask 6] start
[CalculateTask 3] done.
[TimerTask 6] start
[TimerTask 3] done.
[CalculateTask 7] start
[TimerTask 4] done.
[TimerTask 7] start
[TimerTask 5] done.
[CalculateTask 8] start
[CalculateTask 4] done.
[TimerTask 8] start
[CalculateTask 5] done.
[CalculateTask 9] start
[CalculateTask 6] done.
[TimerTask 9] start
[TimerTask 6] done.
[TimerTask 7] done.
-----------------------------------------------
Thread count = 7
worker-0: idle
worker-1: busy
worker-2: busy
worker-3: idle
worker-4: busy
worker-5: busy
worker-6: busy
-----------------------------------------------
[CalculateTask 7] done.
[CalculateTask 8] done.
[TimerTask 8] done.
[TimerTask 9] done.
[CalculateTask 9] done.

仔細觀察:一開始只有兩個服務器線程,所以線程狀態都是忙,後來線程數增多,6個線程中的兩個狀態變成idle,說明處於wait()狀態。

思考:本例的線程調度算法其實根本沒有,由於這個應用是圍繞TaskQueue設計的,不是以Thread Pool爲中心設計的。所以,Task調度取決於TaskQueue的getTask()方法,你能夠改進這個方法,例如使用優先隊列,使優先級高的任務先被執行。

若是全部的服務器線程都處於busy狀態,則說明任務繁忙,TaskQueue的隊列愈來愈長,最終會致使服務器內存耗盡。所以,能夠限制 TaskQueue的等待任務數,超過最大長度就拒絕處理。許多Web服務器在用戶請求繁忙時就會拒絕用戶:HTTP 503 SERVICE UNAVAILABLE


多線程讀寫同一個對象的數據是很廣泛的,一般,要避免讀寫衝突,必須保證任什麼時候候僅有一個線程在寫入,有線程正在讀取的時候,寫入操做就必須等待。簡單說,就是要避免「寫-寫」衝突和「讀-寫」衝突。可是同時讀是容許的,由於「讀-讀」不衝突,並且很安全。

要實現以上的ReadWriteLock,簡單的使用synchronized就不行,咱們必須本身設計一個ReadWriteLock類,在讀以前,必須先得到「讀鎖」,寫以前,必須先得到「寫鎖」。舉例說明:

DataHandler對象保存了一個可讀寫的char[]數組:

package com.crackj2ee.thread;

public class DataHandler {
    // store data:
    private char[] buffer = "AAAAAAAAAA".toCharArray();

    private char[] doRead() {
        char[] ret = new char[buffer.length];
        for(int i=0; i<buffer.length; i++) {
            ret[i] = buffer[i];
            sleep(3);
        }
        return ret;
    }

    private void doWrite(char[] data) {
        if(data!=null) {
            buffer = new char[data.length];
            for(int i=0; i<buffer.length; i++) {
                buffer[i] = data[i];
                sleep(10);
            }
        }
    }

    private void sleep(int ms) {
        try {
            Thread.sleep(ms);
        }
        catch(InterruptedException ie) {}
    }
}

doRead()和doWrite()方法是非線程安全的讀寫方法。爲了演示,加入了sleep(),並設置讀的速度大約是寫的3倍,這符合一般的狀況。

爲了讓多線程能安全讀寫,咱們設計了一個ReadWriteLock:

package com.crackj2ee.thread;
public class ReadWriteLock {
    private int readingThreads = 0;
    private int writingThreads = 0;
    private int waitingThreads = 0; // waiting for write
    private boolean preferWrite = true;

    public synchronized void readLock() throws InterruptedException {
        while(writingThreads>0 || (preferWrite && waitingThreads>0))
            this.wait();
        readingThreads++;
    }

    public synchronized void readUnlock() {
        readingThreads--;
        preferWrite = true;
        notifyAll();
    }

    public synchronized void writeLock() throws InterruptedException {
        waitingThreads++;
        try {
            while(readingThreads>0 || writingThreads>0)
                this.wait();
        }
        finally {
            waitingThreads--;
        }
        writingThreads++;
    }

    public synchronized void writeUnlock() {
        writingThreads--;
        preferWrite = false;
        notifyAll();
    }
}

readLock()用於得到讀鎖,readUnlock()釋放讀鎖,writeLock()和writeUnlock()同樣。因爲鎖用完必須釋放,所以,必須保證lock和unlock匹配。咱們修改DataHandler,加入ReadWriteLock:

package com.crackj2ee.thread;
public class DataHandler {
    // store data:
    private char[] buffer = "AAAAAAAAAA".toCharArray();
    // lock:
    private ReadWriteLock lock = new ReadWriteLock();

    public char[] read(String name) throws InterruptedException {
        System.out.println(name + " waiting for read");
        lock.readLock();
        try {
            char[] data = doRead();
            System.out.println(name + " reads data: " + new String(data));
            return data;
        }
        finally {
            lock.readUnlock();
        }
    }

    public void write(String name, char[] data) throws InterruptedException {
        System.out.println(name + " waiting for write");
        lock.writeLock();
        try {
            System.out.println(name + " wrote data: " + new String(data));
            doWrite(data);
        }
        finally {
            lock.writeUnlock();
        }
    }

    private char[] doRead() {
        char[] ret = new char[buffer.length];
        for(int i=0; i<buffer.length; i++) {
            ret[i] = buffer[i];
            sleep(3);
        }
        return ret;
    }
    private void doWrite(char[] data) {
        if(data!=null) {
            buffer = new char[data.length];
            for(int i=0; i<buffer.length; i++) {
                buffer[i] = data[i];
                sleep(10);
            }
        }
    }
    private void sleep(int ms) {
        try {
            Thread.sleep(ms);
        }
        catch(InterruptedException ie) {}
    }
}

public方法read()和write()徹底封裝了底層的ReadWriteLock,所以,多線程能夠安全地調用這兩個方法:

// ReadingThread不斷讀取數據:
package com.crackj2ee.thread;
public class ReadingThread extends Thread {
    private DataHandler handler;
    public ReadingThread(DataHandler handler) {
        this.handler = handler;
    }
    public void run() {
        for(;;) {
            try {
                char[] data = handler.read(getName());
                Thread.sleep((long)(Math.random()*1000+100));
            }
            catch(InterruptedException ie) {
                break;
            }
        }
    }
}

// WritingThread不斷寫入數據,每次寫入的都是10個相同的字符:
package com.crackj2ee.thread;
public class WritingThread extends Thread {
    private DataHandler handler;
    public WritingThread(DataHandler handler) {
        this.handler = handler;
    }
    public void run() {
        char[] data = new char[10];
        for(;;) {
            try {
                fill(data);
                handler.write(getName(), data);
                Thread.sleep((long)(Math.random()*1000+100));
            }
            catch(InterruptedException ie) {
                break;
            }
        }
    }
    // 產生一個A-Z隨機字符,填入char[10]:
    private void fill(char[] data) {
        char c = (char)(Math.random()*26+'A');
        for(int i=0; i<data.length; i++)
            data[i] = c;
    }
}

最後Main負責啓動這些線程:

package com.crackj2ee.thread;
public class Main {
    public static void main(String[] args) {
        DataHandler handler = new DataHandler();
        Thread[] ts = new Thread[] {
                new ReadingThread(handler),
                new ReadingThread(handler),
                new ReadingThread(handler),
                new ReadingThread(handler),
                new ReadingThread(handler),
                new WritingThread(handler),
                new WritingThread(handler)
        };
        for(int i=0; i<ts.length; i++) {
            ts[i].start();
        }
    }
}

咱們啓動了5個讀線程和2個寫線程,運行結果以下:

Thread-0 waiting for read
Thread-1 waiting for read
Thread-2 waiting for read
Thread-3 waiting for read
Thread-4 waiting for read
Thread-5 waiting for write
Thread-6 waiting for write
Thread-4 reads data: AAAAAAAAAA
Thread-3 reads data: AAAAAAAAAA
Thread-2 reads data: AAAAAAAAAA
Thread-1 reads data: AAAAAAAAAA
Thread-0 reads data: AAAAAAAAAA
Thread-5 wrote data: EEEEEEEEEE
Thread-6 wrote data: MMMMMMMMMM
Thread-1 waiting for read
Thread-4 waiting for read
Thread-1 reads data: MMMMMMMMMM
Thread-4 reads data: MMMMMMMMMM
Thread-2 waiting for read
Thread-2 reads data: MMMMMMMMMM
Thread-0 waiting for read
Thread-0 reads data: MMMMMMMMMM
Thread-4 waiting for read
Thread-4 reads data: MMMMMMMMMM
Thread-2 waiting for read
Thread-5 waiting for write
Thread-2 reads data: MMMMMMMMMM
Thread-5 wrote data: GGGGGGGGGG
Thread-6 waiting for write
Thread-6 wrote data: AAAAAAAAAA
Thread-3 waiting for read
Thread-3 reads data: AAAAAAAAAA


能夠看到,每次讀/寫都是完整的原子操做,由於咱們每次寫入的都是10個相同字符。而且,每次讀出的都是最近一次寫入的內容。

若是去掉ReadWriteLock:

package com.crackj2ee.thread;
public class DataHandler {

    // store data:
    private char[] buffer = "AAAAAAAAAA".toCharArray();

    public char[] read(String name) throws InterruptedException {
        char[] data = doRead();
        System.out.println(name + " reads data: " + new String(data));
        return data;
    }
    public void write(String name, char[] data) throws InterruptedException {
        System.out.println(name + " wrote data: " + new String(data));
        doWrite(data);
    }

    private char[] doRead() {
        char[] ret = new char[10];
        for(int i=0; i<10; i++) {
            ret[i] = buffer[i];
            sleep(3);
        }
        return ret;
    }
    private void doWrite(char[] data) {
        for(int i=0; i<10; i++) {
            buffer[i] = data[i];
            sleep(10);
        }
    }
    private void sleep(int ms) {
        try {
            Thread.sleep(ms);
        }
        catch(InterruptedException ie) {}
    }
}

運行結果以下:

Thread-5 wrote data: AAAAAAAAAA
Thread-6 wrote data: MMMMMMMMMM
Thread-0 reads data: AAAAAAAAAA
Thread-1 reads data: AAAAAAAAAA
Thread-2 reads data: AAAAAAAAAA
Thread-3 reads data: AAAAAAAAAA
Thread-4 reads data: AAAAAAAAAA
Thread-2 reads data: MAAAAAAAAA
Thread-3 reads data: MAAAAAAAAA
Thread-5 wrote data: CCCCCCCCCC
Thread-1 reads data: MAAAAAAAAA
Thread-0 reads data: MAAAAAAAAA
Thread-4 reads data: MAAAAAAAAA
Thread-6 wrote data: EEEEEEEEEE
Thread-3 reads data: EEEEECCCCC
Thread-4 reads data: EEEEEEEEEC
Thread-1 reads data: EEEEEEEEEE

能夠看到在Thread-6寫入EEEEEEEEEE的過程當中,3個線程讀取的內容是不一樣的。

思考

java的synchronized提供了最底層的物理鎖,要在synchronized的基礎上,實現本身的邏輯鎖,就必須仔細設計ReadWriteLock。

Q: lock.readLock()爲何不放入try{ } 內?
A: 由於readLock()會拋出InterruptedException,致使readingThreads++不執行,而readUnlock()在 finally{ } 中,致使readingThreads--執行,從而使readingThread狀態出錯。writeLock()也是相似的。

Q: preferWrite有用嗎?
A: 若是去掉preferWrite,線程安全不受影響。可是,若是讀取線程不少,上一個線程尚未讀取完,下一個線程又開始讀了,就致使寫入線程長時間沒法得到writeLock;若是寫入線程等待的不少,一個接一個寫,也會致使讀取線程長時間沒法得到readLock。preferWrite的做用是讓讀 /寫交替執行,避免因爲讀線程繁忙致使寫沒法進行和因爲寫線程繁忙致使讀沒法進行。

Q: notifyAll()換成notify()行不行?
A: 不能夠。因爲preferWrite的存在,若是一個線程剛讀取完畢,此時preferWrite=true,再notify(),若剛好喚醒的是一個讀線程,則while(writingThreads>0 || (preferWrite && waitingThreads>0))可能爲true致使該讀線程繼續等待,而等待寫入的線程也處於wait()中,結果全部線程都處於wait ()狀態,誰也沒法喚醒誰。所以,notifyAll()比notify()要來得安全。程序驗證notify()帶來的死鎖:

Thread-0 waiting for read
Thread-1 waiting for read
Thread-2 waiting for read
Thread-3 waiting for read
Thread-4 waiting for read
Thread-5 waiting for write
Thread-6 waiting for write
Thread-0 reads data: AAAAAAAAAA
Thread-4 reads data: AAAAAAAAAA
Thread-3 reads data: AAAAAAAAAA
Thread-2 reads data: AAAAAAAAAA
Thread-1 reads data: AAAAAAAAAA
Thread-5 wrote data: CCCCCCCCCC
Thread-2 waiting for read
Thread-1 waiting for read
Thread-3 waiting for read
Thread-0 waiting for read
Thread-4 waiting for read
Thread-6 wrote data: LLLLLLLLLL
Thread-5 waiting for write
Thread-6 waiting for write
Thread-2 reads data: LLLLLLLLLL
Thread-2 waiting for read
(運行到此不動了)

注意到這種死鎖是因爲全部線程都在等待別的線程喚醒本身,結果都沒法醒過來。這和兩個線程但願得到對方已有的鎖形成死鎖不一樣。所以多線程設計的難度遠遠高於單線程應用。設計模式

相關文章
相關標籤/搜索