Java 併發設計模式

Java 併發設計模式

1、Thread Local Storage 模式

1. ThreadLocal 的使用

Thread Local Storage 表示線程本地存儲模式。java

大多數併發問題都是因爲變量的共享致使的,多個線程同時讀寫同一變量便會出現原子性,可見性等問題。局部變量是線程安全的,本質上也是因爲各個線程各自擁有本身的變量,避免了變量的共享。設計模式

Java 中使用了 ThreadLocal 來實現避免變量共享的方案。ThreadLocal 保證在線程訪問變量時,會建立一個這個變量的副本,這樣每一個線程都有本身的變量值,沒有共享,從而避免了線程不安全的問題。緩存

下面是 ThreadLocal 的一個簡單使用示例:安全

public class ThreadLocalTest {

    private static final ThreadLocal<SimpleDateFormat> threadLocal =
            ThreadLocal.withInitial(() -> new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"));

    public static SimpleDateFormat safeDateFormat() {
        return threadLocal.get();
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        FutureTask<SimpleDateFormat> task1 = new FutureTask<>(ThreadLocalTest::safeDateFormat);
        FutureTask<SimpleDateFormat> task2 = new FutureTask<>(ThreadLocalTest::safeDateFormat);

        Thread t1 = new Thread(task1);
        Thread t2 = new Thread(task2);
        t1.start();
        t2.start();
        System.out.println(task1.get() == task2.get());//返回false,表示兩個對象不相等
    }
}

程序中構造了一個線程安全的 SimpleDateFormat ,兩個線程取到的是不一樣的示例對象,這樣就保證了線程安全。多線程

2. ThreadLocal 原理淺析

線程 Thread 類內部有兩個 ThreadLocalMap 類型的變量:併發

/* ThreadLocal values pertaining to this thread. This map is maintained
 * by the ThreadLocal class. */
ThreadLocal.ThreadLocalMap threadLocals = null;

/*
 * InheritableThreadLocal values pertaining to this thread. This map is
 * maintained by the InheritableThreadLocal class.
 */
ThreadLocal.ThreadLocalMap inheritableThreadLocals = null;

其中第二個變量的用途是建立可繼承父線程變量的子線程,只不過這並不經常使用,主要介紹第一個。異步

ThreadLocalMap 是一個用於存儲 ThreadLocal 的特殊 HashMap,map 中 key 就是 ThreadLocal,value 是線程變量值。只不過這個 map 並不被 ThreadLocal 持有,而是被 Thread 持有。工具

當調用 ThreadLocal 類中的 set 方法時,就會建立 Thread 中的 threadLocals 屬性。性能

//ThreadLocal的set方法
public void set(T value) {
    Thread t = Thread.currentThread();
    ThreadLocalMap map = getMap(t);//獲取Thread中的ThreadLocalMap
    if (map != null)
        map.set(this, value);
    else
        createMap(t, value);
}

能夠看到,最終的 ThreadLocal 對象和變量值並非建立在 ThreadLocal 內部,而是 Thread 中的 ThreadLocalMap,ThreadLocal 在這裏只是充當了代理的做用。this

3. ThreadLocal 內存泄漏問題

存儲數據的 TheadLocalMap 被 Thread 持有,而不是 ThreadLocal,主要的緣由即是 ThreadLocal 的生命週期比 Thread 要長,若是 ThreadLocal 對象一直存在,那麼 map 中的線程就不能被回收,容易致使內存泄漏。

而 Thread 持有 ThreadLocalMap,而且 ThreadLocalMap 對 ThreadLocal 的引用仍是弱引用,這樣當線程被回收時,map 也可以被回收,更加安全。

可是 Java 的這種設計並無徹底避免內存泄漏問題。若是線程池中的線程存活時間過長,那麼其持有的 ThreadLocalMap 一直不會被釋放。ThreadLocalMap 中的 Entry 對其 value 是強引用的(對 ThreadLocal 是弱引用),這樣就算 ThreadLocalMap 的生命週期結束了,可是 value 值並無被回收。

解決的辦法即是手動釋放 ThreadLocalMap 中對 value 的強引用,可使用 TheadLocal 的 remove 方法。在 finally 語句塊中執行。例以下面這個簡單的示例:

public class ThreadLocalTest {

    private final ThreadLocal<Integer> threadLocal = new ThreadLocal<>();

    public void test(){
        //設置變量值
        threadLocal.set(10);
        try {
            System.out.println(threadLocal.get());
        }
        finally {
            //釋放
            threadLocal.remove();
        }
    }
}

2、Immutability 模式

1. 不可變的概念

Immutability,即不變模式。能夠理解爲只要對象一經建立,其狀態是不可以被改變的,沒法進行寫操做。

要實現 Immuatability 模式很簡單,將一個類自己及其全部的屬性都設爲 final ,而且方法都是隻讀的,須要注意的是,若是類的屬性也是引用類型,那麼其對應的類也要知足不可變的特性。final 應該都很熟悉了,用它來修飾類和方法,分別表示類不可繼承、屬性不可改變。

Java 中具有不可變性的類型包括:

  • String
  • final 修飾的基本數據類型
  • Integer、Long、Double 等基本數據類型的包裝類
  • Collections 中的不可變集合

具有不可變性的類,若是須要有相似修改這樣的功能,那麼它不會像普通的對象同樣改變本身的屬性,而是建立新的對象。

下面是 String 的字符串鏈接方法 concat() 的源碼,仔細觀察,能夠看到最後方法返回的時候,建立了一個新的 Sring 對象:

public String concat(String str) {
    int otherLen = str.length();
    if (otherLen == 0) {
        return this;
    }
    int len = value.length;
    char buf[] = Arrays.copyOf(value, len + otherLen);
    str.getChars(buf, len);
    //建立新的對象
    return new String(buf, true);
}

而 Collections 工具能夠將集合變爲不可變的,徹底禁止寫、修改等操做。示例以下:

//Collections 中構建不可變集合的方法
Collections.unmodifiableList();
Collections.unmodifiableSet();
Collections.unmodifiableMap();
Collections.unmodifiableSortedSet();
Collections.unmodifiableSortedMap();
---
List<Integer> list = Arrays.asList(1, 2, 3, 4, 5);
//構建不可變集合
List<Integer> unmodifiableList = Collections.unmodifiableList(list);

unmodifiableList.remove(1);//拋出異常

2. 對象池

對於一個不可變性的類,若是頻繁的對其進行修改操做,那麼一直會建立性新的對象,這樣就比較浪費內存空間了,一種解決辦法即是利用對象池。

原理也很簡單,新建對象的時候,去對象池看是否存在對象,若是存在則直接利用,若是不存在纔會建立新的對象,建立以後再將對象放到對象池中。

以長整型的包裝類 Long 爲例,它緩存了 -128 到 127 的數據,若是建立的是這個區間的對象,那麼會直接使用緩存中的對象。例如 Long 中的 valueOf 方法就用到了這個緩存,而後直接返回:

public static Long valueOf(long l) {
    final int offset = 128;
    //在這個區間則直接使用緩存中的對象
    if (l >= -128 && l <= 127) { // will cache
        return LongCache.cache[(int)l + offset];
    }
    return new Long(l);
}

3、Guarded Suspension 模式

1. Guarded Suspension 實現

Guarded Suspension 意爲保護性暫停。一個典型的使用場景是:當客戶端線程 T 發送請求後,服務端這時有大量的請求須要處理,這時候就須要排隊,線程 T 進入等待狀態,直到服務端處理完請求而且返回結果。

Guarded Suspension 的實現很簡單,有一個對象 GuardedObject,其內部有一個屬性,即被保護的對象,還有兩個方法,客戶端調用 get() 方法,若是未獲取到結果,則進入等待狀態,即「保護性暫停」;還有一個 notice() 通知方法,當服務端處理完請求後,調用這個方法,而且喚醒等待中的線程。示意圖以下:

在這裏插入圖片描述

示例代碼以下:

public class GuardedObject<T> {

    private T obj;
    private final Lock lock = new ReentrantLock();
    private final Condition finished = lock.newCondition();

    //調用方線程獲取結果
    T get(){
        lock.lock();
        try {
            while (未獲取到結果){
                finished.await();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        finally {
            lock.unlock();
        }
        return obj;
    }

    //執行完後通知
    void notice(T obj){
        lock.lock();
        try {
            this.obj = obj;
            finished.signalAll();
        }
        finally {
            lock.unlock();
        }
    }
}

從代碼中能夠看到,Guarded Suspension 模式本質上就是一種等待-通知機制,只不過使用這種模式,在解決實際的問題的時候,須要根據狀況進行程序功能的擴展。

2. 使用示例

仍是上面提到的那個例子,當客戶端發送請求後,須要等待服務端的響應結果,這時候就可使用 Guarded Suspension 來實現,下面是代碼示例:

public class SendRequest<T> {

    //至關於消息隊列
    private final BlockingQueue<Request> queue = new ArrayBlockingQueue<>(5);

    //客戶端發送請求
    void send(Request request) throws InterruptedException {
        //將消息存放至隊列中
        queue.put(request);
        //建立Guarded Suspension模式的對象
        GuardedObject<Request> guardedObject = GuardedObject.create(request.id);
        //循環等待,獲取結果
        Request res = guardedObject.get(Objects::nonNull);
    }

    //服務端處理請求
    void handle() throws InterruptedException {
        //從隊列中獲取請求
        Request request = queue.take();
        //調用請求對應的GuardedObject,並處理請求
        GuardedObject.handleRequest(request.id, request);
    }
    
    //請求類
    private static class Request{
        private int id;
        private String content;
    }
}

須要注意的是,這裏並非直接使用 new GuardedObject() 的方式來建立對象,這是由於須要找到每一個請求和對象之間的對應關係,因此 GuardedObject 內部使用了一個 map 來保存對象,key 是對應的請求 id。

GuardedObject 類代碼以下:

public class GuardedObject<T> {

    private T obj;

    private final Lock lock = new ReentrantLock();
    private final Condition finished = lock.newCondition();

    private static final ConcurrentHashMap<Integer, GuardedObject> map = new ConcurrentHashMap<>();

    //建立對象
    public static GuardedObject create(int id){
        GuardedObject guardedObject = new GuardedObject();
        //保存對象和請求的對應關係
        map.put(id, guardedObject);
        return guardedObject;
    }

    //處理請求
    public static void handleRequest(int id, Object obj){
        GuardedObject guardedObject = map.remove(id);
        if (guardedObject != null){
            
            //具體的處理邏輯省略
            
            //處理完後通知
            guardedObject.notice(obj);
        }
    }

    //調用方線程獲取結果
    T get(Predicate<T> p){
        lock.lock();
        try {
            while (!p.test(obj)){
                finished.await();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        finally {
            lock.unlock();
        }
        return obj;
    }

    //執行完後通知
    void notice(T obj){
        lock.lock();
        try {
            this.obj = obj;
            finished.signalAll();
        }
        finally {
            lock.unlock();
        }
    }
}

4、Balking 模式

Balking 模式的典型應用場景是,業務邏輯依賴於某個條件變量的狀態,所以這種模式又能夠理解爲多線程版本的 if。

public class BalkingTest {

    private boolean flag = false;

    public void execute(){
        if (!flag){
            return;
        }
        //具體的執行操做省略
        flag = false;
    }

    public void test(){
        //省略業務代碼若干
        flag = true;
    }
}

例如上面這個例子,一段業務邏輯會改變 flag 的值,另外一個方法會根據 flag 的值來決定是否繼續執行。

這個程序並非線程安全的,解決的辦法也很簡單,就是加互斥鎖,而後能夠將改變 flag 值的邏輯單獨拿出來,以下:

public class BalkingTest {
    private boolean flag = false;

    public synchronized void execute(){
        if (!flag){
            return;
        }
        //具體的執行操做省略
        flag = false;
    }

    public void test(){
        //省略業務代碼若干
        change();
    }
    
    public synchronized void change(){
        flag = true;
    }
}

Balking 模式通常可使用互斥鎖來實現,而且能夠將對條件變量的改變的邏輯和業務邏輯進行分離,這樣可以減少鎖的粒度,提高性能。Balking 模式大多應用於須要快速失敗的場景,即當條件變量不知足,則直接失敗。這也是它和 Guarded Suspension 模式的區別,由於 Guarded Suspension 模式在條件不知足的時候,會一直等待條件知足。

5、Worker - Thread 模式

Worker Thread 模式,對應到現實世界,相似工廠中的工人作任務,當有任務的時候,工人取出任務執行。

解決的辦法是使用線程池,而且使用一個阻塞隊列來存儲任務,線程池中的線程從隊列中取出任務執行。線程池的使用須要注意幾點:

  • 任務隊列儘可能使用有界隊列,避免任務過多形成 OOM。
  • 應該明確指定拒絕策略,能夠根據實際狀況實現 RejectedExecutionHandler 接口自定義拒絕策略。
  • 應該給線程指定一個有意義的名字,最好和業務相關。
  • 爲不一樣的任務建立不一樣的線程池,這樣可以有效的避免死鎖問題。

6、Two - Phase Termination 模式

1. 兩階段終止概念

Two - Phase Termination,即兩階段終止,主要是爲解決如何正確的終止一個線程,這裏說的是一個線程終止另外一個線程,而不是線程終止本身。

Java 中的線程提供了一個 stop() 方法用來終止線程,這不過這個方法會直接將線程殺死,風險過高,而且這個方法已經被標記爲廢棄,不建議使用了。

兩階段終止,即將線程的結束分爲了兩個階段,第一個階段是一個線程 T1 向另外一個線程 T2 發送終止指令,第二個階段是線程 T2 響應終止指令。

根據 Java 的線程狀態,線程若是要進入 TERMINATED 狀態則必須先進入 RUNNABLE 狀態,而處於 RUNNABLE 狀態的線程有可能轉換到休眠狀態。

Java 的線程提供了 interrupt() 方法,這個方法的做用即是將線程的狀態從休眠狀態轉換到 RUNNABLE 狀態。

切換到 RUNNABLE 狀態以後,線程有兩種方式能夠終止,一是執行完 run() 方法,自動進入終止狀態;二是設置一個標誌,線程若是檢測到這個標誌,則退出 run() 方法,這就是兩階段終止的響應終止指令。

2. 程序示例

下面是一個簡單的使用 interrupt() 方法和中斷標誌位來終止線程的示例:

public class Test {
    public static void main(String[] args) {

        Thread thread = new Thread(() -> {
            //檢測到中斷則退出
            while (!Thread.currentThread().isInterrupted()){
                try {
                    Thread.sleep(3000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                    //從新設置中斷標誌
                    Thread.currentThread().interrupt();
                }
                System.out.println("I am roseduan");
            }
        });
        thread.start();
        thread.interrupt();
    }
}

程序要每隔三秒打印語句,可是線程啓動以後就直接調用了 interrupt() 方法,因此線程直接退出了。須要注意的是這裏在捕獲異常以後,須要從新設置線程的中斷狀態,由於 JVM 的異常處理會清除線程的中斷狀態。

在實際的生產中,並不推薦使用這種方式,由於在 Thread 內部可能會調用其餘的方法,而其餘的方法並不可以保證正確的處理了線程中斷,解決的辦法即是自定義一個線程的中斷標誌,以下所示:

public class Test {
    //自定義中斷標誌
    private volatile boolean isTerminated = false;
    private Thread thread;

    public synchronized void start(){
        thread = new Thread(() -> {
            //檢測到中斷則退出
            while (!isTerminated) {
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                    //從新設置中斷狀態
                    Thread.currentThread().interrupt();
                }
                System.out.println("I am roseduan");
            }
            isTerminated = false;
        });
        thread.start();
    }

    //線程終止方法
    public synchronized void stop(){
        isTerminated = true;
        thread.interrupt();
    }
}

3. 終止線程池

Java 中並不太會顯式的建立和終止一個線程,使用更多的是線程池。

Java 中的線程池提供了兩個方法來終止,分別是 shutdown() 和 shutdownNow() ,兩個方法的區別以下:

  • shutdown():拒絕新的任務,等待正在執行的和已經在阻塞隊列中的任務執行完後,再關閉線程池
  • shutdownNow():直接關閉線程池,拒絕新的任務,而且中斷正在執行的任務,已經在阻塞隊列中的任務也不會被執行了。

7、Producer - Consumer 模式

這是較爲經常使用的生產者 - 消費者模式,Java 中的線程池就使用了這種模式,線程的使用方是生產者,提供任務,線程池自己是消費者,取出並執行任務。

生產者 - 消費者模式使用了一個任務隊列,生產者將任務添加到隊列中,消費者從隊列中取出任務執行。

這樣的設計的目的有三個:

  • 解耦,生產者和消費者之間沒有直接的關聯,而是經過隊列進行通訊。
  • 其次能夠實現異步,例如生產者能夠不用管消費者的行爲,直接將任務添加到隊列中。消費者也能夠不在意生產者,直接從隊列中取任務。
  • 最後,能夠平衡生產者和消費者之間的速度差別。

下面是一個簡單的生產者 - 消費者程序示例:

public class ProducerConsumerTest {

    private BlockingQueue<Task> queue = new LinkedBlockingQueue<>(100);

    public void produce() {
        queue.add(new Task());

    }

    public void consume() {
        Task task = queue.poll();
        while (task != null){
            task.execute();
            task = queue.poll();
        }
        System.out.println("沒有任務了");
    }

    public static void main(String[] args) throws InterruptedException {
        Test test = new Test();
        //生產者線程,建立10個任務
        Thread producer = new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                test.produce();
            }
        });
        producer.start();
        producer.join();

        //消費者線程
        Thread consumer = new Thread(test::consume);
        consumer.start();

    }
}

class Task{
    public void execute(){
        System.out.println("執行任務");
    }
}
相關文章
相關標籤/搜索