HTTP Client MultiThreadedHttpConnectionManager線程安全鏈接管理類源碼解析

MultiThreadedHttpConnectionManager 是HTTP Client中用來複用鏈接的鏈接管理類,能夠經過多線程

      MultiThreadedHttpConnectionManager n =  new MultiThreadedHttpConnectionManager();併發

      HttpClient client = new HttpClient(n);socket

這樣的方式去 建立一個Client 實例,分佈式

建立後,每當執行post

int statusCode = client.executeMethod(postMethod);時 性能

http client 委託ConnectionManager建立鏈接,實際上是先委託HttpMethodDirector 執行excute方法,測試

再經過它委託ConnectionManager 建立鏈接,HttpMethodDirector 中包含了一下host,請求參數等信息。this

在建立鏈接時,HttpMethodDirector 中有以下代碼:url

  
1 if ( this .conn == null ) { 2 3 this .conn = connectionManager.getConnectionWithTimeout( hostConfiguration, this .params.getConnectionManagerTimeout() 4 ); 5 ...... 6 } 7

ConnectionManager 使用了經常使用的多態的方式將鏈接的獲取交給子類完成。 加強其擴展性。spa

ConnectionManager 有三個子類:

1

對應於:

1. 一次性的鏈接:

Image(1)

2. 線程池中獲取鏈接:

Image(2)

3. 複用當前SimpleHttpConnectionManager中的一個成員變量,策略是沒有則建立,有則覆蓋後返回

Image(3)

重點說下MultiThreadedHttpConnectionManager   中鏈接的獲取:

在使用 MultiThreadedHttpConnectionManager  獲取鏈接的時候,MultiThreadedHttpConnectionManager  使用了鏈接池的概念針對每一個
HostConfiguration 作了鏈接的管理,即 HostConfiguration 做爲Key ,鏈接池(HostConnectionPool)做爲value去管理當前host下的全部鏈接,
HostConfiguration的實例以下: HostConfiguration[host=http://www.taobao.com]

HostConnectionPool 中使用鏈表 管理了 空閒的鏈接和等待鏈接的線程隊列。

每次獲取鏈接的時候 根據參數(後面會提到)決定是直接從池中獲取一個空閒鏈接,建立一個鏈接,仍是計算出一個等待時間後 將當前線程沉睡這麼久。然後再檢查。

Http Client 經過協議對應的ProtocolSocketFactory去建立一個socket鏈接來發送請求和接受響應

使用注意事項:

1. MultiThreadedHttpConnectionManager  中有如下兩個變量,分別解釋:

     a. 每一個host最大同時能夠獲取的鏈接數, 大於這個數字後, (1,2號線程正在使用鏈接)3號線程會wait 沉睡住 直到到達時間或者被打斷或者1,2號中有人release這個connection,拋出異常。

          注意,若是是HTTP client 來調用接口的話 這個例如(http://www.taobao.com 那他的host是 www.taobao.com) 這個值應該設置大一點 不然不少線程調用這個接口的時候會阻塞住。

     b. 同一時間MultiThreadedHttpConnectionManager  容許的最大鏈接數,超過這個數字,鏈接的創建將會阻塞。直到有空閒鏈接釋放。

Image(4)

使用注意事項測試代碼:  下劃線的兩個方法能夠調整後觀察結果

  
1 public static void main(String[] sadfasd) throws HttpException, IOException, InterruptedException{ 2 final String url = " http://www.taobao.com " ; 3 final HttpClient client = new HttpClient(); 4 final MultiThreadedHttpConnectionManager connectionManager = new MultiThreadedHttpConnectionManager(); 5 connectionManager.setMaxTotalConnections ( 1 ); // 總的鏈接數 6 connectionManager.setMaxConnectionsPerHost ( 2 ); // 每一個host的最大鏈接數 7 client.setHttpConnectionManager( connectionManager ); 8 9 Runnable r = new Runnable(){ 10 public void run(){ 11 int statusCode = 0 ; 12 PostMethod postMethod = new PostMethod(url); 13 try { 14 statusCode = client.executeMethod(postMethod); 15 System. out.println( " sleep " + statusCode ); 16 Thread. sleep( 3000 ); // 10s 17 postMethod.releaseConnection(); 18 } catch (HttpException e) { 19 e.printStackTrace(); 20 } catch (IOException e) { 21 e.printStackTrace(); 22 } catch (InterruptedException e) { 23 e.printStackTrace(); 24 } 25 }; 26 }; 27 Runnable r1 = new Runnable(){ 28 public void run(){ 29 int statusCode = 0 ; 30 PostMethod postMethod = new PostMethod(url); 31 try { 32 statusCode = client.executeMethod(postMethod); 33 } catch (HttpException e) { 34 e.printStackTrace(); 35 } catch (IOException e) { 36 e.printStackTrace(); 37 } 38 System. out.println( statusCode ); 39 postMethod.releaseConnection(); 40 }; 41 }; 42 Runnable r2 = new Runnable(){ 43 public void run(){ 44 int statusCode = 0 ; 45 PostMethod postMethod = new PostMethod(url); 46 try { 47 statusCode = client.executeMethod(postMethod); 48 } catch (HttpException e) { 49 e.printStackTrace(); 50 } catch (IOException e) { 51 e.printStackTrace(); 52 } 53 System. out.println( statusCode ); 54 postMethod.releaseConnection(); 55 }; 56 }; 57 Runnable r3 = new Runnable(){ 58 public void run(){ 59 int statusCode = 0 ; 60 PostMethod postMethod = new PostMethod(url); 61 try { 62 statusCode = client.executeMethod(postMethod); 63 } catch (HttpException e) { 64 e.printStackTrace(); 65 } catch (IOException e) { 66 e.printStackTrace(); 67 } 68 System. out.println( statusCode ); 69 postMethod.releaseConnection(); 70 }; 71 }; 72 new Thread(r).start(); 73 Thread. sleep( 1000 ); 74 new Thread(r1).start(); 75 new Thread(r2).start(); 76 new Thread(r3).start(); 77 78 } 79

 

釋放鏈接:

在咱們調用postMethod.releaseConnection()時, 會調用connectionManager的releaseConnection方法。
注意:進入這個方法後會首先同步整個connectionPool(鏈接池)對象,這意味着,在多鏈接複用的時候頻繁的釋放鏈接,也是會有性能損耗的,同步整個connectionPool後鏈接的建立都會受影響。
而後開始歸還鏈接,歸還的方式很清晰:

1. 將Connection放到基於host的鏈接池的空閒鏈表中
    hostPool. freeConnections .add(conn);
2.將Connection放到整個全局的connectionPool的空閒鏈表中
3. 將Connection從Reference Map中移除(Reference Map 後面單獨講解)
4. 將Connection加入到超時管理中去。
5. 將hostPool(host鏈接池)裏等待隊列的頭元素拿出來 發送interrupt的信號量。目的是 喚醒等待鏈接的線程。

到目前爲止,有兩個點能夠詳細說下
1. Reference Map的做用。
2.  等待鏈接的線程的處理方式。

首先說Reference Map,這個名字是我本身取的。它在MultiThreadedHttpConnectionManager  中的名字叫作:

Image(5)

在每次獲取鏈接和釋放鏈接的時候會將」鏈接「存入和移除。

注意: 這裏的」鏈接「 已經不是Connection 而是用 WeakReference包裝過的Connection。

爲何用WeakReference?

這裏的概念和ThreadLocal 中用WeakReference 包裝ThreadLocalMap中的Key同樣。

目的是爲了 在鏈接丟失時,HTTP client 失去了對「鏈接」(Connection)的強引用,該鏈接對象變成了弱引用對象,能夠被GC掉。

因此,每次在獲取鏈接的時候 要將鏈接用WeakReference 包裝後放到REFERENCE_TO_CONNECTION_SOURCE 這個Map中,

每次釋放鏈接時,將它從REFERENCE_TO_CONNECTION_SOURCE 中移除,由於這個時候鏈接的管理由線程池使用強引用管理。

再說,等待鏈接的線程的處理方式

先看 獲取鏈接時的代碼 和註釋  大部分代碼被精簡了。 因此邏輯不通,看流程便可。

  
1 synchronized (connectionPool) { 2 while (connection == null ) { 3 if (hostPool.freeConnections.size() > 0 ) { 4 // 有線程池中有空閒的鏈接 5 connection = connectionPool.getFreeConnection(hostConfiguration); 6 7 } else if ((hostPool.numConnections < maxHostConnections) && (connectionPool.numConnections < maxTotalConnections)) { 8 // 沒有空閒鏈接,可是知足前文的兩個條件 能夠建立新的鏈接 9 connection = connectionPool.createConnection(hostConfiguration); 10 11 } else if ((hostPool.numConnections < maxHostConnections) && (connectionPool.freeConnections.size() > 0 )) { 12 13 // 整個鏈接數 沒有到達最大,而且有空閒鏈接(其餘host池中) 則刪除掉其餘host中的鏈接,而且在當前host池子中建立新鏈接 14 connectionPool.deleteLeastUsedConnection(); 15 connection = connectionPool.createConnection(hostConfiguration); 16 } else { 17 // 以上條件都不知足, 只能將當前線程睡眠 18 try { 19 waitingThread = new WaitingThread(); // 建立一個線程包裝類 20 waitingThread.hostConnectionPool = hostPool; // 指定所屬的host鏈接池 21 waitingThread.thread = Thread.currentThread(); // 將當前線程賦值 22 startWait = System.currentTimeMillis (); 23 24 hostPool.waitingThreads.addLast(waitingThread); // 將線程包裝類 添加到host鏈接池的 等待列表中 25 connectionPool.waitingThreads.addLast(waitingThread); // 將線程包裝類 添加到全局鏈接池的 等待列表中 26 connectionPool.wait(timeToWait); // 沉睡 27 } catch (InterruptedException e) { 28 // 被打斷是檢查 布爾變量interruptedByConnectionPool 肯定是 HTTP 釋放鏈接後 主動打斷的,仍是其餘異常緣由打斷 29 // 是本身打斷的 catch住異常後什麼也不作,從新進入while循環中,嘗試獲取鏈接 30 if ( ! waitingThread.interruptedByConnectionPool) { 31 throw new IllegalThreadStateException( " Interrupted while waiting in MultiThreadedHttpConnectionManager " ); 32 } 33 } finally { 34 if ( ! waitingThread.interruptedByConnectionPool) { 35 hostPool.waitingThreads.remove(waitingThread); 36 connectionPool.waitingThreads.remove(waitingThread); 37 } 38 if (useTimeout) { 39 endWait = System.currentTimeMillis (); 40 timeToWait -= (endWait - startWait); 41 } 42 } 43 } 44 } 45 } 46 47

釋放鏈接時

調用notifyWaitingThread 方法,結合上面的代碼看:

  
1 public synchronized void notifyWaitingThread(HostConnectionPool hostPool) { 2 3 // find the thread we are going to notify, we want to ensure that each 4 // waiting thread is only interrupted once so we will remove it from 5 // all wait queues before interrupting it 6 WaitingThread waitingThread = null ; 7 // 取出 等待的線程後發送 interrupt 信號量, 8 9 if (hostPool.waitingThreads.size() > 0 ) { 10 11 waitingThread = ( WaitingThread) hostPool.waitingThreads.removeFirst(); 12 waitingThreads.remove(waitingThread); 13 } else if (waitingThreads .size() > 0 ) { 14 15 waitingThread = ( WaitingThread) waitingThreads.removeFirst(); 16 waitingThread.hostConnectionPool.waitingThreads.remove(waitingThread); 17 } 18

// 致使 獲取鏈接的那個方法中 捕獲異常

// 注:interrupt 信號量是必定會引發 interruptException的

// 將interruptedByConnectionPool 設置爲true 好標明 是 HTTP client 手動打斷的。 這是HTTP client對於等待線程喚醒方式的核心思路

  
1 if (waitingThread != null ) { 2 3 waitingThread.interruptedByConnectionPool = true ; 4 5 waitingThread.thread.interrupt(); 6 7 } 8 9 } 10

上面兩端代碼主要思路就是: 有空鏈接就直接用,沒有則沉睡等待喚醒。

其實用interrupt信號量 會引發interruptException異常,經過catch住異常來處理,是比較粗暴的。

優雅的用 wait and notify的方式 就不須要catch異常,一樣能達到喚醒線程效果,並且很優雅。

MultiThreadedHttpConnectionManager  中對弱引用的使用

MultiThreadedHttpConnectionManager  類中 還有一個 ReferenceQueueThread類 是用來配合HttpConnectionWithReference(將鏈接用弱引用包裹後的對象)使用的

Image(6)

使用的方式是這樣:

1. 建立鏈接時,用弱引用包裹住Connection對象放到REFERENCE_TO_CONNECTION_SOURCE  中,目的是防止在鏈接丟失的時候Map中的這個HttpConnectionWithReference 對象變成弱引用,

     在GC回收時會被回收掉,防止內存泄露。

2. 首先明確的是,JVM會在HttpConnectionWithReference 被回收的時候,將他加入到REFERENCE_QUEUE 中。這是JAVA對於弱引用的規則。

3. 同時,在將HttpConnectionWithReference  放入Map時,啓動一個子線程 ReferenceQueueThread  去監聽 這個REFERENCE_QUEUE ,只要這個REFERENCE_QUEUE  有值(被GC回收的時候)

     立馬被取出來,將線程池可用鏈接的大小 -1 。

MultiThreadedHttpConnectionManager  使用弱引用 確保了

1. connection對象丟失時 內存的及時回收。

2. 搭配隊列和子線程確保,鏈接丟失後線程池中可用鏈接數的次數能夠修改。

說到這裏,HTTP Client的MultiThreadedHttpConnectionManager  類的絕大部分分方法已經解釋完畢了。其中主要是省略掉了,發送和讀取HTTP 報文的代碼,沒有太多技巧,以規則解析出來便可。

總結:

1. 在單純的發送請求的場景下,使用MultiThreadedHttpConnectionManager 來代替SimpleHTTPConnectionManger是可行的,而且MultiThreadedHttpConnectionManager 的鏈接池機制也會提升發送請求的效率,

2. 可是以爲不符合分佈式應用間的藉口調用,緣由很簡單,對每一個host作了鏈接池,在必定狀況下,這個限制是致命的,直接影響了接口的調用效率。嚴重影響調用的併發數。因此,在分佈式應用的調用中不適合使用MultiThreadedHttpConnectionManager 。

MultiThreadedHttpConnectionManager類中幾個值得注意的點:


1. 鏈接的管理,特別是使用WeakReference包裝Connection對象,而後結合一個子線程和Queque去確保對象被回收時,能夠鏈接數的增長。

2. 對於沒有鏈接可用時,使用使當前線程睡眠的,在釋放鏈接時 使用 interrupt信號量 是等待線程恢復的處理方式

相關文章
相關標籤/搜索