java多線程三之線程協做與通訊實例

多線程的難點主要就是多線程通訊協做這一塊了,前面筆記二中提到了常見的同步方法,這裏主要是進行實例學習了,今天總結了一下3個實例:html

一、銀行存款與提款多線程實現,使用Lock鎖和條件Condition。     附加 : 用監視器進行線程間通訊java

二、生產者消費者實現,使用LinkedList自寫緩衝區。linux

三、多線程之阻塞隊列學習,用阻塞隊列快速實現生產者消費者模型。    附加:用布爾變量關閉線程數組

       在三種線程同步方法中,咱們這裏的實例用Lock鎖來實現變量同步,由於它比較靈活直觀。緩存

       實現了變量的同步,咱們還要讓多個線程之間進行「通話」,就是一個線程完成了某個條件以後,告訴其餘線程我完成了這個條件,大家能夠行動了。下面就是java提供的條件接口Condition定義的同步方法:安全

image

       很方便的是,java的Lock鎖裏面提供了newConditon()方法能夠,該方法返回:一個綁定了lock鎖的Condition實例,有點抽象,其實把它看做一個能夠發信息的鎖就能夠了,看後面的代碼,應該就能理解了。多線程

一、銀行存款與提款多線程實現。

咱們模擬ATM機器存款與提款,建立一個帳戶類Account(),該類包含同步方法併發

     存款方法:deposit()dom

     提款方法:withdraw()ide

     以及一個普通的查詢餘額的方法getbalance().

咱們建立兩個任務線程,分別調用兩個同步方法,進行模擬操做,看代碼:

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class ThreadCooperation {
	private static Account account = new Account();   
	public static void main(String[] args)
	{
		//建立線程池
		ExecutorService executor = Executors.newFixedThreadPool(2);
		executor.execute(new DepositTask());
		executor.execute(new WithdrawTask());
	}
	//存錢
	public static class DepositTask implements Runnable
	{
		@Override
		public void run() {
			try {
				while(true)
				{
					account.deposit((int)(Math.random()*1000)+1);
					Thread.sleep(1000);
				}
				} catch (InterruptedException e) {
					e.printStackTrace();
			}
		}
	}
	public static class WithdrawTask implements Runnable
	{
		@Override
		public void run() {
			try{
				while(true)
				{
					account.withdraw((int)(Math.random()*1000)+1);
					Thread.sleep(500);
				}
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}
	
	public static class Account
	{
		//一個鎖是一個Lock接口的實例   它定義了加鎖和釋放鎖的方法     ReentrantLock是爲建立相互排斥的鎖的Lock的具體實現
		private static Lock lock = new ReentrantLock();
		//建立一個condition,具備發通知功能的鎖,前提是要實現了lock接口
		private static Condition  newDeposit = lock.newCondition();
		private int balance = 0;
		public int getBalance()
		{
			return balance;
		}
		public void withdraw(int amount)
		{
			lock.lock();
			try {
				while(balance < amount)
				{
				System.out.println("\t\t錢不夠,等待存錢");
					newDeposit.await();
				}
				balance -= amount;
				System.out.println("\t\t取出"+amount+"塊錢\t剩餘"+getBalance());
			} catch (InterruptedException e) {
				e.printStackTrace();
			}finally{
				lock.unlock();
			}
		}
		
		public void deposit(int amount)
		{
			lock.lock();
			try{
				balance+=amount;
				System.out.println("存入"+amount+"塊錢");
				newDeposit.signalAll();   //發信息喚醒全部的線程
			}
			finally{
				lock.unlock();
			}
		}
	}
}

運行截圖C1]I1L)1D0(F)I4MCX3)4$I

分析:

      一、程序中須要注意的:建立一個condition,具備發通知功能的鎖,前提是要實現了lock接口。

      二、while(balance < amount)不能改用if判斷,用if會使得線程不安全,使用if會不會進行循環驗證,而while會,咱們常常看到while(true),可是不會常常看到if(true).

      三、調用了await方法後,要記得使用signalAll()或者signal()將線程喚醒,不然線程永久等待。

       最後再來分析一下這個類的結構,有3個類,兩個靜態任務類實現了Runnable接口,是線程類,而另一個類則是普通的任務類,包含了線程類所用到的方法。咱們的主類在main方法前面就實例化一個Account類,以供線程類調用該類裏面的同步方法。

      這種構造方式是多線程經常使用到的一種構造方式吧。不難發現後面要手寫的生產者消費者模型也是這樣子構造的。這至關因而一個多線程模板。也是咱們學習這個例子最重要的收穫吧。

 

 

用監視器進行線程之間的通訊

       還有一點,接口Lock與Condition都是在java5以後出現的,在這以前,線程通訊是經過內置的監視器(monitor)實現的。

      監視器是一個相互排斥且具備同步能力的對象,任意對象都有可能成爲一個monitor。監視器是經過synchronized關鍵字來對本身加鎖(加鎖解鎖是解決線程同步最基本的思想),使用wait()方法時線程暫停並 等待條件發生,發通知則是經過notify()和notifyAll()方法。大致的模板是這樣子的:

image

不難看出await()、signal()、signally()是wait()、notify()、notifyAll()的進化形態,因此不建議使用監視器。

 

二、生產者消費者實現,使用LinkedList自寫緩衝區

         這個模型一直很經典,學操做系統的時候還學過,記得linux還用PV操做去實現它,不過這東西是跨學科的。

考慮緩存區buffer的使用者,生產者和消費者,他們都能識別緩衝區是否滿的,且兩種各只能發出一種信號:

       生產者:它能發出notEmpty()信號,即緩衝區非空信號,當它看到緩衝區滿的時候,它就調用await等待。

       消費者:它能發出notFull()信號,即緩衝區未滿的信號,當它看到緩衝區空的時候,它也調用await等待。

看代碼:

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

//生產者消費者
public class ConsumerProducer {
	private static Buffer  buffer= new Buffer();
	public static void main(String[] args)
	{
		ExecutorService executor = Executors.newFixedThreadPool(2);
		executor.execute(new ProducerTask());
		executor.execute(new ConsumerTask());
		executor.shutdown();
	}
	
	public static class ProducerTask implements Runnable
	{
		@Override
		public void run() {
			int i=1;
			try {
				while(true)
				{
					System.out.println("生產者寫入數據"+i);	
					buffer.write(i++);
					Thread.sleep((int)(Math.random()*80));
					
				} 
			}catch (InterruptedException e) {
					e.printStackTrace();
			}
		}
	}
	
	public static class ConsumerTask implements Runnable
	{
		public void run() {
			try {
				while(true){
					System.out.println("\t\t消費讀出數據"+buffer.read());
					Thread.sleep((int)(Math.random()*100));
					}
				} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}
	
	public static class Buffer
	{
		private static final int CAPACTIY = 4;  //緩衝區容量
		private java.util.LinkedList<Integer> queue = new java.util.LinkedList<Integer>();
		
		private static Lock lock = new ReentrantLock();
		
		private static Condition  notEmpty = lock.newCondition();
		private static Condition notFull = lock.newCondition();
		
		public void write(int value)
		{
			lock.lock();
			try{
				while(queue.size()==CAPACTIY)
				{
					System.out.println("緩衝區爆滿");
					notFull.await();
				}
				queue.offer(value);
				notEmpty.signalAll();  //通知全部的緩衝區未空的狀況
			}catch(InterruptedException ex){
				ex.printStackTrace();
			}finally{
				lock.unlock();
			}
		}
		
		@SuppressWarnings("finally")
		public int read()
		{
			int value = 0;
			lock.lock();
			try{
				while(queue.isEmpty())
				{
					System.out.println("\t\t緩衝區是空的,等待緩衝區非空的狀況");
					notEmpty.await();
				}
				value = queue.remove();
				notFull.signal();
			}catch(InterruptedException ex){
				ex.printStackTrace();
			}finally{
				lock.unlock();
				return value;
			}
		}
	}
}

 

運行截圖image

程序運行正常,不過稍微延長一下讀取時間,就會出現這樣的狀況image

     

        程序裏面設置的容量是4,但是這裏卻能夠存入最多5個數據,並且更合理的狀況應該是初始緩衝區是空的,後面找了下這個小bug,原來是調用offer()函數應該放在檢測語句以前,若是但願一開始就調用ConsumerTask,在main方法裏面調換二者的順序便可。

三、用阻塞隊列快速實現生產者消費者模型

        java的強大之處是它有着豐富的類庫,咱們學習java在某種程度上就是學習這些類庫。

 

        阻塞隊列是這樣的一種隊列:當試圖向一個滿隊列裏添加元素  或者 從空隊列裏刪除元素時,隊列會讓線程自動阻塞,且當隊列滿時,隊列會繼續存儲元素,供喚醒後的線程使用。這應該說是專門爲消費者生產者模型而設計的一種隊列吧,它實現了Queue接口,主要方法是put()和take()方法。

                                                                         EU7H[HB1$E{SB3MZ)@DT89B

java支持三個具體的阻塞隊列ArrayBlockingQueue、LinkedBlockingQueue、PriorityBlockingQueue。都在java.util.concurrent包中。

      

簡單描述上面三個阻塞隊列:

          ArrayBlockingQueue: 該阻塞用數組實現,按照FIFO,即先進先出的原則對數據進行排序,和數組的使用有點類似,它事先須要指定一個容量,不過即使隊列超出這個容量,也是不會報錯滴。

          LinkeddBlockingQueue:用鏈表實現,默認隊列大小是Integer.MAX_VALUE,也是按照先進先出的方法對數據排序,性能可能比ArrayBlockingQueue,有待研究。

          PriorityBlockingQueue:用優先隊列實現的阻塞隊列,會對元素按照大小進行排序,也能夠建立不受限制的隊列,put方法永不阻塞

 

ok,看代碼:

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ConsumerProducerUsingBlockQueue {
	private static ArrayBlockingQueue<Integer>  buffer = new ArrayBlockingQueue<Integer>(2);
	public static void main(String[] args)
	{
		ExecutorService executor = Executors.newFixedThreadPool(2);
		executor.execute(new Consumer());
		executor.execute(new Producer());
		
		try {
			Thread.sleep(100);
			executor.shutdownNow();     //暴力關閉,會報錯,不推薦
		} catch (InterruptedException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		
	}
	
	public static class Consumer implements Runnable
	{
		@Override
		public void run() {
			try{
				int i=1;
				while(true){
					System.out.println("生成者寫入:"+i);
					buffer.put(i++);
					Thread.sleep((int)(Math.random())*1000);
				}
			}catch(InterruptedException ex){
				ex.printStackTrace();
			}
		}
	}
	public static class Producer implements Runnable
	{
		@Override
		public void run() {
			try{
				while(true)
				{
					System.out.println("\t\t消費者取出"+buffer.take());
					Thread.sleep((int)(Math.random())*10000);
				}
			}catch(InterruptedException ex){
				ex.printStackTrace();
			}
		}
	}
}

 

運行截圖:1cd4147312c04552aded4cc98ba76a10

        沒啥大的問題,就是在關閉線程的時候太過暴力了,會報錯,線程裏面的每個函數都彷佛值得研究,以前想經過Interrupt暫停,不過失敗了,就直接使用線程池執行器的shoutdownNow方法來的。後面本身又用了另一種關閉線程的方法,見下面代碼

 

使用LinkedBlockingQueue實現消費者生產者且使用布爾變量控制線程關閉

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
public class A_Control_stop {
	private static LinkedBlockingQueue<String>  buffer = new LinkedBlockingQueue<String>();

	public static void main(String[] args)
	{
		ExecutorService executor = Executors.newFixedThreadPool(2);
		executor.execute(new Consumer());
		executor.execute(new Producer());
		executor.shutdown();

		while(!executor.isTerminated()){}
		System.out.println("全部的的線程都正常結束");
	}
	
	public static class Consumer implements Runnable
	{
		private volatile boolean exit = false;
		@Override
		public void run() {
			try{
				int i=0;
				String[] str ={"as","d","sd","ew","sdfg","esfr"};
				while(!exit){
					System.out.println("生成者寫入:"+str[i]);
					buffer.put(str[i++]);
					Thread.sleep((int)(Math.random())*10);
					if(5==i)
					{
						exit=true;
					}
				}
			}catch(InterruptedException ex){
				ex.printStackTrace();
			}
		}
	}
	public static class Producer implements Runnable
	{
		private volatile boolean exit = false;
		@Override
		public void run() {
			try{
				int i=0;
				while(!exit)
				{
					System.out.println("\t\t消費者取出"+buffer.take());
					i++;
					Thread.sleep((int)(Math.random())*10);
					if(5==i)
					{
						exit=true;
					}
				}
			}catch(InterruptedException ex){
				ex.printStackTrace();
			}
		}
	}
}

 

截圖image

         關於阻塞隊列,以爲這篇文章講的不錯,推薦你們看看  聊聊併發----Java中的阻塞隊列

        用了幾天,多線程算是學了點皮毛,附註一下:這幾天文章主要是參考了《java程序語言設計進階篇第8版》,老外寫的書講的真心不錯,只不過如今java都已經更新到java8了。在其餘一些網站上看到本身的文章,沒有說明轉載什麼的,估計是直接「被採集」過去了。

        本文出自於博客園蘭幽,轉載請說明出處。

相關文章
相關標籤/搜索