Lock接口主要操做類是ReentrantLock
,能夠起到synchronized的做用,另外也提供額外的功能。
用Lock重寫上一篇中的死鎖例子java
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class Resource { Lock lock=new ReentrantLock(); int num=0; void doSome(){ } public void deal(Resource res){ while(true){ boolean mylock=this.lock.tryLock();//嘗試得到當前Resource的鎖定 boolean resLock=res.lock.tryLock();//嘗試得到傳入的Resource的鎖定 try{ if(mylock&&resLock){ res.doSome(); System.out.println(res+":"+this.num); break;//退出循環 } }finally{ if(mylock) this.lock.unlock(); if(resLock) res.lock.unlock(); } } } }
重寫後不會出現死鎖的緣由在於,當沒法同時得到兩個鎖定時,乾脆釋放已得到的鎖定。
上面代碼使用當前Resource的Lock的tryLock()方法嘗試得到鎖定,以及傳入Resource的Lock的tryLock()方法嘗試得到鎖定。只有當能夠得到兩個Resource的鎖定,才能執行res.doSome().最後不管什麼狀況,都要finally解除鎖定。segmentfault
ReadWriteLock接口定義了讀取鎖定和寫入鎖定的行爲。可使用readLock()
,writeLock()
方法返回Lock操做對象。ReentrantReadWriteLock
是ReadWriteLock接口的主要操做類.ReentrantReadWriteLock.readLock
操做Lock接口,調用其lock()方法時,若沒有任何ReentrantReadWriteLock.writeLock
調用過lock()方法,也就是沒有任何寫入鎖定時,才能夠取得讀取鎖定。
下面用ReadWriteLock
試着寫一個ArrayListapi
import java.util.Arrays; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; public class MyArrayList<T> { private ReadWriteLock lock=new ReentrantReadWriteLock(); private Object[] list; private int next=0; public MyArrayList(){ list=new Object[16]; } public void add(T obj){ try{ lock.writeLock().lock();//獲取寫入鎖定 if(next==list.length) list=Arrays.copyOf(list, list.length*2); list[next++]=obj; }finally{ lock.writeLock().unlock();//解除寫入鎖定 } } @SuppressWarnings("unchecked") public T get(int index){ try{ lock.readLock().lock();//獲取讀取鎖定 return (T) list[index]; }finally{ lock.readLock().unlock();//解除讀取鎖定 } } public int size(){ try{ lock.readLock().lock(); return next; }finally{ lock.readLock().unlock(); } } }
重寫後的效果是dom
如有線程調用add()方法進行寫入操做,先得到寫入鎖定。這時若是有其餘線程準備得到寫入鎖定或讀取鎖定,都必須等待前面的寫入鎖定解除。ide
如有線程調用get()方法進行讀取操做,先得到讀取鎖定。這時若是有其餘線程準備得到讀取鎖定,則能夠得到;但若是是準備得到寫入鎖定,仍然要等待全部讀取鎖定解除。this
使用ReadWriteLock
的好處在於,若是有兩個線程都想調用get()和size()方法,因爲鎖定的關係,其中一個線程只能等到另外一個線程解除鎖定。然而,這兩個方法都只是讀取對象狀態,若是隻是讀取操做,就能夠容許線程並行,這樣讀取效率將會提升。spa
Condition接口用來搭配Lock,最基本的用法就是達到Object的wait(),notify(),notifyAll()方法的做用。
下面用wait(),notify(),notifyAll()實現生產者與消費者.線程
店員從生產者得到生產出的商品,消費者從店員取走商品3d
若生產者生產速度較快,店員那可能有不少商品,店員會叫生產者停一下。過一段時間,店員那商品很少了,再通知生產者繼續生產code
若消費者取走速度過快,店員那可能沒有商品可供取走,店員會叫消費者停一下。過一段時間,店員那有商品了,再通知消費者過來取
這裏假定店員那最多隻能放一件商品
public class Producer implements Runnable{ private Clerk clerk; public Producer(Clerk clerk){ this.clerk=clerk; } @Override public void run() { for(int i=0;i<10;i++){ try { Thread.sleep((int)Math.random()*3000); } catch (InterruptedException e) { } clerk.setProduct(i); } } }
public class Consumer implements Runnable{ private Clerk clerk; public Consumer(Clerk clerk){ this.clerk=clerk; } @Override public void run() { for(int i=0;i<10;i++){ try { Thread.sleep((int)Math.random()*3000); } catch (InterruptedException e) { } clerk.getProduct(); } } }
public class Clerk extends Thread{ private int product=-1;//沒有商品 public synchronized void setProduct(int product){ while(this.product!=-1){ try { wait();//店員那有商品,生產者停一下 } catch (InterruptedException e) { } } this.product=product; System.out.println("生產者生產商品"+this.product); notify();//通知等待集合(喚醒的多是消費者,也多是生產者) } public synchronized int getProduct(){ while(this.product==-1){ try { wait();//店員沒有商品,消費者停一下 } catch (InterruptedException e) { } } int p=this.product; System.out.println("消費者消費商品"+this.product); this.product=-1;//商品已經被取走 notify(); return p; } public static void main(String[] args){ Clerk clerk=new Clerk(); new Thread(new Producer(clerk)).start(); new Thread(new Consumer(clerk)).start(); } }
生產者生產商品0 消費者消費商品0 生產者生產商品1 消費者消費商品1 生產者生產商品2 消費者消費商品2 生產者生產商品3 消費者消費商品3 生產者生產商品4 消費者消費商品4 生產者生產商品5 消費者消費商品5 生產者生產商品6 消費者消費商品6 生產者生產商品7 消費者消費商品7 生產者生產商品8 消費者消費商品8 生產者生產商品9 消費者消費商品9
如今用Condition接口重寫
public class Clerk { private int product=-1;//沒有商品 Lock lock=new ReentrantLock(); private Condition condition=lock.newCondition(); public void setProduct(int product){ try{ lock.lock(); while(this.product!=-1){ try { condition.await();//店員那有商品,生產者停一下 } catch (InterruptedException e) { } } this.product=product; System.out.println("生產者生產商品"+this.product); condition.signal();//通知等待集合(喚醒的多是消費者,也多是生產者) }finally{ lock.unlock(); } } public int getProduct(){ try{ lock.lock(); while(this.product==-1){ try { condition.await();//店員沒有商品,消費者停一下 } catch (InterruptedException e) { } } int p=this.product; System.out.println("消費者消費商品"+this.product); this.product=-1;//商品已經被取走 condition.signal(); return p; }finally{ lock.unlock(); } } public static void main(String[] args){ Clerk clerk=new Clerk(); new Thread(new Producer(clerk)).start(); new Thread(new Consumer(clerk)).start(); } }
注意在多個生產者,消費者線程的狀況下,等待集合中二者都會有,而condition.signal()從等待集合中喚醒的具體對象是不肯定的。有可能消費者取走商品後,喚醒的仍是消費者,這時,消費者又會執行while循環,進入等待集合。
事實上,一個Condition對象能夠表示一個等待集合。這樣上面例子,能夠有兩個等待集合,一個給消費者用,一個給生產者用。生產者只會通知消費者的等待集合,消費者也只會通知生產者的等待集合。這樣效率會高些。
public class Clerk { ... private Condition producerCondition=lock.newCondition();//生產者的等待集合 private Condition consumerCondition=lock.newCondition();//消費者的等待集合 public void setProduct(int product){ try{ lock.lock(); while(this.product!=-1){ try { producerCondition.await();//店員那有商品,生產者停一下 } catch (InterruptedException e) { } } this.product=product; System.out.println("生產者生產商品"+this.product); consumerCondition.signal();//喚醒消費者等待集合 }finally{ lock.unlock(); } } public int getProduct(){ try{ lock.lock(); while(this.product==-1){ try { consumerCondition.await();//店員沒有商品,消費者停一下 } catch (InterruptedException e) { } } int p=this.product; System.out.println("消費者消費商品"+this.product); this.product=-1;//商品已經被取走 producerCondition.signal();//喚醒生產者等待集合 return p; }finally{ lock.unlock(); } } ... }
定義Executor接口的目的是將Runnable的指定與如何執行分離。它只定義了一個execute()方法。
public class Page{ private Executor executor; public Page(Executor executor){ this.executor=executor; } ... public void method1(){ ... executor.execute(new Runnable(){ @Override public void run(){ ... } }); ... }
}
public class DirectExecutor implements Executor{ public void execute(Runnable r){ r.run(); } }
調用
new Page(new DirectExecutor()).method1();
Executor api
像線程池這類服務,其實是定義在Executor接口的子接口ExecutorService中。通用的ExecutorService由抽象類AbstractExecutorService操做,若是須要線程池功能,可使用其子類ThreadPoolExecutor.
重寫上面executor例子
ExecutorService executorService=Executors.newCachedThreadPool(); new Page(executorService).method1(); executorService.shutdown();//在指定執行的Runnable都完成後,將ExecutorService關閉
ExecutorService還定義了submit(),invokeAll(),invokeAny()等方法,這些方法出如今java.util.concurrent.Future
,java.util.concurrent.Callable
接口
Future定義的行爲就是讓你在未來取得結果。你能夠將想執行的工做交給Future,Future會使用另外一個線程處理,你能夠先作別的事情。過些時候,再調用Future的get()得到結果。
若是結果已經產生,get()會直接返回,不然會進入阻塞狀態直到結果返回。get()的另外一種重載方法能夠指定等待結果的時間,若指定時間內結果還沒產生,則拋出TimeoutException異常。也可使用Future的isDone()方法看結果是否產生。
Future常常與Callable一塊兒使用,Callable的做用與Runnable類似,都是用來定義執行的流程。
Runnable的run()方法無返回值,也沒法拋出異常
Callable的call()方法能夠有返回值,也能夠拋出異常
FutureTask
是Future的操做類,建立時可傳入Callable對象指定執行的流程
import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.FutureTask; public class FutureTaskDemo { public static int fib(int n){ return n<=1?n:fib(n-1)+fib(n-2); } public static void main(String[] args){ FutureTask<Integer> task=new FutureTask<Integer>( new Callable<Integer>(){ @Override public Integer call() throws Exception { return fib(30); } } ); new Thread(task).start(); try { Thread.sleep(3000); System.out.println(task.get()); } catch (InterruptedException|ExecutionException e) { } } }
FutureTask構造類
FutureTask實現RunnableFuture接口
,RunnableFuture接口
繼承Runnable,Future接口。因此能夠new Thread(task).
ExecutorService的submit()方法也能夠接受Callable對象,調用後返回Future對象。
ExecutorService service=Executors.newCachedThreadPool(); Future<Integer> future=service.submit(new Callable<Integer>(){ @Override public Integer call() throws Exception { return fib(30); } });
若是有多個Callable,能夠先將它們收集到Collection中,而後調用ExecutorService的invokeAll()方法,返回List<Future>
若是有多個Callable,要求其中只要有一個執行完成就好了,則能夠先將它們收集到Collection中,而後調用ExecutorService的invokeAny()方法
ScheduledThreadPoolExecutor用來進行工做排程,其中的schedule()方法用來排定Runnable或Callable實例延遲多久執行一次,並返回Future子接口ScheduledFuture的實例。
import java.util.Date; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; public class ScheduledExecution { public static void main(String[] args){ ScheduledExecutorService service=Executors.newSingleThreadScheduledExecutor(); service.scheduleWithFixedDelay(new Runnable(){ public void run(){ System.out.println(new Date()); try { Thread.sleep(2000);//假設工做會執行2s } catch (InterruptedException e) { } } }, 2000, 1000, TimeUnit.MILLISECONDS); } }
Sat Oct 24 17:11:59 CST 2015 Sat Oct 24 17:12:02 CST 2015 Sat Oct 24 17:12:05 CST 2015 Sat Oct 24 17:12:08 CST 2015 Sat Oct 24 17:12:11 CST 2015
能夠看到,輸出兩兩間相差3s.scheduleWithFixedDelay()
方法參數
若是把方法換成scheduleAtFixedRate()
Sat Oct 24 17:28:28 CST 2015 Sat Oct 24 17:28:30 CST 2015 Sat Oct 24 17:28:32 CST 2015 Sat Oct 24 17:28:34 CST 2015
每次排定的執行週期是1s,可是工做執行的時間是2s,會超過排定的執行週期,因此輸出兩兩間相差2s。
Future的另外一個操做類ForkJoinTask
,與ExecutorService的另外一個操做類ForkJoinPool
有關,它們都是jdk7新增的api,用來解決分而治之的問題。
ForkJoinTask
操做Future接口,能夠在將來得到耗時工做的執行結果
ForkJoinPool
管理ForkJoinTask
,調用fork()方法,可讓另外一個線程執行ForkJoinTask
若是要得到ForkJoinTask
的執行結果,能夠調用join()方法。若是執行結果還沒產生,會阻塞直至有執行結果返回
使用ForkJoinTask
的子類RecursiveTask
,它是個抽象類,使用時必須繼承它,並操做compute()方法。
import java.util.concurrent.ForkJoinPool; import java.util.concurrent.RecursiveTask; public class FibDemo extends RecursiveTask<Integer>{ final int n; FibDemo(int n){ this.n=n; } public static int fib(int n){ return n<=1?n:fib(n-1)+fib(n-2); } @Override protected Integer compute() { if(n<=10){ return fib(n); } FibDemo f1=new FibDemo(n-1); f1.fork();//ForkJoinPool分配線程執行子任務 FibDemo f2=new FibDemo(n-2); return f2.compute()+f1.join();//執行f2子任務+得到f1子任務進行完成的結果 } public static void main(String[] args){ FibDemo fib=new FibDemo(40); ForkJoinPool pool=new ForkJoinPool(); System.out.println(pool.invoke(fib)); } }