讀書筆記部份內容來源書出版書,版權歸本書做者,若有錯誤,請指正。java
歡迎star、fork,讀書筆記系列會同步更新git
git
github
https://github.com/xuminwlt/j360-jdk 數據庫
module編程
j360-jdk-thread/me.j360.jdk.concurrent安全
本系列分4篇多線程
二、讀書筆記之《Java併發編程的藝術》-java中的鎖 app
三、讀書筆記之《Java併發編程的藝術》-併發編程容器和框架(重要)框架
四、讀書筆記之《Java併發編程的藝術》-線程池和Executor的子孫們
本書前三章分別爲
併發編程的挑戰,也就是併發編程的原因所在
底層的實現原理
java內存模型
分別從cpu x86,x64以及內存模型等概念中描述java對併發編程的實現和控制,概念較爲底層和基礎,讀書筆記略過前三章直接從第四章應用實現及原理基礎開始。
併發編程基礎
java中的鎖
併發容器和框架(重點)
13個操做原子類
java併發工具類
線程池
Execurot框架
先看一段main方法
public class MultThread { public static void main(String[] args){ //獲取Java線程管理MXBean ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean(); ThreadInfo[] threadInfos = threadMXBean.dumpAllThreads(false,false); for(ThreadInfo threadInfo:threadInfos){ System.out.println("[" + threadInfo.getThreadId() + " ] " + threadInfo.getLockName()); } } }
輸出
[9 ] Monitor Ctrl-Break [5 ] Attach Listener [4 ] Signal Dispatcher [3 ] Finalizer [2 ] Reference Handler [1 ] main
解釋
java程序運行的不單單是main方法的運行,而是main線程和多個其餘線程的同時運行
java天生就是多線程程序
線程的運行不能依賴於線程優先級
線程在一個時刻,只能處於一種狀態
NEW | 初始狀態,線程構建尚未start |
RUNNABLE | 運行狀態,就緒+運行 |
BLOCKED | 阻塞狀態,阻塞於鎖 |
WAITING | 等待狀態,須要等待其餘線程作出一些動做 |
TIME_WAITING | 超時等待,能夠再指定的時間自行返回 |
TERMINATED | 終止狀態,執行完畢 |
public class ThreadState { public static void main(String[] args){ new Thread(new TimeWaiting(),"TimeWaiting ").start(); new Thread(new Waiting(),"Waiting").start(); new Thread(new Blocked(),"Block-1").start(); new Thread(new Blocked(),"Block-2").start(); } static class TimeWaiting implements Runnable{ @Override public void run() { while(true){ SleepUtils.second(100); } } } static class Waiting implements Runnable{ @Override public void run() { while (true){ synchronized (Waiting.class){ try { Waiting.class.wait(); }catch (InterruptedException e){ e.printStackTrace(); } } } } } static class Blocked implements Runnable{ @Override public void run() { synchronized (Blocked.class){ while (true){ SleepUtils.second(100); } } } } static class SleepUtils{ public static final void second(long seconds){ try { TimeUnit.SECONDS.sleep(seconds); }catch (InterruptedException e){ } } } }
cmd:jps
D:\IdealProjects\j360-jdk>jps 11476 Launcher 13520 7628 AppMain 4480 Jps
D:\IdealProjects\j360-jdk>jstack 7628 2015-11-10 11:20:55 Full thread dump Java HotSpot(TM) 64-Bit Server VM (24.75-b04 mixed mode): "DestroyJavaVM" prio=6 tid=0x0000000000e6d800 nid=0x11b8 waiting on condition [0x0000000000000000] java.lang.Thread.State: RUNNABLE "Block-2" prio=6 tid=0x0000000011429000 nid=0x373c waiting for monitor entry [0x0000000011c9f000] java.lang.Thread.State: BLOCKED (on object monitor) at me.j360.jdk.concurrent._1_thread.ThreadState$Blocked.run(ThreadState.java:52) - waiting to lock <0x00000007ac3e23f0> (a java.lang.Class for me.j360.jdk.concurrent._1_thread.ThreadState$Blocked) at java.lang.Thread.run(Thread.java:745) "Block-1" prio=6 tid=0x0000000011420000 nid=0x35a0 waiting on condition [0x0000000011b9e000] java.lang.Thread.State: TIMED_WAITING (sleeping) at java.lang.Thread.sleep(Native Method) at java.lang.Thread.sleep(Thread.java:340) at java.util.concurrent.TimeUnit.sleep(TimeUnit.java:360) at me.j360.jdk.concurrent._1_thread.ThreadState$SleepUtils.second(ThreadState.java:61) at me.j360.jdk.concurrent._1_thread.ThreadState$Blocked.run(ThreadState.java:52) - locked <0x00000007ac3e23f0> (a java.lang.Class for me.j360.jdk.concurrent._1_thread.ThreadState$Blocked) at java.lang.Thread.run(Thread.java:745) "Waiting" prio=6 tid=0x000000001141d800 nid=0x2318 in Object.wait() [0x0000000011a9f000] java.lang.Thread.State: WAITING (on object monitor) at java.lang.Object.wait(Native Method) - waiting on <0x00000007ac3df4e8> (a java.lang.Class for me.j360.jdk.concurrent._1_thread.ThreadState$Waiting) at java.lang.Object.wait(Object.java:503) at me.j360.jdk.concurrent._1_thread.ThreadState$Waiting.run(ThreadState.java:37) - locked <0x00000007ac3df4e8> (a java.lang.Class for me.j360.jdk.concurrent._1_thread.ThreadState$Waiting) at java.lang.Thread.run(Thread.java:745) "TimeWaiting " prio=6 tid=0x000000001141a800 nid=0x1124 waiting on condition [0x000000001199f000] java.lang.Thread.State: TIMED_WAITING (sleeping) at java.lang.Thread.sleep(Native Method) at java.lang.Thread.sleep(Thread.java:340) at java.util.concurrent.TimeUnit.sleep(TimeUnit.java:360) at me.j360.jdk.concurrent._1_thread.ThreadState$SleepUtils.second(ThreadState.java:61) at me.j360.jdk.concurrent._1_thread.ThreadState$TimeWaiting.run(ThreadState.java:25) at java.lang.Thread.run(Thread.java:745) "Monitor Ctrl-Break" daemon prio=6 tid=0x00000000113a7000 nid=0x3338 runnable [0x000000001180f000] java.lang.Thread.State: RUNNABLE at java.net.DualStackPlainSocketImpl.accept0(Native Method) at java.net.DualStackPlainSocketImpl.socketAccept(DualStackPlainSocketImpl.java:131) at java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:398) at java.net.PlainSocketImpl.accept(PlainSocketImpl.java:199) - locked <0x00000007ac444038> (a java.net.SocksSocketImpl) at java.net.ServerSocket.implAccept(ServerSocket.java:530) at java.net.ServerSocket.accept(ServerSocket.java:498) at com.intellij.rt.execution.application.AppMain$1.run(AppMain.java:90) at java.lang.Thread.run(Thread.java:745)
線程的狀態會隨着代碼的執行在不一樣的狀態間切換
線程的一個標識位屬性,運行中的線程被其餘線程進行了中斷操做,調用interrupt()方法,能夠經過isinterrupt()方法判斷是否被中斷
標記位清除的兩個場景:Thread.interrupt()復位、InterruptException拋出異常
利用boolean變量來控制線程
public class Shutdown { public static void main(String[] args) throws InterruptedException { Runner one = new Runner(); Thread thread1 = new Thread(one,"CountThread"); thread1.start(); TimeUnit.SECONDS.sleep(1); thread1.interrupt(); Runner two = new Runner(); Thread thread2 = new Thread(two,"CountThread"); thread2.start(); TimeUnit.SECONDS.sleep(1); thread2.interrupt(); two.cancel(); } private static class Runner implements Runnable{ private long i; private volatile boolean on = true; @Override public void run() { while(on && !Thread.currentThread().isInterrupted()){ i++; } System.out.println("Count i = " + i); } public void cancel(){ on = false; } } }
Count i = 365764392 Count i = 226360860
notify() | 通知一個等待的線程,從wait方法返回,前提是線程獲取了對象的鎖 |
notifyAll() | 通知全部在該對象上的線程 |
wait() | 調用該方法進入WAITING狀態,只有等待其餘線程通知纔會返回,會釋放鎖 |
wait(long) | 超時等待通知 |
wait(long,int) | 更精確的超時等待通知,精確到納秒 |
等待線程終止後才用thread.join返回
每一個線程的終止是前驅線程的終止
public class Join { public static void main(String[] args){ Thread previous = Thread.currentThread(); for(int i = 0;i<10;i++){ Thread thread = new Thread(new Domino(previous),String.valueOf(i)); thread.start(); previous = thread; } } static class Domino implements Runnable{ public Domino(Thread thread){ this.thread = thread; } private Thread thread; @Override public void run() { try { thread.join(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + " terminate"); } } } 0 terminate 1 terminate 2 terminate 3 terminate 4 terminate 5 terminate 6 terminate 7 terminate 8 terminate 9 terminate
public class ConnectionPool { private LinkedList<Connection> pool = new LinkedList<Connection>(); public ConnectionPool(int initialSize){ if(initialSize > 0){ for(int i=1;i<initialSize;i++){ pool.addLast(ConnectionDrive.createConnection()); } } } public void releaseConnection(Connection connection){ if(connection != null){ synchronized (pool){ pool.addLast(connection); pool.notifyAll(); } } } public Connection fetchConnection(long mills) throws InterruptedException { synchronized (pool){ if(mills <= 0){ while (pool.isEmpty()){ pool.wait(); } return pool.removeFirst(); }else{ long future = System.currentTimeMillis() + mills; long remainning = mills; while(pool.isEmpty() && remainning > 0){ pool.wait(); remainning = future - System.currentTimeMillis(); } Connection result = null; if(! pool.isEmpty()){ result = pool.removeFirst(); } return result; } } } }
public class ConnectionDrive { static class ConnectionHandler implements InvocationHandler{ @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { if(method.getName().equals("commit")){ TimeUnit.MILLISECONDS.sleep(100); } return null; } } public static final Connection createConnection(){ return (Connection) Proxy.newProxyInstance(ConnectionDrive.class.getClassLoader(),new Class[]{ Connection.class} ,new ConnectionHandler()); } }
public class ConnectionTest { static ConnectionPool pool = new ConnectionPool(10); static CountDownLatch start = new CountDownLatch(1); static CountDownLatch end; public static void main(String[] args) throws InterruptedException { int threadCount = 30; end = new CountDownLatch(threadCount); int count = 20; AtomicInteger got = new AtomicInteger(); AtomicInteger notGot = new AtomicInteger(); for(int i=0;i<threadCount;i++){ Thread thread = new Thread(new ConnectionRunner(count,got,notGot),"Thread"); thread.start(); } start.countDown(); end.await(); System.out.println("total invoke:" + (threadCount *count)); System.out.println("got " + got); System.out.println("notGot " + notGot); } static class ConnectionRunner implements Runnable{ int count; AtomicInteger got; AtomicInteger notGot; public ConnectionRunner(int count,AtomicInteger got,AtomicInteger notGot){ this.count = count; this.got = got; this.notGot = notGot; } @Override public void run() { try { start.await(); }catch (Exception e){ } while (count > 0){ try { Connection connection = pool.fetchConnection(1000); if(connection != null){ try { connection.createStatement(); connection.commit(); }finally { pool.releaseConnection(connection); got.incrementAndGet(); } }else{ notGot.incrementAndGet(); } }catch (Exception ex){ }finally { count--; } } end.countDown(); } } }
total invoke:600
got 569
notGot 31
資源必定的狀況下,客戶端出現超時沒法獲取鏈接的比例不斷升高,超時按時返回告知客戶端獲取鏈接出現問題,是系統自我保護的機制。
public interface ThreadPool<Job extends Runnable> { void execute(Job job); void shutdown(); void addWorkers(int num); void removeWorker(int num); int getJobSize(); }
package me.j360.jdk.concurrent._1_thread.simplepool; import java.util.*; import java.util.concurrent.atomic.AtomicLong; /** * Created with j360-jdk -> me.j360.jdk.concurrent._1_thread.simplepool. * User: min_xu * Date: 2015/11/10 * Time: 13:45 * 說明: */ public class DefaultThreadPool<Job extends Runnable> implements ThreadPool<Job> { private static final int MAX_WORKER_NUMBERS = 10; private static final int DEFAULT_WORRER_NUMBERS = 5; private static final int MIN_WORKER_NUMBERS = 1; private final LinkedList<Job> jobs = new LinkedList<Job>(); private final List<Worker> workers = Collections.synchronizedList(new ArrayList<Worker>()); private int workerNum = DEFAULT_WORRER_NUMBERS; private AtomicLong threadNum = new AtomicLong(); public DefaultThreadPool(){ initializerWorkers(DEFAULT_WORRER_NUMBERS); } public DefaultThreadPool(int num){ workerNum = num > MAX_WORKER_NUMBERS?MAX_WORKER_NUMBERS:num < MIN_WORKER_NUMBERS?MIN_WORKER_NUMBERS:num; initializerWorkers(workerNum); } @Override public void execute(Job job) { if(job != null){ synchronized (jobs){ jobs.addLast(job); jobs.notify(); } } } @Override public void shutdown() { for(Worker worker:workers){ worker.shutdown(); } } @Override public void addWorkers(int num) { synchronized (jobs){ if(num+this.workerNum >MAX_WORKER_NUMBERS){ num = MAX_WORKER_NUMBERS - this.workerNum; } initializerWorkers(num); this.workerNum = num; } } @Override public void removeWorker(int num) { synchronized(jobs){ if(num>this.workerNum){ throw new IllegalArgumentException("beyond worknum"); } int count = 0; while(count < num){ Worker worker = workers.get(count); if(workers.remove(worker)){ worker.shutdown(); count++; } } this.workerNum -= count; } } @Override public int getJobSize() { return jobs.size(); } //初始化線程工做 private void initializerWorkers(int num){ for(int i = 0;i<num;i++){ Worker worker = new Worker(); Thread thread = new Thread(worker,"ThreadPool-Worker-" + threadNum.incrementAndGet()); thread.start(); } } class Worker implements Runnable{ private volatile boolean running = true; @Override public void run() { while (running){ Job job = null; synchronized (jobs){ while (jobs.isEmpty()){ try { jobs.wait(); }catch (InterruptedException e){ Thread.currentThread().interrupt(); return; } } job = jobs.removeFirst(); } if(job != null){ try { job.run(); }catch (Exception ex){ } } } } public void shutdown(){ running = false; } } static class JobJob implements Runnable{ @Override public void run() { System.out.println("JobJob"); } } public static void main(String[] args){ DefaultThreadPool<JobJob> defaultThreadPool = new DefaultThreadPool<JobJob>(4); for(int i=0;i<40;i++){ JobJob jobJob = new JobJob(); defaultThreadPool.execute(jobJob); System.out.println(defaultThreadPool.getJobSize()); } defaultThreadPool.shutdown(); } }
下一篇將講述鎖的應用