讀書筆記之《Java併發編程的藝術》-併發編程基礎

讀書筆記部份內容來源書出版書,版權歸本書做者,若有錯誤,請指正。java

歡迎star、fork,讀書筆記系列會同步更新git

git
github

https://github.com/xuminwlt/j360-jdk 數據庫

module編程

j360-jdk-thread/me.j360.jdk.concurrent安全

本系列分4篇多線程

一、讀書筆記之《Java併發編程的藝術》-併發編程基礎併發

二、讀書筆記之《Java併發編程的藝術》-java中的鎖 app

三、讀書筆記之《Java併發編程的藝術》-併發編程容器和框架(重要)框架

四、讀書筆記之《Java併發編程的藝術》-線程池和Executor的子孫們


本書前三章分別爲

  1. 併發編程的挑戰,也就是併發編程的原因所在

  2. 底層的實現原理

  3. java內存模型

分別從cpu x86,x64以及內存模型等概念中描述java對併發編程的實現和控制,概念較爲底層和基礎,讀書筆記略過前三章直接從第四章應用實現及原理基礎開始。

章節

  1. 併發編程基礎

  2. java中的鎖

  3. 併發容器和框架(重點)

  4. 13個操做原子類

  5. java併發工具類

  6. 線程池

  7. Execurot框架

內容

  1. 併發編程基礎

  2. 多線程

    先看一段main方法

    public class MultThread {
        public static void main(String[] args){
            //獲取Java線程管理MXBean
    
            ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();
    
            ThreadInfo[] threadInfos = threadMXBean.dumpAllThreads(false,false);
            for(ThreadInfo threadInfo:threadInfos){
                System.out.println("[" + threadInfo.getThreadId() + " ] " + threadInfo.getLockName());
            }
        }
    }

    輸出

    [9 ] Monitor Ctrl-Break
    [5 ] Attach Listener
    [4 ] Signal Dispatcher
    [3 ] Finalizer
    [2 ] Reference Handler
    [1 ] main

    解釋

    java程序運行的不單單是main方法的運行,而是main線程和多個其餘線程的同時運行

    java天生就是多線程程序

    線程優先級

        線程的運行不能依賴於線程優先級

    線程的狀態

        線程在一個時刻,只能處於一種狀態

    NEW 初始狀態,線程構建尚未start
    RUNNABLE 運行狀態,就緒+運行
    BLOCKED 阻塞狀態,阻塞於鎖
    WAITING 等待狀態,須要等待其餘線程作出一些動做
    TIME_WAITING 超時等待,能夠再指定的時間自行返回
    TERMINATED 終止狀態,執行完畢
    public class ThreadState {
        public static void main(String[] args){
            new Thread(new TimeWaiting(),"TimeWaiting ").start();
            new Thread(new Waiting(),"Waiting").start();
            new Thread(new Blocked(),"Block-1").start();
            new Thread(new Blocked(),"Block-2").start();
        }
    
        static class TimeWaiting implements Runnable{
    
            @Override
            public void run() {
                while(true){
                    SleepUtils.second(100);
                }
            }
        }
    
        static class Waiting implements Runnable{
    
            @Override
            public void run() {
                while (true){
                    synchronized (Waiting.class){
                        try {
                            Waiting.class.wait();
                        }catch (InterruptedException e){
                            e.printStackTrace();
                        }
                    }
                }
            }
        }
    
        static class Blocked implements Runnable{
    
            @Override
            public void run() {
                synchronized (Blocked.class){
                    while (true){
                        SleepUtils.second(100);
                    }
                }
            }
        }
    
        static class SleepUtils{
            public static final void second(long seconds){
                try {
                    TimeUnit.SECONDS.sleep(seconds);
                }catch (InterruptedException e){
    
                }
            }
        }
    }

    cmd:jps

    D:\IdealProjects\j360-jdk>jps
    11476 Launcher
    13520
    7628 AppMain
    4480 Jps
    D:\IdealProjects\j360-jdk>jstack 7628
    2015-11-10 11:20:55
    Full thread dump Java HotSpot(TM) 64-Bit Server VM (24.75-b04 mixed mode):
    
    "DestroyJavaVM" prio=6 tid=0x0000000000e6d800 nid=0x11b8 waiting on condition [0x0000000000000000]
       java.lang.Thread.State: RUNNABLE
    
    "Block-2" prio=6 tid=0x0000000011429000 nid=0x373c waiting for monitor entry [0x0000000011c9f000]
       java.lang.Thread.State: BLOCKED (on object monitor)
            at me.j360.jdk.concurrent._1_thread.ThreadState$Blocked.run(ThreadState.java:52)
            - waiting to lock <0x00000007ac3e23f0> (a java.lang.Class for me.j360.jdk.concurrent._1_thread.ThreadState$Blocked)
            at java.lang.Thread.run(Thread.java:745)
    
    "Block-1" prio=6 tid=0x0000000011420000 nid=0x35a0 waiting on condition [0x0000000011b9e000]
       java.lang.Thread.State: TIMED_WAITING (sleeping)
            at java.lang.Thread.sleep(Native Method)
            at java.lang.Thread.sleep(Thread.java:340)
            at java.util.concurrent.TimeUnit.sleep(TimeUnit.java:360)
            at me.j360.jdk.concurrent._1_thread.ThreadState$SleepUtils.second(ThreadState.java:61)
            at me.j360.jdk.concurrent._1_thread.ThreadState$Blocked.run(ThreadState.java:52)
            - locked <0x00000007ac3e23f0> (a java.lang.Class for me.j360.jdk.concurrent._1_thread.ThreadState$Blocked)
            at java.lang.Thread.run(Thread.java:745)
    
    "Waiting" prio=6 tid=0x000000001141d800 nid=0x2318 in Object.wait() [0x0000000011a9f000]
       java.lang.Thread.State: WAITING (on object monitor)
            at java.lang.Object.wait(Native Method)
            - waiting on <0x00000007ac3df4e8> (a java.lang.Class for me.j360.jdk.concurrent._1_thread.ThreadState$Waiting)
            at java.lang.Object.wait(Object.java:503)
            at me.j360.jdk.concurrent._1_thread.ThreadState$Waiting.run(ThreadState.java:37)
            - locked <0x00000007ac3df4e8> (a java.lang.Class for me.j360.jdk.concurrent._1_thread.ThreadState$Waiting)
            at java.lang.Thread.run(Thread.java:745)
    
    "TimeWaiting " prio=6 tid=0x000000001141a800 nid=0x1124 waiting on condition [0x000000001199f000]
       java.lang.Thread.State: TIMED_WAITING (sleeping)
            at java.lang.Thread.sleep(Native Method)
            at java.lang.Thread.sleep(Thread.java:340)
            at java.util.concurrent.TimeUnit.sleep(TimeUnit.java:360)
            at me.j360.jdk.concurrent._1_thread.ThreadState$SleepUtils.second(ThreadState.java:61)
            at me.j360.jdk.concurrent._1_thread.ThreadState$TimeWaiting.run(ThreadState.java:25)
            at java.lang.Thread.run(Thread.java:745)
    
    "Monitor Ctrl-Break" daemon prio=6 tid=0x00000000113a7000 nid=0x3338 runnable [0x000000001180f000]
       java.lang.Thread.State: RUNNABLE
            at java.net.DualStackPlainSocketImpl.accept0(Native Method)
            at java.net.DualStackPlainSocketImpl.socketAccept(DualStackPlainSocketImpl.java:131)
            at java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:398)
            at java.net.PlainSocketImpl.accept(PlainSocketImpl.java:199)
            - locked <0x00000007ac444038> (a java.net.SocksSocketImpl)
            at java.net.ServerSocket.implAccept(ServerSocket.java:530)
            at java.net.ServerSocket.accept(ServerSocket.java:498)
            at com.intellij.rt.execution.application.AppMain$1.run(AppMain.java:90)
            at java.lang.Thread.run(Thread.java:745)

    線程的狀態會隨着代碼的執行在不一樣的狀態間切換

    線程中斷

    線程的一個標識位屬性,運行中的線程被其餘線程進行了中斷操做,調用interrupt()方法,能夠經過isinterrupt()方法判斷是否被中斷

    標記位清除的兩個場景:Thread.interrupt()復位、InterruptException拋出異常

    安全終止線程

    利用boolean變量來控制線程

    public class Shutdown {
        public static void main(String[] args) throws InterruptedException {
            Runner one  = new Runner();
            Thread thread1 = new Thread(one,"CountThread");
            thread1.start();
            TimeUnit.SECONDS.sleep(1);
            thread1.interrupt();
            Runner two = new Runner();
            Thread thread2 = new Thread(two,"CountThread");
            thread2.start();
            TimeUnit.SECONDS.sleep(1);
            thread2.interrupt();
            two.cancel();
        }
    
        private static class Runner implements Runnable{
    
            private long i;
            private volatile boolean on = true;
            @Override
            public void run() {
                while(on && !Thread.currentThread().isInterrupted()){
                    i++;
                }
                System.out.println("Count i = " + i);
            }
    
            public void cancel(){
                on = false;
            }
        }
    }
    Count i = 365764392
    Count i = 226360860

    線程間通訊、等待/通知機制

    notify() 通知一個等待的線程,從wait方法返回,前提是線程獲取了對象的鎖
    notifyAll() 通知全部在該對象上的線程
    wait() 調用該方法進入WAITING狀態,只有等待其餘線程通知纔會返回,會釋放鎖
    wait(long) 超時等待通知
    wait(long,int) 更精確的超時等待通知,精確到納秒

    Thread.join()的使用

    等待線程終止後才用thread.join返回

    每一個線程的終止是前驅線程的終止

    public class Join {
        public static void main(String[] args){
            Thread previous = Thread.currentThread();
            for(int i = 0;i<10;i++){
                Thread thread = new Thread(new Domino(previous),String.valueOf(i));
                thread.start();
                previous = thread;
            }
        }
    
        static class Domino implements Runnable{
    
            public Domino(Thread thread){
                this.thread = thread;
            }
            private Thread thread;
            @Override
            public void run() {
                try {
                    thread.join();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName() + " terminate");
            }
        }
    }
    0 terminate
    1 terminate
    2 terminate
    3 terminate
    4 terminate
    5 terminate
    6 terminate
    7 terminate
    8 terminate
    9 terminate


案例

    基於等待模式數據庫鏈接池

    public class ConnectionPool {
        private LinkedList<Connection> pool = new LinkedList<Connection>();
    
        public ConnectionPool(int initialSize){
            if(initialSize > 0){
                for(int i=1;i<initialSize;i++){
                    pool.addLast(ConnectionDrive.createConnection());
                }
            }
        }
    
        public void releaseConnection(Connection connection){
            if(connection != null){
                synchronized (pool){
                    pool.addLast(connection);
                    pool.notifyAll();
                }
            }
        }
    
        public Connection fetchConnection(long mills) throws InterruptedException {
            synchronized (pool){
                if(mills <= 0){
                    while (pool.isEmpty()){
                        pool.wait();
                    }
                    return pool.removeFirst();
                }else{
                    long future = System.currentTimeMillis() + mills;
                    long remainning = mills;
                    while(pool.isEmpty() && remainning > 0){
                        pool.wait();
                        remainning = future - System.currentTimeMillis();
                    }
                    Connection result = null;
                    if(! pool.isEmpty()){
                        result = pool.removeFirst();
                    }
                    return result;
                }
            }
        }
    }
    public class ConnectionDrive {
        static class ConnectionHandler implements InvocationHandler{
    
            @Override
            public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                if(method.getName().equals("commit")){
                    TimeUnit.MILLISECONDS.sleep(100);
                }
                return null;
            }
        }
    
        public static final Connection createConnection(){
            return (Connection) Proxy.newProxyInstance(ConnectionDrive.class.getClassLoader(),new Class[]{ Connection.class} ,new ConnectionHandler());
        }
    
    
    }
    public class ConnectionTest {
        static ConnectionPool pool = new ConnectionPool(10);
        static CountDownLatch start = new CountDownLatch(1);
    
        static CountDownLatch end;
    
        public static void main(String[] args) throws InterruptedException {
            int threadCount = 30;
            end = new CountDownLatch(threadCount);
            int count = 20;
            AtomicInteger got = new AtomicInteger();
            AtomicInteger notGot = new AtomicInteger();
            for(int i=0;i<threadCount;i++){
                Thread thread = new Thread(new ConnectionRunner(count,got,notGot),"Thread");
                thread.start();
            }
            start.countDown();
            end.await();
            System.out.println("total invoke:" + (threadCount *count));
            System.out.println("got " + got);
            System.out.println("notGot " + notGot);
        }
    
        static class ConnectionRunner implements Runnable{
            int count;
            AtomicInteger got;
            AtomicInteger notGot;
            public ConnectionRunner(int count,AtomicInteger got,AtomicInteger notGot){
                this.count = count;
                this.got = got;
                this.notGot = notGot;
            }
    
            @Override
            public void run() {
                try {
                    start.await();
                }catch (Exception e){
    
                }
                while (count > 0){
                    try {
                        Connection connection = pool.fetchConnection(1000);
                        if(connection != null){
                            try {
                                connection.createStatement();
                                connection.commit();
                            }finally {
                                pool.releaseConnection(connection);
                                got.incrementAndGet();
                            }
                        }else{
                            notGot.incrementAndGet();
                        }
                    }catch (Exception ex){
    
                    }finally {
                        count--;
                    }
                }
                end.countDown();
            }
        }
    }

    total invoke:600

    got 569

    notGot 31

    資源必定的狀況下,客戶端出現超時沒法獲取鏈接的比例不斷升高,超時按時返回告知客戶端獲取鏈接出現問題,是系統自我保護的機制。

    線程池技術

    public interface ThreadPool<Job extends Runnable> {
        void execute(Job job);
        void shutdown();
        void addWorkers(int num);
        void removeWorker(int num);
        int getJobSize();
    }
    package me.j360.jdk.concurrent._1_thread.simplepool;
    
    import java.util.*;
    import java.util.concurrent.atomic.AtomicLong;
    
    /**
     * Created with j360-jdk -> me.j360.jdk.concurrent._1_thread.simplepool.
     * User: min_xu
     * Date: 2015/11/10
     * Time: 13:45
     * 說明:
     */
    public class DefaultThreadPool<Job extends Runnable> implements ThreadPool<Job> {
    
        private static final int MAX_WORKER_NUMBERS = 10;
        private static final int DEFAULT_WORRER_NUMBERS = 5;
        private static final int MIN_WORKER_NUMBERS = 1;
        private final LinkedList<Job> jobs = new LinkedList<Job>();
        private final List<Worker> workers = Collections.synchronizedList(new ArrayList<Worker>());
    
        private int workerNum = DEFAULT_WORRER_NUMBERS;
        private AtomicLong threadNum = new AtomicLong();
    
        public DefaultThreadPool(){
            initializerWorkers(DEFAULT_WORRER_NUMBERS);
        }
        public DefaultThreadPool(int num){
            workerNum = num  > MAX_WORKER_NUMBERS?MAX_WORKER_NUMBERS:num < MIN_WORKER_NUMBERS?MIN_WORKER_NUMBERS:num;
            initializerWorkers(workerNum);
        }
        @Override
        public void execute(Job job) {
            if(job != null){
                synchronized (jobs){
                    jobs.addLast(job);
                    jobs.notify();
                }
            }
        }
    
        @Override
        public void shutdown() {
            for(Worker worker:workers){
                worker.shutdown();
            }
        }
    
        @Override
        public void addWorkers(int num) {
            synchronized (jobs){
                if(num+this.workerNum >MAX_WORKER_NUMBERS){
                    num = MAX_WORKER_NUMBERS - this.workerNum;
                }
                initializerWorkers(num);
                this.workerNum = num;
            }
        }
    
        @Override
        public void removeWorker(int num) {
            synchronized(jobs){
                if(num>this.workerNum){
                    throw new IllegalArgumentException("beyond worknum");
                }
                int count = 0;
                while(count < num){
                    Worker worker = workers.get(count);
                    if(workers.remove(worker)){
                        worker.shutdown();
                        count++;
                    }
                }
                this.workerNum -= count;
            }
        }
    
        @Override
        public int getJobSize() {
            return jobs.size();
        }
    
        //初始化線程工做
        private void initializerWorkers(int num){
            for(int i = 0;i<num;i++){
                Worker worker = new Worker();
                Thread thread = new Thread(worker,"ThreadPool-Worker-" + threadNum.incrementAndGet());
                thread.start();
            }
        }
    
        class Worker implements Runnable{
    
            private volatile boolean running = true;
            @Override
            public void run() {
                while (running){
                    Job job = null;
                    synchronized (jobs){
                        while (jobs.isEmpty()){
                            try {
                                jobs.wait();
                            }catch (InterruptedException e){
                                Thread.currentThread().interrupt();
                                return;
                            }
    
                        }
                        job = jobs.removeFirst();
                    }
                    if(job != null){
                        try {
                            job.run();
                        }catch (Exception ex){
    
                        }
                    }
                }
            }
    
            public void shutdown(){
                running = false;
            }
        }
    
        static class JobJob implements Runnable{
    
            @Override
            public void run() {
                System.out.println("JobJob");
            }
        }
        public static void main(String[] args){
            DefaultThreadPool<JobJob> defaultThreadPool = new DefaultThreadPool<JobJob>(4);
            for(int i=0;i<40;i++){
                JobJob jobJob = new JobJob();
                defaultThreadPool.execute(jobJob);
    
                System.out.println(defaultThreadPool.getJobSize());
            }
    
            defaultThreadPool.shutdown();
        }
    }



下一篇將講述鎖的應用

相關文章
相關標籤/搜索