/** * 一個java程序包含的線程 */ public class ShowMainThread { public static void main(String[] args) { // java虛擬機的線程管理接口 ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean(); // 獲取線程信息的方法 ThreadInfo[] threadInfos = threadMXBean.dumpAllThreads(false, false); for (ThreadInfo threadInfo : threadInfos) { System.out.println(threadInfo.getThreadId() + ":" + threadInfo.getThreadName()); } } }
11:Monitor Ctrl-Break //監聽中斷信號 5:Attach Listener //獲取內存dump,線程dump 4:Signal Dispatcher //將信號分給jvm的線程 3:Finalizer //調用對象的finalizer 方法 2:Reference Handler //清除Reference 1:main //程序的主入口
一、 充分利用多處理核心;sql
二、 更快的響應時間(用戶訂單的場景,發送郵件等部分可由其餘線程執行)數據庫
/** * 如何建立一個線程 */ public class HowStartThread { // 繼承Thread private static class TestThread extends Thread { @Override public void run() { System.out.println("TestThread is runing"); } } // 實現Runnable 或者Callable 另外還有內部類線程 private static class TestRunable implements Runnable { @Override public void run() { System.out.println("TestRunable is runing"); } } public static void main(String[] args) { Thread t1 = new TestThread(); Thread t2 = new Thread(new TestRunable()); t1.start(); t2.start(); } }
/** * 使用自定義的取消標誌位中斷線程(不安全) */ public class FlagCancel { private static class TestRunable implements Runnable { // boolean的標誌位 volatile輕量的線程同步 private volatile boolean on = true; private long i = 0; @Override public void run() { while (on) { i++; // 阻塞方法,on不起做用 // wait,sleep,blockingqueue(put,take) try { Thread.sleep(20000); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println("TestRunable is runing :" + i); } public void cancel() { on = false; } } }
使用線程的中斷 :
interrupt() 中斷線程,本質是將線程的中斷標誌位設爲true,其餘線程向須要中斷的線程打個招呼。是否真正進行中斷由線程本身決定。
isInterrupted() 線程檢查本身的中斷標誌位
靜態方法Thread.interrupted() 將中斷標誌位復位爲false
/** * 安全的中斷線程 */ public class SafeInterrupt implements Runnable { private volatile boolean on = true; private long i = 0; @Override public void run() { while (on && Thread.currentThread().isInterrupted()) { i++; } System.out.println("TestRunable is runing :" + i); } public void cancel() { on = false; Thread.currentThread().interrupt(); } }
IO通訊 inputstream read/write等阻塞方法,不會理會中斷,而關閉底層的套接字socket.close()會拋出socketException
NIO: selector.select()會阻塞,調用selector的wakeup和close方法會拋出ClosedSelectorException
/** * 調用阻塞方法時,如何中斷線程 */ public class BlockInterrupt { private static volatile boolean on = true; private static class WhenBlock implements Runnable { @Override public void run() { while (on && !Thread.currentThread().isInterrupted()) { try { // 拋出中斷異常的阻塞方法,拋出異常後,中斷標誌位改爲false 須要從新設置標誌位 Thread.sleep(100); } catch (InterruptedException e) { // 從新設置標誌位 Thread.currentThread().interrupt(); // do my work } // 清理工做結束線程 } } public void cancel() { on = false; Thread.currentThread().interrupt(); } } }
/** * 如何覆蓋線程的interrupt() 方法 */ public class OverrideInterrupt extends Thread { private final Socket socket; private final InputStream in; public OverrideInterrupt(Socket socket, InputStream in) { this.socket = socket; this.in = in; } private void t() { } @Override public void interrupt() { try { // 關閉底層的套接字 socket.close(); } catch (IOException e) { e.printStackTrace(); // ..... } finally { // 同時中斷線程 super.interrupt(); } } }
新建立 線程被建立,可是沒有調用start方法
可運行(RUNNABLE) 運行狀態,由cpu決定是否是正在運行
被阻塞(BLOCKING) 阻塞,線程被阻塞於鎖
等待/計時等待(WAITING) 等待某些條件成熟
被終止 線程執行完畢
public class SleepUtils { public static final void second(long seconds) { try { TimeUnit.SECONDS.sleep(seconds); } catch (InterruptedException e) { } } }
/** * 查看線程的狀態 */ public class ThreadState { private static Lock lock = new ReentrantLock(); public static void main(String[] args) { new Thread(new SleepAlways(), "SleepAlwaysThread").start(); new Thread(new Waiting(), "WaitingThread").start(); // 使用兩個Blocked線程,一個獲取鎖成功,另外一個被阻塞 new Thread(new Blocked(), "BlockedThread-1").start(); new Thread(new Blocked(), "BlockedThread-2").start(); new Thread(new Sync(), "SyncThread-1").start(); new Thread(new Sync(), "SyncThread-2").start(); } /** * 該線程不斷的進行睡眠 */ static class SleepAlways implements Runnable { @Override public void run() { while (true) { SleepUtils.second(100); } } } /** * 該線程在Waiting.class實例上等待 */ static class Waiting implements Runnable { @Override public void run() { while (true) { synchronized (Waiting.class) { try { Waiting.class.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } } } } /** * 該線程在Blocked.class實例上加鎖後,不會釋放該鎖 */ static class Blocked implements Runnable { public void run() { synchronized (Blocked.class) { while (true) { SleepUtils.second(100); } } } } /** * 該線程得到鎖休眠後,又釋放鎖 */ static class Sync implements Runnable { @Override public void run() { lock.lock(); try { SleepUtils.second(3000); } finally { lock.unlock(); } } } }
/** * 守護線程 */ public class Daemon { public static void main(String[] args) { Thread thread = new Thread(new DaemonRunner()); thread.setDaemon(true);// 設置爲守護線程 thread.start(); } static class DaemonRunner implements Runnable { @Override public void run() { System.out.println("2"); try { System.out.println("23"); SleepUtils.second(100); } finally { System.out.println("DaemonThread finally run."); } } } } main運行結果:沒有打印任何東西
/** * Run和start方法辨析 */ public class RunAndStart { private static class TestThread extends Thread { private String name; public TestThread(String name) { this.name = name; } @Override public void run() { int i = 90; while (i > 0) { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("I am " + name + " i= " + i); } } } public static void main(String[] args) { TestThread parent = new TestThread("beInvoked"); parent.start();// 是TestThread線程去執行 TestThread beInvoked = new TestThread("beInvoked_thread"); beInvoked.run();// 是main線程去執行 } } main運行結果: I am beInvoked i= 90 I am beInvoked_thread i= 90 I am beInvoked i= 90 I am beInvoked_thread i= 90 I am beInvoked i= 90 I am beInvoked_thread i= 90
/** * sleep方法是否會釋放鎖 */ public class SleepTest { // 鎖 private Object lock = new Object(); public static void main(String[] args) { SleepTest sleepTest = new SleepTest(); Thread threadA = sleepTest.new ThreadSleep(); threadA.setName("ThreadSleep"); Thread threadB = sleepTest.new ThreadNotSleep(); threadB.setName("ThreadNotSleep"); threadA.start(); try { Thread.sleep(1000); System.out.println(" RunTest slept!"); } catch (InterruptedException e) { e.printStackTrace(); } threadB.start(); } // 休眠的線程 private class ThreadSleep extends Thread { @Override public void run() { String threadName = Thread.currentThread().getName(); System.out.println(threadName + " will take the lock"); try { // 拿到鎖之後,休眠 synchronized (lock) { System.out.println(threadName + " taking the lock"); System.out.println("Finish the work: " + threadName); Thread.sleep(5000); } } catch (InterruptedException e) { // e.printStackTrace(); } } } // 不休眠的線程 private class ThreadNotSleep extends Thread { @Override public void run() { String threadName = Thread.currentThread().getName(); System.out.println(threadName + " will take the lock time=" + System.currentTimeMillis()); // 拿到鎖之後不休眠 synchronized (lock) { System.out.println(threadName + " taking the lock time=" + System.currentTimeMillis()); System.out.println("Finish the work: " + threadName); } } } } main運行結果: ThreadSleep will take the lock ThreadSleep taking the lock Finish the work: ThreadSleep RunTest slept! ThreadNotSleep will take the lock time=1526644256785 ThreadNotSleep taking the lock time=1526644260785 Finish the work: ThreadNotSleep
wait()和 notify()/notiyfAll()
調用之前,當前線程必需要持有鎖,調用了wait() notify()/notiyfAll()會釋放鎖。
線程 A調用了對象O的wait方法進入等待狀態,線程 B調用了對象O的notify方法進行喚醒,喚醒的是在對象O上wait的線程(好比線程A)
notify() 喚醒一個線程,喚醒哪個徹底看cpu的心情(謹慎使用)
notiyfAll() 全部在對象O上wait的線程所有喚醒(應該用notiyfAll())
/** * wait/notify/notifyAll的演示 */ public class User { private int age; private String city; public static final String CITY = "NewYork"; public User(int age, String city) { this.age = age; this.city = city; } public User() { } // 修改用戶的城市後,發出通知 public synchronized void changeCity() { this.city = "London"; notifyAll(); } // 修改用戶的年齡後,發出通知 public synchronized void changeAge() { this.age = 31; notifyAll(); } // 等待用戶的年齡變化的方法,接收到通知,檢查發現用戶年齡大於30時,進行業務工做,不然一直等待 // 阻塞方法 public synchronized void waitAge() { while (this.age <= 30) { try { wait(); System.out.println("wait age [" + Thread.currentThread().getId() + "] is notified!"); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println("the age is " + this.age);// 業務工做 } // 等待用戶的城市變化的方法,接收到通知,檢查發現用戶城市不是NewYork時,進行業務工做,不然一直等待 // 阻塞方法 public synchronized void waitCity() { while (this.city.equals(CITY)) { try { wait(); System.out.println("wait city [" + Thread.currentThread().getId() + "] is notified!"); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println("the city is " + this.city);// 業務工做 } }
/** * TestUser測試類 */ public class TestUser { private static User user = new User(30, User.CITY); private static class CheckAge extends Thread { @Override public void run() { user.waitAge(); } } private static class CheckCity extends Thread { @Override public void run() { user.waitCity(); } } public static void main(String[] args) throws InterruptedException { for (int i = 0; i < 3; i++) { // 啓動三個等待用戶年齡變化的線程 new CheckAge().start(); } for (int i = 0; i < 3; i++) { // 啓動三個等待用戶城市變化的線程 new CheckCity().start(); } Thread.sleep(1000); user.changeCity();// 變更用戶的城市 } } main運行結果: wait city [16] is notified! the city is London wait city [15] is notified! the city is London wait city [14] is notified! the city is London wait age [13] is notified! wait age [12] is notified! wait age [11] is notified!
/** *調用阻塞方法時,如何中斷線程 */ public class BlockInterrupt { private static Object o = new Object(); /* while循環中包含try/catch塊 */ private static class WhileTryWhenBlock extends Thread { private volatile boolean on = true; private long i = 0; @Override public void run() { System.out.println("當前執行線程id:" + Thread.currentThread().getId()); while (on && !Thread.currentThread().isInterrupted()) { System.out.println("i=" + i++); try { // 拋出中斷異常的阻塞方法,拋出異常後,中斷標誌位會改爲false // 能夠理解爲這些方法會隱含調用Thread.interrputed()方法 synchronized (o) { o.wait(); } } catch (InterruptedException e) { System.out.println("當前執行線程的中斷標誌位:" + Thread.currentThread().getId() + ":" + Thread.currentThread().isInterrupted()); Thread.currentThread().interrupt();// 從新設置一下 System.out.println("被中斷的線程_" + getId() + ":" + isInterrupted()); // do my work } // 清理工做,準備結束線程 } } public void cancel() { // on = false; interrupt(); System.out.println("本方法所在線程實例:" + getId()); System.out.println("執行本方法的線程:" + Thread.currentThread().getId()); // Thread.currentThread().interrupt(); } } /* try/catch塊中包含while循環 */ private static class TryWhileWhenBlock extends Thread { private volatile boolean on = true; private long i = 0; @Override public void run() { try { while (on) { System.out.println(i++); // 拋出中斷異常的阻塞方法,拋出異常後,中斷標誌位改爲false synchronized (o) { o.wait(); } } } catch (InterruptedException e) { System.out.println("當前執行線程的中斷標誌位:" + Thread.currentThread().getId() + ":" + Thread.currentThread().isInterrupted()); } finally { // 清理工做結束線程 } } public void cancel() { on = false; interrupt(); } } public static void main(String[] args) throws InterruptedException { WhileTryWhenBlock whileTryWhenBlock = new WhileTryWhenBlock(); whileTryWhenBlock.start(); Thread.sleep(100); whileTryWhenBlock.cancel(); System.out.println("===================="); TryWhileWhenBlock tryWhileWhenBlock = new TryWhileWhenBlock(); tryWhileWhenBlock.start(); Thread.sleep(100); tryWhileWhenBlock.cancel(); } } 輸出結果 當前執行線程id:11 i=0 本方法所在線程實例:11 當前執行線程的中斷標誌位:11:false 執行本方法的線程:1 被中斷的線程_11:true ==================== 0 當前執行線程的中斷標誌位:11:false
/** * 測試Volatile型變量的操做原子性 */ public class VolatileThread implements Runnable { private volatile int a= 0; @Override public void run() { //synchronized (this){ a=a+1; System.out.println(Thread.currentThread().getName()+"----"+a); try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } a=a+1; System.out.println(Thread.currentThread().getName()+"----"+a); //} } }
public class VolatileTest { public static void main(String[] args) { VolatileThread volatileThread = new VolatileThread(); Thread t1 = new Thread(volatileThread); Thread t2 = new Thread(volatileThread); Thread t3 = new Thread(volatileThread); Thread t4 = new Thread(volatileThread); t1.start(); t2.start(); t3.start(); t4.start(); } }
Synchronized(對象){ While(條件不知足){ 對象.wait() } 業務邏輯處理 }
一、 得到對象的鎖;
二、 改變條件;
三、 通知全部等待在對象的線程
Synchronized(對象){ 業務邏輯處理,改變條件 對象.notify/notifyAll }
/** * 有界阻塞隊列 */ public class BlockingQueueWN<T> { private List queue = new LinkedList<>(); private final int limit; // 大小限制 public BlockingQueueWN(int limit) { this.limit = limit; } // 入隊 public synchronized void enqueue(T item) throws InterruptedException { // 若是隊列滿了 等待 while (this.queue.size() == this.limit) { wait(); } // 若是隊列爲空 喚醒 if (this.queue.size() == 0) { System.out.println("enqueue notifyAll"); notifyAll(); } // 入列 this.queue.add(item); } // 出隊 public synchronized T dequeue() throws InterruptedException { // 若是隊列爲空 等待 while (this.queue.size() == 0) { System.out.println("dequeue wait"); wait(); } // 若是隊列滿了 喚醒 if (this.queue.size() == this.limit) { notifyAll(); } // 出列 return (T) this.queue.remove(0); } }
public class BqTest { public static void main(String[] args) { BlockingQueueWN bq = new BlockingQueueWN(10); Thread threadA = new ThreadPush(bq); threadA.setName("Push"); Thread threadB = new ThreadPop(bq); threadB.setName("Pop"); threadB.start(); threadA.start(); } // 數據入隊列線程 private static class ThreadPush extends Thread { BlockingQueueWN<Integer> bq; public ThreadPush(BlockingQueueWN<Integer> bq) { this.bq = bq; } @Override public void run() { String threadName = Thread.currentThread().getName(); int i = 20; // 循環20次入列 while (i > 0) { try { Thread.sleep(1000); System.out.println(" i=" + i + " will push"); bq.enqueue(i--); } catch (InterruptedException e) { // e.printStackTrace(); } } } } // 數據出隊列線程 private static class ThreadPop extends Thread { BlockingQueueWN<Integer> bq; public ThreadPop(BlockingQueueWN<Integer> bq) { this.bq = bq; } @Override public void run() { // 無限循環 有就取 while (true) { try { System.out.println(Thread.currentThread().getName() + " will pop....."); Integer i = bq.dequeue(); System.out.println(" i=" + i.intValue() + " alread pop"); } catch (InterruptedException e) { // e.printStackTrace(); } } } } }
運行結果 Pop will pop..... dequeue wait i=20 will push enqueue notifyAll i=20 alread pop Pop will pop..... dequeue wait i=19 will push enqueue notifyAll i=19 alread pop Pop will pop..... dequeue wait i=18 will push enqueue notifyAll i=18 alread pop Pop will pop..... dequeue wait i=17 will push enqueue notifyAll i=17 alread pop Pop will pop..... dequeue wait i=16 will push enqueue notifyAll i=16 alread pop Pop will pop..... dequeue wait i=15 will push enqueue notifyAll i=15 alread pop Pop will pop..... dequeue wait i=14 will push enqueue notifyAll i=14 alread pop Pop will pop..... dequeue wait i=13 will push enqueue notifyAll i=13 alread pop Pop will pop..... dequeue wait i=12 will push enqueue notifyAll i=12 alread pop Pop will pop..... dequeue wait i=11 will push enqueue notifyAll i=11 alread pop Pop will pop..... dequeue wait i=10 will push enqueue notifyAll i=10 alread pop Pop will pop..... dequeue wait i=9 will push enqueue notifyAll i=9 alread pop Pop will pop..... dequeue wait i=8 will push enqueue notifyAll i=8 alread pop Pop will pop..... dequeue wait i=7 will push enqueue notifyAll i=7 alread pop Pop will pop..... dequeue wait i=6 will push enqueue notifyAll i=6 alread pop Pop will pop..... dequeue wait i=5 will push enqueue notifyAll i=5 alread pop Pop will pop..... dequeue wait i=4 will push enqueue notifyAll i=4 alread pop Pop will pop..... dequeue wait i=3 will push enqueue notifyAll i=3 alread pop Pop will pop..... dequeue wait i=2 will push enqueue notifyAll i=2 alread pop Pop will pop..... dequeue wait i=1 will push enqueue notifyAll i=1 alread pop Pop will pop..... dequeue wait
pipedOutputStream/input 面向的字節
pipedReader/Writer 面向的是字符
public class PipeTransfer { private static class Print implements Runnable{ private PipedReader in; public Print(PipedReader in) { this.in = in; } @Override public void run() { int receive =0; try { while((receive=in.read())!=-1){ System.out.println((char) receive); } } catch (IOException e) { e.printStackTrace(); } } } public static void main(String[] args) throws Exception { PipedWriter out = new PipedWriter(); PipedReader in = new PipedReader(); //必須進行鏈接 out.connect(in); Thread t1 = new Thread(new Print(in),"PrintThread"); t1.start(); int receive =0; try { while((receive=System.in.read())!=-1){ out.write(receive); } } catch (IOException e) { e.printStackTrace(); }finally { out.close(); } } } 運行輸入 good 輸出 g o o d
public class JoinTes { public static void main(String[] args) throws InterruptedException { ThreadJoinTest t1 = new ThreadJoinTest("小明"); ThreadJoinTest t2 = new ThreadJoinTest("小東"); t1.start(); /** * Thread類中的join方法的主要做用就是同步,它可使得線程之間的並行執行變爲串行執行。 * * 1 join的意思是使得放棄當前線程的執行,並返回對應的線程,例以下面代碼的意思就是: * 程序在main線程中調用t1線程的join方法,則main線程放棄cpu控制權,並返回t1線程繼續執行直到線程t1執行完畢 * 因此結果是t1線程執行完後,纔到主線程執行,至關於在main線程中同步t1線程,t1執行完了,main線程纔有執行的機會 * * 2 join方法能夠傳遞參數,join(10)表示main線程會等待t1線程10毫秒,10毫秒過去後, * main線程和t1線程之間執行順序由串行執行變爲普通的並行執行 * 若是A線程中掉用B線程的join(10),則表示A線程會等待B線程執行10毫秒,10毫秒事後, * A、B線程並行執行。須要注意的是,jdk規定,join(0)的意思不是A線程等待B線程0秒, * 而是A線程等待B線程無限時間,直到B線程執行完畢,即join(0)等價於join() * * 3 join方法必須在線程start方法調用以後調用纔有意義。這個也很容易理解:若是一個線程都沒有start,那它也就沒法同步了。 * * 4 join方法的原理就是調用相應線程的wait方法進行等待操做的,例如A線程中調用了B線程的join方法, * 則至關於在A線程中調用了B線程的wait方法,當B線程執行完(或者到達等待時間), * B線程會自動調用自身的notifyAll方法喚醒A線程,從而達到同步的目的。 */ t1.join(); t2.start(); } } class ThreadJoinTest extends Thread { public ThreadJoinTest(String name) { super(name); } @Override public void run() { for (int i = 0; i < 10; i++) { System.out.println(this.getName() + ":" + i); try { sleep(1000); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } } 運行結果 小明:0 小明:1 小明:2 小明:3 小明:4 小明:5 小明:6 小明:7 小明:8 小明:9 小東:0 小東:1 小東:2 小東:3 小東:4 小東:5 小東:6 小東:7 小東:8 小東:9
public class JoinTest { public static class CutInLine implements Runnable { private Thread thread; public CutInLine(Thread thread) { this.thread = thread; } @Override public void run() { try { // 在被插隊的線程裏,調用一下插隊線程的join方法 System.out.println(thread.getName() + " join " + Thread.currentThread().getName()); thread.join(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + " will work"); } } public static void main(String[] args) throws InterruptedException { Thread previous = Thread.currentThread(); for (int i = 0; i < 10; i++) { Thread thread = new Thread(new CutInLine(previous), String.valueOf(i)); thread.start(); System.out.print(thread.getName() + " start "); previous = thread; } } } 打印結果 0 start main join 0 1 start 0 join 1 2 start 1 join 2 3 start 2 join 3 4 start 3 join 4 5 start 4 join 5 6 start 5 join 6 7 start 6 join 7 8 start 7 join 8 9 start 8 join 9 0 will work 1 will work 2 will work 3 will work 4 will work 5 will work 6 will work 7 will work 8 will work 9 will work
remove():將當前線程局部變量的值刪除,這個方法是JDK 5.0新增的方法。當線程結束後,對應該線程的局部變量將自動被垃圾回收,因此顯式調用該方法清除線程的局部變量並非必須的操做,但它能夠加快內存回收的速度。
public class ThreadLocalTest { static ThreadLocal<String> threadLocal = new ThreadLocal<String>(){ @Override protected String initialValue() { return "init"; } }; public void test(){ Thread[] runs = new Thread[3]; for(int i =0;i<runs.length;i++){ runs[i]=new Thread(new T1(i)); } for(int i =0;i<runs.length;i++){ runs[i].start(); } } private static class T1 implements Runnable{ private int id; public T1(int id) { this.id = id; } @Override public void run() { System.out.println(Thread.currentThread().getId()+" start"); String s = threadLocal.get(); s = s+"_"+id; threadLocal.set(s); System.out.println(Thread.currentThread().getId()+s); } } public static void main(String[] args) { ThreadLocalTest test = new ThreadLocalTest(); test.test(); } } 輸出結果 11 start 13 start 11init_0 12 start 13init_2 12init_1
public class PerfermenTest { /** 執行次數 */ private static final long count = 100000000; public static void main(String[] args) throws InterruptedException { //併發計算 concurrency(); //單線程計算 serial(); } private static void concurrency() throws InterruptedException { long start = System.currentTimeMillis(); Thread thread = new Thread(new Runnable() { @Override public void run() { int a = 0; for (long i = 0; i < count; i++) { a += 5; } System.out.println("a="+a); } }); thread.start(); int b = 0; for (long i = 0; i < count; i++) { b--; } thread.join(); long time = System.currentTimeMillis() - start; System.out.println("concurrency :" + time + "ms,b=" + b); } private static void serial() { long start = System.currentTimeMillis(); int a = 0; for (long i = 0; i < count; i++) { a += 5; } int b = 0; for (long i = 0; i < count; i++) { b--; } long time = System.currentTimeMillis() - start; System.out.println("serial:" + time + "ms,b=" + b + ",a=" + a); } } 輸出結果 a=500000000 concurrency :41ms,b=-100000000 serial:70ms,b=-100000000,a=500000000
假設等待時間段是T,那麼能夠推斷出在當前時間now+T以後就會超時 等待持續時間:REMAINING=T。 ·超時時間:FUTURE=now+T。 // 對當前對象加鎖 public synchronized Object get(long mills) throws InterruptedException { long future = System.currentTimeMillis() + mills; long remaining = mills; // 當超時大於0而且result返回值不知足要求 while ((result == null) && remaining > 0) { wait(remaining); remaining = future - System.currentTimeMillis(); } return result; }
public class ConnectionDriver { public static final Connection getConnectiong() { return new ConnectionImpl(); } private static class ConnectionImpl implements Connection { @Override public Statement createStatement() throws SQLException { System.out.println("建立SQL " + Thread.currentThread().getId()); return null; } @Override public void commit() throws SQLException { try { System.err.println(Thread.currentThread().getId() + "準備提交數據"); TimeUnit.MILLISECONDS.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } @Override public PreparedStatement prepareStatement(String sql) throws SQLException { return null; } @Override public CallableStatement prepareCall(String sql) throws SQLException { return null; } @Override public String nativeSQL(String sql) throws SQLException { return null; } @Override public void setAutoCommit(boolean autoCommit) throws SQLException { } @Override public boolean getAutoCommit() throws SQLException { return false; } @Override public void rollback() throws SQLException { } @Override public void close() throws SQLException { } @Override public boolean isClosed() throws SQLException { return false; } @Override public DatabaseMetaData getMetaData() throws SQLException { return null; } @Override public void setReadOnly(boolean readOnly) throws SQLException { } @Override public boolean isReadOnly() throws SQLException { return false; } @Override public void setCatalog(String catalog) throws SQLException { } @Override public String getCatalog() throws SQLException { return null; } @Override public void setTransactionIsolation(int level) throws SQLException { } @Override public int getTransactionIsolation() throws SQLException { return 0; } @Override public SQLWarning getWarnings() throws SQLException { return null; } @Override public void clearWarnings() throws SQLException { } @Override public Statement createStatement(int resultSetType, int resultSetConcurrency) throws SQLException { return null; } @Override public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency) throws SQLException { return null; } @Override public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency) throws SQLException { return null; } @Override public Map<String, Class<?>> getTypeMap() throws SQLException { return null; } @Override public void setTypeMap(Map<String, Class<?>> map) throws SQLException { } @Override public void setHoldability(int holdability) throws SQLException { } @Override public int getHoldability() throws SQLException { return 0; } @Override public Savepoint setSavepoint() throws SQLException { return null; } @Override public Savepoint setSavepoint(String name) throws SQLException { return null; } @Override public void rollback(Savepoint savepoint) throws SQLException { } @Override public void releaseSavepoint(Savepoint savepoint) throws SQLException { } @Override public Statement createStatement(int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException { return null; } @Override public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException { return null; } @Override public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException { return null; } @Override public PreparedStatement prepareStatement(String sql, int autoGeneratedKeys) throws SQLException { return null; } @Override public PreparedStatement prepareStatement(String sql, int[] columnIndexes) throws SQLException { return null; } @Override public PreparedStatement prepareStatement(String sql, String[] columnNames) throws SQLException { return null; } @Override public Clob createClob() throws SQLException { return null; } @Override public Blob createBlob() throws SQLException { return null; } @Override public NClob createNClob() throws SQLException { return null; } @Override public SQLXML createSQLXML() throws SQLException { return null; } @Override public boolean isValid(int timeout) throws SQLException { return false; } @Override public void setClientInfo(String name, String value) throws SQLClientInfoException { } @Override public void setClientInfo(Properties properties) throws SQLClientInfoException { } @Override public String getClientInfo(String name) throws SQLException { return null; } @Override public Properties getClientInfo() throws SQLException { return null; } @Override public Array createArrayOf(String typeName, Object[] elements) throws SQLException { return null; } @Override public Struct createStruct(String typeName, Object[] attributes) throws SQLException { return null; } @Override public void setSchema(String schema) throws SQLException { } @Override public String getSchema() throws SQLException { return null; } @Override public void abort(Executor executor) throws SQLException { } @Override public void setNetworkTimeout(Executor executor, int milliseconds) throws SQLException { } @Override public int getNetworkTimeout() throws SQLException { return 0; } @Override public <T> T unwrap(Class<T> iface) throws SQLException { return null; } @Override public boolean isWrapperFor(Class<?> iface) throws SQLException { return false; } } }
/** * 數據庫鏈接池 * 從鏈接池中獲取、使用和釋放鏈接的過程,而客戶端獲取鏈接的過程被設定爲等待超時的模式, * 也就是在1000毫秒內若是沒法獲取到可用鏈接,將會返回給客戶端一個null。 * 設定鏈接池的大小爲10個,而後經過調節客戶端的線程數來模擬沒法獲取鏈接的場景。 * 鏈接池的定義。它經過構造函數初始化鏈接的最大上限,經過一個雙向隊列來維護鏈接, * 調用方須要先調用fetchConnection(long)方法來指定在多少毫秒內超時獲取鏈接,當鏈接使用完成後, * 須要調用releaseConnection(Connection)方法將鏈接放回線程池 */ public class ConnectionPool { // 存放鏈接的容器 private LinkedList<Connection> pool = new LinkedList<Connection>(); public ConnectionPool(int initialSize) { if (initialSize > 0) { for (int i = 0; i < initialSize; i++) { pool.addLast(ConnectionDriver.getConnectiong()); } } } /* 將鏈接放回線程池 */ public void releaseConnection(Connection connection) { if (connection != null) { synchronized (pool) { // 添加後須要進行通知,這樣其餘消費者可以感知到連接池中已經歸還了一個連接 pool.addLast(connection); pool.notifyAll(); } } } /* * 指定在多少毫秒內超時獲取鏈接,在指定時間內沒法獲取到鏈接,將會返回null */ public Connection fetchConnection(long mills) throws InterruptedException { synchronized (pool) { // 徹底超時 if (mills <= 0) { while (pool.isEmpty()) { pool.wait(); } return pool.removeFirst(); } else { long future = System.currentTimeMillis() + mills;// 何時超時 long remaining = mills;// 超時時長 while (pool.isEmpty() && remaining > 0) { pool.wait(remaining); remaining = future - System.currentTimeMillis();// 當前還須等待的時長 } Connection result = null; if (!pool.isEmpty()) { result = pool.removeFirst(); } return result; } } } }
public class ConnectionPoolTest { static ConnectionPool pool = new ConnectionPool(10); // 保證全部ConnectionRunner可以同時開始 static CountDownLatch start = new CountDownLatch(1); // main線程將會等待全部ConnectionRunner結束後才能繼續執行 static CountDownLatch end; public static void main(String[] args) throws Exception { // 線程數量,能夠線程數量進行觀察 int threadCount = 50; end = new CountDownLatch(threadCount); int count = 10;// 每一個線程循環取20次 AtomicInteger got = new AtomicInteger();// 獲取到數據庫鏈接的次數 AtomicInteger notGot = new AtomicInteger();// 沒有獲取到數據庫鏈接的次數 for (int i = 0; i < threadCount; i++) { Thread thread = new Thread(new ConnetionRunner(count, got, notGot), "ConnectionRunnerThread"); thread.start(); } start.countDown(); end.await(); System.out.println("total invoke: " + (threadCount * count)); System.out.println("got connection: " + got); System.out.println("not got connection " + notGot); } static class ConnetionRunner implements Runnable { int count; AtomicInteger got; AtomicInteger notGot; public ConnetionRunner(int count, AtomicInteger got, AtomicInteger notGot) { this.count = count; this.got = got; this.notGot = notGot; } public void run() { try { start.await(); } catch (Exception ex) { } while (count > 0) { try { // 從線程池中獲取鏈接,若是1000ms內沒法獲取到,將會返回null // 分別統計鏈接獲取的數量got和未獲取到的數量notGot Connection connection = pool.fetchConnection(1000); if (connection != null) { try { connection.createStatement(); connection.commit(); } finally { pool.releaseConnection(connection); got.incrementAndGet(); } } else { notGot.incrementAndGet(); } } catch (Exception ex) { } finally { count--; } } end.countDown(); } } } 運行結果: 建立SQL 41 41準備提交數據 41準備提交數據 建立SQL 41 total invoke: 500 got connection: 432 not got connection 68