1.等待/通知機制(wait/notify)java
wait使線程中止,notify使線程繼續運行.數組
當方法wait()執行後,鎖被自動釋放,但執行完notify()方法,鎖卻不自動釋放.dom
驗證:當方法wait()執行後,鎖被自動釋放ide
public class Service { public void testMethod(Object lock){ synchronized (lock){ try { System.out.println("begin wait()"); lock.wait(); System.out.println(" end wait()"); } catch (InterruptedException e) { e.printStackTrace(); } } } }
public class ThreadA extends Thread{ private Object lock; public ThreadA(Object lock){ this.lock=lock; } @Override public void run() { Service service=new Service(); service.testMethod(lock); } }
public class ThreadB extends Thread{ private Object lock; public ThreadB(Object lock){ this.lock=lock; } @Override public void run() { Service service=new Service(); service.testMethod(lock); } }
public class Test { public static void main(String[] args) { Object lock=new Object(); ThreadA a= new ThreadA(lock); a.start(); ThreadB b=new ThreadB(lock); b.start(); } /*output: begin wait() begin wait() */ }
總結:線程a wait()以後釋放鎖,線程b進入,驗證成立this
public class Service { public void testMethod(Object lock){ synchronized (lock){ try { System.out.println("begin wait() ThreadName="+Thread.currentThread().getName()); lock.wait(); System.out.println("end wait() ThreadName="+Thread.currentThread().getName()); } catch (InterruptedException e) { e.printStackTrace(); } } } public void synNotifyMethod(Object lock){ try { synchronized (lock){ System.out.println("begin notify ThreadName="+Thread.currentThread().getName()+" time="+System.currentTimeMillis()); lock.notify(); Thread.sleep(5000); System.out.println("end notify ThreadName="+Thread.currentThread().getName()+" time="+System.currentTimeMillis()); } } catch (InterruptedException e) { e.printStackTrace(); } } }
public class ThreadA extends Thread{ private Object lock; public ThreadA(Object lock){ this.lock=lock; } @Override public void run() { Service service=new Service(); service.testMethod(lock); } }
public class NotifyThread extends Thread{ private Object lock; public NotifyThread(Object lock){ super(); this.lock=lock; } @Override public void run() { Service service=new Service(); service.synNotifyMethod(lock); } }
public class SynNotifyThread extends Thread{ private Object lock; public SynNotifyThread(Object lock){ super(); this.lock=lock; } @Override public void run() { Service service=new Service(); service.synNotifyMethod(lock); } }
public class Test { public static void main(String[] args) { Object lock=new Object(); ThreadA a= new ThreadA(lock); a.start(); NotifyThread notifyThread=new NotifyThread(lock); notifyThread.start(); SynNotifyThread synNotifyThread=new SynNotifyThread(lock); synNotifyThread.start(); } /* begin wait() ThreadName=Thread-0 begin notify ThreadName=Thread-1 time=1515776621424 end notify ThreadName=Thread-1 time=1515776626424 end wait() ThreadName=Thread-0 begin notify ThreadName=Thread-2 time=1515776626424 end notify ThreadName=Thread-2 time=1515776631425 */ }
總結:線程a wait()以後被線程notifyThread 和synNotifyThread notify(),可是線程Thread-1沒有立刻釋放鎖,運行完synchronized代碼塊才釋放鎖.線程
2.當interrupt()遇到wait()對象
當線程呈wait()狀態時,調用線程對象的interrupt()方法會出現interruptedException()異常.ip
3.notify()一次只隨機通知一個線程進行喚醒內存
public class Service { public void testMethod(Object lock){ try { synchronized (lock) { System.out.println("begin wait() ThreadName=" + Thread.currentThread().getName()); lock.wait(); System.out.println("end wait() ThreadName=" + Thread.currentThread().getName()); } } catch (InterruptedException e) { e.printStackTrace(); } } }
public class ThreadA extends Thread{ private Object lock; public ThreadA(Object lock){ this.lock=lock; } @Override public void run() { Service service=new Service(); service.testMethod(lock); } }
public class ThreadB extends Thread{ private Object lock; public ThreadB(Object lock){ this.lock=lock; } @Override public void run() { Service service=new Service(); service.testMethod(lock); } }
public class ThreadC extends Thread{ private Object lock; public ThreadC(Object lock){ this.lock=lock; } @Override public void run() { Service service=new Service(); service.testMethod(lock); } }
public class NotifyThread extends Thread{ private Object lock; public NotifyThread(Object lock){ super(); this.lock=lock; } @Override public void run() { synchronized (lock){ lock.notifyAll(); } } }
public class Test { public static void main(String[] args) throws InterruptedException { Object lock=new Object(); ThreadA a= new ThreadA(lock); a.start(); ThreadB b= new ThreadB(lock); b.start(); ThreadC c= new ThreadC(lock); c.start(); Thread.sleep(1000); NotifyThread notifyThread=new NotifyThread(lock); notifyThread.start(); } /*output: begin wait() ThreadName=Thread-0 begin wait() ThreadName=Thread-2 begin wait() ThreadName=Thread-1 end wait() ThreadName=Thread-0 end wait() ThreadName=Thread-2 */ }
總結:notify()方法隨機通知一個線程進行喚醒rem
notifyAll()是喚醒全部線程.
4.方法wait(long)
等待某一時間內是否有線程對鎖進行喚醒,若是超過這個時間則自動喚醒
public class MyRunnable { static private Object lock = new Object(); static private Runnable runnable1 = new Runnable() { @Override public void run() { try { synchronized (lock) { System.out.println("wait begin timer=" + System.currentTimeMillis()); lock.wait(5000); System.out.println("wait end timer=" + System.currentTimeMillis()); } } catch (InterruptedException e) { e.printStackTrace(); } } }; static private Runnable runnable= new Runnable() { @Override public void run() { synchronized (lock){ System.out.println("notify begin timer=" + System.currentTimeMillis()); lock.notify(); System.out.println("notify end timer=" + System.currentTimeMillis()); } } }; public static void main(String[] args) throws InterruptedException { Thread t1=new Thread(runnable1); t1.start(); Thread.sleep(3000); Thread t2=new Thread(runnable); t2.start(); } /* wait begin timer=1516010021128 notify begin timer=1516010024127//睡了3秒 notify end timer=1516010024128 wait end timer=1516010024128//立刻激活,輸出打印 */ }
5.生產者/消費者模式實現
//生產者 public class P { private String lock; public P(String lock){ this.lock=lock; } public void setValue(){ try { synchronized (lock){ while(!ValueObject.value.equals("")){ System.out.println("生產者"+Thread.currentThread().getName()+" waiting了"); lock.wait(); } System.out.println("生產者"+Thread.currentThread().getName()+" running了"); String value=System.currentTimeMillis()+"_"+System.nanoTime(); System.out.println("set的值是"+value); ValueObject.value=value; lock.notify(); } } catch (InterruptedException e) { e.printStackTrace(); } } }
//消費者 public class C { private String lock; public C(String lock){ this.lock=lock; } public void getValue(){ try { synchronized (lock){ while(ValueObject.value.equals("")){ System.out.println("消費者"+Thread.currentThread().getName()+" waiting了"); lock.wait(); } System.out.println("消費者"+Thread.currentThread().getName()+" running了"); System.out.println("get的值是"+ValueObject.value); ValueObject.value=""; lock.notify(); } } catch (InterruptedException e) { e.printStackTrace(); } } }
//生產線程 public class ThreadP extends Thread{ private P p; public ThreadP(P p){ super(); this.p=p; } @Override public void run() { while(true){ p.setValue(); } } }
//消費線程 public class ThreadC extends Thread{ private C c; public ThreadC(C c){ super(); this.c=c; } @Override public void run() { while(true){ c.getValue(); } } }
public class Run { public static void main(String[] args) throws InterruptedException { String lock=new String(""); P p=new P(lock); C c=new C(lock); ThreadP[] tp=new ThreadP[2]; ThreadC[] tc=new ThreadC[2]; for (int i = 0; i <2 ; i++) { tp[i]=new ThreadP(p); tp[i].setName("生產者"+(i+1)); tc[i]=new ThreadC(c); tc[i].setName("消費者"+(i+1)); tp[i].start(); tc[i].start(); } Thread.sleep(5000); Thread[] threadArray=new Thread[Thread.currentThread().getThreadGroup().activeCount()]; Thread.currentThread().getThreadGroup().enumerate(threadArray);//將list轉成數組 for (int i = 0; i <threadArray.length ; i++) { System.out.println(threadArray[i].getName()+" "+threadArray[i].getState()); } } /*// 生產者生產者2 running了 set的值是1516022907331_19063484002988 生產者生產者2 waiting了 生產者生產者1 waiting了 消費者消費者1 running了 get的值是1516022907331_19063484002988 消費者消費者1 waiting了 消費者消費者2 waiting了 main RUNNABLE Monitor Ctrl-Break RUNNABLE 生產者1 WAITING 消費者1 WAITING 生產者2 WAITING 消費者2 WAITING */ }
總結:產生緣由是由於notify()是隨機通知,致使生產類沒有被通知,陷入假死狀態
解決方法:使用notifyAll()
一個生產者對應多個消費者:
public class MyStack { private List list=new ArrayList(); synchronized public void push(){ try { if(list.size()!=0){ this.wait(); } list.add("anything "+ Math.random()); this.notify(); System.out.println("push="+list.size()); } catch (InterruptedException e) { e.printStackTrace(); } } synchronized public String pop(){ String returnValue=""; try { if(list.size()==0){ System.out.println("pop操做中的:"+Thread.currentThread().getName()+" 線程呈wait" + "狀態"); this.wait(); } returnValue=""+list.get(0); list.remove(0); this.notify(); System.out.println("pop="+list.size()); } catch (InterruptedException e) { e.printStackTrace(); } return returnValue; } }
public class P_Thread extends Thread{ private P p; public P_Thread(P p){ super(); this.p=p; } @Override public void run() { while(true){ p.pushService(); } } }
public class C_Thread extends Thread{ private C c; public C_Thread(C c){ super(); this.c=c; } @Override public void run() { while(true){ c.popService(); } } }
public class P { private MyStack stack; public P(MyStack stack){ this.stack=stack; } public void pushService(){ stack.push(); } }
public class C { private MyStack stack; public C(MyStack stack){ this.stack=stack; } public void popService(){ stack.pop(); } }
public class Run { public static void main(String[] args) { MyStack stack=new MyStack(); P p=new P(stack); C c1=new C(stack); C c2=new C(stack); C c3=new C(stack); C c4=new C(stack); C c5=new C(stack); P_Thread pt=new P_Thread(p); C_Thread ct=new C_Thread(c1); C_Thread ct2=new C_Thread(c2); C_Thread ct3=new C_Thread(c3); C_Thread ct4=new C_Thread(c4); C_Thread ct5=new C_Thread(c5); pt.start(); ct.start(); ct2.start(); ct3.start(); ct4.start(); ct5.start(); } /* pop操做中的:Thread-2 線程呈wait狀態 pop操做中的:Thread-3 線程呈wait狀態 push=1 pop=0 pop操做中的:Thread-2 線程呈wait狀態 pop操做中的:Thread-4 線程呈wait狀態 pop操做中的:Thread-1 線程呈wait狀態 pop操做中的:Thread-5 線程呈wait狀態 Exception in thread "Thread-3" java.lang.IndexOutOfBoundsException: Index: 0, Size: 0 at java.util.ArrayList.rangeCheck(ArrayList.java:604) at java.util.ArrayList.get(ArrayList.java:382) at mutithreading.MyStack.pop(MyStack.java:33) at mutithreading.C.popService(C.java:13) at mutithreading.C_Thread.run(C_Thread.java:16) */ }
總結: 產生問題的緣由,條件改變時(list.size())並無獲得及時的響應,因此多個呈wait狀態的線程被喚醒,繼而執行list.remove(0)代碼而出現異常.
這邊我試了一下用volidate關鍵字修飾list,思路是,將list的修改強行操做到內存,讓全部的線程共享list,可是實際操做結果仍是同樣.
解決方法:
public class MyStack { private List list=new ArrayList(); synchronized public void push(){ try { while(list.size()!=0){ this.wait(); } list.add("anything "+ Math.random()); System.out.println("list的size="+list.size()); this.notify(); System.out.println("push="+list.size()); } catch (InterruptedException e) { e.printStackTrace(); } } synchronized public String pop(){ String returnValue=""; try { while(list.size()==0){ System.out.println("pop操做中的:"+Thread.currentThread().getName()+" 線程呈wait" + "狀態"); this.wait(); } returnValue=""+list.get(0); list.remove(0); this.notify(); System.out.println("pop="+list.size()); } catch (InterruptedException e) { e.printStackTrace(); } return returnValue; } /* pop操做中的:Thread-5 線程呈wait狀態 pop操做中的:Thread-3 線程呈wait狀態 pop操做中的:Thread-2 線程呈wait狀態 pop操做中的:Thread-4 線程呈wait狀態 list的size=1 push=1 pop=0 pop操做中的:Thread-1 線程呈wait狀態 pop操做中的:Thread-5 線程呈wait狀態 */ }
陷入假死狀態,老辦法,改成notifyAll(),順利解決.
6.經過管道進行線程間通訊:字節流
public class WriteData { public void writeMethod(PipedOutputStream out){ try { System.out.println("write:"); for (int i = 0; i <300 ; i++) { String outData=""+(i+1); out.write(outData.getBytes()); System.out.println(outData); } out.close(); } catch (IOException e) { e.printStackTrace(); } } }
public class ReadData { public void readMethod(PipedInputStream in) throws IOException { System.out.println("read: "); byte[] byteArray=new byte[20]; int readLength=in.read(byteArray); while(readLength!=-1){ String newData=new String(byteArray,0,readLength); System.out.println(newData); readLength=in.read(byteArray); } in.close(); } }
public class ThreadWrite extends Thread{ private WriteData write; private PipedOutputStream out; public ThreadWrite(WriteData write,PipedOutputStream out){ super(); this.write=write; this.out=out; } @Override public void run() { write.writeMethod(out); } }
public class ThreadRead extends Thread{ private ReadData read; private PipedInputStream in; public ThreadRead(ReadData read, PipedInputStream in){ super(); this.read=read; this.in=in; } @Override public void run() { try { read.readMethod(in); } catch (IOException e) { e.printStackTrace(); } } }
public class Run { public static void main(String[] args) throws IOException { try { WriteData write=new WriteData(); ReadData read=new ReadData(); PipedOutputStream out=new PipedOutputStream(); PipedInputStream in =new PipedInputStream(); out.connect(in);//使兩個Stream之間產生通訊 ThreadRead threadRead=new ThreadRead(read,in); threadRead.start(); Thread.sleep(2000); ThreadWrite threadWrite=new ThreadWrite(write,out); threadWrite.start(); } catch (InterruptedException e) { e.printStackTrace(); } } }
字符流差很少.
public class ReadData { public void readMethod(PipedReader in) throws IOException { System.out.println("read: "); char[] byteArray=new char[20]; int readLength=in.read(byteArray); while(readLength!=-1){ String newData=new String(byteArray,0,readLength); System.out.println(newData); readLength=in.read(byteArray); } in.close(); } }
public class WriteData { public void writeMethod(PipedWriter out){ try { System.out.println("write:"); for (int i = 0; i <300 ; i++) { String outData=""+(i+1); out.write(outData); System.out.println(outData); } out.close(); } catch (IOException e) { e.printStackTrace(); } } }