在平常開發中網絡請求是很常見的功能。OkHttp做爲Android開發中最經常使用的網絡請求框架,在Android開發中咱們常常結合retrofit一塊兒使用,俗話說得好:「知其然知其因此然」,因此這篇文章咱們經過源碼來深刻理解OKHttp3(基於3.12版本)java
//框架引入項目
implementation("com.squareup.okhttp3:okhttp:3.12.0")
//引用官方Demo的例子
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
//主線程不能進行耗時操做
new Thread(){
@Override
public void run() {
super.run();
/**
* 同步請求
*/
GetExample getexample = new GetExample();
String syncresponse = null;
try {
syncresponse = getexample.run("https://raw.github.com/square/okhttp/master/README.md");
Log.i("maoqitian","異步請求返回參數"+syncresponse);
} catch (IOException e) {
e.printStackTrace();
}
}
}.start();
/**
* 異步請求
*/
PostExample postexample = new PostExample();
String json = postexample.bowlingJson("Jesse", "Jake");
try {
postexample.post("http://www.roundsapp.com/post", json);
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 異步請求
*/
class PostExample {
final MediaType JSON = MediaType.get("application/json; charset=utf-8");
//獲取 OkHttpClient 對象
OkHttpClient client = new OkHttpClient();
void post(String url, String json) throws IOException {
RequestBody body = RequestBody.create(JSON, json);
Request request = new Request.Builder()
.url(url)
.post(body)
.build();
client.newCall(request).enqueue(new Callback() {
@Override
public void onFailure(Call call, IOException e) {
Log.i("maoqitian","異步請求返回參數"+e.toString());
}
@Override
public void onResponse(Call call, Response response) throws IOException {
String asynresponse= response.body().string();
Log.i("maoqitian","異步請求返回參數"+asynresponse);
}
});
}
String bowlingJson(String player1, String player2) {
return "{'winCondition':'HIGH_SCORE',"
+ "'name':'Bowling',"
+ "'round':4,"
+ "'lastSaved':1367702411696,"
+ "'dateStarted':1367702378785,"
+ "'players':["
+ "{'name':'" + player1 + "','history':[10,8,6,7,8],'color':-13388315,'total':39},"
+ "{'name':'" + player2 + "','history':[6,10,5,10,10],'color':-48060,'total':41}"
+ "]}";
}
}
/**
* 同步請求
*/
class GetExample {
OkHttpClient client = new OkHttpClient();
String run(String url) throws IOException {
Request request = new Request.Builder()
.url(url)
.build();
try (Response response = client.newCall(request).execute()) {
return response.body().string();
}
}
}
複製代碼
首先看一個流程圖,對於接下來的源碼分析有個大致印象android
經過上面的例子能夠看到,不論是同步請求仍是異步請求,首先調用的OkHttpClient的newCall(request)方法,先來看看這個方法git
/**
* Prepares the {@code request} to be executed at some point in the future.
*/
@Override public Call newCall(Request request) {
return RealCall.newRealCall(this, request, false /* for web socket */);
}
複製代碼
經過newCall方法的源碼能夠看到該方法返回值是Call,Call是一個接口,他的實現類是RealCall,因此咱們執行的同步execute()方法或者異步enqueue()方法都是RealCall的方法。newCall方法接收了的網絡請求參數,接下來咱們看看execute()和enqueue()方法github
/**
* 同步請求
*/
@Override public Response execute() throws IOException {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
captureCallStackTrace();
timeout.enter();
eventListener.callStart(this);
try {
client.dispatcher().executed(this);
Response result = getResponseWithInterceptorChain();
if (result == null) throw new IOException("Canceled");
return result;
} catch (IOException e) {
e = timeoutExit(e);
eventListener.callFailed(this, e);
throw e;
} finally {
client.dispatcher().finished(this);
}
}
/**
* 異步請求
*/
@Override public void enqueue(Callback responseCallback) {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
captureCallStackTrace();
eventListener.callStart(this);
client.dispatcher().enqueue(new AsyncCall(responseCallback));
}
複製代碼
這裏先看異步的enqueue方法,很直觀能夠看到真正執行網絡請求的是最後一句代碼,而它是怎麼作的呢,咱們還得先弄明白dispatcher,Dispatcher的本質是異步請求的調度器,它內部持有一個線程池,結合線程池調配併發請求。官方文檔描述也說了這一點。web
/**最大併發請求數*/
private int maxRequests = 64;
/**每一個主機最大請求數*/
private int maxRequestsPerHost = 5;
/** Ready async calls in the order they'll be run. 準備要執行的異步請求隊列*/ private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>(); /** Running asynchronous calls. Includes canceled calls that haven't finished yet.
正在執行的異步請求隊列*/
private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();
/** Running synchronous calls. Includes canceled calls that haven't finished yet. 正在執行的同步請求隊列*/ private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>(); /** Dispatcher 構造方法 */ public Dispatcher(ExecutorService executorService) { this.executorService = executorService; } public Dispatcher() { } public synchronized ExecutorService executorService() { if (executorService == null) { executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false)); } return executorService; } 複製代碼
void enqueue(AsyncCall call) {
synchronized (this) {
readyAsyncCalls.add(call);
}
promoteAndExecute();
}
/**
* Promotes eligible calls from {@link #readyAsyncCalls} to {@link #runningAsyncCalls} and runs
* them on the executor service. Must not be called with synchronization because executing calls
* can call into user code.
*
* @return true if the dispatcher is currently running calls.
*/
private boolean promoteAndExecute() {
assert (!Thread.holdsLock(this));
List<AsyncCall> executableCalls = new ArrayList<>();
boolean isRunning;
synchronized (this) {
for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
AsyncCall asyncCall = i.next();
if (runningAsyncCalls.size() >= maxRequests) break; // Max capacity.
if (runningCallsForHost(asyncCall) >= maxRequestsPerHost) continue; // Host max capacity.
i.remove();
executableCalls.add(asyncCall);
runningAsyncCalls.add(asyncCall);
}
isRunning = runningCallsCount() > 0;
}
for (int i = 0, size = executableCalls.size(); i < size; i++) {
AsyncCall asyncCall = executableCalls.get(i);
asyncCall.executeOn(executorService());
}
return isRunning;
}
複製代碼
/**
* Attempt to enqueue this async call on {@code executorService}. This will attempt to clean up
* if the executor has been shut down by reporting the call as failed.
*/
void executeOn(ExecutorService executorService) {
assert (!Thread.holdsLock(client.dispatcher()));
boolean success = false;
try {
executorService.execute(this);
success = true;
} catch (RejectedExecutionException e) {
InterruptedIOException ioException = new InterruptedIOException("executor rejected");
ioException.initCause(e);
eventListener.callFailed(RealCall.this, ioException);
responseCallback.onFailure(RealCall.this, ioException);
} finally {
if (!success) {
client.dispatcher().finished(this); // This call is no longer running!
}
}
}
複製代碼
/**
* Runnable implementation which always sets its thread name.
*/
public abstract class NamedRunnable implements Runnable {
protected final String name;
public NamedRunnable(String format, Object... args) {
this.name = Util.format(format, args);
}
@Override public final void run() {
String oldName = Thread.currentThread().getName();
Thread.currentThread().setName(name);
try {
execute();
} finally {
Thread.currentThread().setName(oldName);
}
}
protected abstract void execute();
}
複製代碼
@Override protected void execute() {
boolean signalledCallback = false;
timeout.enter();
try {
Response response = getResponseWithInterceptorChain();
if (retryAndFollowUpInterceptor.isCanceled()) {
signalledCallback = true;
responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
} else {
signalledCallback = true;
responseCallback.onResponse(RealCall.this, response);
}
} catch (IOException e) {
e = timeoutExit(e);
if (signalledCallback) {
// Do not signal the callback twice!
Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
} else {
eventListener.callFailed(RealCall.this, e);
responseCallback.onFailure(RealCall.this, e);
}
} finally {
client.dispatcher().finished(this);
}
}
/**Dispatcher的finished方法*/
/** Used by {@code AsyncCall#run} to signal completion. */
void finished(AsyncCall call) {
finished(runningAsyncCalls, call);
}
/** Used by {@code Call#execute} to signal completion. */
void finished(RealCall call) {
finished(runningSyncCalls, call);
}
private <T> void finished(Deque<T> calls, T call) {
Runnable idleCallback;
synchronized (this) {
if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
idleCallback = this.idleCallback;
}
boolean isRunning = promoteAndExecute();
if (!isRunning && idleCallback != null) {
idleCallback.run();
}
}
複製代碼
Response getResponseWithInterceptorChain() throws IOException {
// Build a full stack of interceptors.
List<Interceptor> interceptors = new ArrayList<>();
interceptors.addAll(client.interceptors());
interceptors.add(retryAndFollowUpInterceptor);
interceptors.add(new BridgeInterceptor(client.cookieJar()));
interceptors.add(new CacheInterceptor(client.internalCache()));
interceptors.add(new ConnectInterceptor(client));
if (!forWebSocket) {
interceptors.addAll(client.networkInterceptors());
}
interceptors.add(new CallServerInterceptor(forWebSocket));
Interceptor.Chain chain = new RealInterceptorChain(interceptors, null, null, null, 0,
originalRequest, this, eventListener, client.connectTimeoutMillis(),
client.readTimeoutMillis(), client.writeTimeoutMillis());
return chain.proceed(originalRequest);
}
複製代碼
由getResponseWithInterceptorChain()方法咱們看到添加了不少Interceptor(攔截器),首先要了解每一個Interceptor的做用,也能大體瞭解OKHttp完成網絡請求的過程。json
在閱讀接下來源碼以前,咱們先要了解責任鏈模式。通俗化的講在責任鏈模式中有不少對象,這些對象能夠理解爲上面列出的攔截器,而每一個對象之間都經過一條鏈子鏈接,網絡請求在這條鏈子上傳遞,直到某一個對象處理了這個網絡請求,也就是完成了網絡請求。使用這個模式的好處就是無論你用多少攔截器處理什麼操做,最終都不會影響咱們的發出請求的目的,就是完成網絡請求,攔截過程你能夠任意添加分配責任。設計模式
接着繼續看Interceptor.Chain,他是Interceptor的內部接口,前面添加的每個攔截器都實現了Interceptor接口,而RealInterceptorChain是Interceptor.Chain接口的實現類。先看RealInterceptorChain的proceed方法源碼緩存
public Response proceed(Request request, StreamAllocation streamAllocation, HttpCodec httpCodec,
RealConnection connection) throws IOException {
if (index >= interceptors.size()) throw new AssertionError();
calls++;
......
// Call the next interceptor in the chain.
RealInterceptorChain next = new RealInterceptorChain(interceptors, streamAllocation, httpCodec,
connection, index + 1, request, call, eventListener, connectTimeout, readTimeout,
writeTimeout);
Interceptor interceptor = interceptors.get(index);
Response response = interceptor.intercept(next);
// Confirm that the next interceptor made its required call to chain.proceed().
if (httpCodec != null && index + 1 < interceptors.size() && next.calls != 1) {
throw new IllegalStateException("network interceptor " + interceptor
+ " must call proceed() exactly once");
}
.....
return response;
}
複製代碼
經過源碼能夠注意到interceptor.intercept(next),RetryAndFollowUpInterceptor做爲默認攔截器的第一個攔截器,也就是執行了它的intercept方法bash
前面說過RetryAndFollowUpInterceptor攔截器執行OKHttp網絡重試,先看它的intercept方法服務器
/**RetryAndFollowUpInterceptor的intercept方法 **/
@Override public Response intercept(Chain chain) throws IOException {
Request request = chain.request();
RealInterceptorChain realChain = (RealInterceptorChain) chain;
Call call = realChain.call();
EventListener eventListener = realChain.eventListener();
StreamAllocation streamAllocation = new StreamAllocation(client.connectionPool(),
createAddress(request.url()), call, eventListener, callStackTrace);
this.streamAllocation = streamAllocation;
int followUpCount = 0;
Response priorResponse = null;
while (true) {
if (canceled) {
streamAllocation.release();
throw new IOException("Canceled");
}
//將請求經過鏈子chain傳遞到下一個攔截器
Response response;
boolean releaseConnection = true;
try {
response = realChain.proceed(request, streamAllocation, null, null);
releaseConnection = false;
} catch (RouteException e) {
// 線路異常,鏈接失敗,檢查是否能夠從新鏈接
if (!recover(e.getLastConnectException(), streamAllocation, false, request)) {
throw e.getFirstConnectException();
}
releaseConnection = false;
continue;
} catch (IOException e) {
// An attempt to communicate with a server failed. The request may have been sent.
// IO異常,鏈接失敗,檢查是否能夠從新鏈接
boolean requestSendStarted = !(e instanceof ConnectionShutdownException);
if (!recover(e, streamAllocation, requestSendStarted, request)) throw e;
releaseConnection = false;
continue;
} finally {
// We're throwing an unchecked exception. Release any resources. 釋放資源 if (releaseConnection) { streamAllocation.streamFailed(null); streamAllocation.release(); } } // Attach the prior response if it exists. Such responses never have a body. if (priorResponse != null) { response = response.newBuilder() .priorResponse(priorResponse.newBuilder() .body(null) .build()) .build(); } Request followUp; try { //效驗狀態碼、身份驗證頭信息、跟蹤重定向或處理客戶端請求超時 followUp = followUpRequest(response, streamAllocation.route()); } catch (IOException e) { streamAllocation.release(); throw e; } if (followUp == null) { streamAllocation.release(); // 不須要重定向,正常返回結果 return response; } closeQuietly(response.body()); //超太重試次數 if (++followUpCount > MAX_FOLLOW_UPS) { streamAllocation.release(); throw new ProtocolException("Too many follow-up requests: " + followUpCount); } if (followUp.body() instanceof UnrepeatableRequestBody) { streamAllocation.release(); throw new HttpRetryException("Cannot retry streamed HTTP body", response.code()); } if (!sameConnection(response, followUp.url())) { streamAllocation.release(); streamAllocation = new StreamAllocation(client.connectionPool(), createAddress(followUp.url()), call, eventListener, callStackTrace); this.streamAllocation = streamAllocation; } else if (streamAllocation.codec() != null) { throw new IllegalStateException("Closing the body of " + response + " didn't close its backing stream. Bad interceptor?"); } request = followUp; priorResponse = response; } } 複製代碼
1.首先建立StreamAllocation對象(稍後分析),在一個死循環中將經過鏈子chain傳遞到下一個攔截器,若是捕獲異常,則判斷異常是否恢復鏈接,不能鏈接則拋出異常,跳出循環並是否建立的鏈接池資源
2.第一步沒有異常,還要返回值效驗狀態碼、頭部信息、是否須要重定向、鏈接超時等信息,捕獲異常則拋出並退出循環
3.若是若是重定向,循環超出RetryAndFollowUpInterceptor攔截器的最大重試次數,也拋出異常,退出循環
/**
* Bridges from application code to network code. First it builds a network request from a user
* request. Then it proceeds to call the network. Finally it builds a user response from the network
* response.
* BridgeInterceptor的intercept方法
*/
@Override public Response intercept(Chain chain) throws IOException {
Request userRequest = chain.request();
Request.Builder requestBuilder = userRequest.newBuilder();
RequestBody body = userRequest.body();
......
Response networkResponse = chain.proceed(requestBuilder.build());
......
}
複製代碼
/**
* 攔截器CacheInterceptor的intercept方法
*/
@Override public Response intercept(Chain chain) throws IOException {
Response cacheCandidate = cache != null
? cache.get(chain.request())
: null;
long now = System.currentTimeMillis();
//獲取策略,假設當前可使用網絡
CacheStrategy strategy = new CacheStrategy.Factory(now, chain.request(), cacheCandidate).get();
Request networkRequest = strategy.networkRequest;
Response cacheResponse = strategy.cacheResponse;
if (cache != null) {
cache.trackResponse(strategy);
}
if (cacheCandidate != null && cacheResponse == null) {
closeQuietly(cacheCandidate.body()); // The cache candidate wasn't applicable. Close it. } // If we're forbidden from using the network and the cache is insufficient, fail. 若是網絡被禁止使用而且沒有緩存,則請求失敗
if (networkRequest == null && cacheResponse == null) {
return new Response.Builder()
.request(chain.request())
.protocol(Protocol.HTTP_1_1)
.code(504)
.message("Unsatisfiable Request (only-if-cached)")
.body(Util.EMPTY_RESPONSE)
.sentRequestAtMillis(-1L)
.receivedResponseAtMillis(System.currentTimeMillis())
.build();
}
// If we don't need the network, we're done.若是有緩存,則返回響應緩存,請求完成
if (networkRequest == null) {
return cacheResponse.newBuilder()
.cacheResponse(stripBody(cacheResponse))
.build();
}
Response networkResponse = null;
try {
//沒有緩存,則進行網絡請求,執行下一個攔截器
networkResponse = chain.proceed(networkRequest);
} finally {
// If we're crashing on I/O or otherwise, don't leak the cache body.
if (networkResponse == null && cacheCandidate != null) {
closeQuietly(cacheCandidate.body());
}
}
// If we have a cache response too, then we're doing a conditional get. if (cacheResponse != null) { //狀態碼 304 if (networkResponse.code() == HTTP_NOT_MODIFIED) { Response response = cacheResponse.newBuilder() .headers(combine(cacheResponse.headers(), networkResponse.headers())) .sentRequestAtMillis(networkResponse.sentRequestAtMillis()) .receivedResponseAtMillis(networkResponse.receivedResponseAtMillis()) .cacheResponse(stripBody(cacheResponse)) .networkResponse(stripBody(networkResponse)) .build(); networkResponse.body().close(); // Update the cache after combining headers but before stripping the // Content-Encoding header (as performed by initContentStream()). cache.trackConditionalCacheHit(); cache.update(cacheResponse, response); return response; } else { closeQuietly(cacheResponse.body()); } } Response response = networkResponse.newBuilder() .cacheResponse(stripBody(cacheResponse)) .networkResponse(stripBody(networkResponse)) .build(); if (cache != null) { if (HttpHeaders.hasBody(response) && CacheStrategy.isCacheable(response, networkRequest)) { // Offer this request to the cache. //保存緩存 CacheRequest cacheRequest = cache.put(response); return cacheWritingResponse(cacheRequest, response); } if (HttpMethod.invalidatesCache(networkRequest.method())) { try { cache.remove(networkRequest); } catch (IOException ignored) { // The cache cannot be written. } } } return response; } 複製代碼
先看看intercept方法的大體邏輯
緩存的場景也符合設計模式中的策略模式,須要CacheStrategy提供策略在不一樣場景下讀緩存仍是請求網絡。
瞭解了緩存邏輯,繼續深刻了解OKHttp的緩存是如何作的。首先咱們應該回到最初的緩存攔截器設置代碼
/**RealCall 設置緩存攔截器*/
interceptors.add(new CacheInterceptor(client.internalCache()));
/**OkHttpClient 設置緩存*/
Cache cache;
@Override public void setCache(OkHttpClient.Builder builder, InternalCache internalCache) {
builder.setInternalCache(internalCache);
}
void setInternalCache(@Nullable InternalCache internalCache) {
this.internalCache = internalCache;
this.cache = null;
}
InternalCache internalCache() {
return cache != null ? cache.internalCache : internalCache;
}
/**Cache類中 內部持有 InternalCache */
final DiskLruCache cache;
final InternalCache internalCache = new InternalCache() {
@Override public Response get(Request request) throws IOException {
return Cache.this.get(request);
}
@Override public CacheRequest put(Response response) throws IOException {
return Cache.this.put(response);
}
@Override public void remove(Request request) throws IOException {
Cache.this.remove(request);
}
@Override public void update(Response cached, Response network) {
Cache.this.update(cached, network);
}
@Override public void trackConditionalCacheHit() {
Cache.this.trackConditionalCacheHit();
}
@Override public void trackResponse(CacheStrategy cacheStrategy) {
Cache.this.trackResponse(cacheStrategy);
}
};
複製代碼
上面咱們分別截取了 RealCall類、OkHttpClient類和Cache類的源碼,能夠了解到攔截器使用的緩存類是DiskLruCache,設置緩存緩存只能經過OkHttpClient的builder來設置,緩存操做實現是在Cache類中,可是Cache沒有實現InternalCache接口,而是持有InternalCache接口的內部類對象來實現緩存的操做方法,這樣就使得緩存的操做實現只在Cache內部,外部用戶是沒法實現緩存操做的,方便框架內部使用,接口擴展也不影響外部。
根據前面的分析,緩存攔截器中也會調用chain.proceed方法,因此這時候執行到了第四個默認攔截器ConnectInterceptor,接着看它的intercept方法
/**
* 攔截器ConnectInterceptor的intercept方法
*/
@Override public Response intercept(Chain chain) throws IOException {
RealInterceptorChain realChain = (RealInterceptorChain) chain;
Request request = realChain.request();
StreamAllocation streamAllocation = realChain.streamAllocation();
// We need the network to satisfy this request. Possibly for validating a conditional GET.
boolean doExtensiveHealthChecks = !request.method().equals("GET");
//打開鏈接
HttpCodec httpCodec = streamAllocation.newStream(client, chain, doExtensiveHealthChecks);
RealConnection connection = streamAllocation.connection();
//交由下一個攔截器處理
return realChain.proceed(request, streamAllocation, httpCodec, connection);
}
複製代碼
public Builder() {
.......
connectionPool = new ConnectionPool();
.......
}
複製代碼
public HttpCodec newStream(
OkHttpClient client, Interceptor.Chain chain, boolean doExtensiveHealthChecks) {
........
try {
RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout,
writeTimeout, pingIntervalMillis, connectionRetryEnabled, doExtensiveHealthChecks);
.......
}
} catch (IOException e) {
throw new RouteException(e);
}
}
/**
* Finds a connection and returns it if it is healthy. If it is unhealthy the process is repeated
* until a healthy connection is found.
*/
private RealConnection findHealthyConnection(int connectTimeout, int readTimeout,
int writeTimeout, int pingIntervalMillis, boolean connectionRetryEnabled,
boolean doExtensiveHealthChecks) throws IOException {
while (true) {
RealConnection candidate = findConnection(connectTimeout, readTimeout, writeTimeout,
pingIntervalMillis, connectionRetryEnabled);
........
return candidate;
}
}
/**
* Returns a connection to host a new stream. This prefers the existing connection if it exists,
* then the pool, finally building a new connection.
*/
private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout,
int pingIntervalMillis, boolean connectionRetryEnabled) throws IOException {
............
//
if (result == null) {
// Attempt to get a connection from the pool.
Internal.instance.get(connectionPool, address, this, null);
if (connection != null) {
//鏈接複用
foundPooledConnection = true;
result = connection;
} else {
selectedRoute = route;
}
}
..........
if (!foundPooledConnection) {
........
result = new RealConnection(connectionPool, selectedRoute);
//記錄每一個鏈接的引用,每一個調用必須與同一鏈接上的調用配對。
acquire(result, false);
}
}
.........
synchronized (connectionPool) {
.......
// Pool the connection. 將鏈接放入鏈接池
Internal.instance.put(connectionPool, result);
......
}
}
.......
return result;
}
複製代碼
根據上面的源碼,咱們能夠知道findHealthyConnection在循環找健康的鏈接,直到找到鏈接,說明findConnection方法是尋找鏈接的核心方法,該方法中存在能夠複用的鏈接則複用,不然建立新的鏈接,而且記錄鏈接引用,咱們能夠明白StreamAllocation主要是爲攔截器提供一個鏈接, 若是鏈接池中有複用的鏈接則複用鏈接, 若是沒有則建立新的鏈接。
明白StreamAllocation是如何建立和複用鏈接池,咱們還要明白鏈接池(ConnectionPool)的是如何實現的。
理解ConnectionPool以前,咱們須要明白TCP鏈接的知識,Tcp創建鏈接三次握手和斷開鏈接四次握手過程是須要消耗時間的,在http/1.0每一次請求只能打開一次鏈接,而在http/1.1是支持持續鏈接(persistent connection),使得一次鏈接打開以後會保持一段時間,若是仍是同一個請求而且使同一個服務器則在這段時間內繼續請求鏈接是能夠複用的。而ConnectionPool也實現了這個機制,在它內部持有一個線程池和一個緩存鏈接的雙向列表,鏈接中最多隻能存在5個空閒鏈接,空閒鏈接最多隻能存活5分鐘,空閒鏈接到期以後定時清理。
public final class ConnectionPool {
/**
* Background threads are used to cleanup expired connections. There will be at most a single
* thread running per connection pool. The thread pool executor permits the pool itself to be
* garbage collected.
*/
//線程池
private static final Executor executor = new ThreadPoolExecutor(0 /* corePoolSize */,
Integer.MAX_VALUE /* maximumPoolSize */, 60L /* keepAliveTime */, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp ConnectionPool", true));
/** The maximum number of idle connections for each address. */
private final int maxIdleConnections;
private final long keepAliveDurationNs;
private final Runnable cleanupRunnable = new Runnable() {
@Override public void run() {
// 後臺按期清理鏈接的線程
while (true) {
long waitNanos = cleanup(System.nanoTime());
if (waitNanos == -1) return;
if (waitNanos > 0) {
long waitMillis = waitNanos / 1000000L;
waitNanos -= (waitMillis * 1000000L);
synchronized (ConnectionPool.this) {
try {
ConnectionPool.this.wait(waitMillis, (int) waitNanos);
} catch (InterruptedException ignored) {
}
}
}
}
}
};
//緩存鏈接的雙向隊列
private final Deque<RealConnection> connections = new ArrayDeque<>();
............
/**
* Create a new connection pool with tuning parameters appropriate for a single-user application.
* The tuning parameters in this pool are subject to change in future OkHttp releases. Currently
* this pool holds up to 5 idle connections which will be evicted after 5 minutes of inactivity.
*/
public ConnectionPool() {
this(5, 5, TimeUnit.MINUTES);
}
............
}
複製代碼
/**
RealConnection類newCodec方法
*/
public HttpCodec newCodec(OkHttpClient client, Interceptor.Chain chain,
StreamAllocation streamAllocation) throws SocketException {
if (http2Connection != null) {
return new Http2Codec(client, chain, streamAllocation, http2Connection);
} else {
socket.setSoTimeout(chain.readTimeoutMillis());
source.timeout().timeout(chain.readTimeoutMillis(), MILLISECONDS);
sink.timeout().timeout(chain.writeTimeoutMillis(), MILLISECONDS);
return new Http1Codec(client, streamAllocation, source, sink);
}
}
複製代碼
/** Does all the work necessary to build a full HTTP or HTTPS connection on a raw socket.
RealConnection類connectSocket方法
*/
private void connectSocket(int connectTimeout, int readTimeout, Call call,
EventListener eventListener) throws IOException {
Proxy proxy = route.proxy();
Address address = route.address();
rawSocket = proxy.type() == Proxy.Type.DIRECT || proxy.type() == Proxy.Type.HTTP
? address.socketFactory().createSocket()
: new Socket(proxy);
eventListener.connectStart(call, route.socketAddress(), proxy);
rawSocket.setSoTimeout(readTimeout);
try {
//打開 socket 鏈接
Platform.get().connectSocket(rawSocket, route.socketAddress(), connectTimeout);
} catch (ConnectException e) {
ConnectException ce = new ConnectException("Failed to connect to " + route.socketAddress());
ce.initCause(e);
throw ce;
}
// The following try/catch block is a pseudo hacky way to get around a crash on Android 7.0
// More details:
// https://github.com/square/okhttp/issues/3245
// https://android-review.googlesource.com/#/c/271775/
try {
//使用OKio來對數據讀寫
source = Okio.buffer(Okio.source(rawSocket));
sink = Okio.buffer(Okio.sink(rawSocket));
} catch (NullPointerException npe) {
if (NPE_THROW_WITH_NULL.equals(npe.getMessage())) {
throw new IOException(npe);
}
}
}
複製代碼
/**CallServerInterceptor的intercept方法*/
@Override public Response intercept(Chain chain) throws IOException {
RealInterceptorChain realChain = (RealInterceptorChain) chain;
HttpCodec httpCodec = realChain.httpStream();
StreamAllocation streamAllocation = realChain.streamAllocation();
RealConnection connection = (RealConnection) realChain.connection();
Request request = realChain.request();
long sentRequestMillis = System.currentTimeMillis();
realChain.eventListener().requestHeadersStart(realChain.call());
//按照HTTP協議,依次寫入請求體
httpCodec.writeRequestHeaders(request);
.................
if (responseBuilder == null) {
realChain.eventListener().responseHeadersStart(realChain.call());
//
responseBuilder = httpCodec.readResponseHeaders(false);
}
Response response = responseBuilder
.request(request)
.handshake(streamAllocation.connection().handshake())
.sentRequestAtMillis(sentRequestMillis)
.receivedResponseAtMillis(System.currentTimeMillis())
.build();
...............
if (forWebSocket && code == 101) {
// Connection is upgrading, but we need to ensure interceptors see a non-null response body.
response = response.newBuilder()
.body(Util.EMPTY_RESPONSE)
.build();
} else {
//響應數據OKio寫入
response = response.newBuilder()
.body(httpCodec.openResponseBody(response))
.build();
}
}
return response;
}
/**Http1Codec方法**/
//OKio 讀寫對象
final BufferedSource source;
final BufferedSink sink;
@Override public void writeRequestHeaders(Request request) throws IOException {
//構造好請求頭
String requestLine = RequestLine.get(
request, streamAllocation.connection().route().proxy().type());
writeRequest(request.headers(), requestLine);
}
/** Returns bytes of a request header for sending on an HTTP transport.
將請求信息寫入sink
*/
public void writeRequest(Headers headers, String requestLine) throws IOException {
if (state != STATE_IDLE) throw new IllegalStateException("state: " + state);
sink.writeUtf8(requestLine).writeUtf8("\r\n");
for (int i = 0, size = headers.size(); i < size; i++) {
sink.writeUtf8(headers.name(i))
.writeUtf8(": ")
.writeUtf8(headers.value(i))
.writeUtf8("\r\n");
}
sink.writeUtf8("\r\n");
state = STATE_OPEN_REQUEST_BODY;
}
複製代碼
/**Http1Codec方法**/
/**
讀取響應頭信息
*/
@Override public Response.Builder readResponseHeaders(boolean expectContinue) throws IOException {
if (state != STATE_OPEN_REQUEST_BODY && state != STATE_READ_RESPONSE_HEADERS) {
throw new IllegalStateException("state: " + state);
}
try {
StatusLine statusLine = StatusLine.parse(readHeaderLine());
Response.Builder responseBuilder = new Response.Builder()
.protocol(statusLine.protocol)
.code(statusLine.code)
.message(statusLine.message)
.headers(readHeaders());
if (expectContinue && statusLine.code == HTTP_CONTINUE) {
return null;
} else if (statusLine.code == HTTP_CONTINUE) {
state = STATE_READ_RESPONSE_HEADERS;
return responseBuilder;
}
state = STATE_OPEN_RESPONSE_BODY;
return responseBuilder;
} catch (EOFException e) {
// Provide more context if the server ends the stream before sending a response.
IOException exception = new IOException("unexpected end of stream on " + streamAllocation);
exception.initCause(e);
throw exception;
}
}
/**
寫入響應輸入到ResponseBody
*/
@Override public ResponseBody openResponseBody(Response response) throws IOException {
streamAllocation.eventListener.responseBodyStart(streamAllocation.call);
String contentType = response.header("Content-Type");
if (!HttpHeaders.hasBody(response)) {
Source source = newFixedLengthSource(0);
return new RealResponseBody(contentType, 0, Okio.buffer(source));
}
if ("chunked".equalsIgnoreCase(response.header("Transfer-Encoding"))) {
Source source = newChunkedSource(response.request().url());
return new RealResponseBody(contentType, -1L, Okio.buffer(source));
}
long contentLength = HttpHeaders.contentLength(response);
if (contentLength != -1) {
Source source = newFixedLengthSource(contentLength);
return new RealResponseBody(contentType, contentLength, Okio.buffer(source));
}
return new RealResponseBody(contentType, -1L, Okio.buffer(newUnknownLengthSource()));
}
複製代碼
最後,經過OKHttp這個框架源碼閱讀,也是對本身的一個提高,不只瞭解了框架原理,設計模式在適宜場景的運用,同時也是對本身耐心的一次考驗,源碼的閱讀是枯燥的,可是隻要靜下心來,也能發現閱讀源碼的樂趣。因爲本人水平有限,文章中若是有錯誤,請你們給我提出來,你們一塊兒學習進步,若是以爲個人文章給予你幫助,也請給我一個喜歡和關注。
參考連接:
參考書籍: