前言:首先簡單模擬一個場景,前端有一個輸入框,有一個按鈕,點擊這個按鈕能夠實現搜索輸入框中的相關的文本和圖片(相似於百度、谷歌搜索).看似一個簡單的功能,後端處理也不難,前端發起一個請求,後端接受到這個請求,獲取前端輸入的內容,而後用搜索服務查找相關的數據返回給前端。可是問題來了,可能不是一個用戶在搜索,假若有一萬個用戶同時發起請求呢?後端如何處理?若是按照單機的 處理方式,很容易線程堵死,程序崩潰、數據庫崩塌。本文來介紹一下如何經過線程池來處理前端的請求。html
本篇博客的目錄前端
一:線程池的優勢java
二:定義一個線程池web
三: 線程池實現類數據庫
四:執行任務編程
五:總結後端
本篇博客技術總架構圖:安全
一:線程池的好處服務器
1.1:好處多線程
1.1.1 線程池能夠異步的執行任務,當任務進來的時候線程池首先會判斷當前是否有存活可用的線程,若是有的話,線程會執行這個任務。可是任務此時能夠馬上返回,並不必定必須等待任務執行完畢纔會返回。假如是同步阻塞的話,當一個線程遇到Exception的時候,假如這個線程沒有獲得處理,那麼就會形成線程堵塞,資源囤積,最終的結果只能是cpu資源耗盡,全部的任務沒法處理。以前咱們的線上就出現了不少dubbo服務訪問超時問題,最後發現就是cpu資源耗盡,報了一個unable to create new Thread,這樣就沒法處理任務(最後咱們進行了物理擴容而且合理限定了線程池的最大線程數量才解決這個問題)
1.1.2:線程池能夠集中管理線程,能夠控制線程的運行週期,這裏包括動態添加線程或者移除線程。有一個很重要的點是這樣的:線程的上下文切換是很是消耗性能的;假如來了一個任務,線程執行一次,而後馬上銷燬;再來一個任務,再建立一個任務,用完再銷燬這個線程。那麼爲何不能對這個線程進行復用呢?
1.1.3:線程池的優點只有在高請求量纔會體現出來,若是請求量比較好,須要處理的任務不多,那麼使用線程池的做用並不明顯。可是並非線程數量越多越好,具體的數量須要評估每一個任務的處理時間以及當前計算機的處理能力和數量,這個是有具體的數據體現的,咱們來看一下實際數據比較:
二:定義一個線程池
2.1:首先咱們來定義一個線程池的接口,其中包含線程池開始任務、關閉線程池,增長線程、減小線程,線程池的使命就是管理線程的生命週期,包括加、減小和刪除線程,還有讓線程開始執行任務,自身的關閉和開啓!
public interface ThreadPool<Job extends Runnable> { /** * 線程池開始 */ void execute(Job job); /** * 關閉線程池 */ void shutDown(); /** * 添加線程 * @param num */ void addWorkers(int num); /** * 減小線程 * @param num */ void removeWorker(int num); /** * 獲取正在等待的線程數量 * @return */ int getJobSize(); }
三:線程池實現類
解釋: 線程池的實現類,首先就是定義線程池的默認數量,爲2*cpu核心數+1,這是比較合理的計算公式。最小數量定義爲1,最大數量定義爲10。還用一個LinkedList來做爲工做線程的集合容器。這裏爲何要用linkedList而不是ArrayList呢?由於linkedList是一個雙向鏈表,雙向鏈表能夠實現先進先出或者後進先出等集合。而後咱們定義了worker來封裝具體執行任務的線程,用Job來封裝要執行的任務。而後在構造方法裏用initWorkers方法來初始化線程池,建立指定的默認數量的線程,指定名稱(用AtomicLong:原子線程安全的)並添加到管理線程的集合workers中(這個list通過synchronizedList修飾它已經成爲了一個同步的集合,所作的操做都是線程安全的)。在execute中,首先獲取須要指定的任務(Job),爲了保證線程安全,會鎖住全部的任務集合(放心synchronized這個關鍵字的做用,它通過jdk1.7已經優化過了,性能消耗有質的提高)。這裏爲何要鎖住jobs這個集合呢,答案是:爲了防止在多線程環境下,有多個job同時添到這個jobs裏面,任務要一個個的執行,防止沒法執行任務。接着再用addLast方法將任務添加到鏈表的最後一個,這裏就是一個先進先出的隊列(先進入的線程會優先被執行)再調用jobs的notify方法喚醒其餘job。而在下面的添加線程或者移除線程的方法,都必需要鎖住整個工做隊列,這裏爲了防止,執行的時候忽然發現job不見了,或者添加的時候取不到最新的job等多線程下的安全問題,而且在worker線程中增長了一個running字段,用於控制線程的運行或者中止(run方法是否執行的控制條件)
public class DefaultThreadPool<Job extends Runnable> implements ThreadPool<Job> { private static final int MAX_WORKER_NUMBERS = 10; private static final int DEFAULT_WORKERS_NUMBERS = 2 * (Runtime.getRuntime().availableProcessors()) + 1; private static final int MIN_WORDER_NUMBERS = 1; private final LinkedList<Job> jobs = new LinkedList<Job>(); //管理工做線程的集合 private final List<Worker> workers = Collections.synchronizedList(new ArrayList<Worker>()); private int workerNum = DEFAULT_WORKERS_NUMBERS; private AtomicLong threadNum = new AtomicLong(); /** * 線程開始運行 */ public DefaultThreadPool() { initWorkers(DEFAULT_WORKERS_NUMBERS); } public DefaultThreadPool(int num) { workerNum = num > MAX_WORKER_NUMBERS ? MAX_WORKER_NUMBERS : num < MIN_WORDER_NUMBERS ? MIN_WORDER_NUMBERS : num; } /** * 初始化線程 * * @param defaultWorkersNumbers */ private void initWorkers(int defaultWorkersNumbers) { for (int i = 0; i < defaultWorkersNumbers; i++) { Worker worker = new Worker(); workers.add(worker); Thread thread = new Thread(worker, "ThreadPool-worker" + threadNum.incrementAndGet()); thread.start(); } } /** * 執行任務 * * @param job */ @Override public void execute(Job job) { if (job != null) { synchronized (jobs) { jobs.addLast(job); jobs.notify(); } } } /** * 關閉線程 */ @Override public void shutDown() { for (Worker worker : workers) { if (worker != null) { worker.shutDown(); } } } /** * 添加線程 * * @param num */ @Override public void addWorkers(int num) { synchronized (jobs) { if (num + this.workerNum > MAX_WORKER_NUMBERS) { num = MAX_WORKER_NUMBERS - this.workerNum; } initWorkers(num); this.workerNum += num; } } /** * 移除線程 * * @param num */ @Override public void removeWorker(int num) { synchronized (jobs) { if (num > workerNum) { throw new IllegalArgumentException("much workNum"); } int count = 0; while (count < num) { Worker worker = workers.get(count); if (workers.remove(worker)) { worker.shutDown(); count++; } } this.workerNum -= count; } } /** * 獲取工做線程的數量 * * @return */ @Override public int getJobSize() { return jobs.size(); } /** * 工做線程 */ public class Worker implements Runnable { private volatile boolean running = false; @Override public void run() { while (running) { Job job = null; synchronized (jobs) { while (jobs.isEmpty()) { try { jobs.wait(); } catch (InterruptedException ex) { ex.printStackTrace(); Thread.currentThread().interrupt(); return; } } job = jobs.removeFirst(); } if (job != null) { try { job.run(); } catch (Exception ex) { ex.printStackTrace(); } } } } public void shutDown() { this.running = false; } } }
四:簡易的web http處理線程池
4.1:定義一個類,叫作SimlpeHttpHandler,其中維護着一個叫作HttpRequestHandler的job,這個job的做用就是經過socket監聽固定的端口(8080),而後經過流讀取web目錄中的文件,根據不一樣的文件格式封裝打印返回
public class SimleHttpHandler { static ThreadPool<HttpRequestHandler> threadPool = new DefaultThreadPool<HttpRequestHandler>(1); public String basePath; private ServerSocket serverSocket; @Resource private HttpRequestHandler httpRequstHandler; int port = 8080; /** * 設置端口 * * @param port */ public void setPort(int port) { if (port > 0) { this.port = port; } } /** * 設置基本路徑 * * @param basePath */ public void setBasePath(String basePath) { if (basePath != null) { boolean exist = new File(basePath).exists(); boolean directory = new File(basePath).isDirectory(); if (exist && directory) { this.basePath = basePath; } } } /** * 開始線程 * * @throws Exception */ public void start() throws Exception { serverSocket = new ServerSocket(port); Socket socket = null; while ((socket = serverSocket.accept()) != null) { try { threadPool.execute(new HttpRequestHandler(socket, basePath)); } catch (Exception ex) { ex.printStackTrace(); } finally { serverSocket.close(); } } } }
4.2:定義一個類叫作HttpRequestHandler實現Runnable接口,而後構造進入socket和路徑,在run方法中調用具體的處理方法:我將具體的業務封裝到
ServerRequestManager中,而後調用它的dealRequest方法進行具體的業務處理:
@Component public class HttpRequestHandler implements Runnable { private Socket socket; private String basePath; @Resource private ServerRequestManager serverRequestManager; public HttpRequestHandler(Socket socket, String basePath) { this.basePath = basePath; this.socket = socket; } @Override public void run() { serverRequestManager.dealRequest(basePath); } }
4.3:服務器的具體處理邏輯,這裏就是根據當前的路徑用流讀取路徑中的文件,一旦檢測到文件的後綴是.jpg或者ico,就將其輸出爲http的內容類型爲img類型的圖片,不然輸出爲text類型。最後用colse方法來關閉流
*/ @Component public class ServerRequestManager { private Socket socket; public static final String httpOK = "HTTP/1.1 200 ok"; public static final String molly = "Server:Molly"; public static final String contentType = "Content-Type:"; /** * 處理請求 * * @param basePath */ public void dealRequest(String basePath) { String content = null; BufferedReader br = null; BufferedReader reader = null; PrintWriter out = null; InputStream in = null; try { reader = new BufferedReader(new InputStreamReader(socket.getInputStream())); String header = reader.readLine(); String filePath = basePath + header.split(" ")[1]; out = new PrintWriter(socket.getOutputStream()); if (filePath.endsWith("jpg") || filePath.endsWith("ico")) { in = new FileInputStream(filePath); ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); int index = 0; while ((index = in.read()) != -1) { byteArrayOutputStream.write(index); } byte[] array = byteArrayOutputStream.toByteArray(); out.print(httpOK); out.print(molly); out.println(contentType + " image/jpeg"); out.println("Content-Length" + array.length); out.print(""); socket.getOutputStream().write(array, 0, array.length); } else { br = new BufferedReader(new InputStreamReader(new FileInputStream(filePath))); out = new PrintWriter(socket.getOutputStream()); out.print(httpOK); out.print(molly); out.print(contentType + "text/html; Charset =UTF-8"); out.print(""); while ((content = br.readLine()) != null) { out.print(content); } } out.flush(); } catch (final Exception ex) { ex.printStackTrace(); out.println("HTTP/1.1 500"); out.println(""); out.flush(); } finally { close(br, in, out, socket); } } /** * 關閉流 * * @param closeables */ public static void close(Closeable... closeables) { if (closeables != null) { for (Closeable closeable : closeables) { try { closeable.close(); } catch (Exception ex) { ex.printStackTrace(); } } } } }
五:總結
本篇博客總結了如何開發一個簡單的線程池,固然功能不夠齊全,比不上jdk的線程池,沒有阻塞隊列和超時時間和拒絕策略等;而後會用socket監聽8080端口,獲取web根目錄讀取目錄下的文件,而後輸出對應的格式內容。實現的功能很簡單,沒有什麼複雜的,不過我覺的這篇這篇博客能讓我學習的地方就是線程池的使用方法,在處理高併發的請求時,線程池技術基本是必不可少的。
參考資料《java併發編程的藝術》
*假如你想學習java,或者看本篇博客有任務問題,能夠添加java羣:618626589