線程間的通訊

 

線程間的通訊java

wait/notifydom

等待,通知機制的實現ide

wait()是Object類的方法,在調用wait()方法以前,線程必須得到該對象的對象級別鎖,只能在同步中調用wait()方法this

notify()也要在同步方法或者同步塊中調用,調用以前也要得到鎖。spa

 

public class Test1 {線程

         public static void main(String[] args) {對象

 

                   try {隊列

                            String newString = new String("");ip

                            newString.wait();資源

                   } catch (Exception e) {

                            e.printStackTrace();

                   }

 

         }

}

結果:

java.lang.IllegalMonitorStateException

         at java.lang.Object.wait(Native Method)

         at java.lang.Object.wait(Object.java:503)

         at MoreThread.TestWaitAndNotify.Test1.main(Test1.java:8)

結果緣由:沒有對象監視器,沒有同步鎖

public class Test2 {

         public static void main(String[] args) {

                   try {

                            String lock = new String();

                            System.out.println("syn before");

 

                            synchronized (lock) {

                                     System.out.println("syn begin");

                                     lock.wait();

                                     System.out.println("wait end");

                            }

                            System.out.println("syn end");

                   } catch (Exception e) {

                            e.printStackTrace();

                   }

         }

 

}

結果:程序一直處於等待狀態。

notify方法

 

public class MyThread1 extends Thread {

         private Object lock;

 

         public MyThread1(Object lock) {

                   super();

                   this.lock = lock;

         }

 

         @Override

         public void run() {

                   try {

                            synchronized (lock) {

                                     System.out.println("開始 wait time=" + System.currentTimeMillis());

                                     lock.wait();

                                     System.out.println("結束 wait time=" + System.currentTimeMillis());

                            }

                   } catch (Exception e) {

                            e.printStackTrace();

                   }

         }

}

 

 

public class MyThread2 extends Thread {

         private Object lock;

 

         public MyThread2(Object lock) {

                   super();

                   this.lock = lock;

         }

 

         @Override

         public void run() {

                   synchronized (lock) {

                            System.out.println("開始 wait time=" + System.currentTimeMillis());

                            lock.notify();

                            System.out.println("結束 wait time=" + System.currentTimeMillis());

                   }

         }

}

public class Test3 {

         public static void main(String[] args) {

                   try {

                            Object lock = new Object();

                            MyThread1 t1 = new MyThread1(lock);

                            t1.start();

                            Thread.sleep(3000);

                            MyThread2 t2 = new MyThread2(lock);

                            t2.start();

                   } catch (Exception e) {

                            e.printStackTrace();

                  }

         }

}

結果:開始 wait time=1482979301067

開始 wait time=1482979304068

結束 wait time=1482979304068

結束 wait time=1482979304068

 

 

例子:在list中添加5個元素public class MyList {

 

         private static List list = new ArrayList();

 

         public static void add() {

                   list.add("anystring");

         }

 

         public static int size() {

                   return list.size();

         }

 

}

 

public class ThreadA extends Thread {

         private Object lock;

 

         public ThreadA(Object lock) {

                   super();

                   this.lock = lock;

         }

 

         @Override

         public void run() {

                   try {

                            synchronized (lock) {

                                     if (MyList.size() != 5) {

                                               System.out.println("wait begin " + System.currentTimeMillis());

                                               lock.wait();

                                               System.out.println("wait end" + System.currentTimeMillis());

 

                                     }

                            }

                   } catch (Exception e) {

                            e.printStackTrace();

                   }

         }

}

 

public class ThreadB extends Thread {

         private Object lock;

 

         public ThreadB(Object lock) {

                   super();

                   this.lock = lock;

         }

 

         @Override

         public void run() {

                   try {

                            synchronized (lock) {

                                     for (int i = 0; i < 10; i++) {

                                               MyList.add();

                                               if (MyList.size() == 5) {

                                                        lock.notify();

                                                        System.out.println("已經發出通知");

                                               }

                                               System.out.println("添加了" + (i + 1) + "個元素!");

                                               Thread.sleep(1000);

                                     }

 

                            }

                   } catch (Exception e) {

                            e.printStackTrace();

                   }

         }

}

 

 

public class Run {

         public static void main(String[] args) {

                   try {

                            Object lock = new Object();

                            ThreadA a = new ThreadA(lock);

                            a.start();

                            Thread.sleep(50);

                            ThreadB b = new ThreadB(lock);

                            b.start();

                   } catch (Exception e) {

                            e.printStackTrace();

                   }

         }

}

 

 

結果:wait begin 1482980039417

添加了1個元素!

添加了2個元素!

添加了3個元素!

添加了4個元素!

已經發出通知

添加了5個元素!

添加了6個元素!

添加了7個元素!

添加了8個元素!

添加了9個元素!

添加了10個元素!

wait end1482980049467

 

說明:notify()方法執行後並非當即釋放鎖,notify方法能夠喚醒一個因調用了wait操做而處於柱塞狀態中的線程,使其進入就緒狀態。被從新喚醒的線程會試圖從新得到臨界區的控制權,也就是s鎖,並繼續執行臨界區 內wait以後的代碼。

 notify()方法能夠隨機喚醒等待隊列中等待同一個共享資源的「一個」線程,並使該線程退出等待隊列,進入可運行狀態,也就是notufy()方法僅通知「一個」線程。

notifyAll()方法可使全部正在等待隊列中等待同一共享資源的「所有」線程從等待狀態中退出,進入可運行狀態。此時,優先級最高的那個線程最早執行,但也有多是隨機執行。

 

方法wait()鎖釋放與notify()鎖不釋放

例子:

public class Service {

         public void testMethod(Object lock) {

                   try {

                            synchronized (lock) {

                                     System.out.println("begin wait()");

                                     lock.wait();

                                     System.out.println("end wait()");

                            }

                   } catch (Exception e) {

                            e.printStackTrace();

                   }

         }

}

public class ThreadA extends Thread {

 

         private Object lock;

 

         public ThreadA(Object lock) {

                   super();

                   this.lock = lock;

         }

 

         @Override

         public void run() {

                   Service service = new Service();

                   service.testMethod(lock);

         }

}

 

 

public class ThreadB extends Thread {

 

         private Object lock;

 

         public ThreadB(Object lock) {

                   super();

                   this.lock = lock;

         }

 

         @Override

         public void run() {

                   Service service = new Service();

                   service.testMethod(lock);

         }

 

}

 

public class Test {

         public static void main(String[] args) {

                   Object lock = new Object();

                   ThreadA a = new ThreadA(lock);

                   a.start();

                   ThreadB b = new ThreadB(lock);

                   b.start();

         }

 

}

 

結果:兩個線程都打印

begin wait()

begin wait()

說明wait時釋放了鎖

 

例子:notify方法不是當即釋放鎖

public class Service {

         public void testMethod(Object lock) {

                   try {

                            synchronized (lock) {

                                     System.out.println("begin wait()");

                                     lock.wait();

                                     System.out.println("end wait()");

                            }

                   } catch (Exception e) {

                            e.printStackTrace();

                   }

         }

 

         public void synNotifyMethod(Object lock) {

                   try {

                            synchronized (lock) {

                                     System.out.println("begin notify() ThreadName=" + Thread.currentThread().getName() + "time ="

                                                        + System.currentTimeMillis());

                                     lock.notify();

                                     Thread.sleep(5000);

                                     System.out.println("end notify() ThreadName=" + Thread.currentThread().getName() + "time="

                                                        + System.currentTimeMillis());

                            }

                   } catch (Exception e) {

                            e.printStackTrace();

                   }

 

         }

}

 

public class ThreadA extends Thread {

         private Object lock;

 

         public ThreadA(Object lock) {

                   super();

                   this.lock = lock;

         }

 

         public void run() {

                   Service service = new Service();

                   service.testMethod(lock);

         }

}

 

public class NotifyThread extends Thread {

         private Object lock;

 

         public NotifyThread(Object lock) {

                   super();

                   this.lock = lock;

         }

 

         @Override

         public void run() {

                   Service service = new Service();

                   service.synNotifyMethod(lock);

                   super.run();

         }

 

}

 

 

public class synNotifyMethodThread extends Thread {

         private Object lock;

 

         public synNotifyMethodThread(Object lock) {

                   super();

                   this.lock = lock;

         }

         @Override

         public void run() {

                   Service service=new Service();

                   service.synNotifyMethod(lock);

         }

}

 

public class Test {

         public static void main(String[] args) {

                   Object lock = new Object();

                   ThreadA a = new ThreadA(lock);

                   a.start();

                   NotifyThread notifyThread = new NotifyThread(lock);

                   notifyThread.start();

                   synNotifyMethodThread c = new synNotifyMethodThread(lock);

                   c.start();

         }

}

 

 

結果:begin wait()

begin notify() ThreadName=Thread-2time =1482989890728

end notify() ThreadName=Thread-2time=1482989895728

end wait()

begin notify() ThreadName=Thread-1time =1482989895728

end notify() ThreadName=Thread-1time=1482989900728

 

必須執行完notify()所在的同步synchronized代碼塊後纔會釋放

 

 

當線程處於wait狀態時,調用interrupt方法會出現InterruptedException

例子:

public class Service {

         public void testMethod(Object lock) {

                   try {

                            synchronized (lock) {

                                     System.out.println("begin wait()");

                                     lock.wait();

                                     System.out.println(" end wait()");

                            }

                   } catch (Exception e) {

                            e.printStackTrace();

                            System.out.println("出現了異常,由於wait狀態的線程被interrupt了!");

                   }

         }

}

 

public class ThreadA extends Thread {

         private Object lock;

 

         public ThreadA(Object lock) {

                   super();

                   this.lock = lock;

         }

 

         @Override

         public void run() {

                   Service service = new Service();

                   service.testMethod(lock);

         }

} public class Test {

         public static void main(String[] args) {

                   try {

                            Object lock = new Object();

                            ThreadA a = new ThreadA(lock);

                            a.start();

                            Thread.sleep(5000);

                            a.interrupt();

                   } catch (Exception e) {

                            e.printStackTrace();

                   }

         }

}

結果:異常產生

執行完同步就會釋放鎖

異常終止也會釋放鎖

 

只通知一個線程

調用notify()一次只隨機通知一個線程進行喚醒;

屢次調用notify能夠喚醒多個等待中的線程

public class NotifyThread extends Thread {

         private Object lock;

 

         public NotifyThread(Object lock) {

                   super();

                   this.lock = lock;

         }

 

         @Override

         public void run() {

                   synchronized (lock) {

                            lock.notify();

                            lock.notify();

                            lock.notify();

 

                   }

         }

}

 

喚醒全部線程,爲了喚醒全部線程,可使用notifyAll()方法。

 

方法wait(long):

功能是等待某一個時間是否有線程對鎖進行喚醒,若是超過這個時間則自動喚醒。

 

 

通知過早

會打亂程序的正常運行邏輯

正常狀況下的例子

public class MyRun {

          private String lock = new String("");

          private Runnable runnableA = new Runnable() {

                   public void run() {

                            try {

                                      synchronized (lock) {

                                               System.out.println("begin wait");

                                               lock.wait();

                                               System.out.println("end wait");

                                      }

                            } catch (Exception e) {

                                      e.printStackTrace();

                            }

                   };

          };

          private Runnable runnableB = new Runnable() {

                   public void run() {

                            synchronized (lock) {

                                      System.out.println("begin notify");

                                      lock.notify();

                                      System.out.println("end notify");

                            }

                   };

          };

 

          public static void main(String[] args) {

                   MyRun run = new MyRun();

                   Thread a = new Thread(run.runnableA);

                   a.start();

                   Thread b = new Thread(run.runnableB);

                   b.start();

 

          }

}

 

結果:

begin wait

begin notify

end notify

end wait

 

用狀態標識控制程序執行流程;

public class MyRun {

         private String lock = new String("");

         private boolean isFirstRunB=false;

        

         private Runnable runnableA = new Runnable() {

                   public void run() {

                            try {

                                     synchronized (lock) {

                                               while(isFirstRunB==false){

                                                        System.out.println("begin wait");

                                                        lock.wait();

                                                        System.out.println("end wait");

                                               }

                                              

                                     }

                            } catch (Exception e) {

                                     e.printStackTrace();

                            }

                   };

         };

         private Runnable runnableB = new Runnable() {

                   public void run() {

                            synchronized (lock) {

                                     System.out.println("begin notify");

                                     lock.notify();

                                     System.out.println("end notify");

                                     isFirstRunB=true;

                            }

                   };

         };

 

         public static void main(String[] args) throws InterruptedException {

                   MyRun run = new MyRun();

                   Thread a = new Thread(run.runnableA);

                   a.start();

                   Thread.sleep(100);

                   Thread b = new Thread(run.runnableB);

                   b.start();

 

         }

}

等待wait的條件發生了變化

wait條件發生了變化,也容易形成程序邏輯的混亂;

例子

public class ValueObject {

         public static List list = new ArrayList();

 

}

 

public class ThreadSubtract extends Thread {

         private Subtract r;

 

         public ThreadSubtract(Subtract r) {

                   super();

                   this.r = r;

         }

 

         @Override

         public void run() {

                   r.substract();

         }

}

 

public class ThreadAdd extends Thread {

         private Add p;

 

         public ThreadAdd(Add p) {

                   super();

                   this.p = p;

 

         }

 

         public void run() {

                   p.add();

         }

}

 

 

public class Subtract {

         private String lock;

 

         public Subtract(String lock) {

                   super();

                   this.lock = lock;

         }

 

         public void substract() {

                   try {

                            synchronized (lock) {

                                     if (ValueObject.list.size() == 0) {

                                               System.out.println("wait begin ThreadName=" + Thread.currentThread().getName());

                                               lock.wait();

                                               System.out.println("wait end THreadName=" + Thread.currentThread().getName());

                                     }

                                     ValueObject.list.remove(0);

                                     System.out.println("listsize=" + ValueObject.list.size());

                            }

                   } catch (Exception e) {

                            e.printStackTrace();

                   }

         }

 

}

 

public class Add {

         private String lock;

         public Add(String lock){

                   super();

                   this.lock=lock;

         }

         public void add(){

                   synchronized(lock){

                            ValueObject.list.add("anyString");

                            lock.notifyAll();

                   }

         }

 

}

 

public class Run {

         public static void main(String[] args) throws InterruptedException {

                   String lock = new String("");

                   Add add = new Add(lock);

                   Subtract subtract = new Subtract(lock);

 

                   ThreadSubtract subtract1Thread = new ThreadSubtract(subtract);

                   subtract1Thread.setName("subtract1Thread");

                   subtract1Thread.start();

 

                   ThreadSubtract subtract2Thread = new ThreadSubtract(subtract);

                   subtract2Thread.setName("subtract2Thread");

                   subtract2Thread.start();

                   Thread.sleep(1000);

                   ThreadAdd addThread = new ThreadAdd(add);

                   addThread.setName("addThread");

                   addThread.start();

         }

 

}

 

結果:增長了一個值,但第二次移除的時候已經爲空,沒有對應的index:0

wait begin ThreadName=subtract1Thread

wait begin ThreadName=subtract2Thread

wait end THreadName=subtract2Thread

listsize=0

wait end THreadName=subtract1Thread

java.lang.IndexOutOfBoundsException: Index: 0, Size: 0

         at java.util.ArrayList.rangeCheck(ArrayList.java:604)

         at java.util.ArrayList.remove(ArrayList.java:445)

         at MoreThread.waitOld.Subtract.substract(Subtract.java:19)

         at MoreThread.waitOld.ThreadSubtract.run(ThreadSubtract.java:13)

 

 

修改程序中條件,使程序再也不繼續向前執行remove方法,也就不會產生向下溢出。

 

 

生產者消費者模式的實現

 

等待/通知最經典的就是生產者消費者模式:

1.單個生產者和消費者:

例子:

 

 

public class C {

         private String lock;

 

         public C(String lock) {

                   super();

                   this.lock = lock;

         }

 

         public void getValue() {

                   try {

                            synchronized (lock) {

                                     if (ValueObject.value.equals("")) {

                                               lock.wait();

                                     }

                                     System.err.println("get的值是" + ValueObject.value);

                                     ValueObject.value = "";

                                     lock.notify();

                            }

 

                   } catch (Exception e) {

                            e.printStackTrace();

                   }

         }

 

}

 

public class P {

         private String lock;

 

         public P(String lock) {

                   super();

                   this.lock = lock;

         }

 

         public void setValue() {

                   try {

                            synchronized (lock) {

                                     if (!ValueObject.value.equals("")) {

                                               lock.wait();

                                     }

                                     String value = System.currentTimeMillis() + "_" + System.nanoTime();

                                     System.out.println("set的值是:" + value);

                                     ValueObject.value = value;

                                     lock.notify();

                            }

                   } catch (Exception e) {

                            e.printStackTrace();

                   }

 

         }

}

 

 

public class ValueObject {

         public static String value = "";

}

 

public class ThreadP extends Thread {

         private P p;

 

         public ThreadP(P p) {

                   super();

                   this.p = p;

         }

 

         @Override

         public void run() {

                   while (true) {

                            p.setValue();

                   }

         }

}

 

public class ThreadC extends Thread {

         private C r;

 

         public ThreadC(C r) {

                   super();

                   this.r = r;

         }

 

         public void run() {

                   while (true) {

                            r.getValue();

                   }

         }

}

 

public class Run {

         public static void main(String[] args) {

                   String lock = new String("");

                   P p = new P(lock);

                   C r = new C(lock);

                   ThreadP pThread = new ThreadP(p);

                   ThreadC rthread = new ThreadC(r);

                   pThread.start();

                   rthread.start();

         }

}

多生產者和多消費者:

public class Run {

         public static void main(String[] args) throws InterruptedException {

                   String lock = new String("");

                   P p = new P(lock);

                   C r = new C(lock);

                   ThreadP[] pThread = new ThreadP[2];

                   ThreadC[] rthread = new ThreadC[2];

                   for (int i = 0; i < 2; i++) {

                            pThread[i] = new ThreadP(p);

 

                            pThread[i].setName("生產者" + (i + 1));

 

                            rthread[i] = new ThreadC(r);

                            rthread[i].setName("消費者" + (i + 1));

                            pThread[i].start();

                            rthread[i].start();

                   }

                   Thread.sleep(5000);

                   Thread[] threadArray = new Thread[Thread.currentThread().getThreadGroup().activeCount()];

                   Thread.currentThread().getThreadGroup().enumerate(threadArray);

                   for (int i = 0; i < threadArray.length; i++) {

                            System.out.println(threadArray[i].getName() + "" + threadArray[i].getState());

                   }

 

         }

}

 

public class C {

         private String lock;

 

         public C(String lock) {

                   super();

                   this.lock = lock;

         }

 

         public void getValue() {

                   try {

                            synchronized (lock) {

                                     while (ValueObject.value.equals("")) {

                                               System.out.println("消費者" + Thread.currentThread().getName() + "waiting 了★★★☆☆☞☝☜☚✘█▌▎▓");

                                               lock.wait();

                                     }

 

                                     System.err.println("消費者" + Thread.currentThread().getName() + "runnable 了★★★☆☆☞☝☜☚✘█▌▎▓");

                                     ValueObject.value = "";

                                     lock.notify();

                            }

 

                   } catch (Exception e) {

                            e.printStackTrace();

                   }

         }

 

}

 

 

public class P {

         private String lock;

 

         public P(String lock) {

                   super();

                   this.lock = lock;

         }

 

         public void setValue() {

                   try {

                            synchronized (lock) {

                                     while (!ValueObject.value.equals("")) {

                                              

                                               System.out.println("生產者"+Thread.currentThread().getName()+"waiting 了★★★☆☆☞☝☜☚✘█▌▎▓");

                                               lock.wait();

                                     }

                                     System.out.println("生產者"+Thread.currentThread().getName()+"runnable 了★★★☆☆☞☝☜☚✘█▌▎▓");

                                     String value = System.currentTimeMillis() + "_" + System.nanoTime();

                                     System.out.println("set的值是:" + value);

                                     ValueObject.value = value;

                                     lock.notify();

                            }

                   } catch (Exception e) {

                            e.printStackTrace();

                   }

 

         }

}

 

 

public class ThreadC extends Thread {

         private C r;

 

         public ThreadC(C r) {

                   super();

                   this.r = r;

         }

 

         public void run() {

                   while (true) {

                            r.getValue();

                   }

         }

}

 

public class ThreadP extends Thread {

         private P p;

 

         public ThreadP(P p) {

                   super();

                   this.p = p;

         }

 

         @Override

         public void run() {

                   while (true) {

                            p.setValue();

                   }

         }

}

 

public class ValueObject {

         public static String value = "";

}

 

出現假死狀態,全部的生產者和消費者都在等待狀態;多是連續喚醒了同類,同時喚醒異類就能解決問題;

 

 

多生產者和多消費者解決死鎖的問題:操做值將notify改成notifyAll;

單個生產者和消費者交替執行,操做棧:

public class Run {

         public static void main(String[] args) {

                   MyStack myStack = new MyStack();

                   P p = new P(myStack);

                   C r = new C(myStack);

                   P_Thread pthread = new P_Thread(p);

                   C_Thread rthread = new C_Thread(r);

                   pthread.start();

                   rthread.start();

 

         }

}

 

 

public class C_Thread extends Thread {

         private C r;

 

         public C_Thread(C r) {

                   super();

                   this.r = r;

         }

 

         public void run() {

                   while (true) {

                            r.popService();

                   }

         }

 

}

public class P_Thread extends Thread {

         private P p;

 

         public P_Thread(P p) {

                   super();

                   this.p = p;

         }

 

         @Override

         public void run() {

                   while (true) {

                            p.pushService();

                   }

         }

 

}

 

public class C {

         private MyStack myStack;

 

         public C(MyStack myStack) {

                   super();

                   this.myStack = myStack;

         }

 

         public void popService() {

                   System.out.println("pop=" + myStack.pop());

         }

 

}

 

public class P {

         private MyStack myStack;

 

         public P(MyStack myStack) {

                   super();

                   this.myStack = myStack;

         }

 

         public void pushService() {

                   myStack.push();

         }

}

 

public class MyStack {

         private List list = new ArrayList();

 

         synchronized public void push() {

                   try {

                            if (list.size() == 1) {

                                     this.wait();

                            }

                            list.add("anyString=" + Math.random());

                            this.notify();

                            System.out.println("push=" + list.size());

                   } catch (Exception e) {

                            e.printStackTrace();

                   }

         }

 

         synchronized public String pop() {

                   String returnValue = "";

                   try {

                            if (list.size() == 0) {

                                     System.out.println("pop中的:" + Thread.currentThread().getName() + "線程wait狀態");

                                     this.wait();

                            }

                            returnValue = "" + list.get(0);

                            list.remove(0);

                            this.notify();

                            System.out.println("pop=" + list.size());

                   } catch (Exception e) {

                            e.printStackTrace();

                   }

                   return returnValue;

         }

 

}

 

一個生產者和多個消費者

 

多生產者和一個消費者:操做棧

建立多個生產者線程

多生產者與多消費者

 

 

經過管道進行線程間通訊:字節流

public class Run {

         public static void main(String[] args) {

                   try {

                            WriteData writeData = new WriteData();

                            ReadData readData = new ReadData();

                            PipedInputStream inputStream = new PipedInputStream();

 

                            PipedOutputStream outputStream = new PipedOutputStream();

                            outputStream.connect(inputStream);

                            ThreadRead threadRead = new ThreadRead(readData, inputStream);

                            threadRead.start();

                            Thread.sleep(2000);

                            ThreadWrite threadWrite = new ThreadWrite(writeData, outputStream);

                            threadWrite.start();

                   } catch (Exception e) {

                           

                   }

         }

}

public class ThreadRead extends Thread {

         public ThreadRead(ReadData read, PipedInputStream input) {

                   super();

                   this.read = read;

                   this.input = input;

         }

 

         private ReadData read;

         private PipedInputStream input;

 

         @Override

         public void run() {

                   read.readMethod(input);

         }

}

 

public class ThreadWrite extends Thread {

         private WriteData write;

         private PipedOutputStream out;

 

         public ThreadWrite(WriteData write, PipedOutputStream out) {

                   super();

                   this.write = write;

                   this.out = out;

         }

         @Override

         public void run() {

                   write.writeMethod(out);

         }

}

 

public class ReadData {

         synchronized public void readMethod(PipedInputStream input) {

                   try {

                            System.out.println("read:");

                            byte[] byteArray = new byte[20];

                            int readLength = input.read(byteArray);

                            while (readLength != -1) {

                                     String newData = new String(byteArray, 0, readLength);

                                     System.out.println(newData);

                                     readLength = input.read(byteArray);

 

                            }

                            System.out.println();

                            input.close();

                   } catch (Exception e) {

                            e.printStackTrace();

                   }

         }

}

 

public class WriteData {

 

         synchronized public void writeMethod(PipedOutputStream out) {

                   try {

                            System.out.println("write:");

                           

                            for (int i = 0; i < 20; i++) {

                                     String outData = "" + (i + 1);

                                     out.write(outData.getBytes());

                                     System.out.println(outData);

                            }

                            System.out.println();

                            out.close();

                   } catch (Exception e) {

                            e.printStackTrace();

                   }

         }

 

}

 

 

結果:兩個線程經過管道流成功進行了數據的傳輸。

 

經過管道進行線程間通訊:字符流

經過PipedReader,PipedWriter,char[]等實現字符流的讀寫。

 

 

 

等待通知交叉;

例子:

public class DBTools {

         volatile private boolean prevIsA = false;

 

         synchronized public void backupA() {

                   try {

                            while (prevIsA == true) {

                                     wait();

                            }

                            for (int i = 0; i < 1; i++) {

                                     System.out.println("★★★★★");

                            }

                            prevIsA = true;

                            notifyAll();

 

                   } catch (Exception e) {

                            e.printStackTrace();

                   }

         }

 

         synchronized public void backupB() {

                   try {

                            while (prevIsA == false) {

                                     wait();

                            }

                            for (int i = 0; i < 1; i++) {

                                     System.out.println("☆☆☆☆☆");

                            }

                            prevIsA = false;

                            notifyAll();

                   } catch (Exception e) {

                            e.printStackTrace();

                   }

         }

 

}

 

public class BackupA extends Thread {

         private DBTools dbtools;

 

         public BackupA(DBTools dbtools) {

                   super();

                   this.dbtools = dbtools;

         }

 

         @Override

         public void run() {

                   dbtools.backupA();

         }

 

}

 

public class BackupB extends Thread{

         private DBTools dbtools;

 

         public BackupB(DBTools dbtools) {

                   super();

                   this.dbtools = dbtools;

         }

 

         @Override

         public void run() {

                   dbtools.backupB();

         }

}

 

 

public class Run {

         public static void main(String[] args) {

                   DBTools dbtools = new DBTools();

                   for (int i = 0; i < 20; i++) {

                            BackupB output = new BackupB(dbtools);

                            output.start();

                            BackupA input = new BackupA(dbtools);

                            input.start();

                   }

         }

}

 

交叉打印內容

join方法的使用:

使所屬的線程對象正確的執行run中的方法,使當前線程處於無限期的阻塞,等待線程銷燬以後再繼執行線程後面的代碼。

 

具備使線程排隊運行的做用

 

join()方法和interrupt同時使用時的異常java.lang.InterruptedException

代碼:

public class ThreadA extends Thread {

         @Override

         public void run() {

                   for (int i = 0; i < Integer.MAX_VALUE; i++) {

                            String newString = new String();

                            Math.random();

                   }

         }

}

 

public class ThreadB extends Thread {

         @Override

         public void run() {

                   try {

                            ThreadA a = new ThreadA();

                            a.start();

                            a.join();

                            System.out.println("b線程在run end 處打印了");

                   } catch (Exception e) {

                            System.out.println("b線程在catch  處打印了");

                            e.printStackTrace();

                   }

         }

}

 

public class ThreadC extends Thread {

         private ThreadB threadB;

 

         public ThreadC(ThreadB threadB) {

                   super();

                   this.threadB = threadB;

         }

 

         @Override

         public void run() {

                   threadB.interrupt();

         }

}

 

public class Run {

         public static void main(String[] args) {

                   try {

                            ThreadB b = new ThreadB();

                            b.start();

                            Thread.sleep(500);

                            ThreadC c = new ThreadC(b);

                            c.start();

                   } catch (Exception e) {

                            e.printStackTrace();

                   }

         }

}

 

方法join(long)的使用

設置等待的時間

public class MyThread extends Thread {

         @Override

         public void run() {

                   try {

                            System.out.println("begin Timer=" + System.currentTimeMillis());

                            Thread.sleep(5000);

 

                   } catch (Exception e) {

                            e.printStackTrace();

                   }

         }

}

 

public class Test {

         public static void main(String[] args) {

                   try {

                            MyThread threadTest = new MyThread();

                            threadTest.start();

                            threadTest.join(2000);

                            System.out.println("  end timer=" + System.currentTimeMillis());

                   } catch (Exception e) {

                            e.printStackTrace();

                   }

         }

}

結果:只等待了2秒,join(long time)

和sleep(long time)的區別:join(long)是在顳部使用wait(long)方法來實現的,具備釋放鎖的特色。Thread.sleep(long) 方法卻不釋放鎖。

 

public class JoinDemo {

         public final synchronized void join(long mills) {

                   long base = System.currentTimeMillis();

                   long now = 0;

                   if (mills < 0) {

                            throw new IllegalArgumentException("timeout value is negative");

                   }

                   if (mills == 0) {

                            while (isAlive()) {

                                     wait(0);

                            }

                   } else {

                            while (isAlive()) {

                                     long delay = mills - now;

                                     if (delay <= 0) {

                                               break;

                                     }

                                     wait(delay);

                                     now = System.currentTimeMillis() - base;

                            }

                   }

 

         }

}

 

join()後面的代碼提早運行:出現意外

解釋意外:

 

幾個線程搶鎖,根據不一樣的狀況搶到鎖的狀況是不同的,就會出現不一樣的結果。

 

ThreadLocal的使用:

變量值的共享能夠經過public static 變量的形式,全部的線程都是用同一個static 變量;如何讓每個線程都有本身的共享變量?ThreadLocal正式爲了解決這樣的問題。將每一個線程綁定本身的值

public class Run {

         public static ThreadLocal t1 = new ThreadLocal();

 

         public static void main(String[] args) {

                   if (t1.get() == null) {

                            System.out.println("從未放過值");

                            t1.set("個人值");

                   }

                   System.out.println(t1.get());

                   System.out.println(t1.get());

         }

 

}

 

驗證線程變量的隔離性

例子1

 

 

public class Tools {

         public static ThreadLocal t1 = new ThreadLocal();

}

public class ThreadB extends Thread {

         @Override

         public void run() {

                   try {

                            for (int i = 0; i < 100; i++) {

                                     Tools.t1.set("TrheadB" + (i + 1));

                                     System.out.println("ThreadB get Value=" + Tools.t1.get());

                                     Thread.sleep(200);

                            }

                   } catch (Exception e) {

                            e.printStackTrace();

                   }

         }

}

 

 

public class ThreadA extends Thread {

         @Override

         public void run() {

                   try {

                            for (int i = 0; i < 100; i++) {

                                     Tools.t1.set("TrheadA" + (i + 1));

                                     System.out.println("ThreadA get Value=" + Tools.t1.get());

                                     Thread.sleep(200);

                            }

                   } catch (Exception e) {

                            e.printStackTrace();

                   }

         }

}

 

public class Run {

         public static void main(String[] args) {

                   try {

                            ThreadA a = new ThreadA();

                            ThreadB b = new ThreadB();

                            a.start();

                            b.start();

                            for (int i = 0; i < 100; i++) {

                                     Tools.t1.set("main" + (i + 1));

                                     System.out.println("Main get value=" + Tools.t1.get());

                                     Thread.sleep(200);

 

                            }

                   } catch (Exception e) {

                            e.printStackTrace();

                   }

         }

}

 

結果:雖然3個線程都向t1對象中et()數據值,可是每一個線程仍是能取出本身的數據

例子2

public class Tools {

         public static ThreadLocal<Date> t1 = new ThreadLocal<Date>();

}

public class ThreadA extends Thread {

         @Override

         public void run() {

                   try {

                            for (int i = 0; i < 20; i++) {

                                     if (Tools.t1.get() == null) {

                                               Tools.t1.set(new Date());

                                     }

                                     System.out.println("A " + Tools.t1.get().getTime());

                                     Thread.sleep(100);

                            }

                   } catch (Exception e) {

                            e.printStackTrace();

                   }

         }

}

 

 

public class ThreadB extends Thread {

         @Override

         public void run() {

                   try {

                            for (int i = 0; i < 20; i++) {

                                     if (Tools.t1.get() == null) {

                                               Tools.t1.set(new Date());

                                     }

                                     System.out.println("B " + Tools.t1.get().getTime());

                                     Thread.sleep(100);

                            }

                   } catch (Exception e) {

                            e.printStackTrace();

                   }

         }

}

 

public class Run {

         public static void main(String[] args) {

                   try {

                            ThreadA a = new ThreadA();

                            a.start();

                            Thread.sleep(1000);

                            ThreadB b = new ThreadB();

                            b.start();

                   } catch (Exception e) {

                            e.printStackTrace();

                   }

         }

}

 

 

給ThreadLocal賦初值,值再也不爲null,重寫方法InitialValue()

例子:

public class Run {

         public static ThreadLocalExt t1 = new ThreadLocalExt();

 

         public static void main(String[] args) {

                   if (t1.get() == null) {

                            System.out.println("我從未放過值");

                            t1.set("個人值");

                   }

                   System.out.println(t1.get());

                   System.out.println(t1.get());

         }

}

public class ThreadLocalExt extends ThreadLocal {

         @Override

         protected Object initialValue() {

                  

                   return "我是默認值,再也不爲null";

         }

}

結果,證實main線程中有本身的值,那麼其餘線程是否會有本身的初始值呢?

一樣能夠證實其餘線程也有本身獨立的值

再次驗證線程變量的隔離性:

類InheritableThreadLocal使用

讓子線程從父線程中取得值。

例子以下:public class Tools {

         public static InheritableThreadLocalExt t1 = new InheritableThreadLocalExt();

}

 

public class ThreadA extends Thread {

         @Override

         public void run() {

                   try {

                            for (int i = 0; i < 10; i++) {

                                     System.out.println("在ThreadA線程中取值" + Tools.t1.get());

                                     Thread.sleep(100);

                            }

                   } catch (Exception e) {

                            e.printStackTrace();

                   }

         }

}

 

public class InheritableThreadLocalExt extends InheritableThreadLocal {

         @Override

         protected Object initialValue() {

                   return new Date().getTime();

         }

        

         @Override

         protected Object childValue(Object parentValue) {

                   return parentValue+"我在子線程加的~!";

         }

}

 

 

public class Run {

         public static void main(String[] args) {

                   try {

                            for (int i = 0; i < 10; i++) {

                                     System.out.println("在Main線程中取值:" + Tools.t1.get());

                                     Thread.sleep(100);

                            }

                            ThreadA a = new ThreadA();

                            a.start();

                   } catch (Exception e) {

                            e.printStackTrace();

                   }

         }

}

相關文章
相關標籤/搜索