從使用方法出發,首先是怎麼使用,其次是咱們使用的功能在內部是如何實現的.建議你們下載 OkHttp 源碼以後,跟着本文,過一遍源碼。html
官方博客栗子:http://square.github.io/okhttp/#examplesjava
OkHttpClient client = new OkHttpClient(); String run(String url) throws IOException { Request request = new Request.Builder() .url(url) .build(); Response response = client.newCall(request).execute(); return response.body().string(); }
上面的代碼中涉及到幾個經常使用的類:Request、Response和Call。下面分別介紹:git
每個HTTP請求包含一個URL、一個方法(GET或POST或其餘)、一些HTTP頭。請求還可能包含一個特定內容類型的數據類的主體部分。github
響應是對請求的回覆,包含狀態碼、HTTP頭和主體部分。web
OkHttp使用Call抽象出一個知足請求的模型,儘管中間可能會有多個請求或響應。執行Call有兩種方式,同步或異步面試
OkHttpClient client = new OkHttpClient();`
經過okhttp源碼分析,直接建立的 OkHttpClient對象而且默認構造builder對象進行初始化sql
public class OkHttpClient implements Cloneable, Call.Factory, WebSocket.Factory { public OkHttpClient() { this(new Builder()); } OkHttpClient(Builder builder) { this.dispatcher = builder.dispatcher; this.proxy = builder.proxy; this.protocols = builder.protocols; this.connectionSpecs = builder.connectionSpecs; this.interceptors = Util.immutableList(builder.interceptors); this.networkInterceptors = Util.immutableList(builder.networkInterceptors); this.eventListenerFactory = builder.eventListenerFactory; this.proxySelector = builder.proxySelector; this.cookieJar = builder.cookieJar; this.cache = builder.cache; this.internalCache = builder.internalCache; this.socketFactory = builder.socketFactory; boolean isTLS = false; ...... this.hostnameVerifier = builder.hostnameVerifier; this.certificatePinner = builder.certificatePinner.withCertificateChainCleaner( certificateChainCleaner); this.proxyAuthenticator = builder.proxyAuthenticator; this.authenticator = builder.authenticator; this.connectionPool = builder.connectionPool; this.dns = builder.dns; this.followSslRedirects = builder.followSslRedirects; this.followRedirects = builder.followRedirects; this.retryOnConnectionFailure = builder.retryOnConnectionFailure; this.connectTimeout = builder.connectTimeout; this.readTimeout = builder.readTimeout; this.writeTimeout = builder.writeTimeout; this.pingInterval = builder.pingInterval; } }
Request request = new Request.Builder().url("url").build(); okHttpClient.newCall(request).enqueue(new Callback() { @Override public void onFailure(Call call, IOException e) { } @Override public void onResponse(Call call, Response response) throws IOException { } });
Request request = new Request.Builder().url("url").build();
初始化構建者模式和請求對象,而且用URL替換Web套接字URL。緩存
public final class Request { public Builder() { this.method = "GET"; this.headers = new Headers.Builder(); } public Builder url(String url) { ...... // Silently replace web socket URLs with HTTP URLs. if (url.regionMatches(true, 0, "ws:", 0, 3)) { url = "http:" + url.substring(3); } else if (url.regionMatches(true, 0, "wss:", 0, 4)) { url = "https:" + url.substring(4); } HttpUrl parsed = HttpUrl.parse(url); ...... return url(parsed); } public Request build() { ...... return new Request(this); } }
okHttpClient.newCall(request).enqueue(new Callback() { @Override public void onFailure(Call call, IOException e) { } @Override public void onResponse(Call call, Response response) throws IOException { } });
源碼分析:性能優化
public class OkHttpClient implements Cloneable, Call.Factory, WebSocket.Factory { @Override public Call newCall(Request request) { return new RealCall(this, request, false /* for web socket */); } }
RealCall實現了Call.Factory接口建立了一個RealCall的實例,而RealCall是Call接口的實現。服務器
final class RealCall implements Call { @Override public void enqueue(Callback responseCallback) { synchronized (this) { if (executed) throw new IllegalStateException("Already Executed"); executed = true; } captureCallStackTrace(); client.dispatcher().enqueue(new AsyncCall(responseCallback)); } }
由以上源碼得知:
1) 檢查這個 call 是否已經被執行了,每一個 call 只能被執行一次,若是想要一個徹底同樣的 call,能夠利用 call#clone 方法進行克隆。
2)利用 client.dispatcher().enqueue(this) 來進行實際執行,dispatcher 是剛纔看到的 OkHttpClient.Builder 的成員之一
3)AsyncCall是RealCall的一個內部類而且繼承NamedRunnable,那麼首先看NamedRunnable類是什麼樣的,以下:
public abstract class NamedRunnable implements Runnable { ...... @Override public final void run() { ...... try { execute(); } ...... } protected abstract void execute(); }
能夠看到NamedRunnable實現了Runnbale接口而且是個抽象類,其抽象方法是execute(),該方法是在run方法中被調用的,這也就意味着NamedRunnable是一個任務,而且其子類應該實現execute方法。下面再看AsyncCall的實現:
final class AsyncCall extends NamedRunnable { private final Callback responseCallback; AsyncCall(Callback responseCallback) { super("OkHttp %s", redactedUrl()); this.responseCallback = responseCallback; } ...... final class RealCall implements Call { @Override protected void execute() { boolean signalledCallback = false; try { Response response = getResponseWithInterceptorChain(); if (retryAndFollowUpInterceptor.isCanceled()) { signalledCallback = true; responseCallback.onFailure(RealCall.this, new IOException("Canceled")); } else { signalledCallback = true; responseCallback.onResponse(RealCall.this, response); } } catch (IOException e) { ...... responseCallback.onFailure(RealCall.this, e); } finally { client.dispatcher().finished(this); } }
AsyncCall實現了execute方法,首先是調用getResponseWithInterceptorChain()方法獲取響應,而後獲取成功後,就調用回調的onReponse方法,若是失敗,就調用回調的onFailure方法。最後,調用Dispatcher的finished方法。
關鍵代碼:
responseCallback.onFailure(RealCall.this, new IOException(「Canceled」));
和
responseCallback.onResponse(RealCall.this, response);
走完這兩句代碼會進行回調到剛剛咱們初始化Okhttp的地方,以下:
okHttpClient.newCall(request).enqueue(new Callback() { @Override public void onFailure(Call call, IOException e) { } @Override public void onResponse(Call call, Response response) throws IOException { } });
public final class Dispatcher { /** 最大併發請求數爲64 */ private int maxRequests = 64; /** 每一個主機最大請求數爲5 */ private int maxRequestsPerHost = 5; /** 線程池 */ private ExecutorService executorService; /** 準備執行的請求 */ private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>(); /** 正在執行的異步請求,包含已經取消但未執行完的請求 */ private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>(); /** 正在執行的同步請求,包含已經取消單未執行完的請求 */ private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();
在OkHttp,使用以下構造了單例線程池
public synchronized ExecutorService executorService() { if (executorService == null) { executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false)); } return executorService; }
構造一個線程池ExecutorService:
executorService = new ThreadPoolExecutor( //corePoolSize 最小併發線程數,若是是0的話,空閒一段時間後全部線程將所有被銷燬 0, //maximumPoolSize: 最大線程數,當任務進來時能夠擴充的線程最大值,當大於了這個值就會根據丟棄處理機制來處理 Integer.MAX_VALUE, //keepAliveTime: 當線程數大於corePoolSize時,多餘的空閒線程的最大存活時間 60, //單位秒 TimeUnit.SECONDS, //工做隊列,先進先出 new SynchronousQueue<Runnable>(), //單個線程的工廠 Util.threadFactory("OkHttp Dispatcher", false));
能夠看出,在Okhttp中,構建了一個核心爲[0, Integer.MAX_VALUE]的線程池,它不保留任何最小線程數,隨時建立更多的線程數,當線程空閒時只能活60秒,它使用了一個不存儲元素的阻塞工做隊列,一個叫作」OkHttp Dispatcher」的線程工廠。
也就是說,在實際運行中,當收到10個併發請求時,線程池會建立十個線程,當工做完成後,線程池會在60s後相繼關閉全部線程。
synchronized void enqueue(AsyncCall call) { if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) { runningAsyncCalls.add(call); executorService().execute(call); } else { readyAsyncCalls.add(call); } }
從上述源碼分析,若是當前還能執行一個併發請求,則加入 runningAsyncCalls ,當即執行,不然加入 readyAsyncCalls 隊列。
Dispatcher線程池總結
1)調度線程池Disptcher實現了高併發,低阻塞的實現
2)採用Deque做爲緩存,先進先出的順序執行
3)任務在try/finally中調用了finished函數,控制任務隊列的執行順序,而不是採用鎖,減小了編碼複雜性提升性能
這裏是分析OkHttp源碼,並不詳細講線程池原理,如對線程池不瞭解請參考以下連接
try { Response response = getResponseWithInterceptorChain(); if (retryAndFollowUpInterceptor.isCanceled()) { signalledCallback = true; responseCallback.onFailure(RealCall.this, new IOException("Canceled")); } else { signalledCallback = true; responseCallback.onResponse(RealCall.this, response); } } finally { client.dispatcher().finished(this); }
當任務執行完成後,不管是否有異常,finally代碼段總會被執行,也就是會調用Dispatcher的finished函數
void finished(AsyncCall call) { finished(runningAsyncCalls, call, true); }
從上面的代碼能夠看出,第一個參數傳入的是正在運行的異步隊列,第三個參數爲true,下面再看有是三個參數的finished方法:
private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) { int runningCallsCount; Runnable idleCallback; synchronized (this) { if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!"); if (promoteCalls) promoteCalls(); runningCallsCount = runningCallsCount(); idleCallback = this.idleCallback; } if (runningCallsCount == 0 && idleCallback != null) { idleCallback.run(); } }
打開源碼,發現它將正在運行的任務Call從隊列runningAsyncCalls中移除後,獲取運行數量判斷是否進入了Idle狀態,接着執行promoteCalls()函數,下面是promoteCalls()方法:
private void promoteCalls() { if (runningAsyncCalls.size() >= maxRequests) return; // Already running max capacity. if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote. for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) { AsyncCall call = i.next(); if (runningCallsForHost(call) < maxRequestsPerHost) { i.remove(); runningAsyncCalls.add(call); executorService().execute(call); } if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity. } }
主要就是遍歷等待隊列,而且須要知足同一主機的請求小於maxRequestsPerHost時,就移到運行隊列中並交給線程池運行。就主動的把緩存隊列向前走了一步,而沒有使用互斥鎖等複雜編碼
Response getResponseWithInterceptorChain() throws IOException {
// Build a full stack of interceptors. List<Interceptor> interceptors = new ArrayList<>(); interceptors.addAll(client.interceptors()); interceptors.add(retryAndFollowUpInterceptor); interceptors.add(new BridgeInterceptor(client.cookieJar())); interceptors.add(new CacheInterceptor(client.internalCache())); interceptors.add(new ConnectInterceptor(client)); if (!forWebSocket) { interceptors.addAll(client.networkInterceptors()); } interceptors.add(new CallServerInterceptor(forWebSocket)); Interceptor.Chain chain = new RealInterceptorChain( interceptors, null, null, null, 0, originalRequest); return chain.proceed(originalRequest); }
1)在配置 OkHttpClient 時設置的 interceptors;
2)負責失敗重試以及重定向的 RetryAndFollowUpInterceptor;
3)負責把用戶構造的請求轉換爲發送到服務器的請求、把服務器返回的響應轉換爲用戶友好的響應的 BridgeInterceptor;
4)負責讀取緩存直接返回、更新緩存的 CacheInterceptor;
5)負責和服務器創建鏈接的 ConnectInterceptor;
6)配置 OkHttpClient 時設置的 networkInterceptors;
7)負責向服務器發送請求數據、從服務器讀取響應數據的 CallServerInterceptor。
OkHttp的這種攔截器鏈採用的是責任鏈模式,這樣的好處是將請求的發送和處理分開,而且能夠動態添加中間的處理方實現對請求的處理、短路等操做。
從上述源碼得知,無論okhttp有多少攔截器最後都會走,以下方法:
Interceptor.Chain chain = new RealInterceptorChain( interceptors, null, null, null, 0, originalRequest); return chain.proceed(originalRequest);
從方法名字基本能夠猜到是幹嗎的,調用 chain.proceed(originalRequest); 將request傳遞進來,從攔截器鏈裏拿到返回結果。那麼攔截器Interceptor是幹嗎的,Chain是幹嗎的呢?繼續往下看RealInterceptorChain
RealInterceptorChain類
下面是RealInterceptorChain的定義,該類實現了Chain接口,在getResponseWithInterceptorChain調用時好幾個參數都傳的null。
public final class RealInterceptorChain implements Interceptor.Chain { public RealInterceptorChain(List<Interceptor> interceptors, StreamAllocation streamAllocation, HttpCodec httpCodec, RealConnection connection, int index, Request request) { this.interceptors = interceptors; this.connection = connection; this.streamAllocation = streamAllocation; this.httpCodec = httpCodec; this.index = index; this.request = request; } ...... @Override public Response proceed(Request request) throws IOException { return proceed(request, streamAllocation, httpCodec, connection); } public Response proceed(Request request, StreamAllocation streamAllocation, HttpCodec httpCodec, RealConnection connection) throws IOException { if (index >= interceptors.size()) throw new AssertionError(); calls++; ...... // Call the next interceptor in the chain. RealInterceptorChain next = new RealInterceptorChain( interceptors, streamAllocation, httpCodec, connection, index + 1, request); Interceptor interceptor = interceptors.get(index); Response response = interceptor.intercept(next); ...... return response; } protected abstract void execute(); }
主要看proceed方法,proceed方法中判斷index(此時爲0)是否大於或者等於client.interceptors(List )的大小。因爲httpStream爲null,因此首先建立next攔截器鏈,主須要把索引置爲index+1便可;而後獲取第一個攔截器,調用其intercept方法。
Interceptor 代碼以下:
public interface Interceptor { Response intercept(Chain chain) throws IOException; interface Chain { Request request(); Response proceed(Request request) throws IOException; Connection connection(); } }
BridgeInterceptor
BridgeInterceptor從用戶的請求構建網絡請求,而後提交給網絡,最後從網絡響應中提取出用戶響應。從最上面的圖能夠看出,BridgeInterceptor實現了適配的功能。下面是其intercept方法:
public final class BridgeInterceptor implements Interceptor { ...... @Override public Response intercept(Chain chain) throws IOException { Request userRequest = chain.request(); Request.Builder requestBuilder = userRequest.newBuilder(); RequestBody body = userRequest.body(); //若是存在請求主體部分,那麼須要添加Content-Type、Content-Length首部 if (body != null) { MediaType contentType = body.contentType(); if (contentType != null) { requestBuilder.header("Content-Type", contentType.toString()); } long contentLength = body.contentLength(); if (contentLength != -1) { requestBuilder.header("Content-Length", Long.toString(contentLength)); requestBuilder.removeHeader("Transfer-Encoding"); } else { requestBuilder.header("Transfer-Encoding", "chunked"); requestBuilder.removeHeader("Content-Length"); } } if (userRequest.header("Host") == null) { requestBuilder.header("Host", hostHeader(userRequest.url(), false)); } if (userRequest.header("Connection") == null) { requestBuilder.header("Connection", "Keep-Alive"); } // If we add an "Accept-Encoding: gzip" header field we're responsible for also decompressing // the transfer stream. boolean transparentGzip = false; if (userRequest.header("Accept-Encoding") == null && userRequest.header("Range") == null) { transparentGzip = true; requestBuilder.header("Accept-Encoding", "gzip"); } List<Cookie> cookies = cookieJar.loadForRequest(userRequest.url()); if (!cookies.isEmpty()) { requestBuilder.header("Cookie", cookieHeader(cookies)); } if (userRequest.header("User-Agent") == null) { requestBuilder.header("User-Agent", Version.userAgent()); } Response networkResponse = chain.proceed(requestBuilder.build()); HttpHeaders.receiveHeaders(cookieJar, userRequest.url(), networkResponse.headers()); Response.Builder responseBuilder = networkResponse.newBuilder() .request(userRequest); if (transparentGzip && "gzip".equalsIgnoreCase(networkResponse.header("Content-Encoding")) && HttpHeaders.hasBody(networkResponse)) { GzipSource responseBody = new GzipSource(networkResponse.body().source()); Headers strippedHeaders = networkResponse.headers().newBuilder() .removeAll("Content-Encoding") .removeAll("Content-Length") .build(); responseBuilder.headers(strippedHeaders); responseBuilder.body(new RealResponseBody(strippedHeaders, Okio.buffer(responseBody))); } return responseBuilder.build(); } /** Returns a 'Cookie' HTTP request header with all cookies, like {@code a=b; c=d}. */ private String cookieHeader(List<Cookie> cookies) { StringBuilder cookieHeader = new StringBuilder(); for (int i = 0, size = cookies.size(); i < size; i++) { if (i > 0) { cookieHeader.append("; "); } Cookie cookie = cookies.get(i); cookieHeader.append(cookie.name()).append('=').append(cookie.value()); } return cookieHeader.toString(); } }
從上面的代碼能夠看出,首先獲取原請求,而後在請求中添加頭,好比Host、Connection、Accept-Encoding參數等,而後根據看是否須要填充Cookie,在對原始請求作出處理後,使用chain的procced方法獲得響應,接下來對響應作處理獲得用戶響應,最後返回響應。接下來再看下一個攔截器ConnectInterceptor的處理。
public final class ConnectInterceptor implements Interceptor { ...... @Override public Response intercept(Chain chain) throws IOException { RealInterceptorChain realChain = (RealInterceptorChain) chain; Request request = realChain.request(); StreamAllocation streamAllocation = realChain.streamAllocation(); // We need the network to satisfy this request. Possibly for validating a conditional GET. boolean doExtensiveHealthChecks = !request.method().equals("GET"); HttpCodec httpCodec = streamAllocation.newStream(client, doExtensiveHealthChecks); RealConnection connection = streamAllocation.connection(); return realChain.proceed(request, streamAllocation, httpCodec, connection); } }
實際上創建鏈接就是建立了一個 HttpCodec 對象,它利用 Okio 對 Socket 的讀寫操做進行封裝,Okio 之後有機會再進行分析,如今讓咱們對它們保持一個簡單地認識:它對 java.io 和 java.nio 進行了封裝,讓咱們更便捷高效的進行 IO 操做。
CallServerInterceptor
CallServerInterceptor是攔截器鏈中最後一個攔截器,負責將網絡請求提交給服務器。它的intercept方法實現以下:
@Override public Response intercept(Chain chain) throws IOException { RealInterceptorChain realChain = (RealInterceptorChain) chain; HttpCodec httpCodec = realChain.httpStream(); StreamAllocation streamAllocation = realChain.streamAllocation(); RealConnection connection = (RealConnection) realChain.connection(); Request request = realChain.request(); long sentRequestMillis = System.currentTimeMillis(); httpCodec.writeRequestHeaders(request); Response.Builder responseBuilder = null; if (HttpMethod.permitsRequestBody(request.method()) && request.body() != null) { // If there's a "Expect: 100-continue" header on the request, wait for a "HTTP/1.1 100 // Continue" response before transmitting the request body. If we don't get that, return what // we did get (such as a 4xx response) without ever transmitting the request body. if ("100-continue".equalsIgnoreCase(request.header("Expect"))) { httpCodec.flushRequest(); responseBuilder = httpCodec.readResponseHeaders(true); } if (responseBuilder == null) { // Write the request body if the "Expect: 100-continue" expectation was met. Sink requestBodyOut = httpCodec.createRequestBody(request, request.body().contentLength()); BufferedSink bufferedRequestBody = Okio.buffer(requestBodyOut); request.body().writeTo(bufferedRequestBody); bufferedRequestBody.close(); } else if (!connection.isMultiplexed()) { // If the "Expect: 100-continue" expectation wasn't met, prevent the HTTP/1 connection from // being reused. Otherwise we're still obligated to transmit the request body to leave the // connection in a consistent state. streamAllocation.noNewStreams(); } } httpCodec.finishRequest(); if (responseBuilder == null) { responseBuilder = httpCodec.readResponseHeaders(false); } Response response = responseBuilder .request(request) .handshake(streamAllocation.connection().handshake()) .sentRequestAtMillis(sentRequestMillis) .receivedResponseAtMillis(System.currentTimeMillis()) .build(); int code = response.code(); if (forWebSocket && code == 101) { // Connection is upgrading, but we need to ensure interceptors see a non-null response body. response = response.newBuilder() .body(Util.EMPTY_RESPONSE) .build(); } else { response = response.newBuilder() .body(httpCodec.openResponseBody(response)) .build(); } if ("close".equalsIgnoreCase(response.request().header("Connection")) || "close".equalsIgnoreCase(response.header("Connection"))) { streamAllocation.noNewStreams(); } if ((code == 204 || code == 205) && response.body().contentLength() > 0) { throw new ProtocolException( "HTTP " + code + " had non-zero Content-Length: " + response.body().contentLength()); } return response; }
從上面的代碼中能夠看出,首先獲取HttpStream對象,而後調用writeRequestHeaders方法寫入請求的頭部,而後判斷是否須要寫入請求的body部分,最後調用finishRequest()方法將全部數據刷新給底層的Socket,接下來嘗試調用readResponseHeaders()方法讀取響應的頭部,而後再調用openResponseBody()方法獲得響應的body部分,最後返回響應。
OkHttp的底層是經過Java的Socket發送HTTP請求與接受響應的(這也好理解,HTTP就是基於TCP協議的),可是OkHttp實現了鏈接池的概念,即對於同一主機的多個請求,其實能夠公用一個Socket鏈接,而不是每次發送完HTTP請求就關閉底層的Socket,這樣就實現了鏈接池的概念。而OkHttp對Socket的讀寫操做使用的OkIo庫進行了一層封裝。