多線程(一)

概念

運行程序會建立一個進程。但OS調度的最小單元是線程(輕量級進程)。java

普通的java程序包含的線程:node

/**
 * 一個java程序包含的線程
 */
public class ShowMainThread {

	public static void main(String[] args) {
		// java虛擬機的線程管理接口
		ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();
		// 獲取線程信息的方法
		ThreadInfo[] threadInfos = threadMXBean.dumpAllThreads(false, false);
		for (ThreadInfo threadInfo : threadInfos) {
			System.out.println(threadInfo.getThreadId() + ":"
					+ threadInfo.getThreadName());
		}
	}
}
11:Monitor Ctrl-Break  //監聽中斷信號

5:Attach Listener  //獲取內存dump,線程dump

4:Signal Dispatcher  //將信號分給jvm的線程

3:Finalizer  //調用對象的finalizer 方法

2:Reference Handler  //清除Reference

1:main //程序的主入口

 

爲何要用線程?

一、 充分利用多處理核心;sql

二、 更快的響應時間(用戶訂單的場景,發送郵件等部分可由其餘線程執行)數據庫

 

學習線程的難點

一、知識點多,相關的類和接口比較多,編程

二、學習原理,看源碼牽涉的知識點多,包括有設計模式,數據結構,操做系統,cpu相關的概念和定義;3,線程知識點自己的難度也高。設計模式

    學習路線緊緊記住相關的概念和定義-à多寫代碼,多用à瞭解原理->看看源碼api

 

啓動線程和退出線程

建立線程的方法安全

/**
 * 如何建立一個線程
 */
public class HowStartThread {

	// 繼承Thread
	private static class TestThread extends Thread {
		@Override
		public void run() {
			System.out.println("TestThread is runing");

		}
	}

	// 實現Runnable 或者Callable 另外還有內部類線程
	private static class TestRunable implements Runnable {

		@Override
		public void run() {
			System.out.println("TestRunable is runing");
		}
	}

	public static void main(String[] args) {
		Thread t1 = new TestThread();
		Thread t2 = new Thread(new TestRunable());
		t1.start();
		t2.start();

	}

}

啓動線程:threadl類的start()性能優化

線程完成:一、run()方法執行完成;二、拋出一個未處理的異常致使線程的提早結束網絡

 

取消和中斷

不安全的取消:

 單獨使用一個取消標誌位(boolean值判斷).

Stop(),suspend(),resume()是過時的api,很大的反作用,容易致使死鎖或者數據不一致

/**
 * 使用自定義的取消標誌位中斷線程(不安全)
 */
public class FlagCancel {

	private static class TestRunable implements Runnable {

		// boolean的標誌位 volatile輕量的線程同步
		private volatile boolean on = true;
		private long i = 0;

		@Override
		public void run() {
			while (on) {
				i++;
				// 阻塞方法,on不起做用
				// wait,sleep,blockingqueue(put,take)
				try {
					Thread.sleep(20000);
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
			System.out.println("TestRunable is runing :" + i);
		}

		public void cancel() {
			on = false;
		}
	}

}

如何安全的終止線程

使用線程的中斷 : 

interrupt() 中斷線程,本質是將線程的中斷標誌位設爲true,其餘線程向須要中斷的線程打個招呼。是否真正進行中斷由線程本身決定。

isInterrupted() 線程檢查本身的中斷標誌位

靜態方法Thread.interrupted() 將中斷標誌位復位爲false

由上面的中斷機制可知Java裏是沒有搶佔式任務,只有協做式任務。

爲什麼要用中斷,線程處於阻塞(如調用了java的sleep,wait等等方法時)的時候,是不會理會咱們本身設置的取消標誌位的,可是這些阻塞方法都會檢查線程的中斷標誌位。

/**
 * 安全的中斷線程
 */
public class SafeInterrupt implements Runnable {

	private volatile boolean on = true;
	private long i = 0;

	@Override
	public void run() {
		while (on && Thread.currentThread().isInterrupted()) {
			i++;
		}
		System.out.println("TestRunable is runing :" + i);
	}

	public void cancel() {
		on = false;
		Thread.currentThread().interrupt();
	}
}

處理不可中斷的阻塞

IO通訊 inputstream read/write等阻塞方法,不會理會中斷,而關閉底層的套接字socket.close()會拋出socketException

NIO: selector.select()會阻塞,調用selector的wakeup和close方法會拋出ClosedSelectorException

死鎖狀態不響應中斷的請求,這個必須重啓程序,修改錯誤。

/**
 * 調用阻塞方法時,如何中斷線程
 */
public class BlockInterrupt {

	private static volatile boolean on = true;

	private static class WhenBlock implements Runnable {

		@Override
		public void run() {
			while (on && !Thread.currentThread().isInterrupted()) {
				try {
					// 拋出中斷異常的阻塞方法,拋出異常後,中斷標誌位改爲false 須要從新設置標誌位
					Thread.sleep(100);
				} catch (InterruptedException e) {
					// 從新設置標誌位
					Thread.currentThread().interrupt();
					// do my work
				}
				// 清理工做結束線程
			}
		}

		public void cancel() {
			on = false;
			Thread.currentThread().interrupt();
		}

	}
}

如何讓咱們的代碼既能夠響應普通的中斷,又能夠關閉底層的套接字呢?

覆蓋線程的interrupt方法,在處理套接字異常時,再用super.interrupt()自行中斷線程

/**
 * 如何覆蓋線程的interrupt() 方法
 */
public class OverrideInterrupt extends Thread {

	private final Socket socket;
	private final InputStream in;

	public OverrideInterrupt(Socket socket, InputStream in) {
		this.socket = socket;
		this.in = in;
	}

	private void t() {
	}

	@Override
	public void interrupt() {
		try {
			// 關閉底層的套接字
			socket.close();
		} catch (IOException e) {
			e.printStackTrace();
			// .....
		} finally {
			// 同時中斷線程
			super.interrupt();
		}

	}
}

線程的狀態

新建立   線程被建立,可是沒有調用start方法

可運行(RUNNABLE)  運行狀態,由cpu決定是否是正在運行

被阻塞(BLOCKING)  阻塞,線程被阻塞於鎖

等待/計時等待(WAITING) 等待某些條件成熟

被終止  線程執行完畢

public class SleepUtils {
	public static final void second(long seconds) {
		try {
			TimeUnit.SECONDS.sleep(seconds);
		} catch (InterruptedException e) {
		}
	}
}
/**
 * 查看線程的狀態
 */
public class ThreadState {
	private static Lock lock = new ReentrantLock();

	public static void main(String[] args) {
		new Thread(new SleepAlways(), "SleepAlwaysThread").start();
		new Thread(new Waiting(), "WaitingThread").start();
		// 使用兩個Blocked線程,一個獲取鎖成功,另外一個被阻塞
		new Thread(new Blocked(), "BlockedThread-1").start();
		new Thread(new Blocked(), "BlockedThread-2").start();
		new Thread(new Sync(), "SyncThread-1").start();
		new Thread(new Sync(), "SyncThread-2").start();
	}

	/**
	 * 該線程不斷的進行睡眠
	 */
	static class SleepAlways implements Runnable {
		@Override
		public void run() {
			while (true) {
				SleepUtils.second(100);
			}
		}
	}

	/**
	 * 該線程在Waiting.class實例上等待
	 */
	static class Waiting implements Runnable {
		@Override
		public void run() {
			while (true) {
				synchronized (Waiting.class) {
					try {
						Waiting.class.wait();
					} catch (InterruptedException e) {
						e.printStackTrace();
					}
				}
			}
		}
	}

	/**
	 * 該線程在Blocked.class實例上加鎖後,不會釋放該鎖
	 */
	static class Blocked implements Runnable {
		public void run() {
			synchronized (Blocked.class) {
				while (true) {
					SleepUtils.second(100);
				}
			}
		}
	}

	/**
	 * 該線程得到鎖休眠後,又釋放鎖
	 */
	static class Sync implements Runnable {

		@Override
		public void run() {
			lock.lock();
			try {
				SleepUtils.second(3000);
			} finally {
				lock.unlock();
			}

		}

	}
}

須要用ThreadState工具才能查看運行的進程狀態

線程的優先級:

成員變量priority控制優先級,範圍1-10之間,數字越高優先級越高,缺省爲5,建立線程時setPriotity()能夠設置優先級,不要期望他發揮做用。

Daemon線程

守護型線程(如GC線程),程序裏沒有非Daemon線程時,java程序就會退出。通常用不上,也不建議咱們平時開發時使用,由於Try/Finally裏的代碼不必定執行的。

/**
 * 守護線程
 */
public class Daemon {
	public static void main(String[] args) {
		Thread thread = new Thread(new DaemonRunner());
		thread.setDaemon(true);// 設置爲守護線程
		thread.start();
	}

	static class DaemonRunner implements Runnable {

		@Override
		public void run() {
			System.out.println("2");
			try {
				System.out.println("23");
				SleepUtils.second(100);
			} finally {
				System.out.println("DaemonThread finally run.");
			}
		}
	}
}

main運行結果:沒有打印任何東西

經常使用方法深刻理解

run()和start()

run就是一個普通的方法,跟其餘類的實例方法沒有任何區別。

/**
 * Run和start方法辨析
 */
public class RunAndStart {

	private static class TestThread extends Thread {

		private String name;

		public TestThread(String name) {
			this.name = name;
		}

		@Override
		public void run() {
			int i = 90;
			while (i > 0) {
				try {
					Thread.sleep(1000);
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
				System.out.println("I am " + name + " i= " + i);
			}

		}
	}

	public static void main(String[] args) {
		TestThread parent = new TestThread("beInvoked");
		parent.start();// 是TestThread線程去執行

		TestThread beInvoked = new TestThread("beInvoked_thread");
		beInvoked.run();// 是main線程去執行

	}

}

main運行結果:
I am beInvoked i= 90
I am beInvoked_thread i= 90
I am beInvoked i= 90
I am beInvoked_thread i= 90
I am beInvoked i= 90
I am beInvoked_thread i= 90

Sleep

不會釋放鎖,因此咱們在用sleep時,要把sleep放在同步代碼塊的外面。

/**
 * sleep方法是否會釋放鎖
 */
public class SleepTest {
	// 鎖
	private Object lock = new Object();

	public static void main(String[] args) {
		SleepTest sleepTest = new SleepTest();
		Thread threadA = sleepTest.new ThreadSleep();
		threadA.setName("ThreadSleep");
		Thread threadB = sleepTest.new ThreadNotSleep();
		threadB.setName("ThreadNotSleep");
		threadA.start();
		try {
			Thread.sleep(1000);
			System.out.println(" RunTest slept!");
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		threadB.start();
	}

	// 休眠的線程
	private class ThreadSleep extends Thread {

		@Override
		public void run() {
			String threadName = Thread.currentThread().getName();
			System.out.println(threadName + " will take the lock");
			try {

				// 拿到鎖之後,休眠
				synchronized (lock) {
					System.out.println(threadName + " taking the lock");
					System.out.println("Finish the work: " + threadName);
					Thread.sleep(5000);
				}

			} catch (InterruptedException e) {
				// e.printStackTrace();
			}
		}
	}

	// 不休眠的線程
	private class ThreadNotSleep extends Thread {

		@Override
		public void run() {
			String threadName = Thread.currentThread().getName();
			System.out.println(threadName + " will take the lock time="
					+ System.currentTimeMillis());
			// 拿到鎖之後不休眠
			synchronized (lock) {
				System.out.println(threadName + " taking the lock time="
						+ System.currentTimeMillis());
				System.out.println("Finish the work: " + threadName);
			}
		}
	}
}

main運行結果:
ThreadSleep will take the lock
ThreadSleep taking the lock
Finish the work: ThreadSleep
 RunTest slept!
ThreadNotSleep will take the lock time=1526644256785
ThreadNotSleep taking the lock time=1526644260785
Finish the work: ThreadNotSleep

yield()

當前線程出讓cpu佔有權,當前線程變成了可運行狀態,下一時刻仍然可能被cpu選中,不會釋放鎖。

wait()和 notify()/notiyfAll()

調用之前,當前線程必需要持有鎖,調用了wait() notify()/notiyfAll()會釋放鎖。

等待通知機制:

線程 A調用了對象O的wait方法進入等待狀態,線程 B調用了對象O的notify方法進行喚醒,喚醒的是在對象O上wait的線程(好比線程A)

notify() 喚醒一個線程,喚醒哪個徹底看cpu的心情(謹慎使用)

notiyfAll() 全部在對象O上wait的線程所有喚醒(應該用notiyfAll())

 

/**
 * wait/notify/notifyAll的演示
 */
public class User {

	private int age;
	private String city;

	public static final String CITY = "NewYork";

	public User(int age, String city) {
		this.age = age;
		this.city = city;
	}

	public User() {
	}

	// 修改用戶的城市後,發出通知
	public synchronized void changeCity() {
		this.city = "London";
		notifyAll();
	}

	// 修改用戶的年齡後,發出通知
	public synchronized void changeAge() {
		this.age = 31;
		notifyAll();
	}

	// 等待用戶的年齡變化的方法,接收到通知,檢查發現用戶年齡大於30時,進行業務工做,不然一直等待
	// 阻塞方法
	public synchronized void waitAge() {
		while (this.age <= 30) {
			try {
				wait();
				System.out.println("wait age ["
						+ Thread.currentThread().getId() + "] is notified!");
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
		System.out.println("the age is " + this.age);// 業務工做
	}

	// 等待用戶的城市變化的方法,接收到通知,檢查發現用戶城市不是NewYork時,進行業務工做,不然一直等待
	// 阻塞方法
	public synchronized void waitCity() {
		while (this.city.equals(CITY)) {
			try {
				wait();
				System.out.println("wait city ["
						+ Thread.currentThread().getId() + "] is notified!");
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
		System.out.println("the city is " + this.city);// 業務工做
	}
}
/**
 * TestUser測試類
 */
public class TestUser {

	private static User user = new User(30, User.CITY);

	private static class CheckAge extends Thread {
		@Override
		public void run() {
			user.waitAge();
		}
	}

	private static class CheckCity extends Thread {
		@Override
		public void run() {
			user.waitCity();
		}
	}

	public static void main(String[] args) throws InterruptedException {
		for (int i = 0; i < 3; i++) {
			// 啓動三個等待用戶年齡變化的線程
			new CheckAge().start();
		}
		for (int i = 0; i < 3; i++) {
			// 啓動三個等待用戶城市變化的線程
			new CheckCity().start();
		}
		Thread.sleep(1000);
		user.changeCity();// 變更用戶的城市
	}

}
main運行結果:
wait city [16] is notified!
the city is London
wait city [15] is notified!
the city is London
wait city [14] is notified!
the city is London
wait age [13] is notified!
wait age [12] is notified!
wait age [11] is notified!
/**
 *調用阻塞方法時,如何中斷線程
 */
public class BlockInterrupt {

	private static Object o = new Object();

	/* while循環中包含try/catch塊 */
	private static class WhileTryWhenBlock extends Thread {
		private volatile boolean on = true;
		private long i = 0;

		@Override
		public void run() {
			System.out.println("當前執行線程id:" + Thread.currentThread().getId());
			while (on && !Thread.currentThread().isInterrupted()) {
				System.out.println("i=" + i++);
				try {

					// 拋出中斷異常的阻塞方法,拋出異常後,中斷標誌位會改爲false
					// 能夠理解爲這些方法會隱含調用Thread.interrputed()方法
					synchronized (o) {
						o.wait();
					}

				} catch (InterruptedException e) {
					System.out.println("當前執行線程的中斷標誌位:"
							+ Thread.currentThread().getId() + ":"
							+ Thread.currentThread().isInterrupted());
					Thread.currentThread().interrupt();// 從新設置一下
					System.out.println("被中斷的線程_" + getId() + ":"
							+ isInterrupted());
					// do my work
				}
				// 清理工做,準備結束線程
			}
		}

		public void cancel() {
			// on = false;
			interrupt();
			System.out.println("本方法所在線程實例:" + getId());
			System.out.println("執行本方法的線程:" + Thread.currentThread().getId());
			// Thread.currentThread().interrupt();
		}
	}

	/* try/catch塊中包含while循環 */
	private static class TryWhileWhenBlock extends Thread {
		private volatile boolean on = true;
		private long i = 0;

		@Override
		public void run() {
			try {
				while (on) {
					System.out.println(i++);
					// 拋出中斷異常的阻塞方法,拋出異常後,中斷標誌位改爲false
					synchronized (o) {
						o.wait();
					}
				}
			} catch (InterruptedException e) {
				System.out.println("當前執行線程的中斷標誌位:"
						+ Thread.currentThread().getId() + ":"
						+ Thread.currentThread().isInterrupted());
			} finally {
				// 清理工做結束線程
			}
		}

		public void cancel() {
			on = false;
			interrupt();
		}
	}

	public static void main(String[] args) throws InterruptedException {
		WhileTryWhenBlock whileTryWhenBlock = new WhileTryWhenBlock();
		whileTryWhenBlock.start();
		Thread.sleep(100);
		whileTryWhenBlock.cancel();

		System.out.println("====================");
		TryWhileWhenBlock tryWhileWhenBlock = new TryWhileWhenBlock();
		tryWhileWhenBlock.start();
		Thread.sleep(100);
		tryWhileWhenBlock.cancel();
	}
}


輸出結果
當前執行線程id:11
i=0
本方法所在線程實例:11
當前執行線程的中斷標誌位:11:false
執行本方法的線程:1
被中斷的線程_11:true
====================
0
當前執行線程的中斷標誌位:11:false

線程間協做和通訊

每一個線程有本身棧空間,孤立運行,對咱們沒有價值。若是多個線程可以相互配合完成工做,這將會帶來巨大的價值。

volatile和synchronized

多個線程同時訪問一個共享的變量的時候,每一個線程的工做內存有這個變量的一個拷貝,變量自己仍是保存在共享內存中。

Violate修飾字段,對這個變量的訪問必需要從共享內存刷新一次。最新的修改寫回共享內存。能夠保證字段的可見性。絕對不是線程安全的,沒有操做的原子性。

適用場景:一、一個線程寫,多個線程讀;二、volatile變量的變化很固定

關鍵字synchronized能夠修飾方法或者以同步塊的形式來進行使用,它主要確保多個線程在同一個時刻,只能有一個線程處於方法或者同步塊中,它保證了線程對變量訪問的可見性和排他性,又稱爲內置鎖機制。

Synchronized的類鎖和對象鎖,本質上是兩把鎖,類鎖實際鎖的是每個類的class對象。對象鎖鎖的是當前對象實例。

/**
 * 測試Volatile型變量的操做原子性
 */
public class VolatileThread implements Runnable {

    private volatile  int a= 0;

    @Override
    public void run() {
        //synchronized (this){
            a=a+1;
            System.out.println(Thread.currentThread().getName()+"----"+a);
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            a=a+1;
            System.out.println(Thread.currentThread().getName()+"----"+a);

        //}
    }
}
public class VolatileTest {
    public static void main(String[] args) {
        VolatileThread volatileThread = new VolatileThread();

        Thread t1 = new Thread(volatileThread);
        Thread t2 = new Thread(volatileThread);
        Thread t3 = new Thread(volatileThread);
        Thread t4 = new Thread(volatileThread);
        t1.start();
        t2.start();
        t3.start();
        t4.start();
    }
}

等待和通知機制

等待方原則:

一、獲取對象鎖

二、若是條件不知足,調用對象的wait方法,被通知後依然要檢查條件是否知足

三、條件知足之後,才能執行相關的業務邏輯

Synchronized(對象){
	While(條件不知足){
	  對象.wait()
    }
    業務邏輯處理
}

通知方原則:

一、 得到對象的鎖;

二、 改變條件;

三、 通知全部等待在對象的線程

Synchronized(對象){
	業務邏輯處理,改變條件
	對象.notify/notifyAll
}

 

/**
 * 有界阻塞隊列
 */
public class BlockingQueueWN<T> {

	private List queue = new LinkedList<>();
	private final int limit; // 大小限制

	public BlockingQueueWN(int limit) {
		this.limit = limit;
	}

	// 入隊
	public synchronized void enqueue(T item) throws InterruptedException {
		// 若是隊列滿了 等待
		while (this.queue.size() == this.limit) {
			wait();
		}
		// 若是隊列爲空 喚醒
		if (this.queue.size() == 0) {
			System.out.println("enqueue notifyAll");
			notifyAll();
		}
		// 入列
		this.queue.add(item);
	}

	// 出隊
	public synchronized T dequeue() throws InterruptedException {
		// 若是隊列爲空 等待
		while (this.queue.size() == 0) {
			System.out.println("dequeue wait");
			wait();
		}
		// 若是隊列滿了 喚醒
		if (this.queue.size() == this.limit) {
			notifyAll();
		}
		// 出列
		return (T) this.queue.remove(0);
	}
}
public class BqTest {
	public static void main(String[] args) {
		BlockingQueueWN bq = new BlockingQueueWN(10);
		Thread threadA = new ThreadPush(bq);
		threadA.setName("Push");
		Thread threadB = new ThreadPop(bq);
		threadB.setName("Pop");
		threadB.start();
		threadA.start();
	}

	// 數據入隊列線程
	private static class ThreadPush extends Thread {
		BlockingQueueWN<Integer> bq;

		public ThreadPush(BlockingQueueWN<Integer> bq) {
			this.bq = bq;
		}

		@Override
		public void run() {
			String threadName = Thread.currentThread().getName();
			int i = 20;
			// 循環20次入列
			while (i > 0) {
				try {
					Thread.sleep(1000);
					System.out.println(" i=" + i + " will push");
					bq.enqueue(i--);
				} catch (InterruptedException e) {
					// e.printStackTrace();
				}

			}
		}
	}

	// 數據出隊列線程
	private static class ThreadPop extends Thread {
		BlockingQueueWN<Integer> bq;

		public ThreadPop(BlockingQueueWN<Integer> bq) {
			this.bq = bq;
		}

		@Override
		public void run() {
			// 無限循環 有就取
			while (true) {
				try {
					System.out.println(Thread.currentThread().getName()
							+ " will pop.....");
					Integer i = bq.dequeue();
					System.out.println(" i=" + i.intValue() + " alread pop");
				} catch (InterruptedException e) {
					// e.printStackTrace();
				}
			}

		}
	}
}
運行結果
Pop will pop.....
dequeue wait
 i=20 will push
enqueue notifyAll
 i=20 alread pop
Pop will pop.....
dequeue wait
 i=19 will push
enqueue notifyAll
 i=19 alread pop
Pop will pop.....
dequeue wait
 i=18 will push
enqueue notifyAll
 i=18 alread pop
Pop will pop.....
dequeue wait
 i=17 will push
enqueue notifyAll
 i=17 alread pop
Pop will pop.....
dequeue wait
 i=16 will push
enqueue notifyAll
 i=16 alread pop
Pop will pop.....
dequeue wait
 i=15 will push
enqueue notifyAll
 i=15 alread pop
Pop will pop.....
dequeue wait
 i=14 will push
enqueue notifyAll
 i=14 alread pop
Pop will pop.....
dequeue wait
 i=13 will push
enqueue notifyAll
 i=13 alread pop
Pop will pop.....
dequeue wait
 i=12 will push
enqueue notifyAll
 i=12 alread pop
Pop will pop.....
dequeue wait
 i=11 will push
enqueue notifyAll
 i=11 alread pop
Pop will pop.....
dequeue wait
 i=10 will push
enqueue notifyAll
 i=10 alread pop
Pop will pop.....
dequeue wait
 i=9 will push
enqueue notifyAll
 i=9 alread pop
Pop will pop.....
dequeue wait
 i=8 will push
enqueue notifyAll
 i=8 alread pop
Pop will pop.....
dequeue wait
 i=7 will push
enqueue notifyAll
 i=7 alread pop
Pop will pop.....
dequeue wait
 i=6 will push
enqueue notifyAll
 i=6 alread pop
Pop will pop.....
dequeue wait
 i=5 will push
enqueue notifyAll
 i=5 alread pop
Pop will pop.....
dequeue wait
 i=4 will push
enqueue notifyAll
 i=4 alread pop
Pop will pop.....
dequeue wait
 i=3 will push
enqueue notifyAll
 i=3 alread pop
Pop will pop.....
dequeue wait
 i=2 will push
enqueue notifyAll
 i=2 alread pop
Pop will pop.....
dequeue wait
 i=1 will push
enqueue notifyAll
 i=1 alread pop
Pop will pop.....
dequeue wait

管道輸入輸出流

文件輸入輸出,網絡輸入輸出,管道輸入輸出流用於線程中間的數據傳遞,傳輸媒介的內存

管道是在線程間進行傳送

四種實現

pipedOutputStream/input 面向的字節

pipedReader/Writer 面向的是字符

只適合線程間一對一的通訊,適用範圍較狹窄。

public class PipeTransfer {

    private static class Print implements Runnable{
        private PipedReader in;

        public Print(PipedReader in) {
            this.in = in;
        }

        @Override
        public void run() {
           int receive =0;
            try {
                while((receive=in.read())!=-1){
                    System.out.println((char) receive);
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) throws Exception {
        PipedWriter out = new PipedWriter();
        PipedReader in = new PipedReader();
        //必須進行鏈接
        out.connect(in);

        Thread t1 = new Thread(new Print(in),"PrintThread");
        t1.start();
        int receive =0;
        try {
            while((receive=System.in.read())!=-1){
                out.write(receive);
            }
        } catch (IOException e) {
            e.printStackTrace();
        }finally {
            out.close();
        }
    }

}

運行輸入 good  
輸出
g
o
o
d

join方法

線程A,執行了thread.join(),線程A等待thread線程終止了之後,A在join後面的語句纔會繼續執行

public class JoinTes {

	public static void main(String[] args) throws InterruptedException {
		ThreadJoinTest t1 = new ThreadJoinTest("小明");
		ThreadJoinTest t2 = new ThreadJoinTest("小東");
		t1.start();
		/**
		 * Thread類中的join方法的主要做用就是同步,它可使得線程之間的並行執行變爲串行執行。
		 * 
		 * 1 join的意思是使得放棄當前線程的執行,並返回對應的線程,例以下面代碼的意思就是:
		 * 程序在main線程中調用t1線程的join方法,則main線程放棄cpu控制權,並返回t1線程繼續執行直到線程t1執行完畢
		 * 因此結果是t1線程執行完後,纔到主線程執行,至關於在main線程中同步t1線程,t1執行完了,main線程纔有執行的機會
		 * 
		 * 2 join方法能夠傳遞參數,join(10)表示main線程會等待t1線程10毫秒,10毫秒過去後,
		 * main線程和t1線程之間執行順序由串行執行變爲普通的並行執行
		 * 若是A線程中掉用B線程的join(10),則表示A線程會等待B線程執行10毫秒,10毫秒事後,
		 * A、B線程並行執行。須要注意的是,jdk規定,join(0)的意思不是A線程等待B線程0秒,
		 * 而是A線程等待B線程無限時間,直到B線程執行完畢,即join(0)等價於join()
		 * 
		 * 3 join方法必須在線程start方法調用以後調用纔有意義。這個也很容易理解:若是一個線程都沒有start,那它也就沒法同步了。
		 * 
		 * 4 join方法的原理就是調用相應線程的wait方法進行等待操做的,例如A線程中調用了B線程的join方法,
		 * 則至關於在A線程中調用了B線程的wait方法,當B線程執行完(或者到達等待時間),
		 * B線程會自動調用自身的notifyAll方法喚醒A線程,從而達到同步的目的。
		 */
		t1.join();
		t2.start();
	}

}

class ThreadJoinTest extends Thread {
	public ThreadJoinTest(String name) {
		super(name);
	}

	@Override
	public void run() {
		for (int i = 0; i < 10; i++) {
			System.out.println(this.getName() + ":" + i);
			try {
				sleep(1000);
			} catch (InterruptedException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
		}
	}
}

運行結果
小明:0
小明:1
小明:2
小明:3
小明:4
小明:5
小明:6
小明:7
小明:8
小明:9
小東:0
小東:1
小東:2
小東:3
小東:4
小東:5
小東:6
小東:7
小東:8
小東:9
public class JoinTest {

	public static class CutInLine implements Runnable {

		private Thread thread;

		public CutInLine(Thread thread) {
			this.thread = thread;
		}

		@Override
		public void run() {

			try {
				// 在被插隊的線程裏,調用一下插隊線程的join方法
				System.out.println(thread.getName() + " join "
						+ Thread.currentThread().getName());
				thread.join();

			} catch (InterruptedException e) {
				e.printStackTrace();
			}
			System.out.println(Thread.currentThread().getName() + " will work");

		}
	}

	public static void main(String[] args) throws InterruptedException {
		Thread previous = Thread.currentThread();
		for (int i = 0; i < 10; i++) {
			Thread thread = new Thread(new CutInLine(previous),
					String.valueOf(i));
			thread.start();
			System.out.print(thread.getName() + " start ");
			previous = thread;
		}

	}

}

打印結果
0 start main join 0
1 start 0 join 1
2 start 1 join 2
3 start 2 join 3
4 start 3 join 4
5 start 4 join 5
6 start 5 join 6
7 start 6 join 7
8 start 7 join 8
9 start 8 join 9
0 will work
1 will work
2 will work
3 will work
4 will work
5 will work
6 will work
7 will work
8 will work
9 will work

ThreadLocal

本質是個map,map的鍵就是每一個線程對象,值就是每一個線程所擁有的值

經常使用方法:

initialValue()

get()

set()

remove():將當前線程局部變量的值刪除,這個方法是JDK 5.0新增的方法。當線程結束後,對應該線程的局部變量將自動被垃圾回收,因此顯式調用該方法清除線程的局部變量並非必須的操做,但它能夠加快內存回收的速度。

ThreadLocal擁有的這個變量,在線程之間很獨立的,相互之間沒有聯繫。內存佔用相對來講比較大。

public class ThreadLocalTest {

    static ThreadLocal<String> threadLocal = new ThreadLocal<String>(){
        @Override
        protected String initialValue() {
            return "init";
        }
    };

    public void test(){
        Thread[] runs = new Thread[3];
        for(int i =0;i<runs.length;i++){
            runs[i]=new Thread(new T1(i));
        }
        for(int i =0;i<runs.length;i++){
            runs[i].start();
        }
    }

    private static class T1 implements Runnable{

        private int id;

        public T1(int id) {
            this.id = id;
        }

        @Override
        public void run() {
            System.out.println(Thread.currentThread().getId()+" start");
            String s = threadLocal.get();
            s = s+"_"+id;
            threadLocal.set(s);
            System.out.println(Thread.currentThread().getId()+s);
        }
    }

    public static void main(String[] args) {
        ThreadLocalTest test = new ThreadLocalTest();
        test.test();
    }
}

輸出結果
11 start
13 start
11init_0
12 start
13init_2
12init_1

性能問題

串行化、無鎖化、異步化編程是趨勢之一,好比node.js,Vert.x。

黃金原則:編碼時候不要考慮性能優化的事情,先正確實現業務,發現性能不行,這個時候再來考慮性能優化。

public class PerfermenTest {

    /** 執行次數 */
    private static final long count = 100000000;

    public static void main(String[] args) throws InterruptedException {
        //併發計算
        concurrency();
        //單線程計算
        serial();
    }

    private static void concurrency() throws InterruptedException {
        long start = System.currentTimeMillis();
        Thread thread = new Thread(new Runnable() {
            @Override
            public void run() {
                int a = 0;
                for (long i = 0; i < count; i++) {
                    a += 5;
                }
                System.out.println("a="+a);
            }
        });
        thread.start();
        int b = 0;
        for (long i = 0; i < count; i++) {
            b--;
        }
        thread.join();
        long time = System.currentTimeMillis() - start;
        System.out.println("concurrency :" + time + "ms,b=" + b);
    }

    private static void serial() {
        long start = System.currentTimeMillis();
        int a = 0;
        for (long i = 0; i < count; i++) {
            a += 5;
        }
        int b = 0;
        for (long i = 0; i < count; i++) {
            b--;
        }
        long time = System.currentTimeMillis() - start;
        System.out.println("serial:" + time + "ms,b=" + b + ",a=" + a);
    }

}

輸出結果
a=500000000
concurrency :41ms,b=-100000000
serial:70ms,b=-100000000,a=500000000

等待超時模式

調用場景:調用一個方法時等待一段時間(通常來講是給定一個時間段),若是該方法可以在給定的時間段以內獲得結果,那麼將結果馬上返回,反之,超時返回默認結果。

假設等待時間段是T,那麼能夠推斷出在當前時間now+T以後就會超時
等待持續時間:REMAINING=T。
·超時時間:FUTURE=now+T。
// 對當前對象加鎖
public synchronized Object get(long mills) throws InterruptedException {
   long future = System.currentTimeMillis() + mills;
   long remaining = mills;
   // 當超時大於0而且result返回值不知足要求
   while ((result == null) && remaining > 0) {
      wait(remaining);
      remaining = future - System.currentTimeMillis();
   }
   return result;
}
public class ConnectionDriver {

	public static final Connection getConnectiong() {
		return new ConnectionImpl();
	}

	private static class ConnectionImpl implements Connection {

		@Override
		public Statement createStatement() throws SQLException {
			System.out.println("建立SQL " + Thread.currentThread().getId());
			return null;
		}

		@Override
		public void commit() throws SQLException {
			try {
				System.err.println(Thread.currentThread().getId() + "準備提交數據");
				TimeUnit.MILLISECONDS.sleep(100);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}

		@Override
		public PreparedStatement prepareStatement(String sql)
				throws SQLException {
			return null;
		}

		@Override
		public CallableStatement prepareCall(String sql) throws SQLException {
			return null;
		}

		@Override
		public String nativeSQL(String sql) throws SQLException {
			return null;
		}

		@Override
		public void setAutoCommit(boolean autoCommit) throws SQLException {

		}

		@Override
		public boolean getAutoCommit() throws SQLException {
			return false;
		}

		@Override
		public void rollback() throws SQLException {

		}

		@Override
		public void close() throws SQLException {

		}

		@Override
		public boolean isClosed() throws SQLException {
			return false;
		}

		@Override
		public DatabaseMetaData getMetaData() throws SQLException {
			return null;
		}

		@Override
		public void setReadOnly(boolean readOnly) throws SQLException {

		}

		@Override
		public boolean isReadOnly() throws SQLException {
			return false;
		}

		@Override
		public void setCatalog(String catalog) throws SQLException {

		}

		@Override
		public String getCatalog() throws SQLException {
			return null;
		}

		@Override
		public void setTransactionIsolation(int level) throws SQLException {

		}

		@Override
		public int getTransactionIsolation() throws SQLException {
			return 0;
		}

		@Override
		public SQLWarning getWarnings() throws SQLException {
			return null;
		}

		@Override
		public void clearWarnings() throws SQLException {

		}

		@Override
		public Statement createStatement(int resultSetType,
				int resultSetConcurrency) throws SQLException {
			return null;
		}

		@Override
		public PreparedStatement prepareStatement(String sql,
				int resultSetType, int resultSetConcurrency)
				throws SQLException {
			return null;
		}

		@Override
		public CallableStatement prepareCall(String sql, int resultSetType,
				int resultSetConcurrency) throws SQLException {
			return null;
		}

		@Override
		public Map<String, Class<?>> getTypeMap() throws SQLException {
			return null;
		}

		@Override
		public void setTypeMap(Map<String, Class<?>> map) throws SQLException {

		}

		@Override
		public void setHoldability(int holdability) throws SQLException {

		}

		@Override
		public int getHoldability() throws SQLException {
			return 0;
		}

		@Override
		public Savepoint setSavepoint() throws SQLException {
			return null;
		}

		@Override
		public Savepoint setSavepoint(String name) throws SQLException {
			return null;
		}

		@Override
		public void rollback(Savepoint savepoint) throws SQLException {

		}

		@Override
		public void releaseSavepoint(Savepoint savepoint) throws SQLException {

		}

		@Override
		public Statement createStatement(int resultSetType,
				int resultSetConcurrency, int resultSetHoldability)
				throws SQLException {
			return null;
		}

		@Override
		public PreparedStatement prepareStatement(String sql,
				int resultSetType, int resultSetConcurrency,
				int resultSetHoldability) throws SQLException {
			return null;
		}

		@Override
		public CallableStatement prepareCall(String sql, int resultSetType,
				int resultSetConcurrency, int resultSetHoldability)
				throws SQLException {
			return null;
		}

		@Override
		public PreparedStatement prepareStatement(String sql,
				int autoGeneratedKeys) throws SQLException {
			return null;
		}

		@Override
		public PreparedStatement prepareStatement(String sql,
				int[] columnIndexes) throws SQLException {
			return null;
		}

		@Override
		public PreparedStatement prepareStatement(String sql,
				String[] columnNames) throws SQLException {
			return null;
		}

		@Override
		public Clob createClob() throws SQLException {
			return null;
		}

		@Override
		public Blob createBlob() throws SQLException {
			return null;
		}

		@Override
		public NClob createNClob() throws SQLException {
			return null;
		}

		@Override
		public SQLXML createSQLXML() throws SQLException {
			return null;
		}

		@Override
		public boolean isValid(int timeout) throws SQLException {
			return false;
		}

		@Override
		public void setClientInfo(String name, String value)
				throws SQLClientInfoException {

		}

		@Override
		public void setClientInfo(Properties properties)
				throws SQLClientInfoException {

		}

		@Override
		public String getClientInfo(String name) throws SQLException {
			return null;
		}

		@Override
		public Properties getClientInfo() throws SQLException {
			return null;
		}

		@Override
		public Array createArrayOf(String typeName, Object[] elements)
				throws SQLException {
			return null;
		}

		@Override
		public Struct createStruct(String typeName, Object[] attributes)
				throws SQLException {
			return null;
		}

		@Override
		public void setSchema(String schema) throws SQLException {

		}

		@Override
		public String getSchema() throws SQLException {
			return null;
		}

		@Override
		public void abort(Executor executor) throws SQLException {

		}

		@Override
		public void setNetworkTimeout(Executor executor, int milliseconds)
				throws SQLException {

		}

		@Override
		public int getNetworkTimeout() throws SQLException {
			return 0;
		}

		@Override
		public <T> T unwrap(Class<T> iface) throws SQLException {
			return null;
		}

		@Override
		public boolean isWrapperFor(Class<?> iface) throws SQLException {
			return false;
		}
	}

}
/**
 * 數據庫鏈接池
 * 從鏈接池中獲取、使用和釋放鏈接的過程,而客戶端獲取鏈接的過程被設定爲等待超時的模式,
 * 也就是在1000毫秒內若是沒法獲取到可用鏈接,將會返回給客戶端一個null。
 * 設定鏈接池的大小爲10個,而後經過調節客戶端的線程數來模擬沒法獲取鏈接的場景。
 * 鏈接池的定義。它經過構造函數初始化鏈接的最大上限,經過一個雙向隊列來維護鏈接,
 * 調用方須要先調用fetchConnection(long)方法來指定在多少毫秒內超時獲取鏈接,當鏈接使用完成後,
 * 須要調用releaseConnection(Connection)方法將鏈接放回線程池
 */
public class ConnectionPool {

	// 存放鏈接的容器
	private LinkedList<Connection> pool = new LinkedList<Connection>();

	public ConnectionPool(int initialSize) {
		if (initialSize > 0) {
			for (int i = 0; i < initialSize; i++) {
				pool.addLast(ConnectionDriver.getConnectiong());
			}
		}
	}

	/* 將鏈接放回線程池 */
	public void releaseConnection(Connection connection) {
		if (connection != null) {
			synchronized (pool) {
				// 添加後須要進行通知,這樣其餘消費者可以感知到連接池中已經歸還了一個連接
				pool.addLast(connection);
				pool.notifyAll();
			}
		}
	}

	/*
	 * 指定在多少毫秒內超時獲取鏈接,在指定時間內沒法獲取到鏈接,將會返回null
	 */
	public Connection fetchConnection(long mills) throws InterruptedException {
		synchronized (pool) {
			// 徹底超時
			if (mills <= 0) {
				while (pool.isEmpty()) {
					pool.wait();
				}
				return pool.removeFirst();
			} else {
				long future = System.currentTimeMillis() + mills;// 何時超時
				long remaining = mills;// 超時時長
				while (pool.isEmpty() && remaining > 0) {
					pool.wait(remaining);
					remaining = future - System.currentTimeMillis();// 當前還須等待的時長
				}
				Connection result = null;
				if (!pool.isEmpty()) {
					result = pool.removeFirst();
				}
				return result;
			}
		}
	}
}
public class ConnectionPoolTest {
	static ConnectionPool pool = new ConnectionPool(10);
	// 保證全部ConnectionRunner可以同時開始
	static CountDownLatch start = new CountDownLatch(1);
	// main線程將會等待全部ConnectionRunner結束後才能繼續執行
	static CountDownLatch end;

	public static void main(String[] args) throws Exception {
		// 線程數量,能夠線程數量進行觀察
		int threadCount = 50;
		end = new CountDownLatch(threadCount);
		int count = 10;// 每一個線程循環取20次
		AtomicInteger got = new AtomicInteger();// 獲取到數據庫鏈接的次數
		AtomicInteger notGot = new AtomicInteger();// 沒有獲取到數據庫鏈接的次數
		for (int i = 0; i < threadCount; i++) {
			Thread thread = new Thread(new ConnetionRunner(count, got, notGot),
					"ConnectionRunnerThread");
			thread.start();
		}
		start.countDown();
		end.await();
		System.out.println("total invoke: " + (threadCount * count));
		System.out.println("got connection:  " + got);
		System.out.println("not got connection " + notGot);
	}

	static class ConnetionRunner implements Runnable {
		int count;
		AtomicInteger got;
		AtomicInteger notGot;

		public ConnetionRunner(int count, AtomicInteger got,
				AtomicInteger notGot) {
			this.count = count;
			this.got = got;
			this.notGot = notGot;
		}

		public void run() {
			try {
				start.await();
			} catch (Exception ex) {

			}
			while (count > 0) {
				try {
					// 從線程池中獲取鏈接,若是1000ms內沒法獲取到,將會返回null
					// 分別統計鏈接獲取的數量got和未獲取到的數量notGot
					Connection connection = pool.fetchConnection(1000);
					if (connection != null) {
						try {
							connection.createStatement();
							connection.commit();
						} finally {
							pool.releaseConnection(connection);
							got.incrementAndGet();
						}
					} else {
						notGot.incrementAndGet();
					}
				} catch (Exception ex) {
				} finally {
					count--;
				}
			}
			end.countDown();
		}
	}
}

運行結果:
建立SQL 41
41準備提交數據
41準備提交數據
建立SQL 41
total invoke: 500
got connection:  432
not got connection 68
相關文章
相關標籤/搜索