OkHttp(三)責任鏈

OkHttp(三)

前兩篇文章,講述了OkHttp的基礎的使用與請求的調度狀況,而今天就讓咱們來看看OkHttp的精髓之一-責任鏈模式。web

責任鏈模式

前面的文章中咱們看到,當實際進行網絡請求時,不管是同步請求仍是異步請求都會使用getResponseWithInterceptorChain() 這個方法,因此咱們先從這個方法開始研究。算法

fun getResponseWithInterceptorChain(): Response {
    // Build a full stack of interceptors.
    // 添加各類攔截器 這個後面逐一介紹
    val interceptors = mutableListOf<Interceptor>()
    // 自定義的一個攔截器
    interceptors += client.interceptors
    // 系統內置的攔截器
    interceptors += RetryAndFollowUpInterceptor(client)
    interceptors += BridgeInterceptor(client.cookieJar)
    interceptors += CacheInterceptor(client.cache)
    interceptors += ConnectInterceptor
    if (!forWebSocket) {
      interceptors += client.networkInterceptors
    }
    interceptors += CallServerInterceptor(forWebSocket)
    
    // 建立責任鏈
    val chain = RealInterceptorChain(interceptors, transmitter, null, 0, originalRequest, this,
        client.connectTimeoutMillis, client.readTimeoutMillis, client.writeTimeoutMillis)

    var calledNoMoreExchanges = false
    try {
      // 執行責任鏈
      val response = chain.proceed(originalRequest)
      if (transmitter.isCanceled) {
        response.closeQuietly()
        throw IOException("Canceled")
      }
      return response
    } catch (e: IOException) {
      calledNoMoreExchanges = true
      throw transmitter.noMoreExchanges(e) as Throwable
    } finally {
      if (!calledNoMoreExchanges) {
        transmitter.noMoreExchanges(null)
      }
    }
  }

咱們能夠看到,該方法中將攔截器逐一添加集合中,並建立了一個責任鏈,用chain.proceed()方法來執行請求。promise

OkHttp採用責任鏈的模式來使每一個功能分開,每一個Interceptor自行完成本身的任務,而且將不屬於本身的任務交給下一個,簡化了各自的責任和邏輯。瀏覽器

接下來看看proceed的方法緩存

override fun proceed(request: Request): Response {
    return proceed(request, transmitter, exchange)
  }

  @Throws(IOException::class)
  fun proceed(request: Request, transmitter: Transmitter, exchange: Exchange?): Response {
    if (index >= interceptors.size) throw AssertionError()

    calls++

    ...
    
    // 獲取下一個攔截器,鏈中的攔截器集合index+1
    // Call the next interceptor in the chain.
    val next = RealInterceptorChain(interceptors, transmitter, exchange,
        index + 1, request, call, connectTimeout, readTimeout, writeTimeout)
    val interceptor = interceptors[index]

    @Suppress("USELESS_ELVIS")
    // 執行當前的攔截器-若是在配置okhttpClient,時沒有設置intercept默認是先執行:retryAndFollowUpInterceptor 攔截器`
    val response = interceptor.intercept(next) ?: throw NullPointerException(
        "interceptor $interceptor returned null")

    ...

    return response
  }

在該方法中咱們能夠看到遞歸調用了下一個攔截器,當全部攔截器調用完畢後,返回咱們所得的Response。每一個攔截器都重寫了intercept()方法,用以執行請求。服務器

責任鏈的一個執行過程以下圖cookie

20190710140954641.png

接下來讓咱們分析默認責任鏈的一個做用,並做出一些源碼分析。網絡

RetryAndFollowUpInterceptor

其建立過程是在 構建newCall對象時app

private RealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {
    this.client = client;
    this.originalRequest = originalRequest;
    this.forWebSocket = forWebSocket;
    this.retryAndFollowUpInterceptor = new RetryAndFollowUpInterceptor(client, forWebSocket);
  }
  ...
  @Override public Call newCall(Request request) {
    return RealCall.newRealCall(this, request, false /* for web socket */);
  }

簡單看一下使用的過程
首先建立了transmitter對象,他封裝了網絡請求相關的信息:鏈接池,地址信息,網絡請求,事件回調,負責網絡鏈接的鏈接、關閉,釋放等操做。框架

var request = chain.request()
    val realChain = chain as RealInterceptorChain
    val transmitter = realChain.transmitter()

然後則進入了網絡鏈接的循環

//計數器 屢次相應的次數是由限制的,不一樣瀏覽器推薦的次數不一樣,還特別強調了HTTP 1.0協議推薦5次。
    var followUpCount = 0
    var priorResponse: Response? = null
    while (true) {
      //準備鏈接
      transmitter.prepareToConnect(request)

      if (transmitter.isCanceled) {
        throw IOException("Canceled")
      }

      var response: Response
      var success = false
      try {
        // 獲得最終的結果
        response = realChain.proceed(request, transmitter, null)
        success = true
      } catch (e: RouteException) {
        // The attempt to connect via a route failed. The request will not have been sent.
        //鏈接地址的異常,判斷是否能可以恢復,也就是是否要重試
        if (!recover(e.lastConnectException, transmitter, false, request)) {
          throw e.firstConnectException
        }
        continue
      } catch (e: IOException) {
        // An attempt to communicate with a server failed. The request may have been sent.
        // 鏈接服務器的異常 判斷網絡請求是否已經開始
        val requestSendStarted = e !is ConnectionShutdownException
        // 同上
        if (!recover(e, transmitter, requestSendStarted, request)) throw e
        continue
      } finally {
        // The network call threw an exception. Release any resources.
        // 釋放資源
        if (!success) {
          transmitter.exchangeDoneDueToException()
        }
      }

      // Attach the prior response if it exists. Such responses never have a body.
      //若是不爲空保存到Response中
      if (priorResponse != null) {
        response = response.newBuilder()
            .priorResponse(priorResponse.newBuilder()
                .body(null)
                .build())
            .build()
      }

      val exchange = response.exchange
      val route = exchange?.connection()?.route()
      // 判斷返回結果response,是否須要繼續完善請求,例如證書驗證等等
      val followUp = followUpRequest(response, route)
      
      // 若是不須要繼續完善網絡請求,返回response
      if (followUp == null) {
        if (exchange != null && exchange.isDuplex) {
          transmitter.timeoutEarlyExit()
        }
        return response
      }
      
      // 若是body內容只能發送一次 直接放回
      val followUpBody = followUp.body
      if (followUpBody != null && followUpBody.isOneShot()) {
        return response
      }

      response.body?.closeQuietly()
      if (transmitter.hasExchange()) {
        exchange?.detachWithViolence()
      }
      
      // 若是已經超過最大的網絡請求追加數,釋放鏈接,拋出協議異常
      if (++followUpCount > MAX_FOLLOW_UPS) {
        throw ProtocolException("Too many follow-up requests: $followUpCount")
      }
      // 更新下一次的網絡請求對象
      request = followUp
      // 保存上一次的請求結果
      priorResponse = response
    }

而後就是重試階段recover()的源碼了

/**
   * Report and attempt to recover from a failure to communicate with a server. Returns true if
   * `e` is recoverable, or false if the failure is permanent. Requests with a body can only
   * be recovered if the body is buffered or if the failure occurred before the request has been
   * sent.
   */
  private fun recover(
    e: IOException,
    transmitter: Transmitter,
    requestSendStarted: Boolean,
    userRequest: Request
  ): Boolean {
    // The application layer has forbidden retries.
    // 設置了不須要重試
    if (!client.retryOnConnectionFailure) return false

    // We can't send the request body again.
    // body內容只能發送一次
    if (requestSendStarted && requestIsOneShot(e, userRequest)) return false

    // This exception is fatal.
    // 判斷異常類型,是否要繼續嘗試,
    // 不會重試的類型:協議異常、Socketet異常而且網絡狀況還沒開始,ssl認證異常
    if (!isRecoverable(e, requestSendStarted)) return false

    // No more routes to attempt.
    // 已經沒有其餘可用的路由地址了
    if (!transmitter.canRetry()) return false
    // For failure recovery, use the same route selector with a new connection.
    // 其餘放回true
    return true
  }

咱們稍微屢一下上面源碼的流程:

  • 首先使用了transmitter對象(重要),用以提供相應的網絡鏈接相關的東西
  • 而後開始鏈接,而後又有着幾種狀況

    • 鏈接成功,且無後續操做(如認證等),直接放回
    • 鏈接成功,且有後續操做,則進入下一次循環
    • 鏈接失敗,RouteException和IOException異常,利用recover()判斷是否重試,不要重試則釋放資源,要重試則continue;
    • 鏈接成功,可是重試的次數超過限度,則有問題(能夠本身建立攔截器來修改重試次數)。

BridgeIntecepter

這個攔截器的功能較爲的簡單,請求以前對響應頭作了一些檢查,並添加一些頭,而後在請求以後對響應作一些處理(gzip解壓or設置cookie)。

仍是讓咱們看一下源碼。

override fun intercept(chain: Interceptor.Chain): Response {
    val userRequest = chain.request()
    val requestBuilder = userRequest.newBuilder()
    
    // 若是咱們有RequestBody,會寫一些header信息,如內容長度和內容類型等
    val body = userRequest.body
    if (body != null) {
      ...
    }
    
    // 對一些必要的屬性進行補充
    if (userRequest.header("Host") == null) {
      requestBuilder.header("Host", userRequest.url.toHostHeader())
    }

    if (userRequest.header("Connection") == null) {
      requestBuilder.header("Connection", "Keep-Alive")
    }

    // If we add an "Accept-Encoding: gzip" header field we're responsible for also decompressing
    // the transfer stream.
    // 默認的編碼格式gzip
    var transparentGzip = false
    if (userRequest.header("Accept-Encoding") == null && userRequest.header("Range") == null) {
      transparentGzip = true
      requestBuilder.header("Accept-Encoding", "gzip")
    }
    
    // 把以前的cookie存在header裏
    val cookies = cookieJar.loadForRequest(userRequest.url)
    if (cookies.isNotEmpty()) {
      requestBuilder.header("Cookie", cookieHeader(cookies))
    }

    if (userRequest.header("User-Agent") == null) {
      requestBuilder.header("User-Agent", userAgent)
    }
    
    // 獲得Response
    val networkResponse = chain.proceed(requestBuilder.build())
    
    // 保存新的cookie
    cookieJar.receiveHeaders(userRequest.url, networkResponse.headers)

    val responseBuilder = networkResponse.newBuilder()
        .request(userRequest)
    
    // 若是使用的gzip編碼,而且返回的response有body信息,對作相應的處理
    if (transparentGzip &&
        "gzip".equals(networkResponse.header("Content-Encoding"), ignoreCase = true) &&
        networkResponse.promisesBody()) {
      val responseBody = networkResponse.body
      if (responseBody != null) {
        val gzipSource = GzipSource(responseBody.source())
        val strippedHeaders = networkResponse.headers.newBuilder()
            .removeAll("Content-Encoding")
            .removeAll("Content-Length")
            .build()
        responseBuilder.headers(strippedHeaders)
        val contentType = networkResponse.header("Content-Type")
        responseBuilder.body(RealResponseBody(contentType, -1L, gzipSource.buffer()))
      }
    }

    return responseBuilder.build()
  }

CacheIntecepter

在看這個攔截器的源碼以前,咱們還得關注一件事情,OkHttp的緩存是怎樣緩存的呢?

OkHttp中的Cache類,採用了DiskLruCache,內部使用最近最少使用算法,優先淘汰最近時間內最少次使用的緩存對象,它只有硬存緩存,並無內存緩存,這是他緩存機制的一大缺陷,固然咱們能夠經過自定義緩存機制來解決這一問題。

在OkHttp中還存在一個緩存策略CacheStrategy
CacheStrategy的內部工廠類Factory中有一個getCandidate方法,會根據實際的請求生成對應的CacheStrategy類返回,是個典型的簡單工廠模式。其內部維護一個request和response,經過指定request和response來告訴CacheInterceptor是使用緩存仍是使用網絡請求,亦或二者同時使用。

瞭解完以後,咱們來看源碼:

override fun intercept(chain: Interceptor.Chain): Response {
    1.若是設置緩存而且當前request有緩存,則從緩存Cache中獲取當前請求request的緩存response
    val cacheCandidate = cache?.get(chain.request())

    val now = System.currentTimeMillis()
    
    // 2.傳入的請求request和獲取的緩存response經過緩存策略對象CacheStragy的工廠類get方法根據一些規則獲取緩存策略CacheStrategy
    //(這裏的規則根據請求的request和緩存的Response的header頭部信息生成的,好比是否有noCache標誌位,是不是immutable不可變,緩存是否過時等等)
    val strategy = CacheStrategy.Factory(now, chain.request(), cacheCandidate).compute()
    // 3.生成的CacheStrategy有2個變量,networkRequest和cacheRequest,若是networkRequest爲Null表示不進行網絡請求,若是cacheResponse爲null,則表示沒有有效緩存 
    val networkRequest = strategy.networkRequest
    val cacheResponse = strategy.cacheResponse

    cache?.trackResponse(strategy)
    
    // 4.緩存不可用,關閉
    if (cacheCandidate != null && cacheResponse == null) {
      // The cache candidate wasn't applicable. Close it.
      cacheCandidate.body?.closeQuietly()
    }

    // If we're forbidden from using the network and the cache is insufficient, fail.
    // 5.若是networkRequest和cacheResponse都爲Null,則表示不請求網絡且緩存爲null,返回504,請求失敗
    if (networkRequest == null && cacheResponse == null) {
      return Response.Builder()
          .request(chain.request())
          .protocol(Protocol.HTTP_1_1)
          .code(HTTP_GATEWAY_TIMEOUT)
          .message("Unsatisfiable Request (only-if-cached)")
          .body(EMPTY_RESPONSE)
          .sentRequestAtMillis(-1L)
          .receivedResponseAtMillis(System.currentTimeMillis())
          .build()
    }

    // If we don't need the network, we're done.
    // 6.若是不請求網絡,但存在緩存,則不請求網絡,直接返回緩存,結束,不執行下一個攔截器
    if (networkRequest == null) {
      return cacheResponse!!.newBuilder()
          .cacheResponse(stripBody(cacheResponse))
          .build()
    }
    
    // 7.不然,請求網絡,並調用下一個攔截器鏈,將請求轉發到下一個攔截器
    var networkResponse: Response? = null
    try {
      networkResponse = chain.proceed(networkRequest)
    } finally {
      // If we're crashing on I/O or otherwise, don't leak the cache body.
      if (networkResponse == null && cacheCandidate != null) {
        cacheCandidate.body?.closeQuietly()
      }
    }
    
    //8.請求網絡,而且網絡請求返回HTTP_NOT_MODIFIED,說明緩存有效,則合併網絡響應和緩存結果,同時更新緩存
    // If we have a cache response too, then we're doing a conditional get.
    if (cacheResponse != null) {
      if (networkResponse?.code == HTTP_NOT_MODIFIED) {
        val response = cacheResponse.newBuilder()
            .headers(combine(cacheResponse.headers, networkResponse.headers))
            .sentRequestAtMillis(networkResponse.sentRequestAtMillis)
            .receivedResponseAtMillis(networkResponse.receivedResponseAtMillis)
            // 清空以前的緩衝
            .cacheResponse(stripBody(cacheResponse))
            // 清空請求到的內容, 由於內容沒有改變
            .networkResponse(stripBody(networkResponse))
            .build()

        networkResponse.body!!.close()

        // Update the cache after combining headers but before stripping the
        // Content-Encoding header (as performed by initContentStream()).
        cache!!.trackConditionalCacheHit()
        cache.update(cacheResponse, response)
        return response
      } else {
        cacheResponse.body?.closeQuietly()
      }
    }
    
    //9.若沒有緩存,則寫入緩存
    val response = networkResponse!!.newBuilder()
        .cacheResponse(stripBody(cacheResponse))
        .networkResponse(stripBody(networkResponse))
        .build()

    if (cache != null) {
      if (response.promisesBody() && CacheStrategy.isCacheable(response, networkRequest)) {
        // Offer this request to the cache.
        val cacheRequest = cache.put(response)
        return cacheWritingResponse(cacheRequest, response)
      }
      // 若是請求的方法不須要緩存,移除緩存,例如post,put
      if (HttpMethod.invalidatesCache(networkRequest.method)) {
        try {
          cache.remove(networkRequest)
        } catch (_: IOException) {
          // The cache cannot be written.
        }
      }
    }

    return response
  }

讓咱們簡單梳理一下緩存流程

  • 從當前的Request中獲取緩存,看是否有緩存
  • 非網絡請求時,需結合是否有緩存進行判斷,若是有緩存,直接返回;若是沒有緩存,放回504
  • 是網絡請求時,若是放回304,則作一個小的修補便可;不然根據緩存策略來判斷是否要更新緩存(通常要)。

ConnectIntecepter(核心)

獲取鏈接這個過程較爲複雜,盡力來梳理這個過程。

首先咱們直接來看這個類的源碼,不難發現這個類的源碼較爲簡單,主要核心是transmitter的方法。

override fun intercept(chain: Interceptor.Chain): Response {
    val realChain = chain as RealInterceptorChain
    val request = realChain.request()
    val transmitter = realChain.transmitter()

    // We need the network to satisfy this request. Possibly for validating a conditional GET.
    val doExtensiveHealthChecks = request.method != "GET"
    
    // 利用重試的責任鏈生成的transmitter類 來獲取鏈接
    val exchange = transmitter.newExchange(chain, doExtensiveHealthChecks)

    return realChain.proceed(request, transmitter, exchange)
  }

而後咱們來看看這個類,transmitter

/** Returns a new exchange to carry a new request and response. */
  internal fun newExchange(chain: Interceptor.Chain, doExtensiveHealthChecks: Boolean): Exchange {
    ...//作一些檢查
       
    // 獲取鏈接 分配一個Connection和HttpCodec,爲最終的請求作準備
    val codec = exchangeFinder!!.find(client, chain, doExtensiveHealthChecks)
    val result = Exchange(this, call, eventListener, exchangeFinder!!, codec)
    ...
  }
fun find(
    client: OkHttpClient,
    chain: Interceptor.Chain,
    doExtensiveHealthChecks: Boolean
  ): ExchangeCodec {
    val connectTimeout = chain.connectTimeoutMillis()
    val readTimeout = chain.readTimeoutMillis()
    val writeTimeout = chain.writeTimeoutMillis()
    val pingIntervalMillis = client.pingIntervalMillis
    val connectionRetryEnabled = client.retryOnConnectionFailure

    try {
      // 獲取鏈接
      val resultConnection = findHealthyConnection(
          connectTimeout = connectTimeout,
          readTimeout = readTimeout,
          writeTimeout = writeTimeout,
          pingIntervalMillis = pingIntervalMillis,
          connectionRetryEnabled = connectionRetryEnabled,
          doExtensiveHealthChecks = doExtensiveHealthChecks
      )
      // 設置編碼,有Http1codec和Http2codec兩種方式 後者能夠複用鏈接
      return resultConnection.newCodec(client, chain)
    } catch (e: RouteException) {
      trackFailure()
      throw e
    } catch (e: IOException) {
      trackFailure()
      throw RouteException(e)
    }
// 獲取鏈接
  @Throws(IOException::class)
  private fun findHealthyConnection(
    connectTimeout: Int,
    readTimeout: Int,
    writeTimeout: Int,
    pingIntervalMillis: Int,
    connectionRetryEnabled: Boolean,
    doExtensiveHealthChecks: Boolean
  ): RealConnection {
    while (true) {
      // 查找新鏈接
      val candidate = findConnection(
          connectTimeout = connectTimeout,
          readTimeout = readTimeout,
          writeTimeout = writeTimeout,
          pingIntervalMillis = pingIntervalMillis,
          connectionRetryEnabled = connectionRetryEnabled
      )

      // If this is a brand new connection, we can skip the extensive health checks.
      // 若是是新鏈接 則直接使用
      synchronized(connectionPool) {
        if (candidate.successCount == 0) {
          return candidate
        }
      }

      // Do a (potentially slow) check to confirm that the pooled connection is still good. If it
      // isn't, take it out of the pool and start again.
      //判斷鏈接池中鏈接是否可用,若是不可用,則釋放該鏈接並從鏈接池中移除,並繼續尋找可用鏈接
      if (!candidate.isHealthy(doExtensiveHealthChecks)) {
        candidate.noNewExchanges()
        continue
      }

      return candidate
    }
  }

接着就是正式獲取鏈接這一步了,咱們從註釋中能夠看到,首先從已經存在的Connection來選取鏈接,然後從鏈接池中尋找,最後纔是新建鏈接。

/**
   * Returns a connection to host a new stream. This prefers the existing connection if it exists,
   * then the pool, finally building a new connection.
   */
  @Throws(IOException::class)
  private fun findConnection(
    connectTimeout: Int,
    readTimeout: Int,
    writeTimeout: Int,
    pingIntervalMillis: Int,
    connectionRetryEnabled: Boolean
  ): RealConnection {
    var foundPooledConnection = false
    var result: RealConnection? = null
    var selectedRoute: Route? = null
    var releasedConnection: RealConnection?
    val toClose: Socket?
    synchronized(connectionPool) {
      if (transmitter.isCanceled) throw IOException("Canceled")
      hasStreamFailure = false // This is a fresh attempt.
·     
      // 對現有鏈接作一個備份
      releasedConnection = transmitter.connection
      toClose = if (transmitter.connection != null && transmitter.connection!!.noNewExchanges) {
        // 獲得要關閉的鏈接的socket
        transmitter.releaseConnectionNoEvents()
      } else {
        null
      }
      
      // 若是可使用 則使用
      if (transmitter.connection != null) {
        // We had an already-allocated connection and it's good.
        result = transmitter.connection
        releasedConnection = null
      }
      
      // 若是沒有能夠用的鏈接,從鏈接池中查找
      if (result == null) {
        // Attempt to get a connection from the pool.
        // 以URL爲key查找
        if (connectionPool.transmitterAcquirePooledConnection(address, transmitter, null, false)) {
          foundPooledConnection = true
          result = transmitter.connection
        } else if (nextRouteToTry != null) {
          selectedRoute = nextRouteToTry
          nextRouteToTry = null
        } else if (retryCurrentRoute()) {
          selectedRoute = transmitter.connection!!.route() // 使用路由地址,能夠是代理地址
        }
      }
    }
    // 關閉以前的socket
    toClose?.closeQuietly()

    ... // 若是上面找到,直接返回
    if (result != null) {
      // If we found an already-allocated or pooled connection, we're done.
      return result!!
    }
    
    // If we need a route selection, make one. This is a blocking operation.
    var newRouteSelection = false
    // 選擇一個不空的路由
    if (selectedRoute == null && (routeSelection == null || !routeSelection!!.hasNext())) {
      newRouteSelection = true
      routeSelection = routeSelector.next()
    }

    var routes: List<Route>? = null
    synchronized(connectionPool) {
      if (transmitter.isCanceled) throw IOException("Canceled")

      if (newRouteSelection) {
        // Now that we have a set of IP addresses, make another attempt at getting a connection from
        // the pool. This could match due to connection coalescing.
        routes = routeSelection!!.routes
       
        // 根據IP地址和Route從鏈接池進行第二次查找
        if (connectionPool.transmitterAcquirePooledConnection(
                address, transmitter, routes, false)) {
          foundPooledConnection = true
          result = transmitter.connection
        }
      }

      if (!foundPooledConnection) {
        if (selectedRoute == null) {
          selectedRoute = routeSelection!!.next()
        }
        // 若是沒有找到,再使用下一個路由集合
        // Create a connection and assign it to this allocation immediately. This makes it possible
        // for an asynchronous cancel() to interrupt the handshake we're about to do.
        result = RealConnection(connectionPool, selectedRoute!!)
        connectingConnection = result
      }
    }

    // If we found a pooled connection on the 2nd time around, we're done.
    if (foundPooledConnection) {
      eventListener.connectionAcquired(call, result!!)
      return result!!
    }
    
    // 到這裏還沒找到鏈接,那就去建立這個鏈接
    // Do TCP + TLS handshakes. This is a blocking operation.
    result!!.connect(
        connectTimeout,
        readTimeout,
        writeTimeout,
        pingIntervalMillis,
        connectionRetryEnabled,
        call,
        eventListener
    )
    connectionPool.routeDatabase.connected(result!!.route())

    var socket: Socket? = null
    synchronized(connectionPool) {
      connectingConnection = null
      // Last attempt at connection coalescing, which only occurs if we attempted multiple
      // concurrent connections to the same host.
      // 若是result鏈接是http2.0鏈接,http2.0支持一個鏈接同時發起多個請求,這裏作去重判斷,防止建立多個
      if (connectionPool.transmitterAcquirePooledConnection(address, transmitter, routes, true)) {
        // We lost the race! Close the connection we created and return the pooled connection.
        result!!.noNewExchanges = true
        socket = result!!.socket()
        result = transmitter.connection

        // It's possible for us to obtain a coalesced connection that is immediately unhealthy. In
        // that case we will retry the route we just successfully connected with.
        nextRouteToTry = selectedRoute
      } else {
        connectionPool.put(result!!)
        transmitter.acquireConnectionNoEvents(result!!)
      }
    }
    socket?.closeQuietly()

    eventListener.connectionAcquired(call, result!!)
    return result!!

在這個源碼中,出現了幾個新的類,路由route類,地址address類,咱們簡單的來看看這兩個類,
Address:封裝了全部的能夠訪問的地址信息,在這個類中還添加了代理和dns的相關信息(在OkHttpClient中設置好)proxySelector能夠爲一個URI設置多個代理,若是地址鏈接失敗還回調connectFailed;proxy設置單獨的全局代理,他的優先級高於proxySelecttor;dns用法和proxySelecttor相似,能夠返回多個地址。

private Address createAddress(HttpUrl url) {
        SSLSocketFactory sslSocketFactory = null;
        HostnameVerifier hostnameVerifier = null;
        CertificatePinner certificatePinner = null;
        if (url.isHttps()) {
            sslSocketFactory = client.sslSocketFactory();
            hostnameVerifier = client.hostnameVerifier();
            certificatePinner = client.certificatePinner();
        }
 
        return new Address(url.host(), url.port(), client.dns(), client.socketFactory(),
                sslSocketFactory, hostnameVerifier, certificatePinner, client.proxyAuthenticator(),
                client.proxy(), client.protocols(), client.connectionSpecs(), client.proxySelector());
    }

Route路由:對地址Adress的一個封裝類
RouteSelector路由選擇器:在OKhttp中其實其做用也就是返回一個可用的Route對象

咱們來大概梳理一下流程

  • 首先對當前的流進行一個初步判斷,知足則複用
  • 不知足則,對鏈接池進行第一次的查找,這次查找中,route類爲空
connectionPool.transmitterAcquirePooledConnection(address, transmitter, null, false)

查找獲得直接複用

  • 查找不到則使用路由進行查找,查找設置的代理和DNS是否能找到相關的代理,若是找到則複用
connectionPool.transmitterAcquirePooledConnection(address, transmitter, routes, false)
  • 上述路線都查找不到,直接新建一個鏈接,放入鏈接池中,並把解析的host等信息保存到Connection中,方便下次複用。其中還要多作一步判斷,若是是HTTP2同時發起的請求,要進行一個去重的操做。

下圖是一個簡要的鏈接步驟。

在這裏插入圖片描述

CallServerInterceptor

  • 首先得到前面Intecepter獲取的信息
  • 而後利用編碼器寫入header信息
exchange.writeRequestHeaders(request)
  • 判斷是否要發送請求體,有請求體時,但指望返回狀態碼是100時,則不發送。不然利用流封裝後發送。
var responseBuilder: Response.Builder? = null
    if (HttpMethod.permitsRequestBody(request.method) && requestBody != null) {
      // If there's a "Expect: 100-continue" header on the request, wait for a "HTTP/1.1 100
      // Continue" response before transmitting the request body. If we don't get that, return
      // what we did get (such as a 4xx response) without ever transmitting the request body.
      if ("100-continue".equals(request.header("Expect"), ignoreCase = true)) {
        exchange.flushRequest()
        responseHeadersStarted = true
        exchange.responseHeadersStart()
        responseBuilder = exchange.readResponseHeaders(true)
      }
      if (responseBuilder == null) {
        if (requestBody.isDuplex()) {
          // Prepare a duplex body so that the application can send a request body later.
          exchange.flushRequest()
          val bufferedRequestBody = exchange.createRequestBody(request, true).buffer()
          requestBody.writeTo(bufferedRequestBody)
        } else {
          // Write the request body if the "Expect: 100-continue" expectation was met.
          val bufferedRequestBody = exchange.createRequestBody(request, false).buffer()
          requestBody.writeTo(bufferedRequestBody)
          bufferedRequestBody.close()
        }
      } else {
        exchange.noRequestBody()
        if (!exchange.connection()!!.isMultiplexed) {
          // If the "Expect: 100-continue" expectation wasn't met, prevent the HTTP/1 connection
          // from being reused. Otherwise we're still obligated to transmit the request body to
          // leave the connection in a consistent state.
          exchange.noNewExchangesOnConnection()
        }
      }
    } else {
      exchange.noRequestBody()
    }
// 建立response,把握手信息,和request等信息保存進去
@Override public Response intercept(Chain chain) throws IOException {
    ...
    // 寫入request結束
    httpCodec.finishRequest();
    if (responseBuilder == null) {
      realChain.eventListener().responseHeadersStart(realChain.call());
      // 讀取相應response的header信息
      responseBuilder = httpCodec.readResponseHeaders(false);
    }
 
    // 建立response,把握手信息,和request等信息保存進去
    Response response = responseBuilder
        .request(request)
        .handshake(streamAllocation.connection().handshake())
        .sentRequestAtMillis(sentRequestMillis)
        .receivedResponseAtMillis(System.currentTimeMillis())
        .build();
    // 開始判斷請求碼
    int code = response.code();
    if (code == 100) {
      // 若是是100,直接讀取header
      responseBuilder = httpCodec.readResponseHeaders(false);
      response = responseBuilder
              .request(request)
              // 握手
              .handshake(streamAllocation.connection().handshake())
              .sentRequestAtMillis(sentRequestMillis)
              .receivedResponseAtMillis(System.currentTimeMillis())
              .build();
 
      code = response.code();
    }
    ...
    // 判斷請求碼
    if (forWebSocket && code == 101) {
      // 客戶端須要轉換協議,這裏須要設置一個空的response
      response = response.newBuilder()
          .body(Util.EMPTY_RESPONSE)
          .build();
    } else {
      // 讀取網絡的body
      response = response.newBuilder()
          .body(httpCodec.openResponseBody(response))
          .build();
    }
    // 若是header請求關閉鏈接
    if ("close".equalsIgnoreCase(response.request().header("Connection"))
        || "close".equalsIgnoreCase(response.header("Connection"))) {
      // 關閉這個連接
      streamAllocation.noNewStreams();
    }
    // 特殊code判斷
    if ((code == 204 || code == 205) && response.body().contentLength() > 0) {
      throw new ProtocolException(
          "HTTP " + code + " had non-zero Content-Length: " + response.body().contentLength());
    }
 
    return response;
  }

若是想要了解具體的讀取和寫入流程,以我如今使用的Http 2.0爲例:
鏈接:Http2Connection;
流:Http2Stream;
編解碼器:Http2Codec;
讀操做:Http2Reader;
寫操做:Http2Writer;
他們之間的關係:
一、Http2Connection調用Http2Reader和Http2Writer來進行讀寫;
二、Http2Stream調用Http2Connection進行讀寫;
三、Http2Codec調用Http2Connection和Http2Stream進行操做;

總結

咱們分三個階段來簡要介紹了OkHttp這個框架,因爲如今水平有限,因此會存在疏漏。之後有些有新的發現,則再對其進行補充。

相關文章
相關標籤/搜索