多線程(四)

1.等待/通知機制(wait/notify)java

wait使線程中止,notify使線程繼續運行.數組

當方法wait()執行後,鎖被自動釋放,但執行完notify()方法,鎖卻不自動釋放.dom

驗證:當方法wait()執行後,鎖被自動釋放ide

public class Service {
    public void testMethod(Object lock){
        synchronized (lock){
            try {
                System.out.println("begin wait()");
                lock.wait();
                System.out.println(" end wait()");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}
public class ThreadA extends Thread{
    private Object lock;
    public ThreadA(Object lock){
        this.lock=lock;
    }

    @Override
    public void run() {
        Service service=new Service();
        service.testMethod(lock);
    }
}
public class ThreadB extends Thread{
    private Object lock;
    public ThreadB(Object lock){
        this.lock=lock;
    }

    @Override
    public void run() {
        Service service=new Service();
        service.testMethod(lock);
    }
}
public class Test {
    public static void main(String[] args) {
        Object lock=new Object();
        ThreadA a= new ThreadA(lock);
        a.start();
        ThreadB b=new ThreadB(lock);
        b.start();
    }
/*output:
begin wait()
begin wait()
*/
}

總結:線程a wait()以後釋放鎖,線程b進入,驗證成立this

public class Service {
    public void testMethod(Object lock){
        synchronized (lock){
            try {
                System.out.println("begin wait() ThreadName="+Thread.currentThread().getName());
                lock.wait();
                System.out.println("end wait() ThreadName="+Thread.currentThread().getName());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    public void synNotifyMethod(Object lock){
        try {
            synchronized (lock){
                System.out.println("begin notify ThreadName="+Thread.currentThread().getName()+" time="+System.currentTimeMillis());
                lock.notify();
                Thread.sleep(5000);
                System.out.println("end notify ThreadName="+Thread.currentThread().getName()+" time="+System.currentTimeMillis());
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

    }
}
public class ThreadA extends Thread{
    private Object lock;
    public ThreadA(Object lock){
        this.lock=lock;
    }

    @Override
    public void run() {
        Service service=new Service();
        service.testMethod(lock);
    }
}
public class NotifyThread extends Thread{
    private Object lock;
    public NotifyThread(Object lock){
        super();
        this.lock=lock;
    }

    @Override
    public void run() {
        Service service=new Service();
        service.synNotifyMethod(lock);
    }
}
public class SynNotifyThread extends Thread{
    private Object lock;
    public SynNotifyThread(Object lock){
        super();
        this.lock=lock;
}

    @Override
    public void run() {
        Service service=new Service();
        service.synNotifyMethod(lock);
    }
}
public class Test {
    public static void main(String[] args) {
        Object lock=new Object();
        ThreadA a= new ThreadA(lock);
        a.start();
        NotifyThread notifyThread=new NotifyThread(lock);
        notifyThread.start();
        SynNotifyThread synNotifyThread=new SynNotifyThread(lock);
        synNotifyThread.start();
    }
/*
begin wait() ThreadName=Thread-0
begin notify ThreadName=Thread-1 time=1515776621424
end notify ThreadName=Thread-1 time=1515776626424
end wait() ThreadName=Thread-0
begin notify ThreadName=Thread-2 time=1515776626424
end notify ThreadName=Thread-2 time=1515776631425
*/
}

總結:線程a wait()以後被線程notifyThread 和synNotifyThread notify(),可是線程Thread-1沒有立刻釋放鎖,運行完synchronized代碼塊才釋放鎖.線程

2.當interrupt()遇到wait()對象

當線程呈wait()狀態時,調用線程對象的interrupt()方法會出現interruptedException()異常.ip

3.notify()一次只隨機通知一個線程進行喚醒內存

public class Service {
    public void testMethod(Object lock){
        try {
            synchronized (lock) {
                System.out.println("begin wait() ThreadName=" + Thread.currentThread().getName());
                lock.wait();
                System.out.println("end wait() ThreadName=" + Thread.currentThread().getName());
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
public class ThreadA extends Thread{
    private Object lock;
    public ThreadA(Object lock){
        this.lock=lock;
    }

    @Override
    public void run() {
        Service service=new Service();
        service.testMethod(lock);
    }
}
public class ThreadB extends Thread{
    private Object lock;
    public ThreadB(Object lock){
        this.lock=lock;
    }

    @Override
    public void run() {
        Service service=new Service();
        service.testMethod(lock);
    }
}
public class ThreadC extends Thread{
    private Object lock;
    public ThreadC(Object lock){
        this.lock=lock;
    }

    @Override
    public void run() {
        Service service=new Service();
        service.testMethod(lock);
    }
}
public class NotifyThread extends Thread{
    private Object lock;
    public NotifyThread(Object lock){
        super();
        this.lock=lock;
    }

    @Override
    public void run() {
        synchronized (lock){
            lock.notifyAll();
        }
    }
}
public class Test {
    public static void main(String[] args) throws InterruptedException {
        Object lock=new Object();
        ThreadA a= new ThreadA(lock);
        a.start();
        ThreadB b= new ThreadB(lock);
        b.start();
        ThreadC c= new ThreadC(lock);
        c.start();
        Thread.sleep(1000);
        NotifyThread notifyThread=new NotifyThread(lock);
        notifyThread.start();
    }
/*output:
begin wait() ThreadName=Thread-0
begin wait() ThreadName=Thread-2
begin wait() ThreadName=Thread-1
end wait() ThreadName=Thread-0
end wait() ThreadName=Thread-2
*/
}

總結:notify()方法隨機通知一個線程進行喚醒rem

notifyAll()是喚醒全部線程.

4.方法wait(long)
等待某一時間內是否有線程對鎖進行喚醒,若是超過這個時間則自動喚醒

public class MyRunnable {
    static private Object lock = new Object();
    static private Runnable runnable1 = new Runnable() {
        @Override
        public void run() {
            try {
                synchronized (lock) {
                    System.out.println("wait begin timer=" + System.currentTimeMillis());
                    lock.wait(5000);
                    System.out.println("wait end timer=" + System.currentTimeMillis());
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    };
    static private Runnable runnable= new Runnable() {
        @Override
        public void run() {
            synchronized (lock){
                System.out.println("notify begin timer=" + System.currentTimeMillis());
                lock.notify();
                System.out.println("notify end timer=" + System.currentTimeMillis());
            }
        }
    };

    public static void main(String[] args) throws InterruptedException {
        Thread t1=new Thread(runnable1);
        t1.start();
        Thread.sleep(3000);
        Thread t2=new Thread(runnable);
        t2.start();
    }
/*
wait begin timer=1516010021128
notify begin timer=1516010024127//睡了3秒
notify end timer=1516010024128
wait end timer=1516010024128//立刻激活,輸出打印
*/
}

5.生產者/消費者模式實現

//生產者
public class P {
    private String lock;
    public P(String lock){
        this.lock=lock;
    }
    public void setValue(){
        try {
            synchronized (lock){
                while(!ValueObject.value.equals("")){
                    System.out.println("生產者"+Thread.currentThread().getName()+" waiting了");
                    lock.wait();
                }
                System.out.println("生產者"+Thread.currentThread().getName()+" running了");
                String value=System.currentTimeMillis()+"_"+System.nanoTime();
                System.out.println("set的值是"+value);
                ValueObject.value=value;
                lock.notify();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
//消費者
public class C {
    private String lock;
    public C(String lock){
        this.lock=lock;
    }
    public void getValue(){
        try {
            synchronized (lock){
                while(ValueObject.value.equals("")){
                    System.out.println("消費者"+Thread.currentThread().getName()+" waiting了");
                    lock.wait();
                }
                System.out.println("消費者"+Thread.currentThread().getName()+" running了");
                System.out.println("get的值是"+ValueObject.value);
                ValueObject.value="";
                lock.notify();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
//生產線程
public class ThreadP extends Thread{
    private P p;
    public ThreadP(P p){
        super();
        this.p=p;
    }

    @Override
    public void run() {
        while(true){
            p.setValue();
        }
    }
}
//消費線程
public class ThreadC extends Thread{
    private C c;
    public ThreadC(C c){
        super();
        this.c=c;
    }

    @Override
    public void run() {
        while(true){
            c.getValue();
        }
    }
}
public class Run {
    public static void main(String[] args) throws InterruptedException {
        String lock=new String("");
        P p=new P(lock);
        C c=new C(lock);
        ThreadP[] tp=new ThreadP[2];
        ThreadC[] tc=new ThreadC[2];
        for (int i = 0; i <2 ; i++) {
            tp[i]=new ThreadP(p);
            tp[i].setName("生產者"+(i+1));
            tc[i]=new ThreadC(c);
            tc[i].setName("消費者"+(i+1));
            tp[i].start();
            tc[i].start();
        }
        Thread.sleep(5000);
        Thread[] threadArray=new Thread[Thread.currentThread().getThreadGroup().activeCount()];
        Thread.currentThread().getThreadGroup().enumerate(threadArray);//將list轉成數組
        for (int i = 0; i <threadArray.length ; i++) {
            System.out.println(threadArray[i].getName()+" "+threadArray[i].getState());
        }
    }
/*//
生產者生產者2 running了
set的值是1516022907331_19063484002988
生產者生產者2 waiting了
生產者生產者1 waiting了
消費者消費者1 running了
get的值是1516022907331_19063484002988
消費者消費者1 waiting了
消費者消費者2 waiting了
main RUNNABLE
Monitor Ctrl-Break RUNNABLE
生產者1 WAITING
消費者1 WAITING
生產者2 WAITING
消費者2 WAITING
*/
}

總結:產生緣由是由於notify()是隨機通知,致使生產類沒有被通知,陷入假死狀態

解決方法:使用notifyAll()

一個生產者對應多個消費者:

public class MyStack {
    private List list=new ArrayList();
    synchronized public void push(){
        try {
            if(list.size()!=0){
                this.wait();
            }
            list.add("anything "+ Math.random());
            this.notify();
            System.out.println("push="+list.size());

        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    synchronized public String pop(){
        String returnValue="";
        try {
            if(list.size()==0){
                System.out.println("pop操做中的:"+Thread.currentThread().getName()+" 線程呈wait" +
                        "狀態");
                this.wait();
            }
            returnValue=""+list.get(0);
            list.remove(0);
            this.notify();
            System.out.println("pop="+list.size());

        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return returnValue;
    }
}
public class P_Thread extends Thread{
    private P p;
    public P_Thread(P p){
        super();
        this.p=p;
    }

    @Override
    public void run() {
        while(true){
            p.pushService();
        }
    }
}
public class C_Thread extends Thread{
    private C c;
    public C_Thread(C c){
        super();
        this.c=c;
    }

    @Override
    public void run() {
        while(true){
            c.popService();
        }
    }
}
public class P {
    private MyStack stack;
    public P(MyStack stack){
        this.stack=stack;
    }
    public void pushService(){
        stack.push();
    }
}
public class C {
    private MyStack stack;
    public C(MyStack stack){
        this.stack=stack;
    }
    public void popService(){
        stack.pop();
    }
}
public class Run {
    public static void main(String[] args) {
        MyStack stack=new MyStack();
        P p=new P(stack);
        C c1=new C(stack);
        C c2=new C(stack);
        C c3=new C(stack);
        C c4=new C(stack);
        C c5=new C(stack);
        P_Thread pt=new P_Thread(p);
        C_Thread ct=new C_Thread(c1);
        C_Thread ct2=new C_Thread(c2);
        C_Thread ct3=new C_Thread(c3);
        C_Thread ct4=new C_Thread(c4);
        C_Thread ct5=new C_Thread(c5);
        pt.start();
        ct.start();
        ct2.start();
        ct3.start();
        ct4.start();
        ct5.start();

    }
/*
pop操做中的:Thread-2 線程呈wait狀態
pop操做中的:Thread-3 線程呈wait狀態
push=1
pop=0
pop操做中的:Thread-2 線程呈wait狀態
pop操做中的:Thread-4 線程呈wait狀態
pop操做中的:Thread-1 線程呈wait狀態
pop操做中的:Thread-5 線程呈wait狀態
Exception in thread "Thread-3" java.lang.IndexOutOfBoundsException: Index: 0, Size: 0
    at java.util.ArrayList.rangeCheck(ArrayList.java:604)
    at java.util.ArrayList.get(ArrayList.java:382)
    at mutithreading.MyStack.pop(MyStack.java:33)
    at mutithreading.C.popService(C.java:13)
    at mutithreading.C_Thread.run(C_Thread.java:16)
*/
}

總結: 產生問題的緣由,條件改變時(list.size())並無獲得及時的響應,因此多個呈wait狀態的線程被喚醒,繼而執行list.remove(0)代碼而出現異常.

這邊我試了一下用volidate關鍵字修飾list,思路是,將list的修改強行操做到內存,讓全部的線程共享list,可是實際操做結果仍是同樣.

解決方法:

public class MyStack {
    private List list=new ArrayList();
    synchronized public void push(){
        try {
            while(list.size()!=0){
                this.wait();
            }
            list.add("anything "+ Math.random());
            System.out.println("list的size="+list.size());
            this.notify();
            System.out.println("push="+list.size());

        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    synchronized public String pop(){
        String returnValue="";
        try {
            while(list.size()==0){
                System.out.println("pop操做中的:"+Thread.currentThread().getName()+" 線程呈wait" +
                        "狀態");
                this.wait();
            }
            returnValue=""+list.get(0);
            list.remove(0);
            this.notify();
            System.out.println("pop="+list.size());

        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return returnValue;
    }
/*
pop操做中的:Thread-5 線程呈wait狀態
pop操做中的:Thread-3 線程呈wait狀態
pop操做中的:Thread-2 線程呈wait狀態
pop操做中的:Thread-4 線程呈wait狀態
list的size=1
push=1
pop=0
pop操做中的:Thread-1 線程呈wait狀態
pop操做中的:Thread-5 線程呈wait狀態
*/
}

陷入假死狀態,老辦法,改成notifyAll(),順利解決.

6.經過管道進行線程間通訊:字節流

public class WriteData {
    public void writeMethod(PipedOutputStream out){
        try {
            System.out.println("write:");
            for (int i = 0; i <300 ; i++) {
                String outData=""+(i+1);
                out.write(outData.getBytes());
                System.out.println(outData);
            }
            out.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}
public class ReadData {
    public void readMethod(PipedInputStream in) throws IOException {
        System.out.println("read: ");
        byte[] byteArray=new byte[20];
        int readLength=in.read(byteArray);
        while(readLength!=-1){
            String newData=new String(byteArray,0,readLength);
            System.out.println(newData);
            readLength=in.read(byteArray);
        }
        in.close();
    }
}
public class ThreadWrite extends Thread{
    private WriteData write;
    private PipedOutputStream out;
    public ThreadWrite(WriteData write,PipedOutputStream out){
        super();
        this.write=write;
        this.out=out;
    }

    @Override
    public void run() {
        write.writeMethod(out);
    }
}
public class ThreadRead extends Thread{
    private ReadData read;
    private PipedInputStream in;
    public ThreadRead(ReadData read, PipedInputStream in){
        super();
        this.read=read;
        this.in=in;
    }

    @Override
    public void run() {
        try {
            read.readMethod(in);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}
public class Run {
    public static void main(String[] args) throws IOException {
        try {
            WriteData write=new WriteData();
            ReadData read=new ReadData();
            PipedOutputStream out=new PipedOutputStream();
            PipedInputStream in =new PipedInputStream();
            out.connect(in);//使兩個Stream之間產生通訊
            ThreadRead threadRead=new ThreadRead(read,in);
            threadRead.start();
            Thread.sleep(2000);
            ThreadWrite threadWrite=new ThreadWrite(write,out);
            threadWrite.start();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

字符流差很少.

public class ReadData {
    public void readMethod(PipedReader in) throws IOException {
        System.out.println("read: ");
        char[] byteArray=new char[20];
        int readLength=in.read(byteArray);
        while(readLength!=-1){
            String newData=new String(byteArray,0,readLength);
            System.out.println(newData);
            readLength=in.read(byteArray);
        }
        in.close();
    }
}
public class WriteData {
    public void writeMethod(PipedWriter out){
        try {
            System.out.println("write:");
            for (int i = 0; i <300 ; i++) {
                String outData=""+(i+1);
                out.write(outData);
                System.out.println(outData);
            }
            out.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}
相關文章
相關標籤/搜索