Java線程同步的Monitor機制(Lock配合Condition)

Monitor模式是一種常見的並行開發機制, 一個Monitor實例能夠被多個線程安全使用, 全部的monitor下面的方法在運行時是互斥的, 這種互斥機制機制能夠用於一些特性, 例如讓線程等待某種條件, 在等待時線程會將CPU時間交出去, 可是在條件知足時確保從新得到CPU時間. 在條件達成時, 你能夠同時通知一個或多個線程. 這樣作有如下的優勢:java

  1. 全部的同步代碼都集中在一塊兒, 用戶不須要知道這是如何實現的
  2. 代碼不依賴於線程數量, 線程數量只取決於業務須要
  3. 不須要對某個互斥對象作釋放, 不存在忘記的風險

一個Monitor的結構是這樣的安全

public class SimpleMonitor {
    public method void testA(){
        //Some code
    }

    public method int testB(){
        return 1;
    }
}

使用Java代碼不能直接建立一個Monitor, 要實現Monitor, 須要使用Lock和Condition類. 通常使用的Lock是ReentrantLock, 例如fetch

public class SimpleMonitor {
    private final Lock lock = new ReentrantLock();

    public void testA() {
        lock.lock();

        try {
            //Some code
        } finally {
            lock.unlock();
        }
    }

    public int testB() {
        lock.lock();

        try {
            return 1;
        } finally {
            lock.unlock();
        }
    }
}

若是不須要判斷條件, 那麼用synchronized就能夠了. 在須要判斷條件的狀況下, 使用Lock的newCondition()方法建立Condition, 能夠經過Condition的await方法, 讓當前線程wait, 放棄cpu時間. 而後用signal或者signalAll方法讓線程從新得到CPU時間. signalAll方法會喚起全部wait在當前condition的線程. 下面是一個例子, 一個須要被多個線程使用的容量固定的buffer.this

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class BoundedBuffer {
    private final String[] buffer;
    private final int capacity;

    private int front;
    private int rear;
    private int count;

    private final Lock lock = new ReentrantLock();

    private final Condition notFull = lock.newCondition();
    private final Condition notEmpty = lock.newCondition();

    public BoundedBuffer(int capacity) {
        super();

        this.capacity = capacity;

        buffer = new String[capacity];
    }

    public void deposit(String data) throws InterruptedException {
        lock.lock();

        try {
            while (count == capacity) {
                notFull.await();
            }

            buffer[rear] = data;
            rear = (rear + 1) % capacity;
            count++;

            notEmpty.signal();
        } finally {
            lock.unlock();
        }
    }

    public String fetch() throws InterruptedException {
        lock.lock();

        try {
            while (count == 0) {
                notEmpty.await();
            }

            String result = buffer[front];
            front = (front + 1) % capacity;
            count--;

            notFull.signal();

            return result;
        } finally {
            lock.unlock();
        }
    }
}

代碼說明線程

  1. 這兩個方法經過lock互斥
  2. 而後經過兩個condition變量, 一個用於在buffer非空時等待, 一個用於buffer未滿時等待
  3. 上面使用while循環將await包圍, 這是爲了防止在使用Signal&Condition時產生signal stealers問題.
  4. 以上方法能夠安全地在多個線程中被調用

還有一個例子, 用於協調多個線程按固定順序進行輸出code

public class DemoLockCondition {
    private final Lock lock = new ReentrantLock();
    private final Condition one = lock.newCondition();
    private final Condition two = lock.newCondition();
    private final Condition three = lock.newCondition();

    public void action1() {
        while (true) {
            lock.lock();
            System.out.println("1");
            try {
                two.signal();
                one.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }
    }

    public void action2() {
        while (true) {
            lock.lock();
            System.out.println("2");
            try {
                three.signal();
                two.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }
    }

    public void action3() {
        while (true) {
            lock.lock();
            System.out.println("3");
            try {
                one.signal();
                three.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }
    }

    public static void main(String[] args) {
        DemoLockCondition demo = new DemoLockCondition();
        new Thread(()->demo.action1()).start();
        new Thread(()-> demo.action2()).start();
        new Thread(()-> demo.action3()).start();
    }
}

若是是使用wait()和notify()的話, 就要寫成這樣, 這種狀況下, 運行時notify()隨機通知的線程, 是有可能不知足而跳過的.對象

public class DemoThreadWait2 {
    private Object obj = 0;
    private int pos = 1;

    public void one(int i) {
        synchronized (obj) {
            if (pos == i) {
                System.out.println("T-" + i);
                pos = i % 3 + 1;
            } else {
                // System.out.println(".");
            }
            obj.notify();
            try {
                obj.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) {
        DemoThreadWait2 demo = new DemoThreadWait2();
        new Thread(()->{
            while(true) {
                demo.one(1);
            }
        }).start();

        new Thread(()->{
            while(true) {
                demo.one(2);
            }
        }).start();

        new Thread(()->{
            while(true) {
                demo.one(3);
            }
        }).start();
    }
}
相關文章
相關標籤/搜索