Java線程池管理及分佈式Hadoop調度框架搭建

【編者按】多線程是程序員面試時經常會面對的問題,對多線程概念的掌握和理解水平,也經常被用來衡量一我的的編程實力。不錯,普通的多線程已經不容易了,那麼當多線程碰到「大象」又會產生什麼樣的火花?這裏咱們爲你們分享上海創行科技技術總監嚴瀾的博文——Java線程池管理及分佈式Hadoop調度框架搭建。


如下爲原文
平時的開發中線程是個少不了的東西,好比tomcat裏的servlet就是線程,沒有線程咱們如何提供多用戶訪問呢?不過不少剛開始接觸線程的開發工程師卻在這個上面吃了很多苦頭。怎麼作一套簡便的線程開發模式框架讓你們從單線程開發快速轉入多線程開發,這確實是個比較難搞的工程。
那具體什麼是線程呢?首先看看進程是什麼,進程就是系統中執行的一個程序,這個程序可使用內存、處理器、文件系統等相關資源。例如QQ軟件、Eclipse、Tomcat等就是一個exe程序,運行啓動起來就是一個進程。爲何須要多線程?若是每一個進程都是單獨處理一件事情不能多個任務同時處理,好比咱們打開qq只能和一我的聊天,咱們用eclipse開發代碼的時候不能編譯代碼,咱們請求tomcat服務時只能服務一個用戶請求,那我想咱們還在原始社會。多線程的目的就是讓一個進程可以同時處理多件事情或者請求。好比如今咱們使用的QQ軟件能夠同時和多我的聊天,咱們用eclipse開發代碼時還能夠編譯代碼,tomcat能夠同時服務多個用戶請求。
線程這麼多好處,怎麼把單進程程序變成多線程程序呢?不一樣的語言有不一樣的實現,這裏說下java語言的實現多線程的兩種方式:擴展java.lang.Thread類、實現java.lang.Runnable接口。
先看個例子,假設有100個數據須要分發而且計算。看下單線程的處理速度:

java

  1.     package thread;  
  2.       
  3.     import java.util.Vector;  
  4.       
  5.     public class OneMain {  
  6.            public static void main(String[] args) throws InterruptedException {  
  7.                 Vector<Integer> list = new Vector<Integer>(100);  
  8.       
  9.                  for (int i = 0; i < 100; i++) {  
  10.                       list.add(i);  
  11.                 }  
  12.       
  13.                  long start = System.currentTimeMillis();  
  14.                  while (list.size() > 0) {  
  15.                        int val = list.remove(0);  
  16.                       Thread. sleep(100);//模擬處理  
  17.                       System. out.println(val);  
  18.                 }  
  19.                  long end = System.currentTimeMillis();  
  20.       
  21.                 System. out.println("消耗 " + (end - start) + " ms");  
  22.       
  23.           }  
  24.       
  25.            // 消耗 10063 ms  
  26.     }  

  27. 再看一下多線程的處理速度,採用了10個線程分別處理:

  28. [java] view plaincopy在CODE上查看代碼片派生到個人代碼片

  29.     package thread;  
  30.       
  31.     import java.util.Vector;  
  32.     import java.util.concurrent.CountDownLatch;  
  33.       
  34.     public class MultiThread extends Thread {  
  35.          static Vector<Integer> list = new Vector<Integer>(100);  
  36.          static CountDownLatch count = new CountDownLatch(10);  
  37.       
  38.          public void run() {  
  39.       
  40.               while (list.size() > 0) {  
  41.                    try {  
  42.                         int val = list.remove(0);  
  43.                         System.out.println(val);  
  44.                         Thread.sleep(100);//模擬處理  
  45.                    } catch (Exception e) {  
  46.                         // 可能數組越界,這個地方只是爲了說明問題,忽略錯誤  
  47.                    }  
  48.       
  49.               }  
  50.                
  51.               count.countDown(); // 刪除成功減一  
  52.       
  53.          }  
  54.       
  55.          public static void main(String[] args) throws InterruptedException {  
  56.                
  57.               for (int i = 0; i < 100; i++) {  
  58.                    list.add(i);  
  59.               }  
  60.                
  61.               long start = System.currentTimeMillis();  
  62.       
  63.               for (int i = 0; i < 10; i++) {  
  64.                    new MultiThread().start();  
  65.               }  
  66.       
  67.                
  68.       
  69.               count.await();  
  70.               long end = System.currentTimeMillis();  
  71.               System.out.println("消耗 " + (end - start) + " ms");  
  72.       
  73.          }  
  74.       
  75.          // 消耗 1001 ms  
  76.     }  
複製代碼



你們看到了線程的好處了吧!單線程須要10S,10個線程只須要1S。充分利用了系統資源實現並行計算。也許這裏會產生一個誤解,是否是增長的線程個數越多效率越高。線程越多處理性能越高這個是錯誤的,範式都要合適,過了就很差了。須要普及一下計算機硬件的一些知識。咱們的cpu是個運算器,線程執行就須要這個運算器來運行。不過這個資源只有一個,你們就會爭搶。通常經過如下幾種算法實現爭搶cpu的調度:
程序員

  • 隊列方式,先來先服務。不論是什麼任務來了都要按照隊列排隊先來後到。
  • 時間片輪轉,這也是最古老的cpu調度算法。設定一個時間片,每一個任務使用cpu的時間不能超過這個時間。若是超過了這個時間就把任務暫停保存狀態,放到隊列尾部繼續等待執行。
  • 優先級方式:給任務設定優先級,有優先級的先執行,沒有優先級的就等待執行。

這三種算法都有優缺點,實際操做系統是結合多種算法,保證優先級的可以先處理,可是也不能一直處理優先級的任務。硬件方面爲了提升效率也有多核cpu、多線程cpu等解決方案。目前看得出來線程增多了會帶來cpu調度的負載增長,cpu須要調度大量的線程,包括建立線程、銷燬線程、線程是否須要換出cpu、是否須要分配到cpu。這些都是須要消耗系統資源的,由此,咱們須要一個機制來統一管理這一堆線程資源。線程池的理念提出解決了頻繁建立、銷燬線程的代價。線程池指預先建立好必定大小的線程等待隨時服務用戶的任務處理,沒必要等到用戶須要的時候再去建立。特別是在java開發中,儘可能減小垃圾回收機制的消耗就要減小對象的頻繁建立和銷燬。
以前咱們都是本身實現的線程池,不過隨之jdk1.5的推出,jdk自帶了java.util.concurrent併發開發框架,解決了咱們大部分線程池框架的重複工做。可使用Executors來創建線程池,列出如下大概的,後面再介紹。
web

  • newCachedThreadPool創建具備緩存功能線程池
  • newFixedThreadPool創建固定數量的線程
  • newScheduledThreadPool創建具備時間調度的線程

有了線程池後有如下幾個問題須要考慮:
面試

  • 線程怎麼管理,好比新建任務線程。
  • 線程如何中止、啓動。
  • 線程除了scheduled模式的間隔時間定時外可否實現精確時間啓動。好比晚上1點啓動。
  • 線程如何監控,若是線程執行過程當中死掉了,異常終止咱們怎麼知道。

考慮到這幾點,咱們須要把線程集中管理起來,用java.util.concurrent是作不到的。須要作如下幾點:

算法

  • 將線程和業務分離,業務的配置單獨作成一個表。
  • 構建基於concurrent的線程調度框架,包括能夠管理線程的狀態、中止線程的接口、線程存活心跳機制、線程異常日誌記錄模塊。
  • 構建靈活的timer組件,添加quartz定時組件實現精準定時系統。
  • 和業務配置信息結合構建線程池任務調度系統。能夠經過配置管理、添加線程任務、監控、定時、管理等操做。

組件圖爲:



構建好線程調度框架是否是就能夠應對大量計算的需求了呢?答案是否認的。由於一個機器的資源是有限的,上面也提到了cpu是時間週期的,任務一多了也會排隊,就算增長cpu,一個機器能承載的cpu也是有限的。因此須要把整個線程池框架作成分佈式的任務調度框架才能應對橫向擴展,好比一個機器上的資源達到瓶頸了,立刻增長一臺機器部署調度框架和業務就能夠增長計算能力了。好了,如何搭建?以下圖:


基於jeeframework咱們封裝spring、ibatis、數據庫等操做,而且能夠調用業務方法完成業務處理。主要組件爲:
spring

  • 任務集中存儲到數據庫服務器
  • 控制中心負責管理集羣中的節點狀態,任務分發
  • 線程池調度集羣負責控制中心分發的任務執行
  • web服務器經過可視化操做任務的分派、管理、監控。

通常這個架構能夠應對經常使用的分佈式處理需求了,不過有個缺陷就是隨着開發人員的增多和業務模型的增多,單線程的編程模型也會變得複雜。好比須要對1000w數據進行分詞,若是這個放到一個線程裏來執行,不算計算時間消耗光是查詢數據庫就須要耗費很多時間。有人說,那我把1000w數據打散放到不一樣機器去運算,而後再合併不就好了嗎?由於這是個特例的模式,專爲了這個需求去開發相應的程序沒有問題,可是之後又有其餘的海量需求如何辦?好比把倒退3年的全部用戶發的帖子中發帖子最多的粉絲轉發的最高的用戶做息時間取出來。又得編一套程序實現,太麻煩!分佈式雲計算架構要解決的就是這些問題,減小開發複雜度而且要高性能,你們會不會想到一個最近很熱的一個框架,hadoop,沒錯就是這個玩意。hadoop解決的就是這個問題,把大的計算任務分解、計算、合併,這不就是咱們要的東西嗎?不過玩過這個的人都知道他是一個單獨的進程。不是!他是一堆進程,怎麼和咱們的調度框架結合起來?看圖說話:

基本前面的分佈式調度框架組件不變,增長以下組件和功能:
數據庫

  • 改造分佈式調度框架,能夠把自己線程任務變成mapreduce任務並提交到hadoop集羣。
  • hadoop集羣可以調用業務接口的spring、ibatis處理業務邏輯訪問數據庫。
  • hadoop須要的數據可以經過hive查詢。
  • hadoop能夠訪問hdfs/hbase讀寫操做。
  • 業務數據要及時加入hive倉庫。
  • hive處理離線型數據、hbase處理常常更新的數據、hdfs是hive和hbase的底層結構也能夠存放常規文件。

這樣,整個改造基本完成。不過須要注意的是架構設計必定要減小開發程序的複雜度。這裏雖然引入了hadoop模型,可是框架上開發者仍是隱藏的。業務處理類既能夠在單機模式下運行也能夠在hadoop上運行,而且能夠調用spring、ibatis。減小了開發的學習成本,在實戰中慢慢體會就學會了一項新技能。
界面截圖:

編程

相關文章
相關標籤/搜索