計算機程序的思惟邏輯 (68) - 線程的基本協做機制 (下)

本系列文章經補充和完善,已修訂整理成書《Java編程的邏輯》(馬俊昌著),由機械工業出版社華章分社出版,於2018年1月上市熱銷,讀者好評如潮!各大網店和書店有售,歡迎購買:京東自營連接 html

本節繼續上節的內容,探討如何使用wait/notify實現更多的協做場景。java

同時開始

同時開始,相似於運動員比賽,在聽到比賽開始槍響後同時開始,下面,咱們模擬下這個過程,這裏,有一個主線程和N個子線程,每一個子線程模擬一個運動員,主線程模擬裁判,它們協做的共享變量是一個開始信號。咱們用一個類FireFlag來表示這個協做對象,代碼以下所示:git

static class FireFlag {
    private volatile boolean fired = false;

    public synchronized void waitForFire() throws InterruptedException {
        while (!fired) {
            wait();
        }
    }

    public synchronized void fire() {
        this.fired = true;
        notifyAll();
    }
}
複製代碼

子線程應該調用waitForFire()等待槍響,而主線程應該調用fire()發射比賽開始信號。github

表示比賽運動員的類以下:編程

static class Racer extends Thread {
    FireFlag fireFlag;

    public Racer(FireFlag fireFlag) {
        this.fireFlag = fireFlag;
    }

    @Override
    public void run() {
        try {
            this.fireFlag.waitForFire();
            System.out.println("start run "
                    + Thread.currentThread().getName());
        } catch (InterruptedException e) {
        }
    }
}
複製代碼

主程序代碼以下所示:swift

public static void main(String[] args) throws InterruptedException {
    int num = 10;
    FireFlag fireFlag = new FireFlag();
    Thread[] racers = new Thread[num];
    for (int i = 0; i < num; i++) {
        racers[i] = new Racer(fireFlag);
        racers[i].start();
    }
    Thread.sleep(1000);
    fireFlag.fire();
}
複製代碼

這裏,啓動了10個子線程,每一個子線程啓動後等待fire信號,主線程調用fire()後各個子線程纔開始執行後續操做。微信

等待結束

理解join

理解Synchronized一節中咱們使用join方法讓主線程等待子線程結束,join實際上就是調用了wait,其主要代碼是:框架

while (isAlive()) {
    wait(0);
}
複製代碼

只要線程是活着的,isAlive()返回true,join就一直等待。誰來通知它呢?當線程運行結束的時候,Java系統調用notifyAll來通知。dom

使用協做對象

使用join有時比較麻煩,須要主線程逐一等待每一個子線程。這裏,咱們演示一種新的寫法。主線程與各個子線程協做的共享變量是一個數,這個數表示未完成的線程個數,初始值爲子線程個數,主線程等待該值變爲0,而每一個子線程結束後都將該值減一,當減爲0時調用notifyAll,咱們用MyLatch來表示這個協做對象,示例代碼以下:異步

public class MyLatch {
    private int count;

    public MyLatch(int count) {
        this.count = count;
    }

    public synchronized void await() throws InterruptedException {
        while (count > 0) {
            wait();
        }
    }

    public synchronized void countDown() {
        count--;
        if (count <= 0) {
            notifyAll();
        }
    }
}
複製代碼

這裏,MyLatch構造方法的參數count應初始化爲子線程的個數,主線程應該調用await(),而子線程在執行完後應該調用countDown()。

工做子線程的示例代碼以下:

static class Worker extends Thread {
    MyLatch latch;

    public Worker(MyLatch latch) {
        this.latch = latch;
    }

    @Override
    public void run() {
        try {
            // simulate working on task
            Thread.sleep((int) (Math.random() * 1000));

            this.latch.countDown();
        } catch (InterruptedException e) {
        }
    }
}
複製代碼

主線程的示例代碼以下:

public static void main(String[] args) throws InterruptedException {
    int workerNum = 100;
    MyLatch latch = new MyLatch(workerNum);
    Worker[] workers = new Worker[workerNum];
    for (int i = 0; i < workerNum; i++) {
        workers[i] = new Worker(latch);
        workers[i].start();
    }
    latch.await();

    System.out.println("collect worker results");
}
複製代碼

MyLatch是一個用於同步協做的工具類,主要用於演示基本原理,在Java中有一個專門的同步類CountDownLatch,在實際開發中應該使用它,關於CountDownLatch,咱們會在後續章節介紹。

MyLatch的功能是比較通用的,它也能夠應用於上面"同時開始"的場景,初始值設爲1,Racer類調用await(),主線程調用countDown()便可,以下所示:

public class RacerWithLatchDemo {
    static class Racer extends Thread {
        MyLatch latch;

        public Racer(MyLatch latch) {
            this.latch = latch;
        }

        @Override
        public void run() {
            try {
                this.latch.await();
                System.out.println("start run "
                        + Thread.currentThread().getName());
            } catch (InterruptedException e) {
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        int num = 10;
        MyLatch latch = new MyLatch(1);
        Thread[] racers = new Thread[num];
        for (int i = 0; i < num; i++) {
            racers[i] = new Racer(latch);
            racers[i].start();
        }
        Thread.sleep(1000);
        latch.countDown();
    }
}
複製代碼

異步結果

在主從模式中,手工建立線程每每比較麻煩,一種常見的模式是異步調用,異步調用返回一個通常稱爲Promise或Future的對象,經過它能夠得到最終的結果。在Java中,表示子任務的接口是Callable,聲明爲:

public interface Callable<V> {
    V call() throws Exception;
}
複製代碼

爲表示異步調用的結果,咱們定義一個接口MyFuture,以下所示:

public interface MyFuture <V> {
    V get() throws Exception ;
}
複製代碼

這個接口的get方法返回真正的結果,若是結果尚未計算完成,get會阻塞直到計算完成,若是調用過程發生異常,則get方法拋出調用過程當中的異常。

爲方便主線程調用子任務,咱們定義一個類MyExecutor,其中定義一個public方法execute,表示執行子任務並返回異步結果,聲明以下:

public <V> MyFuture<V> execute(final Callable<V> task) 複製代碼

利用該方法,對於主線程,它就不須要建立並管理子線程了,而且能夠方便地獲取異步調用的結果,好比,在主線程中,能夠相似這樣啓動異步調用並獲取結果:

public static void main(String[] args) {
    MyExecutor executor = new MyExecutor();
    // 子任務
    Callable<Integer> subTask = new Callable<Integer>() {

        @Override
        public Integer call() throws Exception {
            // ... 執行異步任務
            int millis = (int) (Math.random() * 1000);
            Thread.sleep(millis);
            return millis;
        }
    };
    // 異步調用,返回一個MyFuture對象
    MyFuture<Integer> future = executor.execute(subTask);
    // ... 執行其餘操做
    try {
        // 獲取異步調用的結果
        Integer result = future.get();
        System.out.println(result);
    } catch (Exception e) {
        e.printStackTrace();
    }
}
複製代碼

MyExecutor的execute方法是怎麼實現的呢?它封裝了建立子線程,同步獲取結果的過程,它會建立一個執行子線程,該子線程的代碼以下所示:

static class ExecuteThread<V> extends Thread {
    private V result = null;
    private Exception exception = null;
    private boolean done = false;
    private Callable<V> task;
    private Object lock;
    
    public ExecuteThread(Callable<V> task, Object lock) {
        this.task = task;
        this.lock = lock;
    }

    @Override
    public void run() {
        try {
            result = task.call();
        } catch (Exception e) {
            exception = e;
        } finally {
            synchronized (lock) {
                done = true;
                lock.notifyAll();
            }
        }
    }

    public V getResult() {
        return result;
    }

    public boolean isDone() {
        return done;
    }

    public Exception getException() {
        return exception;
    }
}
複製代碼

這個子線程執行實際的子任務,記錄執行結果到result變量、異常到exception變量,執行結束後設置共享狀態變量done爲true並調用notifyAll以喚醒可能在等待結果的主線程。

MyExecutor的execute的方法的代碼爲:

public <V> MyFuture<V> execute(final Callable<V> task) {
    final Object lock = new Object();
    final ExecuteThread<V> thread = new ExecuteThread<>(task, lock);
    thread.start();

    MyFuture<V> future = new MyFuture<V>() {
        @Override
        public V get() throws Exception {
            synchronized (lock) {
                while (!thread.isDone()) {
                    try {
                        lock.wait();
                    } catch (InterruptedException e) {
                    }
                }
                if (thread.getException() != null) {
                    throw thread.getException();
                }
                return thread.getResult();
            }
        }
    };
    return future;
}
複製代碼

execute啓動一個線程,並返回MyFuture對象,MyFuture的get方法會阻塞等待直到線程運行結束。

以上的MyExecutore和MyFuture主要用於演示基本原理,實際上,Java中已經包含了一套完善的框架Executors,相關的部分接口和類有:

  • 表示異步結果的接口Future和實現類FutureTask
  • 用於執行異步任務的接口Executor、以及有更多功能的子接口ExecutorService
  • 用於建立Executor和ExecutorService的工廠方法類Executors

後續章節,咱們會詳細介紹這套框架。

集合點

各個線程先是分頭行動,而後各自到達一個集合點,在集合點須要集齊全部線程,交換數據,而後再進行下一步動做。怎麼表示這種協做呢?協做的共享變量依然是一個數,這個數表示未到集合點的線程個數,初始值爲子線程個數,每一個線程到達集合點後將該值減一,若是不爲0,表示還有別的線程未到,進行等待,若是變爲0,表示本身是最後一個到的,調用notifyAll喚醒全部線程。咱們用AssemblePoint類來表示這個協做對象,示例代碼以下:

public class AssemblePoint {
    private int n;

    public AssemblePoint(int n) {
        this.n = n;
    }

    public synchronized void await() throws InterruptedException {
        if (n > 0) {
            n--;
            if (n == 0) {
                notifyAll();
            } else {
                while (n != 0) {
                    wait();
                }
            }
        }
    }
}
複製代碼

多個遊客線程,各自先獨立運行,而後使用該協做對象到達集合點進行同步的示例代碼以下:

public class AssemblePointDemo {
    static class Tourist extends Thread {
        AssemblePoint ap;

        public Tourist(AssemblePoint ap) {
            this.ap = ap;
        }

        @Override
        public void run() {
            try {
                // 模擬先各自獨立運行
                Thread.sleep((int) (Math.random() * 1000));

                // 集合
                ap.await();
                System.out.println("arrived");
                // ... 集合後執行其餘操做
            } catch (InterruptedException e) {
            }
        }
    }

    public static void main(String[] args) {
        int num = 10;
        Tourist[] threads = new Tourist[num];
        AssemblePoint ap = new AssemblePoint(num);
        for (int i = 0; i < num; i++) {
            threads[i] = new Tourist(ap);
            threads[i].start();
        }
    }
}
複製代碼

這裏實現的是AssemblePoint主要用於演示基本原理,Java中有一個專門的同步工具類CyclicBarrier能夠替代它,關於該類,咱們後續章節介紹。

小結

上節和本節介紹了Java中線程間協做的基本機制wait/notify,協做關鍵要想清楚協做的共享變量和條件是什麼,爲進一步理解,針對多種協做場景,咱們演示了wait/notify的用法及基本協做原理,Java中有專門爲協做而建的阻塞隊列、同步工具類、以及Executors框架,咱們會在後續章節介紹,在實際開發中,應該儘可能使用這些現成的類,而非從新發明輪子。

以前,咱們屢次碰到了InterruptedException並選擇了忽略,如今是時候進一步瞭解它了。

(與其餘章節同樣,本節全部代碼位於 github.com/swiftma/pro…)


未完待續,查看最新文章,敬請關注微信公衆號「老馬說編程」(掃描下方二維碼),深刻淺出,老馬和你一塊兒探索Java編程及計算機技術的本質。用心原創,保留全部版權。

相關文章
相關標籤/搜索