讀書筆記-並行api

Lock

Lock接口主要操做類是ReentrantLock,能夠起到synchronized的做用,另外也提供額外的功能。
用Lock重寫上一篇中的死鎖例子java

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class Resource {
    Lock lock=new ReentrantLock();
    int num=0;
    void doSome(){
        
    }
    public void deal(Resource res){
        while(true){
            boolean mylock=this.lock.tryLock();//嘗試得到當前Resource的鎖定
            boolean resLock=res.lock.tryLock();//嘗試得到傳入的Resource的鎖定
            try{
                if(mylock&&resLock){
                    res.doSome();
                    System.out.println(res+":"+this.num);
                    break;//退出循環
                }
            }finally{
                if(mylock)
                    this.lock.unlock();
                if(resLock)
                    res.lock.unlock();
            }
        }
    }
}

重寫後不會出現死鎖的緣由在於,當沒法同時得到兩個鎖定時,乾脆釋放已得到的鎖定。
上面代碼使用當前Resource的Lock的tryLock()方法嘗試得到鎖定,以及傳入Resource的Lock的tryLock()方法嘗試得到鎖定。只有當能夠得到兩個Resource的鎖定,才能執行res.doSome().最後不管什麼狀況,都要finally解除鎖定。segmentfault

ReadWriteLock

ReadWriteLock接口定義了讀取鎖定和寫入鎖定的行爲。可使用readLock(),writeLock()方法返回Lock操做對象。
ReentrantReadWriteLock是ReadWriteLock接口的主要操做類.
ReentrantReadWriteLock.readLock操做Lock接口,調用其lock()方法時,若沒有任何ReentrantReadWriteLock.writeLock調用過lock()方法,也就是沒有任何寫入鎖定時,才能夠取得讀取鎖定。
下面用ReadWriteLock試着寫一個ArrayListapi

import java.util.Arrays;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

public class MyArrayList<T> {
    private ReadWriteLock lock=new ReentrantReadWriteLock();
    private Object[] list;
    private int next=0;
    public MyArrayList(){
        list=new Object[16];
    }
    public void add(T obj){
        try{
            lock.writeLock().lock();//獲取寫入鎖定
            if(next==list.length)
                list=Arrays.copyOf(list, list.length*2);
            list[next++]=obj;
        }finally{
            lock.writeLock().unlock();//解除寫入鎖定
        }
    }
    @SuppressWarnings("unchecked")
    public T get(int index){
        try{
            lock.readLock().lock();//獲取讀取鎖定
            return (T) list[index];
        }finally{
            lock.readLock().unlock();//解除讀取鎖定
        }
    }
    public int size(){
        try{
            lock.readLock().lock();
            return next;
        }finally{
            lock.readLock().unlock();
        }
    }
}

重寫後的效果是dom

  • 如有線程調用add()方法進行寫入操做,先得到寫入鎖定。這時若是有其餘線程準備得到寫入鎖定或讀取鎖定,都必須等待前面的寫入鎖定解除。ide

  • 如有線程調用get()方法進行讀取操做,先得到讀取鎖定。這時若是有其餘線程準備得到讀取鎖定,則能夠得到;但若是是準備得到寫入鎖定,仍然要等待全部讀取鎖定解除。this

使用ReadWriteLock的好處在於,若是有兩個線程都想調用get()和size()方法,因爲鎖定的關係,其中一個線程只能等到另外一個線程解除鎖定。然而,這兩個方法都只是讀取對象狀態,若是隻是讀取操做,就能夠容許線程並行,這樣讀取效率將會提升。spa

Condition

Condition接口用來搭配Lock,最基本的用法就是達到Object的wait(),notify(),notifyAll()方法的做用。
下面用wait(),notify(),notifyAll()實現生產者與消費者.線程

  • 店員從生產者得到生產出的商品,消費者從店員取走商品3d

  • 若生產者生產速度較快,店員那可能有不少商品,店員會叫生產者停一下。過一段時間,店員那商品很少了,再通知生產者繼續生產code

  • 若消費者取走速度過快,店員那可能沒有商品可供取走,店員會叫消費者停一下。過一段時間,店員那有商品了,再通知消費者過來取

這裏假定店員那最多隻能放一件商品

public class Producer implements Runnable{
    private Clerk clerk;
    public Producer(Clerk clerk){
        this.clerk=clerk;
    }
    @Override
    public void run() {
        for(int i=0;i<10;i++){
            try {
                Thread.sleep((int)Math.random()*3000);
            } catch (InterruptedException e) {
                
            }
            clerk.setProduct(i);
        }
    }
}
public class Consumer implements Runnable{
    private Clerk clerk;
    public Consumer(Clerk clerk){
        this.clerk=clerk;
    }
    @Override
    public void run() {
        for(int i=0;i<10;i++){
            try {
                Thread.sleep((int)Math.random()*3000);
            } catch (InterruptedException e) {
                
            }
            clerk.getProduct();
        }
    }
}
public class Clerk extends Thread{
    private int product=-1;//沒有商品
    public synchronized void setProduct(int product){
        while(this.product!=-1){
            try {
                wait();//店員那有商品,生產者停一下
            } catch (InterruptedException e) {
                
            }
        }
        this.product=product;
        System.out.println("生產者生產商品"+this.product);
        notify();//通知等待集合(喚醒的多是消費者,也多是生產者)
    }
    public synchronized int getProduct(){
        while(this.product==-1){
            try {
                wait();//店員沒有商品,消費者停一下
            } catch (InterruptedException e) {
                
            }
        }
        int p=this.product;
        System.out.println("消費者消費商品"+this.product);
        this.product=-1;//商品已經被取走
        notify();
        return p;
    }
    public static void main(String[] args){
        Clerk clerk=new Clerk();
        new Thread(new Producer(clerk)).start();
        new Thread(new Consumer(clerk)).start();
    }
}
生產者生產商品0
消費者消費商品0
生產者生產商品1
消費者消費商品1
生產者生產商品2
消費者消費商品2
生產者生產商品3
消費者消費商品3
生產者生產商品4
消費者消費商品4
生產者生產商品5
消費者消費商品5
生產者生產商品6
消費者消費商品6
生產者生產商品7
消費者消費商品7
生產者生產商品8
消費者消費商品8
生產者生產商品9
消費者消費商品9

如今用Condition接口重寫

public class Clerk {
    private int product=-1;//沒有商品
    Lock lock=new ReentrantLock();
    private Condition condition=lock.newCondition();
    public void setProduct(int product){
        try{
            lock.lock();
            while(this.product!=-1){
                try {
                    condition.await();//店員那有商品,生產者停一下
                } catch (InterruptedException e) {
                    
                }
            }
            this.product=product;
            System.out.println("生產者生產商品"+this.product);
            condition.signal();//通知等待集合(喚醒的多是消費者,也多是生產者)
        }finally{
            lock.unlock();
        }
    }
    public int getProduct(){
        try{
            lock.lock();
            while(this.product==-1){
                try {
                    condition.await();//店員沒有商品,消費者停一下
                } catch (InterruptedException e) {
                    
                }
            }
            int p=this.product;
            System.out.println("消費者消費商品"+this.product);
            this.product=-1;//商品已經被取走
            condition.signal();
            return p;
        }finally{
            lock.unlock();
        }
    }
    public static void main(String[] args){
        Clerk clerk=new Clerk();
        new Thread(new Producer(clerk)).start();
        new Thread(new Consumer(clerk)).start();
    }
}

注意在多個生產者,消費者線程的狀況下,等待集合中二者都會有,而condition.signal()從等待集合中喚醒的具體對象是不肯定的。有可能消費者取走商品後,喚醒的仍是消費者,這時,消費者又會執行while循環,進入等待集合。
事實上,一個Condition對象能夠表示一個等待集合。這樣上面例子,能夠有兩個等待集合,一個給消費者用,一個給生產者用。生產者只會通知消費者的等待集合,消費者也只會通知生產者的等待集合。這樣效率會高些。

public class Clerk {
    ...
    private Condition producerCondition=lock.newCondition();//生產者的等待集合
    private Condition consumerCondition=lock.newCondition();//消費者的等待集合
    public void setProduct(int product){
        try{
            lock.lock();
            while(this.product!=-1){
                try {
                    producerCondition.await();//店員那有商品,生產者停一下
                } catch (InterruptedException e) {
                    
                }
            }
            this.product=product;
            System.out.println("生產者生產商品"+this.product);
            consumerCondition.signal();//喚醒消費者等待集合
        }finally{
            lock.unlock();
        }
    }
    public int getProduct(){
        try{
            lock.lock();
            while(this.product==-1){
                try {
                    consumerCondition.await();//店員沒有商品,消費者停一下
                } catch (InterruptedException e) {
                    
                }
            }
            int p=this.product;
            System.out.println("消費者消費商品"+this.product);
            this.product=-1;//商品已經被取走
            producerCondition.signal();//喚醒生產者等待集合
            return p;
        }finally{
            lock.unlock();
        }
    }
    ...
}

Executor

定義Executor接口的目的是將Runnable的指定與如何執行分離。它只定義了一個execute()方法。

public class Page{
    private Executor executor;
    public Page(Executor executor){
        this.executor=executor;
    }
    ...
    public void method1(){
        ...
        executor.execute(new Runnable(){
            @Override
            public void run(){
                ...
            }
        });
        ...
    }

}

public class DirectExecutor implements Executor{
    public void execute(Runnable r){
        r.run();
    }
}

調用

new Page(new DirectExecutor()).method1();

Executor api
圖片描述

ThreadPoolExecutor

像線程池這類服務,其實是定義在Executor接口的子接口ExecutorService中。通用的ExecutorService由抽象類AbstractExecutorService操做,若是須要線程池功能,可使用其子類ThreadPoolExecutor.
重寫上面executor例子

ExecutorService executorService=Executors.newCachedThreadPool();
new Page(executorService).method1();
executorService.shutdown();//在指定執行的Runnable都完成後,將ExecutorService關閉

Future與Callable

ExecutorService還定義了submit(),invokeAll(),invokeAny()等方法,這些方法出如今java.util.concurrent.Future,java.util.concurrent.Callable接口
Future定義的行爲就是讓你在未來取得結果。你能夠將想執行的工做交給Future,Future會使用另外一個線程處理,你能夠先作別的事情。過些時候,再調用Future的get()得到結果。
若是結果已經產生,get()會直接返回,不然會進入阻塞狀態直到結果返回。get()的另外一種重載方法能夠指定等待結果的時間,若指定時間內結果還沒產生,則拋出TimeoutException異常。也可使用Future的isDone()方法看結果是否產生。
Future常常與Callable一塊兒使用,Callable的做用與Runnable類似,都是用來定義執行的流程。

  • Runnable的run()方法無返回值,也沒法拋出異常

  • Callable的call()方法能夠有返回值,也能夠拋出異常

FutureTask是Future的操做類,建立時可傳入Callable對象指定執行的流程

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;

public class FutureTaskDemo {
    public static int fib(int n){
        return n<=1?n:fib(n-1)+fib(n-2);
    }
    public static void main(String[] args){
        FutureTask<Integer> task=new FutureTask<Integer>(
                new Callable<Integer>(){
                    @Override
                    public Integer call() throws Exception {
                        return fib(30);
                    }    
                }
        );
        new Thread(task).start();
        try {
            Thread.sleep(3000);
            System.out.println(task.get());
        } catch (InterruptedException|ExecutionException e) {
            
        }
    }
}

FutureTask構造類
圖片描述
FutureTask實現RunnableFuture接口RunnableFuture接口繼承Runnable,Future接口。因此能夠new Thread(task).
ExecutorService的submit()方法也能夠接受Callable對象,調用後返回Future對象。
圖片描述

ExecutorService service=Executors.newCachedThreadPool();
Future<Integer> future=service.submit(new Callable<Integer>(){
    @Override
    public Integer call() throws Exception {
        return fib(30);
    }    
});
  • 若是有多個Callable,能夠先將它們收集到Collection中,而後調用ExecutorService的invokeAll()方法,返回List<Future>

  • 若是有多個Callable,要求其中只要有一個執行完成就好了,則能夠先將它們收集到Collection中,而後調用ExecutorService的invokeAny()方法

ScheduledThreadPoolExecutor

ScheduledThreadPoolExecutor用來進行工做排程,其中的schedule()方法用來排定Runnable或Callable實例延遲多久執行一次,並返回Future子接口ScheduledFuture的實例。

import java.util.Date;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class ScheduledExecution {
    public static void main(String[] args){
        ScheduledExecutorService service=Executors.newSingleThreadScheduledExecutor();
        service.scheduleWithFixedDelay(new Runnable(){
            public void run(){
                System.out.println(new Date());
                try {
                    Thread.sleep(2000);//假設工做會執行2s
                } catch (InterruptedException e) {
                    
                }
            }
        }, 2000, 1000, TimeUnit.MILLISECONDS);
    }
}
Sat Oct 24 17:11:59 CST 2015
Sat Oct 24 17:12:02 CST 2015
Sat Oct 24 17:12:05 CST 2015
Sat Oct 24 17:12:08 CST 2015
Sat Oct 24 17:12:11 CST 2015

能夠看到,輸出兩兩間相差3s.
scheduleWithFixedDelay()方法參數
圖片描述
若是把方法換成scheduleAtFixedRate()

Sat Oct 24 17:28:28 CST 2015
Sat Oct 24 17:28:30 CST 2015
Sat Oct 24 17:28:32 CST 2015
Sat Oct 24 17:28:34 CST 2015

每次排定的執行週期是1s,可是工做執行的時間是2s,會超過排定的執行週期,因此輸出兩兩間相差2s。

ForkJoinPool

Future的另外一個操做類ForkJoinTask,與ExecutorService的另外一個操做類ForkJoinPool有關,它們都是jdk7新增的api,用來解決分而治之的問題。

  • ForkJoinTask操做Future接口,能夠在將來得到耗時工做的執行結果

  • ForkJoinPool管理ForkJoinTask,調用fork()方法,可讓另外一個線程執行ForkJoinTask

  • 若是要得到ForkJoinTask的執行結果,能夠調用join()方法。若是執行結果還沒產生,會阻塞直至有執行結果返回

使用ForkJoinTask的子類RecursiveTask,它是個抽象類,使用時必須繼承它,並操做compute()方法。

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;

public class FibDemo extends RecursiveTask<Integer>{
    final int n;
    FibDemo(int n){
        this.n=n;
    }
    public static int fib(int n){
        return n<=1?n:fib(n-1)+fib(n-2);
    }
    @Override
    protected Integer compute() {
        if(n<=10){
            return fib(n);
        }
        FibDemo f1=new FibDemo(n-1);
        f1.fork();//ForkJoinPool分配線程執行子任務
        FibDemo f2=new FibDemo(n-2);
        return f2.compute()+f1.join();//執行f2子任務+得到f1子任務進行完成的結果
    }
    public static void main(String[] args){
        FibDemo fib=new FibDemo(40);
        ForkJoinPool pool=new ForkJoinPool();
        System.out.println(pool.invoke(fib));
    }
}
相關文章
相關標籤/搜索