Android網絡編程-OKHttp源碼角度分析Http

前面介紹了網絡的基礎知識,這篇主要從OKHttp源碼角度來分析Http。
OKHttp是一個優秀的網絡請求框架,有如下特色:html

  • 支持HTTP2/SPDY
  • Socket自動選擇最好路線,並支持自動重連
  • 擁有自動維護的Socket鏈接池,減小握手次數
  • 擁有隊列線程池,輕鬆寫併發
  • 擁有Interceptors輕鬆處理請求與響應(好比透明GZIP壓縮)
  • 實現基於Headers的緩存策略

基本使用

同步請求

同步的Get請求java

OkHttpClient client = new OkHttpClient();
Request request = new Request.Builder()
      .url(url)
      .build();

Response response = client.newCall(request).execute();
return response.body().string();
複製代碼

異步請求

異步的Get請求git

OkHttpClient client = new OkHttpClient();
Request request = new Request.Builder()
      .url(url)
      .build();

client.newCall(request).enqueue(new Callback() {
            @Override
            public void onFailure(Call call, IOException e) {
                Log.e("DEBUG", "##### onFailure: ", e);

            }

            @Override
            public void onResponse(Call call, Response response) throws IOException {
                Log.d("DEBUG", "##### response: " + response.body().string());
            }
        });
複製代碼

源碼分析

咱們從OKHttp的初始化開始分析。github

OkHttpClient

新建一個OkHttpClient對象web

OkHttpClient client = new OkHttpClient();
複製代碼

構造函數聲明:緩存

public OkHttpClient() {
    this(new Builder());
}
複製代碼

Builder模式構造:安全

public Builder() {
   dispatcher = new Dispatcher();
   protocols = DEFAULT_PROTOCOLS;
   connectionSpecs = DEFAULT_CONNECTION_SPECS;
   eventListenerFactory = EventListener.factory(EventListener.NONE);
   proxySelector = ProxySelector.getDefault();
   if (proxySelector == null) {
      proxySelector = new NullProxySelector();
   }
   cookieJar = CookieJar.NO_COOKIES;
   socketFactory = SocketFactory.getDefault();
   hostnameVerifier = OkHostnameVerifier.INSTANCE;
   certificatePinner = CertificatePinner.DEFAULT;
   proxyAuthenticator = Authenticator.NONE;
   authenticator = Authenticator.NONE;
   connectionPool = new ConnectionPool();
   dns = Dns.SYSTEM;
   followSslRedirects = true;
   followRedirects = true;
   retryOnConnectionFailure = true;
   callTimeout = 0;
   connectTimeout = 10_000;
   readTimeout = 10_000;
   writeTimeout = 10_000;
   pingInterval = 0;
}
複製代碼

聲明瞭不少屬性,具體含義,等後面用到在具體介紹。bash

請求流程

請求流程可分爲同步和異步,大致的請求流程以下圖所示: 服務器

OKHttp流程

同步請求流程

client.newCall(request).execute();
複製代碼

newCall返回的是RealCall,上面代碼實際上執行的是RealCall的execute方法。websocket

@Override public Response execute() throws IOException {
  synchronized (this) {
    if (executed) throw new IllegalStateException("Already Executed");
    executed = true;
  }
  transmitter.timeoutEnter();
  transmitter.callStart();
  try {
    client.dispatcher().executed(this);
    return getResponseWithInterceptorChain();
  } finally {
    client.dispatcher().finished(this);
  }
 }
複製代碼
  • executed判斷Call對象是否已經執行,每一個Call對象只能執行一次
  • client.dispatcher()返回Dispatcher對象,任務核心調度類,是OKHttp中最重要類之一, executed方法把該線程添加到同步線程隊列
synchronized void executed(RealCall call) {
  runningSyncCalls.add(call);
}
複製代碼
  • getResponseWithInterceptorChain()獲取HTTP請求結果,並會進行一系列攔截操做
  • client.dispatcher().finished(this)執行完畢操做
void finished(RealCall call) {
   finished(runningSyncCalls, call);
 }
複製代碼

執行完畢後,會把線程從同步線程隊列中移除:

private <T> void finished(Deque<T> calls, T call) {
  Runnable idleCallback;
  synchronized (this) {
    if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
    idleCallback = this.idleCallback;
  }

  //異步方法中調用
  boolean isRunning = promoteAndExecute();

  if (!isRunning && idleCallback != null) {
    idleCallback.run();
  }
}
複製代碼

異步請求流程

RealCall的enqueue方法:

@Override public void enqueue(Callback responseCallback) {
  synchronized (this) {
    if (executed) throw new IllegalStateException("Already Executed");
    executed = true;
  }
  transmitter.callStart();
  client.dispatcher().enqueue(new AsyncCall(responseCallback));
}
複製代碼
  • executed含義和同步請求同樣,表示請求只能執行一次
  • client.dispatcher().enqueue(new AsyncCall(responseCallback));,會生成一個AsyncCall對象,並把它加入到readyAsyncCalls線程隊列中,等待執行

AsyncCallRealCall的內部類,而且是NamedRunnable線程類,具體執行方法:

@Override protected void execute() {
  boolean signalledCallback = false;
  transmitter.timeoutEnter();
  try {
    Response response = getResponseWithInterceptorChain();
    signalledCallback = true;
    responseCallback.onResponse(RealCall.this, response);
  } catch (IOException e) {
    if (signalledCallback) {
      // Do not signal the callback twice!
      Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
    } else {
      responseCallback.onFailure(RealCall.this, e);
    }
  } finally {
    client.dispatcher().finished(this);
  }
}
複製代碼
  • getResponseWithInterceptorChain()獲取HTTP請求結果,並會進行一系列攔截操做
  • client.dispatcher().finished(this);這個方法很重要,和同步方法中調用相似,可是異步的流程則徹底不一樣

finish方法:

void finished(AsyncCall call) {
  call.callsPerHost().decrementAndGet();
  finished(runningAsyncCalls, call);
}
複製代碼
private <T> void finished(Deque<T> calls, T call) {
  Runnable idleCallback;
  synchronized (this) {
    if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
    idleCallback = this.idleCallback;
  }

  //異步方法中調用
  boolean isRunning = promoteAndExecute();

  if (!isRunning && idleCallback != null) {
    idleCallback.run();
  }
}
複製代碼

異步流程中,promoteAndExecute方法:

private boolean promoteAndExecute() {
  assert (!Thread.holdsLock(this));
  List<AsyncCall> executableCalls = new ArrayList<>();
  boolean isRunning;
  synchronized (this) {
    for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
      AsyncCall asyncCall = i.next();
      if (runningAsyncCalls.size() >= maxRequests) break; // Max capacity.
      if (asyncCall.callsPerHost().get() >= maxRequestsPerHost) continue; // Host max capacity.
      i.remove();
      asyncCall.callsPerHost().incrementAndGet();
      executableCalls.add(asyncCall);
      runningAsyncCalls.add(asyncCall);
    }
    isRunning = runningCallsCount() > 0;
  }
  for (int i = 0, size = executableCalls.size(); i < size; i++) {
    AsyncCall asyncCall = executableCalls.get(i);
    asyncCall.executeOn(executorService());
  }
  return isRunning;
}
複製代碼

會遍歷異步等待線程隊列,並對正在執行的異步線程隊列進行最大請求size,以及每一個host最大請求size進行檢查。
把異步等待線程放到正在執行線程隊列中,並在等待線程隊列中刪除該線程,這樣就把等待線程變成正在執行線程。

Dispatcher

任務調度核心類,這個類,其實在同步和異步請求流程中已經介紹過,其最重要功能是負責請求的分發。
Dispatcher在OKHttpClient的Builder中被初始化:

public Builder() {
   dispatcher = new Dispatcher();
   .....
}
複製代碼
private int maxRequests = 64;
private int maxRequestsPerHost = 5;
private @Nullable Runnable idleCallback;
/** Executes calls. Created lazily. */
private @Nullable ExecutorService executorService;
/** Ready async calls in the order they'll be run. */
private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();
/** Running asynchronous calls. Includes canceled calls that haven't finished yet. */
private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();
/** Running synchronous calls. Includes canceled calls that haven't finished yet. */
private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();
複製代碼
  • maxRequests:最大請求併發請求數64
  • maxRequestsPerHost:每一個主機的最大請求數5
  • executorService:線程池
  • readyAsyncCalls:異步等待線程隊列
  • runningAsyncCalls:正在運行的異步線程隊列
  • runningSyncCalls:正在運行的同步線程隊列

線程池executorService的聲明:

public synchronized ExecutorService executorService() {
  if (executorService == null) {
    executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
        new SynchronousQueue<>(), Util.threadFactory("OkHttp Dispatcher", false));
  }
  return executorService;
}
複製代碼
  • 核心線程數爲0,表示線程在空閒時不會被保留,等待一段時間後中止
  • 最大線程數Integer.MAX_VALUE,基本上就是能夠建立線程無上限
  • keepAliveTime爲60s,表示若是線程空閒時,最多隻能存活60s

綜合上訴,在OKHttp中,設置了不設上限的線程,不保留最小線程,線程空閒時,最大存活時間爲60s,保證I/O任務中高阻塞低佔用的過程,不會長時間卡在阻塞上。並經過maxRequestsmaxRequestsPerHost來控制併發最大請求數。

攔截器

在同步和異步請求中,具體的執行過程當中都會調用到getResponseWithInterceptorChain方法,該方法添加了一系列的攔截器,它在OKHttp整理流程中處於很是重要的地位,

流程

方法實現:

Response getResponseWithInterceptorChain() throws IOException {
  // Build a full stack of interceptors.
  List<Interceptor> interceptors = new ArrayList<>();
  interceptors.addAll(client.interceptors());
  interceptors.add(new RetryAndFollowUpInterceptor(client));
  interceptors.add(new BridgeInterceptor(client.cookieJar()));
  interceptors.add(new CacheInterceptor(client.internalCache()));
  interceptors.add(new ConnectInterceptor(client));
  if (!forWebSocket) {
    interceptors.addAll(client.networkInterceptors());
  }
  interceptors.add(new CallServerInterceptor(forWebSocket));
  Interceptor.Chain chain = new RealInterceptorChain(interceptors, transmitter, null, 0,
      originalRequest, this, client.connectTimeoutMillis(),
      client.readTimeoutMillis(), client.writeTimeoutMillis());
  boolean calledNoMoreExchanges = false;
  try {
    Response response = chain.proceed(originalRequest);
    if (transmitter.isCanceled()) {
      closeQuietly(response);
      throw new IOException("Canceled");
    }
    return response;
  } catch (IOException e) {
    calledNoMoreExchanges = true;
    throw transmitter.noMoreExchanges(e);
  } finally {
    if (!calledNoMoreExchanges) {
      transmitter.noMoreExchanges(null);
    }
  }
}
複製代碼

默認添加的攔截器:

  • RetryAndFollowUpInterceptor:負責失敗重試以及重定向
  • BridgeInterceptor:負責把用戶構造的請求轉換爲發送到服務器的請求、把服務器返回的響應轉換爲用戶友好的響應
  • CacheInterceptor:負責讀取緩存直接返回、更新緩存
  • ConnectInterceptor:負責和服務器創建鏈接
  • CallServerInterceptor:負責向服務器發送請求數據、從服務器讀取響應數據

這是典型的責任鏈模式,經過Interceptor,把Request轉換爲Response,每一個Interceptor都有各自的責任和邏輯。

interceptors.addAll(client.interceptors());
......
if (!forWebSocket) {
    interceptors.addAll(client.networkInterceptors());
  }
複製代碼

開發者能夠本身定義Interceptor,在最開始或者發送請求前,對Request和Response進行處理。

HTTP實現

OKHttp中實現HTTP主要是在ConnectInterceptorCallServerInterceptor
ConnectInterceptor創建服務器之間的鏈接,CallServerInterceptor發送請求和讀取響應。
OKHttp請求一個URL的流程:

  1. 根據請求的URL,createAddress方法會建立一個Address,用於鏈接服務器
  2. 檢查address和routes,是否能夠從ConnectionPool獲取一個鏈接
  3. 若是沒有獲取到鏈接,會進行下一個路由選擇(routeSelector),而且從新嘗試從ConnectionPool獲取一個鏈接。重試仍是獲取不到,就會從新建立一個鏈接(RealConnection)
  4. 獲取鏈接後,它會與服務器創建一個直接的Socket鏈接、使用TLS安全通道(基於HTTP代理的HTTPS),或直接TLS鏈接
  5. 發送HTTP請求,並獲取響應

ConnectInterceptor

在請求發送前的邏輯,都是ConnectInterceptor中實現,ConnectInterceptor的intercept,這個是3.14.2版本源碼,和之前多版本稍微有些區別。

@Override public Response intercept(Chain chain) throws IOException {
  RealInterceptorChain realChain = (RealInterceptorChain) chain;
  Request request = realChain.request();
  Transmitter transmitter = realChain.transmitter();
  // We need the network to satisfy this request. Possibly for validating a conditional GET.
  boolean doExtensiveHealthChecks = !request.method().equals("GET");
  Exchange exchange = transmitter.newExchange(chain, doExtensiveHealthChecks);
  return realChain.proceed(request, transmitter, exchange);
}
複製代碼

Exchange能夠傳輸HTTP請求和響應,並管理鏈接和事件。 newExchange方法調用:

/** Returns a new exchange to carry a new request and response. */
Exchange newExchange(Interceptor.Chain chain, boolean doExtensiveHealthChecks) {
  synchronized (connectionPool) {
    if (noMoreExchanges) {
      throw new IllegalStateException("released");
    }
    if (exchange != null) {
      throw new IllegalStateException("cannot make a new request because the previous response "
          + "is still open: please call response.close()");
    }
  }
  ExchangeCodec codec = exchangeFinder.find(client, chain, doExtensiveHealthChecks);
  Exchange result = new Exchange(this, call, eventListener, exchangeFinder, codec);
  ......
  }
}
複製代碼

find方法會最終執行ExchangeFinderfindConnection方法,在發送HTTP請求以前的邏輯,都是這個方法中實現。

/** * Returns a connection to host a new stream. This prefers the existing connection if it exists, * then the pool, finally building a new connection. */
  private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout, int pingIntervalMillis, boolean connectionRetryEnabled) throws IOException {
    boolean foundPooledConnection = false;
    RealConnection result = null;
    Route selectedRoute = null;
    RealConnection releasedConnection;
    Socket toClose;
    synchronized (connectionPool) {
      if (transmitter.isCanceled()) throw new IOException("Canceled");
      ......

      if (result == null) {
        //2.根據 Address 從鏈接池獲取鏈接
        // Attempt to get a connection from the pool.
        if (connectionPool.transmitterAcquirePooledConnection(address, transmitter, null, false)) {
          foundPooledConnection = true;
          result = transmitter.connection;
        } else if (nextRouteToTry != null) {
          selectedRoute = nextRouteToTry;
          nextRouteToTry = null;
        } else if (retryCurrentRoute()) {
          selectedRoute = transmitter.connection.route();
        }
      }
    }
    ......
    // 3. 從新選擇路由
    // If we need a route selection, make one. This is a blocking operation.
    boolean newRouteSelection = false;
    if (selectedRoute == null && (routeSelection == null || !routeSelection.hasNext())) {
      newRouteSelection = true;
      routeSelection = routeSelector.next();
    }

    List<Route> routes = null;
    synchronized (connectionPool) {
      if (transmitter.isCanceled()) throw new IOException("Canceled");

      if (newRouteSelection) {
        // Now that we have a set of IP addresses, make another attempt at getting a connection from
        // the pool. This could match due to connection coalescing.
        routes = routeSelection.getAll();
        if (connectionPool.transmitterAcquirePooledConnection(
            address, transmitter, routes, false)) {
          foundPooledConnection = true;
          result = transmitter.connection;
        }
      }

      if (!foundPooledConnection) {
        if (selectedRoute == null) {
          selectedRoute = routeSelection.next();
        }
      
        // 3. 從新選擇路由,建立新的 `RealConnection`
        // Create a connection and assign it to this allocation immediately. This makes it possible
        // for an asynchronous cancel() to interrupt the handshake we're about to do.
        result = new RealConnection(connectionPool, selectedRoute);
        connectingConnection = result;
      }
    }

    ......
    // 4. 進行 Socket 鏈接
    // Do TCP + TLS handshakes. This is a blocking operation.
    result.connect(connectTimeout, readTimeout, writeTimeout, pingIntervalMillis,
        connectionRetryEnabled, call, eventListener);
    connectionPool.routeDatabase.connected(result.route());

    Socket socket = null;
    synchronized (connectionPool) {
      connectingConnection = null;
      // Last attempt at connection coalescing, which only occurs if we attempted multiple
      // concurrent connections to the same host.
      if (connectionPool.transmitterAcquirePooledConnection(address, transmitter, routes, true)) {
        // We lost the race! Close the connection we created and return the pooled connection.
        result.noNewExchanges = true;
        socket = result.socket();
        result = transmitter.connection;
      } else {
        //把鏈接放入鏈接池中
        connectionPool.put(result);
        transmitter.acquireConnectionNoEvents(result);
      }
    }
    ......
    return result;
  }
複製代碼

HTTP 的鏈接主要是result.connect方法:

public void connect(int connectTimeout, int readTimeout, int writeTimeout, int pingIntervalMillis, boolean connectionRetryEnabled, Call call, EventListener eventListener){
   if (protocol != null) throw new IllegalStateException("already connected");
   ......
   while (true) {
      try {
        if (route.requiresTunnel()) {
          connectTunnel(connectTimeout, readTimeout, writeTimeout, call, eventListener);
          if (rawSocket == null) {
            // We were unable to connect the tunnel but properly closed down our resources.
            break;
          }
        } else {
          connectSocket(connectTimeout, readTimeout, call, eventListener);
        }
        establishProtocol(connectionSpecSelector, pingIntervalMillis, call, eventListener);
        eventListener.connectEnd(call, route.socketAddress(), route.proxy(), protocol);
        break;
      } catch (IOException e) {
        ......
      }
    }
    ......
}
複製代碼

在 for 循環中檢查這個鏈接是不是隧道協議鏈接。
connectSocket鏈接socket,establishProtocol根據HTTP協議版本進行鏈接處理。
重點分析下connectSocket方法:

private void connectSocket(int connectTimeout, int readTimeout, Call call, EventListener eventListener) throws IOException {
   ......
   try {
      //鏈接 socket
      Platform.get().connectSocket(rawSocket, route.socketAddress(), connectTimeout);
    } catch (ConnectException e) {
      ConnectException ce = new ConnectException("Failed to connect to " + route.socketAddress());
      ce.initCause(e);
      throw ce;
    }

   try {
      source = Okio.buffer(Okio.source(rawSocket));
      sink = Okio.buffer(Okio.sink(rawSocket));
    } catch (NullPointerException npe) {
      if (NPE_THROW_WITH_NULL.equals(npe.getMessage())) {
        throw new IOException(npe);
      }
    }
}
複製代碼

使用 Okio,封裝了Socket的讀寫操做, 創建鏈接後,就能夠發送請求和獲取響應。

CallServerInterceptor

CallServerInterceptor的intercept()方法裏負責發送請求和獲取響應。
具體操做都是經過Exchange來執行,Exchange經過各個功能模塊再進行分發處理。
經過 Socket 發送 HTTP消息,會按照如下聲明週期:

  • writeRequestHeaders發送 request Headers
  • 若是有 request body,就經過 Sink 發送request body,而後關閉 Sink
  • readResponseHeaders獲取 response Headers
  • 經過Source讀取 response body,而後關閉 Source

writeRequestHeaders

Exchange 調用writeRequestHeaders方法

public void writeRequestHeaders(Request request) throws IOException {
    try {
      eventListener.requestHeadersStart(call);
      codec.writeRequestHeaders(request);
      eventListener.requestHeadersEnd(call, request);
    } catch (IOException e) {
      eventListener.requestFailed(call, e);
      trackFailure(e);
      throw e;
    }
  }
複製代碼

實際執行的方法codec實現類Http1ExchangeCodec(前面根據HTTP協議版本選擇)的writeRequest方法

/** Returns bytes of a request header for sending on an HTTP transport. */
public void writeRequest(Headers headers, String requestLine) throws IOException {
  if (state != STATE_IDLE) throw new IllegalStateException("state: " + state);
  sink.writeUtf8(requestLine).writeUtf8("\r\n");
  for (int i = 0, size = headers.size(); i < size; i++) {
    sink.writeUtf8(headers.name(i))
        .writeUtf8(": ")
        .writeUtf8(headers.value(i))
        .writeUtf8("\r\n");
  }
  sink.writeUtf8("\r\n");
  state = STATE_OPEN_REQUEST_BODY;
}
複製代碼

readResponseHeaders

讀取響應頭部,Http1ExchangeCodec的readResponseHeaders方法:

@Override public Response.Builder readResponseHeaders(boolean expectContinue) throws IOException {
  if (state != STATE_OPEN_REQUEST_BODY && state != STATE_READ_RESPONSE_HEADERS) {
    throw new IllegalStateException("state: " + state);
  }
  try {
    StatusLine statusLine = StatusLine.parse(readHeaderLine());
    Response.Builder responseBuilder = new Response.Builder()
        .protocol(statusLine.protocol)
        .code(statusLine.code)
        .message(statusLine.message)
        .headers(readHeaders());
    if (expectContinue && statusLine.code == HTTP_CONTINUE) {
      return null;
    } else if (statusLine.code == HTTP_CONTINUE) {
      state = STATE_READ_RESPONSE_HEADERS;
      return responseBuilder;
    }
    state = STATE_OPEN_RESPONSE_BODY;
    return responseBuilder;
  } catch (EOFException e) {
    // Provide more context if the server ends the stream before sending a response.
    String address = "unknown";
    if (realConnection != null) {
      address = realConnection.route().address().url().redact();
    }
    throw new IOException("unexpected end of stream on "
        + address, e);
  }
}
複製代碼

StatusLine解析HTTP版本信息,readHeaders()讀取response header 信息。

/** Reads headers or trailers. */
private Headers readHeaders() throws IOException {
  Headers.Builder headers = new Headers.Builder();
  // parse the result headers until the first blank line
  for (String line; (line = readHeaderLine()).length() != 0; ) {
    Internal.instance.addLenient(headers, line);
  }
  return headers.build();
}
複製代碼

response body

解析 response body 內容:

if (forWebSocket && code == 101) {
  // Connection is upgrading, but we need to ensure interceptors see a non-null response body.
  response = response.newBuilder()
      .body(Util.EMPTY_RESPONSE)
      .build();
} else {
  response = response.newBuilder()
      .body(exchange.openResponseBody(response))
      .build();
}
複製代碼

若是不是websocket,調用Exchange的openResponseBody方法:

public ResponseBody openResponseBody(Response response) throws IOException {
  try {
    eventListener.responseBodyStart(call);
    String contentType = response.header("Content-Type");
    long contentLength = codec.reportedContentLength(response);
    Source rawSource = codec.openResponseBodySource(response);
    ResponseBodySource source = new ResponseBodySource(rawSource, contentLength);
    return new RealResponseBody(contentType, contentLength, Okio.buffer(source));
  } catch (IOException e) {
    eventListener.responseFailed(call, e);
    trackFailure(e);
    throw e;
  }
}
複製代碼

獲取返回的 body,經過 Source 轉換爲須要的數據類型,ResponseBody提供的 string(),轉換爲 String 類型

public final String string() throws IOException {
  try (BufferedSource source = source()) {
    Charset charset = Util.bomAwareCharset(source, charset());
    return source.readString(charset);
  }
}
複製代碼

經過上述的分析,OKHttp是經過Okio操做Socket實現了Http協議,憑藉高效的性能,Android系統從4.4版本開始,HTTP的實現已經替換爲OKHttp。

參考

相關文章
相關標籤/搜索