02.第二階段、實戰Java高併發程序設計模式-5.JDK併發包2

6.JDK併發包2 ......................................................................................................................1java

1.線程池的基本使用 ......................................................................................................2緩存

  1. 1.1.  爲何須要線程池...............................................................................................2併發

  2. 1.2.  JDK爲咱們提供了哪些支持 .................................................................................2ide

  3. 1.3.  線程池的使用.......................................................................................................2this

    1. 1.3.1.  線程池的種類 ...............................................................................................2atom

    2. 1.3.2.  不一樣線程池的共同性 ...................................................................................2操作系統

1.4. 線程池使用的小例子...........................................................................................2線程

  1. 1.4.1.  簡單線程池 ...................................................................................................3code

  2. 1.4.2.  ScheduledThreadPool ....................................................................................3blog

擴展和加強線程池 .....................................................................................................3

  1. 2.1.  回調接口...............................................................................................................3

  2. 2.2.  拒絕策略...............................................................................................................3

  3. 2.3.  自定義ThreadFactory ...........................................................................................3

線程池及其核心代碼分析 .........................................................................................

 ForkJoin ........................................................................................................................3

4.1. 思想.......................................................................................................................3

4.2. 使用接口...............................................................................................................4

  1. 4.2.1.  RecursiveAction .............................................................................................4

  2. 4.2.2.  RecursiveTask ................................................................................................4

4.3. 簡單例子...............................................................................................................4

4.4. 實現要素...............................................................................................................4

4.4.1. 工做竊取 .......................................................................................................

  1. 簡單例子

  2. newCachedThreadPool 建立一個可緩存的線程池。默認的,若是線程池的大小超過了處理任務所須要的線程, 那麼就會回收部分空閒(60秒不執行任務)的線程,當任務數增長時,此線程池又能夠智能的添加新線程來處理任務。此線程池不會對線程池大小作限制,線程池大小徹底依賴於操做系統(或者說JVM)可以建立的最大線程大小。

  3. newFixedThreadPool 建立固定大小的線程池。每次提交一個任務就建立一個線程,線程可複用,直到線程達到線程池的最大大小。線程池的大小一旦達到最大值就會保持不變,若是某個線程由於執行異常而結束,那麼線程池會補充一個新線程。

    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    public class ThreadPollDemo {
    
    	public static class MyTask implements Runnable {
    		@Override
    		public void run() {
    			System.out.println(System.currentTimeMillis() + ":Thread ID:" +
    					Thread.currentThread().getId());
    			try {
    				Thread.sleep(1000);
    			} catch (InterruptedException e) {
    				e.printStackTrace();
    			}
    		}
    	}
    
    	public static void main(String[] args) {
    		MyTask task = new MyTask();
    		// 建立一個可重用固定線程數的線程池
    		ExecutorService pool = Executors.newFixedThreadPool(5);
    
    		for (int i = 0; i < 10; i++) {
    			pool.submit(task);
    		}
    		pool.shutdown();
    	}
    }
  4. newSingleThreadExecutor 建立一個單線程的線程池。這個線程池只有一個線程在工做,也就是至關於單線程串行執行全部任務。若是這個惟一的線程由於異常結束,那麼會有一個新的線程來替代它。此線程池保證全部任務的執行順序按照任務的提交順序執行。

  5. newScheduledThreadPool 建立一個大小無限的線程池。此線程池支持定時以及週期性執行任務的需求。

    import java.util.concurrent.Executors;
    import java.util.concurrent.ScheduledExecutorService;
    import java.util.concurrent.TimeUnit;
    
    public class ThreadPollDemo {
    
    	public static class MyTask implements Runnable {
    		@Override
    		public void run() {
    			System.out.println(System.currentTimeMillis() + ":Thread ID:" +
    					Thread.currentThread().getId());
    			try {
    				Thread.sleep(1000);
    			} catch (InterruptedException e) {
    				e.printStackTrace();
    			}
    		}
    	}
    
    	public static void main(String[] args) {
    		MyTask task = new MyTask();
    		ScheduledExecutorService pool = Executors.newScheduledThreadPool(5);
    		//若是前面的任務沒有完成,則調度也不會啓動
    		//每3秒重複執行一次
    		pool.scheduleWithFixedDelay(task, 0,3, TimeUnit.SECONDS);
    		//3秒後執行一次
    //        pool.schedule(task, 3, TimeUnit.SECONDS);
    //        pool.shutdown();
    	}
    }

    擴展和加強線程池

    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.LinkedBlockingQueue;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    
    public class ThreadPollDemo {
    
    	public static class MyTask implements Runnable {
    
    		public String name;
    
    		public MyTask(String name) {
    			this.name = name;
    		}
    
    		@Override
    		public void run() {
    			System.out.println("正在執行" + ":Thread ID:" +
    					Thread.currentThread().getId() + "task name= " + name);
    			try {
    				Thread.sleep(100);
    			} catch (InterruptedException e) {
    				e.printStackTrace();
    			}
    		}
    
    
    	}
    
    
    	public static void main(String[] args) throws InterruptedException {
    		ExecutorService pool = new ThreadPoolExecutor(5,
    				5,
    				0L,
    				TimeUnit.MILLISECONDS,
    				new LinkedBlockingQueue<Runnable>()
    		) {
    			@Override
    			protected void beforeExecute(Thread t, Runnable r) {
    				System.out.println("準備執行" + ((MyTask) r).name);
    				super.beforeExecute(t, r);
    			}
    
    			@Override
    			protected void afterExecute(Runnable r, Throwable t) {
    				super.afterExecute(r, t);
    				System.out.println("執行完成" + ((MyTask) r).name);
    			}
    
    			@Override
    			protected void terminated() {
    				super.terminated();
    				System.out.println("線程池退出");
    			}
    		};
    		for (int i = 0; i < 5; i++) {
    			MyTask task = new MyTask("TASK-GEYM" + i);
    			pool.execute(task);
    			Thread.sleep(10);
    		}
    		pool.shutdown();
    	}
    }
    準備執行TASK-GEYM0
    正在執行:Thread ID:10task name= TASK-GEYM0
    準備執行TASK-GEYM1
    正在執行:Thread ID:11task name= TASK-GEYM1
    準備執行TASK-GEYM2
    正在執行:Thread ID:12task name= TASK-GEYM2
    準備執行TASK-GEYM3
    正在執行:Thread ID:13task name= TASK-GEYM3
    準備執行TASK-GEYM4
    正在執行:Thread ID:14task name= TASK-GEYM4
    執行完成TASK-GEYM0
    執行完成TASK-GEYM1
    執行完成TASK-GEYM2
    執行完成TASK-GEYM3
    執行完成TASK-GEYM4
    線程池退出

    拒絕策略

    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.RejectedExecutionHandler;
    import java.util.concurrent.SynchronousQueue;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    
    public class ThreadPollDemo {
    
    	public static class MyTask implements Runnable {
    
    		public String name;
    
    		public MyTask(String name) {
    			this.name = name;
    		}
    
    		@Override
    		public void run() {
    			System.out.println("正在執行" + ":Thread ID:" +
    					Thread.currentThread().getId() + "task name= " + name);
    			try {
    				Thread.sleep(100);
    			} catch (InterruptedException e) {
    				e.printStackTrace();
    			}
    		}
    
    
    	}
    
    
    	public static void main(String[] args) throws InterruptedException {
    		ExecutorService pool = new ThreadPoolExecutor(5,
    				5,
    				0L,
    				TimeUnit.MILLISECONDS,
    				new SynchronousQueue<Runnable>(),
    				Executors.defaultThreadFactory(),
    				new RejectedExecutionHandler() {
    					@Override
    					public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
    						System.out.println(r.toString()+" is discard");
    					}
    				}
    		) ;
    		for (int i = 0; i < Integer.MAX_VALUE; i++) {
    			MyTask task = new MyTask("TASK-GEYM" + i);
    			pool.submit(task);
    			Thread.sleep(10);
    		}
    //        pool.shutdown();
    	}
    }
    正在執行:Thread ID:10task name= TASK-GEYM0
    正在執行:Thread ID:11task name= TASK-GEYM1
    正在執行:Thread ID:12task name= TASK-GEYM2
    正在執行:Thread ID:13task name= TASK-GEYM3
    正在執行:Thread ID:14task name= TASK-GEYM4
    java.util.concurrent.FutureTask@7b23ec81 is discard
    java.util.concurrent.FutureTask@6acbcfc0 is discard
    java.util.concurrent.FutureTask@5f184fc6 is discard
    java.util.concurrent.FutureTask@3feba861 is discard
    正在執行:Thread ID:10task name= TASK-GEYM9
    正在執行:Thread ID:11task name= TASK-GEYM10
    正在執行:Thread ID:12task name= TASK-GEYM11
    正在執行:Thread ID:13task name= TASK-GEYM12
    正在執行:Thread ID:14task name= TASK-GEYM13
    java.util.concurrent.FutureTask@5b480cf9 is discard
    java.util.concurrent.FutureTask@6f496d9f is discard
    java.util.concurrent.FutureTask@723279cf is discard
    java.util.concurrent.FutureTask@10f87f48 is discard
    正在執行:Thread ID:10task name= TASK-GEYM18
    正在執行:Thread ID:11task name= TASK-GEYM19

    自定義線程池名稱

    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.RejectedExecutionHandler;
    import java.util.concurrent.SynchronousQueue;
    import java.util.concurrent.ThreadFactory;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    
    public class ThreadPollDemo {
    
    	public static class MyTask implements Runnable {
    
    		public String name;
    
    		public MyTask(String name) {
    			this.name = name;
    		}
    
    		@Override
    		public void run() {
    			System.out.println("正在執行" + ":Thread ID:" +
    					Thread.currentThread().getId() + "task name= " + name);
    			System.out.println(Thread.currentThread().getName());
    
    			try {
    				Thread.sleep(100);
    			} catch (InterruptedException e) {
    				e.printStackTrace();
    			}
    		}
    
    
    	}
    
    
    	public static void main(String[] args) throws InterruptedException {
    
    		ExecutorService pool = new ThreadPoolExecutor(5,
    				5,
    				0L,
    				TimeUnit.MILLISECONDS,
    				new SynchronousQueue<Runnable>(),
    				new MssThreadFactory("個人專屬線程池"),
    //                Executors.defaultThreadFactory(),
    				new RejectedExecutionHandler() {
    					@Override
    					public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
    						System.out.println(r.toString() + " is discard");
    					}
    				}
    		);
    		for (int i = 0; i < Integer.MAX_VALUE; i++) {
    			MyTask task = new MyTask("TASK-GEYM" + i);
    			pool.submit(task);
    			Thread.sleep(10);
    		}
    //        pool.shutdown();
    	}
    }
    import java.util.concurrent.ThreadFactory;
    import java.util.concurrent.atomic.AtomicInteger;
    
    /**
     * description:
     *
     * @author: dawn.he QQ:       905845006
     * @email: dawn.he@cloudwise.com
     * @email: 905845006@qq.com
     * @date: 2019/10/4    5:00 PM
     */
    public class MssThreadFactory implements ThreadFactory {
    	private final AtomicInteger threadNumber = new AtomicInteger(1);
    	private final String namePrefix;
    
    	MssThreadFactory(String namePrefix) {
    		this.namePrefix = namePrefix+"-";
    	}
    
    	public Thread newThread(Runnable r) {
    		Thread t = new Thread( r,namePrefix + threadNumber.getAndIncrement());
    		if (t.isDaemon())
    			t.setDaemon(true);
    		if (t.getPriority() != Thread.NORM_PRIORITY)
    			t.setPriority(Thread.NORM_PRIORITY);
    		return t;
    	}
    }

    ForkJoin

    發送消息 RecursiveAction 無返回值

    /**
     * description: 發送消息
     *
     * @author: dawn.he QQ:       905845006
     * @email: dawn.he@cloudwise.com
     * @email: 905845006@qq.com
     * @date: 2019/10/4    5:12 PM
     */
    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.ForkJoinPool;
    import java.util.concurrent.RecursiveAction;
    import java.util.concurrent.TimeUnit;
    
    public class ForkJoinPoolDemo {
    
    	class SendMsgTask extends RecursiveAction {
    
    		private final int THRESHOLD = 10;
    
    		private int start;
    		private int end;
    		private List<String> list;
    
    		public SendMsgTask(int start, int end, List<String> list) {
    			this.start = start;
    			this.end = end;
    			this.list = list;
    		}
    
    		@Override
    		protected void compute() {
    
    			if ((end - start) <= THRESHOLD) {
    				for (int i = start; i < end; i++) {
    					System.out.println(Thread.currentThread().getName() + ": " + list.get(i));
    				}
    			}else {
    				int middle = (start + end) / 2;
    				//批量提交任務集
    				invokeAll(new SendMsgTask(start, middle, list), new SendMsgTask(middle, end, list));
    			}
    
    		}
    
    	}
    
    	public static void main(String[] args) throws InterruptedException {
    		List<String> list = new ArrayList<>();
    		for (int i = 0; i < 123; i++) {
    			list.add(String.valueOf(i+1));
    		}
    
    		ForkJoinPool pool = new ForkJoinPool();
    		pool.submit(new ForkJoinPoolDemo().new SendMsgTask(0, list.size(), list));
    		pool.awaitTermination(10, TimeUnit.SECONDS);
    		pool.shutdown();
    	}
    
    }

    計算求和

    /**
     * description: 求和 RecursiveTask 有返回值
     *
     * @author: dawn.he QQ:       905845006
     * @email: dawn.he@cloudwise.com
     * @email: 905845006@qq.com
     * @date: 2019/10/4    5:25 PM
     */
    
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.ForkJoinPool;
    import java.util.concurrent.ForkJoinTask;
    import java.util.concurrent.RecursiveTask;
    
    public class ForkJoinTaskDemo {
    
    	private class SumTask extends RecursiveTask<Integer> {
    
    		private static final int THRESHOLD = 1000;
    
    		private int arr[];
    		private int start;
    		private int end;
    
    		public SumTask(int[] arr, int start, int end) {
    			this.arr = arr;
    			this.start = start;
    			this.end = end;
    		}
    
    		/**
    		 * 小計
    		 */
    		private Integer subtotal() {
    			Integer sum = 0;
    			for (int i = start; i < end; i++) {
    				sum += arr[i];
    			}
    			System.out.println(Thread.currentThread().getName() + ": ∑(" + start + "~" + end + ")=" + sum);
    			return sum;
    		}
    
    		@Override
    		protected Integer compute() {
    
    			if ((end - start) <= THRESHOLD) {
    				return subtotal();
    			}else {
    				int middle = (start + end) / 2;
    				SumTask left = new SumTask(arr, start, middle);
    				SumTask right = new SumTask(arr, middle, end);
    				left.fork();
    				right.fork();
    
    				return left.join() + right.join();
    			}
    		}
    	}
    
    	public static void main(String[] args) throws ExecutionException, InterruptedException {
    		int[] arr = new int[10000];
    		for (int i = 0; i < 10000; i++) {
    			arr[i] = i + 1;
    		}
    
    		ForkJoinPool pool = new ForkJoinPool();
    		ForkJoinTask<Integer> result = pool.submit(new ForkJoinTaskDemo().new SumTask(arr, 0, arr.length));
    		//提交任務
    		System.out.println("最終計算結果: " + result.invoke());
    		pool.shutdown();
    	}
    
    }

相關文章
相關標籤/搜索