自上一篇自定義控件的完結,至今已經有一個月的時間,一直沒有什麼想寫的,所以回到一開始寫的初衷,看一些主流的開源框架的源碼,深刻的瞭解一下其原理,而不是隻知其然,而不知其因此然。本篇是該系列第一篇——OkHttp3(源碼以3.10版爲準)。java
// 經過建造者模式構建OkHttpClient
OkHttpClient OK_HTTP_CLIENT = new OkHttpClient.Builder()
.addInterceptor(loggingInterceptor)
.connectTimeout(60, TimeUnit.SECONDS)
// 設置緩存 :參數1:緩存路徑(/storage/emulated/0/Android/data/xxx包名/cache) 參數2:最大緩存值(100MB)
//.cache(new Cache(new File(getExternalCacheDir()), 100 * 1024 * 1024))
.readTimeout(60, TimeUnit.SECONDS)
.writeTimeout(60, TimeUnit.SECONDS)
.build();
// 建立請求的Request 對象
Request request = builder
.url(mUrl)
.build();
// 在Okhttp中建立Call 對象,將request和Client進行綁定
Call call = OK_HTTP_CLIENT.newCall(request);
// 同步執行
Response response = call.execute();
// 異步執行
call.enqueue(new Callback() {
@Override
public void onFailure(Call call, IOException e) {
LoggerUtil.d("onFailure : "+e.getMessage());
}
@Override
public void onResponse(Call call, Response response) {
responseProcess(response);
}
});
// 注意:
@Override public Call newCall(Request request) {
return RealCall.newRealCall(this, request, false /* for web socket */);
}
複製代碼
總結(OkHttp請求):git
注:github
1.在使用Builder()構建OkHttpClient時會初始化一個很重要的類Dispatcher(分發器類)
,其做用:會接受咱們的同步或者異步的Request隊列,根據不一樣的條件進行任務的分發。
2.OK_HTTP_CLIENT.newCall(request),實際上返回的是RealCall,所以同步/異步請求都是由RealCall發出的
複製代碼
OkHttp3同步/異步請求大致框架流程: web
從上一節中咱們能瞭解到同步請求執行的是execute()方法,而且都是由RealCall發出的請求算法
// RealCall類:
@Override
public Response execute() throws IOException {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
// 捕獲調用堆棧跟蹤(本文不是重點)
captureCallStackTrace();
eventListener.callStart(this);
try {
// 調用分發器入隊
client.dispatcher().executed(this);
// OkHttp精髓之一 經過攔截器鏈得到響應(具體後續單獨講解)
Response result = getResponseWithInterceptorChain();
if (result == null) throw new IOException("Canceled");
return result;
} catch (IOException e) {
eventListener.callFailed(this, e);
throw e;
} finally {
// 調用分發器出隊
client.dispatcher().finished(this);
}
}
複製代碼
由源碼分能夠看出對於同步請求來講,dispatcher只是簡單的入隊和出隊操做,其他都是經過攔截器鏈來處理獲取響應信息。緩存
異步調用是由RealCall類的enqueue方法發出bash
// RealCall類:
@Override
public void enqueue(Callback responseCallback) {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
captureCallStackTrace();
eventListener.callStart(this);
// 建立異步回調AsyncCall,而且將AsyncCall入隊操做
client.dispatcher().enqueue(new AsyncCall(responseCallback));
}
// Dispatcher類:
private int maxRequests = 64; // 最大請求個數數
private int maxRequestsPerHost = 5; // 每一個主機的最大請求數,此請求爲正在進行網絡請求
// 執行異步任務的線程池
private @Nullable ExecutorService executorService;
/**
* 準備異步調用的隊列
*/
private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();
/**
* 正在運行的異步調用隊列。包括還沒有完成的已取消通話。
*/
private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();
/**
* 正在運行的同步調用。包括還沒有完成的已取消通話。
*/
private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();
synchronized void enqueue(AsyncCall call) {
// 正在運行的異步隊列個數 < 64 , 與共享主機的正在運行的呼叫數 < 5
if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
// 添加到正在運行的異步隊列
runningAsyncCalls.add(call);
// 啓動線程池執行異步任務
executorService().execute(call);
} else {
// 添加到準備異步調用的隊列
readyAsyncCalls.add(call);
}
}
複製代碼
從上面源碼中能夠看出來,異步請求有兩個不一樣的隊列,一個是正在運行的請求隊列一個是準備異步調用的隊列。二者根據正在呼叫的個數以及正在運行的異步隊列的個數分別入隊。而正在運行的異步隊列在入隊的同時經過線程池執行了其異步任務。服務器
首先咱們先來看一下其線程池的初始化:cookie
// 相似於單例模式的獲取方式
public synchronized ExecutorService executorService() {
if (executorService == null) {
/*
* corePoolSize:線程池核心線程數 0
* maximumPoolSize:線程池最大數 int 類整數的最大值是 2 的 31 次方
* keepAliveTime:空閒線程存活時間 60s
* unit:時間單位 秒
* workQueue:線程池所使用的緩衝隊列
* threadFactory:線程池建立線程使用的工廠
* handler:線程池對拒絕任務的處理策略
* */
executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60,
TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher",false));
}
return executorService;
}
複製代碼
該線程池核心線程數爲0,線程池最大線程爲整數最大值。網絡
問:若咱們的網絡請求很是多時,多達Integer.MAX_VALUE,這個線程池性能消耗是否特別大? 答:實際上是不會的,由於OkHttp中的runningAsyncCalls隊列最大爲64,所以也限制了OkHttp的請求不會超過64,也就是就算咱們設置了Integer.MAX_VALUE,對咱們的性能也不會有影響。
其次,咱們executorService線程池裏執行的爲AsyncCall,咱們來看一看AsyncCall:
// 繼承自Runnable
public abstract class NamedRunnable implements Runnable {
protected final String name;
public NamedRunnable(String format, Object... args) {
this.name = Util.format(format, args);
}
@Override public final void run() {
String oldName = Thread.currentThread().getName();
Thread.currentThread().setName(name);
try {
// 實際上就是將run()方法的執行交給了execute()方法,進行了一層包裝
execute();
} finally {
Thread.currentThread().setName(oldName);
}
}
protected abstract void execute();
}
// 其繼承自NamedRunnable,所以此Runnable真正執行的代碼在 execute()方法中
final class AsyncCall extends NamedRunnable {
private final Callback responseCallback;
AsyncCall(Callback responseCallback) {
super("OkHttp %s", redactedUrl());
this.responseCallback = responseCallback;
}
String host() {
return originalRequest.url().host();
}
Request request() {
return originalRequest;
}
RealCall get() {
return RealCall.this;
}
@Override
protected void execute() {
boolean signalledCallback = false;
try {
// 經過攔截器鏈得到響應,具體後續詳細講解
Response response = getResponseWithInterceptorChain();
if (retryAndFollowUpInterceptor.isCanceled()) {
signalledCallback = true;
// 失敗回調
responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
} else {
signalledCallback = true;
// 成功回調
responseCallback.onResponse(RealCall.this, response);
}
} catch (IOException e) {
if (signalledCallback) {
// Do not signal the callback twice!
Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
} else {
eventListener.callFailed(RealCall.this, e);
// 失敗回調
responseCallback.onFailure(RealCall.this, e);
}
} finally {
// 異步任務執行結束
client.dispatcher().finished(this);
}
}
}
複製代碼
經過源碼能夠看出,最後調用了AsyncCall的execute()來發起請求,並在execute()方法中執行了咱們上面看到的,一樣在同步請求中執行的getResponseWithInterceptorChain()方法經過攔截器鏈來獲取響應。
咱們再來看一下同步/異步請求結束後的finished:
// 異步請求finished
void finished(AsyncCall call) {
// 注意:參數3 true
finished(runningAsyncCalls, call, true);
}
// 同步請求finished
void finished(RealCall call) {
// 注意:參數3 false
finished(runningSyncCalls, call, false);
}
private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
int runningCallsCount;
Runnable idleCallback;
synchronized (this) {
// 從正在運行的同步/異步隊列中移除任務,若是隊列中沒有則拋出異常
if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
// 同步跳過這一步,一步則執行這一步
if (promoteCalls) promoteCalls();
runningCallsCount = runningCallsCount();
idleCallback = this.idleCallback;
}
if (runningCallsCount == 0 && idleCallback != null) {
idleCallback.run();
}
}
// 異步執行
private void promoteCalls() {
// 已經運行最大容量,則返回
if (runningAsyncCalls.size() >= maxRequests) return;
// 沒有準備執行的異步任務則返回
if (readyAsyncCalls.isEmpty()) return;
// 遍歷準備執行的異步請求隊列
for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
AsyncCall call = i.next(); // 取出下一個異步任務
// 若是與共享主機的正在運行的呼叫數 < 5
if (runningCallsForHost(call) < maxRequestsPerHost) {
i.remove(); // 移除
// 添加進正在運行的異步隊列
runningAsyncCalls.add(call);
// 立馬在線程池中執行此異步請求
executorService().execute(call);
}
if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.
}
}
複製代碼
咱們能夠看出runningAsyncCalls和readyAsyncCalls隊列,是經過方法promoteCalls()來將等待執行的任務(readyAsyncCalls中的元素)添加進runningAsyncCalls隊列並執行。
至此,同步異步請求答題流程已經走完,接下來看一下OkHTTP設計之妙——攔截器。
// RealCall類:
Response getResponseWithInterceptorChain() throws IOException {
// Build a full stack of interceptors.
List<Interceptor> interceptors = new ArrayList<>();
// 咱們本身添加的攔截器(ApplicationInterceptor(應用攔截器))
interceptors.addAll(client.interceptors());
// 請求重定向攔截器:失敗重連等
interceptors.add(retryAndFollowUpInterceptor);
// 橋接攔截器
interceptors.add(new BridgeInterceptor(client.cookieJar()));
// 緩存攔截器
interceptors.add(new CacheInterceptor(client.internalCache()));
// 連接攔截器
interceptors.add(new ConnectInterceptor(client));
if (!forWebSocket) {
// NetworkInterceptor(網絡攔截器)
interceptors.addAll(client.networkInterceptors());
}
// 真正調用網絡請求的攔截器
interceptors.add(new CallServerInterceptor(forWebSocket));
// 攔截器鏈,注意:第5個參數 index == 0
Interceptor.Chain chain = new RealInterceptorChain(interceptors, null, null, null, 0,originalRequest, this, eventListener, client.connectTimeoutMillis(),
client.readTimeoutMillis(), client.writeTimeoutMillis());
return chain.proceed(originalRequest);
}
複製代碼
從getResponseWithInterceptorChain()方法的源碼中能夠看出,攔截器分爲應用攔截器、網絡攔截器,這兩類均爲咱們本身構建OkhttpClient時添加的。不過咱們本文的重點並非這兩類攔截器,而是OkHttp自己的5個攔截器,而這5個攔截器也是整個OkHtp的精華之一。
咱們能夠看出,源碼中將全部攔截器都add進List集合中,並看成參數傳入RealInterceptorChain,即攔截器鏈中,而後調用proceed方法,那咱們來看一下這些攔截器是如何串聯起來的:
// RealInterceptorChain類:
@Override
public Response proceed(Request request) throws IOException {
return proceed(request, streamAllocation, httpCodec, connection);
}
public Response proceed(Request request, StreamAllocation streamAllocation
, HttpCodec httpCodec,RealConnection connection) throws IOException {
...
// 調用鏈中的下一個攔截器。注意:第5個參數 index = index + 1
RealInterceptorChain next = new RealInterceptorChain(interceptors, streamAllocation,httpCodec,
connection, index + 1, request, call, eventListener, connectTimeout, readTimeout,writeTimeout);
// 從getResponseWithInterceptorChain()中咱們知道index初始化爲0
// 獲取當前位置攔截器
Interceptor interceptor = interceptors.get(index);
// 執行當前位置攔截器,並把下一個位置的攔截器鏈傳入
Response response = interceptor.intercept(next);
...
return response;
}
// RetryAndFollowUpInterceptor類:
@Override
public Response intercept(Chain chain) throws IOException {
Request request = chain.request();
RealInterceptorChain realChain = (RealInterceptorChain) chain;
Call call = realChain.call();
EventListener eventListener = realChain.eventListener();
// 初始化分配流對象:OkHtpp請求的各類組件的封裝類
StreamAllocation streamAllocation = new StreamAllocation(client.connectionPool(),
createAddress(request.url()), call, eventListener, callStackTrace);
this.streamAllocation = streamAllocation;
int followUpCount = 0;
Response priorResponse = null;
while (true) {
...
Response response;
boolean releaseConnection = true;
try {
// 執行攔截器鏈的 proceed 方法
response = realChain.proceed(request, streamAllocation, null, null);
releaseConnection = false;
}
......
}
}
複製代碼
從這段代碼兩個類的兩部分中能夠看出各個攔截器是由攔截器鏈串聯起來的,上述代碼中以RetryAndFollowUpInterceptor攔截器爲例,由攔截器鏈的方法proceed開始,按照順序調用各個攔截器,而且每一個攔截器中都會繼續調用下一個攔截器鏈對象的proceed,從而將全部攔截器串聯起來,最終通過全部攔截器後獲取到響應信息。 請求流程圖以下:
借鑑一張感受比較完整的的:
@Override
public Response intercept(Chain chain) throws IOException {
// 獲取咱們構建的請求
Request request = chain.request();
// 1. 初始化一個socket鏈接分配流對象
streamAllocation = new StreamAllocation(
client.connectionPool(), createAddress(request.url()), callStackTrace);
// 計數器
int followUpCount = 0;
Response priorResponse = null;
// 開啓死循環,用於執行第一個攔截器或者請求的失敗重連
while (true) {
// 若是請求已經被取消了,釋放鏈接池的資源
if (canceled) {
streamAllocation.release();
throw new IOException("Canceled");
}
Response response = null;
boolean releaseConnection = true;
try {
// 2. 執行下一個攔截器,即BridgeInterceptor
response = ((RealInterceptorChain) chain).proceed(request, streamAllocation, null, null);
// 先不釋放連接,由於可能要複用
releaseConnection = false;
} catch (RouteException e) { // 鏈接地址失敗的異常
/**
* 3. 若是有異常,判斷是否要恢復
* 不在繼續鏈接的狀況:
* 1. 應用層配置不在鏈接,默認爲true
* 2. 請求Request出錯不能繼續使用
* 3. 是否能夠恢復的
* 3.一、協議錯誤(ProtocolException)
* 3.二、中斷異常(InterruptedIOException)
* 3.三、SSL握手錯誤(SSLHandshakeException && CertificateException)
* 3.四、certificate pinning錯誤(SSLPeerUnverifiedException)
* 4. 沒用更多線路可供選擇
*/
if (!recover(e.getLastConnectException(), false, request)) {
throw e.getLastConnectException();
}
releaseConnection = false;
continue;
} catch (IOException e) {
// 判斷網絡請求是否已經開始
boolean requestSendStarted = !(e instanceof ConnectionShutdownException);
// 判斷是否可以恢復,也就是是否要重試
if (!recover(e, requestSendStarted, request)) throw e;
releaseConnection = false;
continue;
} finally {
// 釋放鏈接
if (releaseConnection) {
streamAllocation.streamFailed(null);
streamAllocation.release();
}
}
// priorResponse若是存在。則構建
if (priorResponse != null) {
response = response.newBuilder()
.priorResponse(priorResponse.newBuilder()
.body(null)
.build())
.build();
}
/**
* 4. 根據返回結果response,來檢查是否須要進行重定向操做
* 或者是否須要繼續完善請求,例如證書驗證等等
* 是否須要進行請求重定向,是根據http請求的響應碼來決定的,
* 所以,在followUpRequest方法中,將會根據響應userResponse,獲取到響應碼,
* 並從鏈接池StreamAllocation中獲取鏈接,而後根據當前鏈接,獲得路由配置參數Route。
*
* followUpCount是用來記錄咱們發起網絡請求的次數的
* 爲何咱們發起一個網絡請求,可能okhttp會發起屢次呢?
* 例如:https的證書驗證,咱們須要通過:發起 -> 驗證 ->響應,
* 三個步驟須要發起至少兩次的請求,或者咱們的網絡請求被重定向,
* 在咱們第一次請求獲得了新的地址後,再向新的地址發起網絡請求。
* */
Request followUp = followUpRequest(response);
if (followUp == null) {
if (!forWebSocket) {
streamAllocation.release();
}
// 返回結果
return response;
}
// 5. 不須要重定向,關閉響應流
closeQuietly(response.body());
// 6. 重定向或者失敗重連,是否超過最大限制 MAX_FOLLOW_UPS == 20
if (++followUpCount > MAX_FOLLOW_UPS) {
streamAllocation.release();
throw new ProtocolException("Too many follow-up requests: " + followUpCount);
}
// 若是body內容只能發送一次,釋放鏈接
if (followUp.body() instanceof UnrepeatableRequestBody) {
streamAllocation.release();
throw new HttpRetryException("Cannot retry streamed HTTP body", response.code());
}
// 7. 檢查重定向(失敗重連)請求,和當前的請求,是否爲同一個鏈接
if (!sameConnection(response, followUp.url())) {
// 釋放以前你的url地址鏈接
streamAllocation.release();
// 建立新的網絡請求封裝對象StreamAllocation
streamAllocation = new StreamAllocation(
client.connectionPool(), createAddress(followUp.url()), callStackTrace);
} else if (streamAllocation.codec() != null) {
throw new IllegalStateException("Closing the body of " + response
+ " didn't close its backing stream. Bad interceptor?");
}
// 更新下一次的網絡請求對象
request = followUp;
// 保存上一次的請求結果
priorResponse = response;
}
}
複製代碼
具體講解大部分都在代碼中說明,簡單來講明一下此攔截器的做用:
1. 初始化一個鏈接流對象
2. 調用下一個攔截器
3. 根據異常結果或者響應結果判斷是否須要從新請求
複製代碼
@Override
public Response intercept(Chain chain) throws IOException {
Request userRequest = chain.request();
Request.Builder requestBuilder = userRequest.newBuilder();
// 構建能夠用於發送網絡請求的Request
// ------------------主要構建完整的請求頭 start------------------------
RequestBody body = userRequest.body();
if (body != null) {
MediaType contentType = body.contentType();
if (contentType != null) {
requestBuilder.header("Content-Type", contentType.toString());
}
long contentLength = body.contentLength();
if (contentLength != -1) {
requestBuilder.header("Content-Length", Long.toString(contentLength));
requestBuilder.removeHeader("Transfer-Encoding");
} else {
requestBuilder.header("Transfer-Encoding", "chunked");
requestBuilder.removeHeader("Content-Length");
}
}
if (userRequest.header("Host") == null) {
requestBuilder.header("Host", hostHeader(userRequest.url(), false));
}
if (userRequest.header("Connection") == null) {
// 開啓TCP鏈接後不會立馬關閉鏈接,而是存活一段時間
requestBuilder.header("Connection", "Keep-Alive");
}
// 若是咱們沒有指定編碼的格式,默認使用gzip
boolean transparentGzip = false;
if (userRequest.header("Accept-Encoding") == null && userRequest.header("Range") == null) {
transparentGzip = true;
requestBuilder.header("Accept-Encoding", "gzip");
}
// 把以前的cookie存在header裏
List<Cookie> cookies = cookieJar.loadForRequest(userRequest.url());
if (!cookies.isEmpty()) {
requestBuilder.header("Cookie", cookieHeader(cookies));
}
if (userRequest.header("User-Agent") == null) {
requestBuilder.header("User-Agent", Version.userAgent());
}
// ------------------主要構建完整的請求頭 end------------------------
// 調用下一個攔截器
Response networkResponse = chain.proceed(requestBuilder.build());
// 響應頭, 若是沒有自定義配置cookieJar == null,則什麼都不作,有則保存新的cookie
// 將服務器返回來的Response轉化爲開發者使用的Response(相似於解壓的過程)
HttpHeaders.receiveHeaders(cookieJar, userRequest.url(), networkResponse.headers());
// 構建Response
Response.Builder responseBuilder = networkResponse.newBuilder()
.request(userRequest);
/**
* 是否轉換爲解壓Response
* 條件:
* 1.判斷服務器是否支持gzip壓縮格式
* 2.判斷服務器響應是否使用gzip壓縮
* 3.是否有響應體
*/
if (transparentGzip
&& "gzip".equalsIgnoreCase(networkResponse.header("Content-Encoding"))
&& HttpHeaders.hasBody(networkResponse)) {
// 轉換成解壓數據源
GzipSource responseBody = new GzipSource(networkResponse.body().source());
Headers strippedHeaders = networkResponse.headers().newBuilder()
.removeAll("Content-Encoding")
.removeAll("Content-Length")
.build();
responseBuilder.headers(strippedHeaders);
String contentType = networkResponse.header("Content-Type");
responseBuilder.body(new RealResponseBody(contentType, -1L, Okio.buffer(responseBody)));
}
return responseBuilder.build();
}
複製代碼
具體講解大部分都在代碼中說明,簡單來講明一下此攔截器的做用:
1. 負責將用戶構建的一個Request請求轉換爲可以進行網絡訪問的請求
2. 將這個符合網絡請求的Resquest進行網絡請求(即調用下一個攔截器)
3. 將網絡請求回來的響應Response轉化爲用戶可用的Response(解壓)
複製代碼
此攔截器是用來緩存請求Request和響應Response數據的攔截器,此攔截器起做用須要用戶調用 new OkHttpClient.Builder().cache(new Cache(new File(getExternalCacheDir()), 100 * 1024 * 1024)) 來設置緩存路徑和緩存的大小。
在看攔截器的源碼以前咱們先來了解幾個概念:
DiskLruCache(此算法和OkHttp(大概是重寫了部分)有些許不一樣,原理一致)
DiskLruCache是JakeWharton大神的傑做,它採用的是LRU算法,經過LRU算法對緩存進行管理,以最近最少使用做爲管理的依據,刪除最近最少使用的數據,保留最近最經常使用的數據。
DiskLruCache主要知識點:
1. 簡單使用
2. journal(日誌)文件的生成
3. journal的介紹
4. 寫入緩存
5. 讀取緩存
6. 刪除緩存
7.其它API
複製代碼
一. 簡單使用
// demo例子:
File directory = getExternalCacheDir();
int appVersion = 1;
int valueCount = 1;
long maxSize = 10 * 1024;
/*
* 參數說明:
* File directory:緩存目錄。
* int appVersion:應用版本號。
* int valueCount:一個key對應的緩存文件的數目
* ,若是咱們傳入的參數大於1,那麼緩存文件後綴就是 .0 , .1等。
* long maxSize:緩存容量上限。
*/
DiskLruCache diskLruCache = DiskLruCache.open(directory, appVersion, valueCount, maxSize);
// 構建寫入緩存 Editor
DiskLruCache.Editor editor = diskLruCache.edit(String.valueOf(System.currentTimeMillis()));
BufferedOutputStream bufferedOutputStream = new BufferedOutputStream(editor.newOutputStream(0));
Bitmap bitmap = BitmapFactory.decodeResource(getResources(), R.drawable.scenery);
bitmap.compress(Bitmap.CompressFormat.JPEG, 100, bufferedOutputStream);
editor.commit();
diskLruCache.flush();
diskLruCache.close();
複製代碼
這個就是DiskLruCache的大體使用流程,簡單看一下其文件建立:
二. 文件建立過程
public final class DiskLruCache implements Closeable {
public static DiskLruCache open(File directory, int appVersion, int valueCount, long maxSize) throws IOException {
if (maxSize <= 0) {
throw new IllegalArgumentException("maxSize <= 0");
}
if (valueCount <= 0) {
throw new IllegalArgumentException("valueCount <= 0");
}
File backupFile = new File(directory, JOURNAL_FILE_BACKUP);
//若是備份文件存在
if (backupFile.exists()) {
File journalFile = new File(directory, JOURNAL_FILE);
// 若是journal文件存在,則把備份文件journal.bkp是刪了
if (journalFile.exists()) {
backupFile.delete();
} else {
//若是journal文件不存在,則將備份文件命名爲journal
renameTo(backupFile, journalFile, false);
}
}
DiskLruCache cache = new DiskLruCache(directory, appVersion, valueCount, maxSize);
//判斷journal文件是否存在
if (cache.journalFile.exists()) {
//若是日誌文件以及存在
try {
//
/**
* 讀取journal文件,根據記錄中不一樣的操做類型進行相應的處理。
* 經過讀取journal文件的每一行,而後封裝成entry對象,放到LinkedHashMap集合中。
* 而且根據每一行不一樣的開頭,設置entry的值。也就是說經過讀取這個文件,
* 咱們把全部的在本地緩存的文件的key都保存到了集合中,這樣咱們用的時候就能夠經過集合來操做了。
*/
cache.readJournal();
// 該方法主要用來計算當前的緩存總容量,並刪除非法緩存記錄以及該記錄對應的文件。
cache.processJournal();
cache.journalWriter = new BufferedWriter(
new OutputStreamWriter(new FileOutputStream(cache.journalFile, true), Util.US_ASCII));
return cache;
} catch (IOException journalIsCorrupt) {
System.out.println("DiskLruCache " + directory + " is corrupt: " + journalIsCorrupt.getMessage() + ", removing");
cache.delete();
}
}
//建立新的緩存目錄
directory.mkdirs();
cache = new DiskLruCache(directory, appVersion, valueCount, maxSize);
//調用新的方法創建新的journal文件
cache.rebuildJournal();
return cache;
}
}
複製代碼
open方法,圍繞着journal文件的建立和讀寫來展開的,那麼journal文件是什麼呢?
三. journal的介紹
咱們若是去打開緩存目錄,就會發現除了緩存文件,還會發現一個journal文件,journal文件用來記錄緩存的操做記錄的,以下所示:
libcore.io.DiskLruCache
1
100
2
CLEAN 3400330d1dfc7f3f7f4b8d4d803dfcf6 832 21054
DIRTY 335c4c6028171cfddfbaae1a9c313c52
CLEAN 335c4c6028171cfddfbaae1a9c313c52 3934 2342
REMOVE 335c4c6028171cfddfbaae1a9c313c52
DIRTY 1ab96a171faeeee38496d8b330771a7a
CLEAN 1ab96a171faeeee38496d8b330771a7a 1600 234
READ 335c4c6028171cfddfbaae1a9c313c52
READ 3400330d1dfc7f3f7f4b8d4d803dfcf6
複製代碼
咱們來分析下這個文件的內容:
而源碼中有4衝命令的記錄:
/*
* DIRTY 表示一個entry正在被寫入。
* 寫入分兩種狀況,若是成功會緊接着寫入一行CLEAN的記錄;
* 若是失敗,會增長一行REMOVE記錄。注意單獨只有DIRTY狀態的記錄是非法的。
*/
private static final String DIRTY = "DIRTY";
private static final String REMOVE = "REMOVE";
// READ就是說明有一次讀取的記錄。
private static final String READ = "READ";
// CLEAN的後面還記錄了文件的長度,注意可能會一個key對應多個文件,那麼就會有多個數字。
// 當手動調用remove(key)方法的時候也會寫入一條REMOVE記錄。
private static final String CLEAN = "CLEAN";
複製代碼
四. 寫入緩存
須要調用DiskLruCache的edit()方法來獲取實例,接口以下所示:
public Editor edit(String key) throws IOException (用法詳見一. 簡單使用)
複製代碼
能夠看到,edit()方法接收一個參數key,這個key將會成爲緩存文件的文件名,由於圖片URL中可能包含一些特殊字符,這些字符有可能在命名文件時是不合法的。所以這裏的參數key通常都會進行MD5編碼,編碼後的字符串確定是惟一的,而且只會包含0-F這樣的字符,徹底符合文件的命名規則。
五. 讀取緩存
讀取的方法要比寫入簡單一些,主要是藉助DiskLruCache的get()方法實現的,接口以下所示:
// 返回一個緩存文件快照,包含緩存文件大小,輸入流等信息。
public synchronized Snapshot get(String key) throws IOException
複製代碼
該方法最終返回一個緩存文件快照,包含緩存文件大小,輸入流等信息。利用這個快照咱們就能夠讀取緩存文件了。只須要調用它的getInputStream()方法就能夠獲得緩存文件的輸入流了。一樣地,getInputStream()方法也須要傳一個index參數,這裏傳入0就好。
六. 刪除緩存
移除緩存主要是藉助DiskLruCache的remove()方法實現的,接口以下所示:
public synchronized boolean remove(String key) throws IOException
複製代碼
用法雖然簡單,可是你要知道,這個方法咱們並不該該常常去調用它。由於你徹底不須要擔憂緩存的數據過多從而佔用SD卡太多空間的問題,DiskLruCache會根據咱們在調用open()方法時設定的緩存最大值來自動刪除多餘的緩存。只有你肯定某個key對應的緩存內容已通過期,須要從網絡獲取最新數據的時候才應該調用remove()方法來移除緩存。
七. 其它API
size() :返回當前緩存路徑下全部緩存數據的總字節數,以byte爲單位
flush() :將內存中的操做記錄同步到日誌文件(也就是journal文件)當中
注:並非每次寫入緩存都要調用一次flush()方法的,頻繁地調用並不會帶來任何好處,
只會額外增長同步journal文件的時間。比較標準的作法就是在Activity的onPause()方法中去調用一次flush()方法就能夠了。
close() :將DiskLruCache關閉掉,是和open()方法對應的一個方法。
注:關閉掉了以後就不能再調用DiskLruCache中任何操做緩存數據的方法,一般只應該在Activity的onDestroy()方法中去調用close()方法。
delete() :將全部的緩存數據所有刪除,好比說手動清理緩存功能
複製代碼
InternalCache
// Cache類:
Cache(File directory, long maxSize, FileSystem fileSystem) {
this.internalCache = new InternalCache() {
// 1.獲取緩存的響應數據
public Response get(Request request) throws IOException {
return Cache.this.get(request);
}
public CacheRequest put(Response response) throws IOException {
// 2.保存請求回來的響應數據
return Cache.this.put(response);
}
public void remove(Request request) throws IOException {
// 3.經過請求移除保存的響應數據
Cache.this.remove(request);
}
public void update(Response cached, Response network) {
// 4.更新緩存的響應數據
Cache.this.update(cached, network);
}
public void trackConditionalCacheHit() {
Cache.this.trackConditionalCacheHit();
}
public void trackResponse(CacheStrategy cacheStrategy) {
Cache.this.trackResponse(cacheStrategy);
}
};
// 硬盤緩存 DiskLruCache
this.cache = DiskLruCache.create(fileSystem, directory, 201105, 2, maxSize);
}
複製代碼
咱們主要了解InternalCache的get和put方法,咱們先看一下其put保存請求回來的響應Response數據,從上面代碼咱們能看到put方法實際上調用的是Cache類的put :
一. put方法分析:
// Cache類:
@Nullable
CacheRequest put(Response response) {
// 獲取請求方法
String requestMethod = response.request().method();
if (HttpMethod.invalidatesCache(response.request().method())) {
try {
this.remove(response.request());
} catch (IOException var6) {
}
return null;
// 若是不是GET請求時返回的response,則不進行緩存
} else if (!requestMethod.equals("GET")) {
return null;
} else if (HttpHeaders.hasVaryAll(response)) {
return null;
} else {
// 把response封裝在Cache.Entry中,調用DiskLruCache的edit()返回editor
Cache.Entry entry = new Cache.Entry(response);
Editor editor = null;
try {
// cache 從Cache類的構造方法中能夠看出cache實際上就是 DiskLruCache
// 把url進行 md5(),並轉換成十六進制格式
// 將轉換後的key做爲DiskLruCache內部LinkHashMap的鍵值
editor = this.cache.edit(key(response.request().url()));
if (editor == null) {
return null;
} else {
// 用editor提供的Okio的sink對文件進行寫入
entry.writeTo(editor);
// 利用CacheRequestImpl寫入body
return new Cache.CacheRequestImpl(editor);
}
} catch (IOException var7) {
this.abortQuietly(editor);
return null;
}
}
}
複製代碼
根據上面的代碼發現,OkHttp只針對GET請求時返回的response進行緩存。官方解釋:非GET請求下返回的response也能夠進行緩存,可是這樣作的複雜性高,且效益低。 在獲取DiskLruCache.Editor對象editor後,調用writeTo()把url、請求方法、響應首部字段等寫入緩存,而後返回一個CacheRequestImpl實例,在CacheInterceptor的intercept()方法內部調用cacheWritingResponse()寫入body,最後調用CacheRequestImpl的close()完成提交(實際內部調用了Editor # commit() )。
接下來咱們看一下edit和writeTo內部實現:
// DiskLruCache 類:
public @Nullable Editor edit(String key) throws IOException {
return edit(key, ANY_SEQUENCE_NUMBER);
}
synchronized Editor edit(String key, long expectedSequenceNumber) throws IOException {
//內部主要是利用FileSystem處理文件,若是這裏出現了異常,
//在最後會構建新的日誌文件,若是文件已存在,則替換
initialize();
//檢測緩存是否已關閉
checkNotClosed();
//檢測是否爲有效key
validateKey(key);
//lruEntries是LinkHashMap的實例,先查找lruEntries是否存在
Entry entry = lruEntries.get(key);
if (expectedSequenceNumber != ANY_SEQUENCE_NUMBER && (entry == null
|| entry.sequenceNumber != expectedSequenceNumber)) {
return null; // Snapshot is stale.
}
//若是有Editor在操做entry,返回null
if (entry != null && entry.currentEditor != null) {
return null;
}
//若是須要,進行clean操做
if (mostRecentTrimFailed || mostRecentRebuildFailed) {
executor.execute(cleanupRunnable);
return null;
}
// 把當前key在對應文件中標記DIRTY狀態,表示正在修改,
//清空日誌緩衝區,防止泄露
journalWriter.writeUtf8(DIRTY).writeByte(' ').writeUtf8(key).writeByte('\n');
journalWriter.flush();
if (hasJournalErrors) {
return null; // 若是日誌文件不能編輯
}
//爲請求的url建立一個新的DiskLruCache.Entry實例
//並放入lruEntries中
if (entry == null) {
entry = new Entry(key);
lruEntries.put(key, entry);
}
Editor editor = new Editor(entry);
entry.currentEditor = editor;
return editor;
}
// Cache.Entry類:
public void writeTo(Editor editor) throws IOException {
BufferedSink sink = Okio.buffer(editor.newSink(0));
// 如下都是利用sink進行寫入操做
sink.writeUtf8(this.url).writeByte(10);
sink.writeUtf8(this.requestMethod).writeByte(10);
sink.writeDecimalLong((long) this.varyHeaders.size()).writeByte(10);
int i = 0;
int size;
for (size = this.varyHeaders.size(); i < size; ++i) {
sink.writeUtf8(this.varyHeaders.name(i)).writeUtf8(": ").writeUtf8(this.varyHeaders.value(i)).writeByte(10);
}
sink.writeUtf8((new StatusLine(this.protocol, this.code, this.message)).toString()).writeByte(10);
sink.writeDecimalLong((long) (this.responseHeaders.size() + 2)).writeByte(10);
i = 0;
for (size = this.responseHeaders.size(); i < size; ++i) {
sink.writeUtf8(this.responseHeaders.name(i)).writeUtf8(": ").writeUtf8(this.responseHeaders.value(i)).writeByte(10);
}
sink.writeUtf8(SENT_MILLIS).writeUtf8(": ").writeDecimalLong(this.sentRequestMillis).writeByte(10);
sink.writeUtf8(RECEIVED_MILLIS).writeUtf8(": ").writeDecimalLong(this.receivedResponseMillis).writeByte(10);
if (this.isHttps()) {
sink.writeByte(10);
sink.writeUtf8(this.handshake.cipherSuite().javaName()).writeByte(10);
this.writeCertList(sink, this.handshake.peerCertificates());
this.writeCertList(sink, this.handshake.localCertificates());
sink.writeUtf8(this.handshake.tlsVersion().javaName()).writeByte(10);
}
sink.close();
}
複製代碼
接下來咱們再看一看Cache.Entry構造方法:
Entry(Response response) {
this.url = response.request().url().toString();
this.varyHeaders = HttpHeaders.varyHeaders(response);
this.requestMethod = response.request().method();
this.protocol = response.protocol();
this.code = response.code();
this.message = response.message();
this.responseHeaders = response.headers();
this.handshake = response.handshake();
this.sentRequestMillis = response.sentRequestAtMillis();
this.receivedResponseMillis = response.receivedResponseAtMillis();
}
複製代碼
咱們發現Cache.Entry構造方法中並無Response的body(),那麼咱們的body是在哪緩存的呢,其實上面就有說明,其實Cache類的put方法有一個返回值 CacheRequest ,而CacheRequest正是後面用來緩存Response的body的關鍵,後續再詳細介紹。
二. get方法分析:
// Cache類:
@Nullable
Response get(Request request) {
//把url轉換成key
String key = key(request.url());
DiskLruCache.Snapshot snapshot;
Entry entry;
try {
//經過DiskLruCache的get()根據具體的key獲取DiskLruCache.Snapshot實例
snapshot = cache.get(key);
if (snapshot == null) {
return null;
}
} catch (IOException e) {
// Give up because the cache cannot be read.
return null;
}
try {
//經過snapshot.getSource()獲取一個Okio的Source
entry = new Entry(snapshot.getSource(ENTRY_METADATA));
} catch (IOException e) {
Util.closeQuietly(snapshot);
return null;
}
//根據snapshot獲取緩存中的response
Response response = entry.response(snapshot);
if (!entry.matches(request, response)) {
Util.closeQuietly(response.body());
return null;
}
return response;
}
// DiskLruCache類:
public synchronized Snapshot get(String key) throws IOException {
initialize();
checkNotClosed();
validateKey(key);
//從lruEntries查找entry,
Entry entry = lruEntries.get(key);
if (entry == null || !entry.readable) return null;
//獲得Entry的快照值snapshot
Snapshot snapshot = entry.snapshot();
if (snapshot == null) return null;
redundantOpCount++;
journalWriter.writeUtf8(READ).writeByte(' ').writeUtf8(key).writeByte('\n');
//若是redundantOpCount超過2000,且超過lruEntries的大小時,進行清理操做
if (journalRebuildRequired()) {
executor.execute(cleanupRunnable);
}
return snapshot;
}
//DiskLruCache.Entry類:
Snapshot snapshot() {
if (!Thread.holdsLock(DiskLruCache.this)) throw new AssertionError();
Source[] sources = new Source[valueCount];
// Defensive copy since these can be zeroed out.
long[] lengths = this.lengths.clone();
try {
//遍歷已緩存的文件,生成相應的sources
for (int i = 0; i < valueCount; i++) {
sources[i] = fileSystem.source(cleanFiles[i]);
}
//建立Snapshot並返回
return new Snapshot(key, sequenceNumber, sources, lengths);
} catch (FileNotFoundException e) {
// A file must have been deleted manually!
for (int i = 0; i < valueCount; i++) {
if (sources[i] != null) {
Util.closeQuietly(sources[i]);
} else {
break;
}
}
// Since the entry is no longer valid, remove it so the metadata is accurate (i.e.
// the cache
// size.)
try {
removeEntry(this);
} catch (IOException ignored) {
}
return null;
}
}
複製代碼
相比於put過程,get過程相對簡單點。DiskLruCache.Snapshot是DiskLruCache.Entry的一個快照值,內部封裝了DiskLruCache.Entry對應文件的Source,簡單的說:根據條件從DiskLruCache.Entry找到相應的緩存文件,並生成Source,封裝在Snapshot內部,而後經過snapshot.getSource()獲取Source,對緩存文件進行讀取操做。
總結::通過分析InternalCache咱們知道,Cache只是一個上層的執行者,內部真正的緩存是由DiskLruCache實現的。在DiskLruCache裏面經過FileSystem,基於Okio的Sink/Source對文件進行流操做。
接下來咱們回到CacheInterceptor的攔截器方法intercept中繼續分析:
// 咱們從RealCall的getResponseWithInterceptorChain()方法中,
// 在add(new CacheInterceptor(client.internalCache()));時可知
// intercept方法中的cache爲Cache類中的InternalCache
@Override
public Response intercept(Chain chain) throws IOException {
// 若是配置了緩存:優先從緩存中讀取Response
Response cacheCandidate = cache != null
? cache.get(chain.request()) // 咱們熟悉的get方法,獲取緩存
: null;
long now = System.currentTimeMillis();
// 緩存策略,該策略經過某種規則來判斷緩存是否有效
// 1. 根據Request和以前緩存的Response獲得CacheStrategy
// 2. 根據CacheStrategy決定是請求網絡仍是直接返回緩存
// 3. 若是2中決定請求網絡,則在這一步將返回的網絡響應和本地緩存對比,對本地緩存進行增刪改操做
CacheStrategy strategy =
new CacheStrategy.Factory(now, chain.request(), cacheCandidate).get();
Request networkRequest = strategy.networkRequest;
Response cacheResponse = strategy.cacheResponse;
if (cache != null) {
cache.trackResponse(strategy);
}
if (cacheCandidate != null && cacheResponse == null) {
closeQuietly(cacheCandidate.body());
}
// 若是根據緩存策略strategy禁止使用網絡,而且緩存無效,直接返回空的Response
if (networkRequest == null && cacheResponse == null) {
return new Response.Builder()
.request(chain.request())
.protocol(Protocol.HTTP_1_1)
.code(504)
.message("Unsatisfiable Request (only-if-cached)")
.body(Util.EMPTY_RESPONSE)
.sentRequestAtMillis(-1L)
.receivedResponseAtMillis(System.currentTimeMillis())
.build();
}
// 若是根據緩存策略strategy禁止使用網絡,且有緩存則直接使用緩存
if (networkRequest == null) {
return cacheResponse.newBuilder()
.cacheResponse(stripBody(cacheResponse))
.build();
}
Response networkResponse = null;
try { // 執行下一個攔截器,發起網路請求
networkResponse = chain.proceed(networkRequest);
} finally {
// If we're crashing on I/O or otherwise, don't leak the cache body.
if (networkResponse == null && cacheCandidate != null) {
closeQuietly(cacheCandidate.body());
}
}
// 若是咱們也有緩存響應,那麼咱們正在進行條件獲取。
if (cacheResponse != null) {
// 而且服務器返回304狀態碼(說明緩存還沒過時或服務器資源沒修改)
if (networkResponse.code() == HTTP_NOT_MODIFIED) {
// 構建緩存數據
Response response = cacheResponse.newBuilder()
.headers(combine(cacheResponse.headers(), networkResponse.headers()))
.sentRequestAtMillis(networkResponse.sentRequestAtMillis())
.receivedResponseAtMillis(networkResponse.receivedResponseAtMillis())
.cacheResponse(stripBody(cacheResponse))
.networkResponse(stripBody(networkResponse))
.build();
networkResponse.body().close();
// Update the cache after combining headers but before stripping the
// Content-Encoding header (as performed by initContentStream()).
cache.trackConditionalCacheHit();
cache.update(cacheResponse, response);
return response;
} else {
closeQuietly(cacheResponse.body());
}
}
// 若是網絡資源已經修改:使用網絡響應返回的最新數據
Response response = networkResponse.newBuilder()
.cacheResponse(stripBody(cacheResponse))
.networkResponse(stripBody(networkResponse))
.build();
// 將最新的數據緩存起來
if (cache != null) {
if (HttpHeaders.hasBody(response) && CacheStrategy.isCacheable(response,
networkRequest)) {
// 咱們熟悉的put 寫入緩存操做
CacheRequest cacheRequest = cache.put(response);
// 寫入Response的body
return cacheWritingResponse(cacheRequest, response);
}
if (HttpMethod.invalidatesCache(networkRequest.method())) {
try {
cache.remove(networkRequest);
} catch (IOException ignored) {
// The cache cannot be written.
}
}
}
return response;
}
複製代碼
簡單的總結一下上面的代碼都作了些什麼:
咱們上面還遺留了一個Response的body的緩存沒有分析,那麼咱們看一看cacheWritingResponse方法的實現:
// CacheInterceptor類:
private Response cacheWritingResponse(final CacheRequest cacheRequest, Response response)throws IOException {
// 有些應用會返回空體;爲了兼容性,咱們將其視爲空緩存請求。
if (cacheRequest == null) return response;
Sink cacheBodyUnbuffered = cacheRequest.body();
if (cacheBodyUnbuffered == null) return response;
// 獲取response.body()的BufferedSource
final BufferedSource source = response.body().source();
// 構建用來存儲response.body()的BufferedSink
final BufferedSink cacheBody = Okio.buffer(cacheBodyUnbuffered);
// 注意:用於真正寫入Response的body
Source cacheWritingSource = new Source() {
boolean cacheRequestClosed;
@Override
public long read(Buffer sink, long byteCount) throws IOException {
long bytesRead;
try {
// 從byteCount個字段到sink中並刪除
bytesRead = source.read(sink, byteCount);
} catch (IOException e) {
if (!cacheRequestClosed) {
cacheRequestClosed = true;
cacheRequest.abort(); // Failed to write a complete cache response.
}
throw e;
}
if (bytesRead == -1) {
if (!cacheRequestClosed) {
cacheRequestClosed = true;
cacheBody.close(); // 緩存response的body完成
}
return -1; // 寫完返回-1
}
// 將讀到sink中的source(response的body數據)拷貝到cacheBody中
sink.copyTo(cacheBody.buffer(), sink.size() - bytesRead, bytesRead);
cacheBody.emitCompleteSegments();
return bytesRead;
}
@Override
public Timeout timeout() {
return source.timeout();
}
@Override
public void close() throws IOException {
if (!cacheRequestClosed
&& !discard(this, HttpCodec.DISCARD_STREAM_TIMEOUT_MILLIS, MILLISECONDS)) {
cacheRequestClosed = true;
cacheRequest.abort();
}
source.close();
}
};
String contentType = response.header("Content-Type");
long contentLength = response.body().contentLength();
return response.newBuilder()
.body(new RealResponseBody(contentType, contentLength, Okio.buffer(cacheWritingSource))) // 注意最後一個參數,後面會說明
.build();
}
複製代碼
從這段代碼中可能會有些疑惑,source是如何在cacheWritingSource這個內部類的read方法中緩存完成的,那麼咱們就要看cacheWritingSource被傳遞到哪裏,而且被誰所調用的read方法啦。咱們從最後的return能夠看出來cacheWritingSource被封裝到Response返回啦,咱們上面講過整個的攔截器鏈最終會將Response返回到異步請求的回調onResponse方法中或者做爲同步請求的返回值。那麼咱們最終對Response的調用也就只有Response的body()的string()方法啦。那麼咱們來研究一下這個方法都幹了什麼?
// ResponseBody ==> 經過調用Response.body()獲取
public final String string() throws IOException {
BufferedSource source = source();
try {
Charset charset = Util.bomAwareCharset(source, charset());
// 從source中讀取結果字符串
return source.readString(charset);
} finally {
Util.closeQuietly(source);
}
}
複製代碼
ResponseBody其實只是一個抽象類,而其實現類爲RealResponseBody,從RealResponseBody中發現source是從其構造方法中初始化的:
// RealResponseBody類:
private final BufferedSource source;
public RealResponseBody(
@Nullable String contentTypeString, long contentLength, BufferedSource source) {
this.contentTypeString = contentTypeString;
this.contentLength = contentLength;
this.source = source;
}
複製代碼
那麼咱們的RealResponseBody是何時初始化的呢?咱們如今在討論的是緩存的Response,所以緩存Response確定是從咱們緩存攔截器CacheInterceptor返回來的,因此咱們上面cacheWritingResponse方法中的返回值Response在構建過程當中,其實就是初始化RealResponseBody的地方。所以咱們此時的source就是咱們在cacheWritingResponse方法的返回值傳入的Okio.buffer(cacheWritingSource)。而Okio.buffer(cacheWritingSource)方法返回的是RealBufferedSource類(並傳入cacheWritingSource),所以Response.body().string()方法裏 source.readString(charset)調用的實際上就是RealBufferedSource類的readString方法。
// RealBufferedSource類
public final Buffer buffer = new Buffer();
public final Source source;
// 終於看到咱們想看到的source,此source就是傳進來的cacheWritingSource(用來寫入Response的body緩存的匿名內部類)
RealBufferedSource(Source source) {
if (source == null) throw new NullPointerException("source == null");
this.source = source;
}
@Override
public String readString(Charset charset) throws IOException {
if (charset == null) throw new IllegalArgumentException("charset == null");
// 寫入所有,
buffer.writeAll(source);
return buffer.readString(charset);
}
// Buffer類,在RealBufferedSource類的成員函數中初始化
@Override
public long writeAll(Source source) throws IOException {
if (source == null) throw new IllegalArgumentException("source == null");
long totalBytesRead = 0;
// 關注此處的for循環,若是read != -1 則一直輪詢,
// 所以一直執行cacheWritingSource的read寫入Response的body數據,直到寫完返回-1
for (long readCount; (readCount = source.read(this, Segment.SIZE)) != -1; ) {
totalBytesRead += readCount;
}
return totalBytesRead;
}
複製代碼
總結一下:
RealConnection是Connection的實現類,Realconnection封裝了底層Socket鏈接,同時使用 OKio(square公司的另外一個獨立的開源項目) 來進行魚服務器交互數據的讀寫。首先看下它的成員屬性:
private final ConnectionPool connectionPool;
private final Route route;
//下面這些字段,經過connect()方法開始初始化,而且絕對不會再次賦值
private Socket rawSocket; //底層Tcp Socket
private Socket socket; //應用層socket
//握手(處理三次握手)
private Handshake handshake;
//協議
private Protocol protocol;
// http2的連接
private Http2Connection http2Connection;
//經過source和sink,與服務器交互的輸入輸出流
private BufferedSource source;
private BufferedSink sink;
//下面這個字段是 屬於表示連接狀態的字段,而且有connectPool統一管理
//若是noNewStreams被設爲true,則noNewStreams一直爲true,不會被改變,而且表示這個連接不會再建立新的stream流
public boolean noNewStreams;
//成功的次數
public int successCount;
//此連接能夠承載最大併發流的限制,若是不超過限制,能夠隨意增長
public int allocationLimit = 1;
// allocations是關聯StreamAllocation,它用來統計在一個鏈接上創建了哪些流,
// 經過StreamAllocation的acquire方法和release方法能夠將一個allcation對方添加到鏈表或者移除鏈表
public final List<Reference<StreamAllocation>> allocations = new ArrayList<>();
複製代碼
從其成員屬性中能夠看出,RealConnection中持有Socket鏈接,而且會保留有sink和source用來與服務器交互的輸入輸出流。所以若是擁有了一個RealConnection就表明了咱們已經跟服務器有了一條通訊鏈路(Socket鏈路)。而且三次握手也是實如今這個類中,其具體實現是在其connect方法中,此方法咱們放到ConnectInterceptor攔截器中進行分析。
當使用OkHttp請求URL時,RealConnection的做用以下:
HTTP的版本背景:
HTTP的版本從最初的1.0版本,到後續的1.1版本,再到後續的google推出的SPDY,後來再推出2.0版本,http協議愈來愈完善。(ps:okhttp也是根據2.0和1.1/1.0做爲區分,實現了兩種鏈接機制)這裏要說下http2.0和http1.0,1.1的主要區別,2.0解決了老版本(1.1和1.0)最重要兩個問題:鏈接沒法複用和head of line blocking (HOL)問題.2.0使用多路複用的技術,多個stream能夠共用一個socket鏈接,每一個tcp鏈接都是經過一個socket來完成的,socket對應一個host和port,若是有多個stream(也就是多個request)都是鏈接在一個host和port上,那麼它們就能夠共同使用同一個socket,這樣作的好處就是能夠減小TCP的一個三次握手的時間。在OKHttp裏面,記錄一次鏈接的是RealConnection,這個負責鏈接,在這個類裏面用socket來鏈接,用HandShake來處理握手。
3個概念:請求、鏈接、流
咱們要明白HTTP通訊執行網絡"請求"須要在"鏈接"上創建一個新的"流",咱們將StreamAllocation稱之流的橋樑,它負責爲一次"請求"尋找"鏈接"並創建"流",從而完成遠程通訊。因此說StreamAllocation與"請求"、"鏈接"、"流"都有關。
StreamAllocation的註釋也詳細講述了,Connection是創建在Socket之上的物流通訊信道,而Stream則是表明邏輯的流,至於Call是對一次請求過程的封裝。以前也說過一個Call可能會涉及多個流(好比重定向或者auth認證等狀況)。若是StreamAllocation要想解決上述問題,須要兩個步驟,一是尋找鏈接,二是獲取流。因此StreamAllocation裏面應該包含一個Stream(OKHttp裏面的流是HttpCodec);還應該包含鏈接Connection。若是想找到合適的鏈接,還須要一個鏈接池ConnectionPool屬性。因此應該有一個獲取流的方法在StreamAllocation裏面是newStream();找到合適的流的方法findConnection();還應該有完成請求任務的以後finish()的方法來關閉流對象,還有終止和取消等方法,以及釋放資源的方法。
咱們先來看一下其成員屬性:
/**
* 地址指定一個webserver(如github.com)和鏈接到該服務器所需的全部靜態配置:端口號、HTTPS設置和首選網絡協議(如HTTP/2或SPDY)。
* 共享相同地址的url也能夠共享相同的底層TCP套接字鏈接。共享鏈接具備顯著的性能優點:
* 更低的延遲、更高的吞吐量(因爲TCP啓動緩慢)和節約的電量。OkHttp使用ConnectionPool自動重用HTTP/1.x的鏈接和HTTP/2和SPDY的多路鏈接。
*/
public final Address address; // 地址
/**
* 路由提供了實際鏈接到web服務器所需的動態信息。這是要嘗試的特定IP地址(由DNS查詢發現)、
* 要使用的確切代理服務器(若是使用的是ProxySelector)以及要協商的TLS版本(用於HTTPS鏈接)。
*
* 一個地址可能有不少路由線路。例如,託管在多個數據中心中的web服務器可能在其DNS響應中產生多個IP地址。
* */
private Route route; // 路由
private final ConnectionPool connectionPool; // 鏈接池
private final Object callStackTrace; // 日誌
private final RouteSelector routeSelector; // 路由選擇器
private int refusedStreamCount; // 拒絕的次數
private RealConnection connection; // 鏈接
private boolean released; // 是否已經被釋放
private boolean canceled // 是否被取消了
private HttpCodec codec; // 鏈接所須要的流
複製代碼
從其成員屬性中其實就能夠看出StreamAllocation實際上就是,OkHtpp請求的各類組件的封裝類。StreamAllocation相關的: 1.找到合適的鏈接 2.獲取流的方法newStream() 3.找到合適的流的方法findConnection()咱們都放在ConnectInterceptor攔截器中分析。
從StreamAllocation中咱們已經提過HttpCodec其實就是「請求、鏈接、流」中的流,而HttpCodec只是接口,其兩個實現類分別爲Http1Codec和Http2Codec,分別對應Http1.1協議以及Http2.0協議。咱們本文主要看一看Http1Codec:
// 配置此流的客戶端。對於HTTPS代理隧道,能夠爲null。
final OkHttpClient client;
// 擁有此流的流分配。對於HTTPS代理隧道,能夠爲null。
final StreamAllocation streamAllocation;
// 與服務器交互的輸入輸出流
final BufferedSource source;
final BufferedSink sink;
// 當前流的狀態,STATE_IDLE:空閒鏈接已準備好寫入請求標頭
int state = STATE_IDLE;
// 標題限制,HEADER_LIMIT:256 * 1024
private long headerLimit = HEADER_LIMIT;
public Http1Codec(OkHttpClient client
, StreamAllocation streamAllocation, BufferedSource source, BufferedSink sink) {
this.client = client;
this.streamAllocation = streamAllocation;
this.source = source;
this.sink = sink;
}
複製代碼
從Http1Codec的成員和構造方法能夠看出,在初始化Http1Codec時就已經將與服務器交互的sink和source傳入,用於最後一個攔截器CallServerInterceptor真正的發送請求和獲取響應。
在整個OkHttp的流程中,咱們在哪裏看到過ConnectionPool的身影呢?
總的來講,ConnectionPool負責全部的鏈接,包括鏈接的複用,以及無用鏈接的清理。OkHttp會將客戶端和服務端全部的鏈接都抽象爲Connection(實際實現類爲RealConnection),而ConnectionPool就是爲了管理全部Connection而設計的,其實際做用:在其時間容許的範圍內複用Connection,並對其清理回收。外部經過調用get方法來獲取一個可使用Connection對象,經過put方法添加一個新的鏈接。
get方法
// ConnectionPool類:
// 一個線性 collection,支持在兩端插入和移除元素。
// 名稱 Deque 是「double ended queue(雙端隊列)」的縮寫
private final Deque<RealConnection> connections = new ArrayDeque<>();
@Nullable
RealConnection get(Address address, StreamAllocation streamAllocation, Route route) {
assert (Thread.holdsLock(this));
// 遍歷connections
for (RealConnection connection : connections) {
// 查看該connection是否符合條件
if (connection.isEligible(address, route)) {
streamAllocation.acquire(connection, true);
return connection;
}
}
return null;
}
// RealConnection類:
// 此鏈接承載的當前流
public final List<Reference<StreamAllocation>> allocations = new ArrayList<>();
public boolean isEligible(Address address, @Nullable Route route) {
// 當前Connection擁有的StreamAllocation是否超過的限制
if (allocations.size() >= allocationLimit || noNewStreams) return false;
// 地址的非主機(host)字段是否重疊(同樣)
if (!Internal.instance.equalsNonHost(this.route.address(), address)) return false;
// 主機(host)是否徹底匹配
if (address.url().host().equals(this.route().address().url().host())) {
return true;
}
// 此時咱們沒有主機名匹配。可是,若是知足咱們的鏈接合併要求,咱們仍然能夠提供請求。
// 1. 此鏈接必須是HTTP / 2。
if (http2Connection == null) return false;
// 2. 路由必須共享IP地址。這要求咱們爲兩個主機提供DNS地址,這隻發生在路由規劃以後。咱們沒法合併使用代理的鏈接,由於代理不告訴咱們源服務器的IP地址。
if (route == null) return false;
if (route.proxy().type() != Proxy.Type.DIRECT) return false;
if (this.route.proxy().type() != Proxy.Type.DIRECT) return false;
if (!this.route.socketAddress().equals(route.socketAddress())) return false;
// 3. 此鏈接的服務器證書必須涵蓋新主機。
if (route.address().hostnameVerifier() != OkHostnameVerifier.INSTANCE) return false;
if (!supportsUrl(address.url())) return false;
// 4. 證書固定必須與主機匹配。
try {
address.certificatePinner().check(address.url().host(), handshake().peerCertificates());
} catch (SSLPeerUnverifiedException e) {
return false;
}
return true;
}
// StreamAllocation類:
public void acquire(RealConnection connection, boolean reportedAcquired) {
assert (Thread.holdsLock(connectionPool));
if (this.connection != null) throw new IllegalStateException();
// 保留鏈接
this.connection = connection;
this.reportedAcquired = reportedAcquired;
// 將此分配流add進allocations中,用於RealConnection.isEligible方法判斷當前Connection擁有的StreamAllocation是否超過的限制
connection.allocations.add(new StreamAllocationReference(this, callStackTrace));
}
複製代碼
簡單總結一下:
isEligible方法(判斷遍歷的鏈接是否符合條件,便是否可複用):
1.若是這個 Connection 已經分配的數量(即 擁有的StreamAllocation)超過了分配限制或者被標記 則不符合。
2.接着調用 equalsNonHost,主要是判斷 Address 中非主機(host)字段是否重疊(同樣),若是有不一樣的則不符合。
3.而後就是判斷 host 是否相同,若是相同(而且1和2也符合)那麼對於當前的Address來講,這個Connection 即是可重用的。
4.若是一、二、3都不符合,則若依舊知足某些條件,此鏈接仍可複用,具體知足的條件查看上面代碼註解
複製代碼
acquire方法(StreamAllocation類):
1.保存遍歷connections獲取的可重用的鏈接
2.將此StreamAllocation類的弱引用StreamAllocationReference添加add進此重用鏈接,判斷當前Connection擁有的StreamAllocation是否超過的限制
3.此方法保留的鏈接將被用於findConnection方法(上面ConnectInterceptor部分有說明)
複製代碼
put方法
void put(RealConnection connection) {
assert (Thread.holdsLock(this));
// 是否開啓異步的清理任務
if (!cleanupRunning) {
cleanupRunning = true;
executor.execute(cleanupRunnable);
}
// add進connections
connections.add(connection);
}
複製代碼
put方法很簡單,直接將Connection對象添加到connections雙端隊列。不過這裏有一個地方須要注意,就是若是cleanupRunning爲false,就會想線程池裏面添加一個cleanupRunnable,這裏的目的進行清理操做。此清理操做立刻就分析。
cleanup:清理無用的鏈接
private final Runnable cleanupRunnable = new Runnable() {
@Override
public void run() {
// 這個cleanupRunnable是一個死循環的任務,只要cleanup方法不返回-1,就會一直執行。
while (true) {
// 調用cleanup查找並清理無用鏈接(返回以納米爲單位的持續時間)
long waitNanos = cleanup(System.nanoTime());
if (waitNanos == -1) return;
// 當cleanup方法沒有返回-1,當前的Runnable就會進入睡眠狀態。
if (waitNanos > 0) {
long waitMillis = waitNanos / 1000000L;
waitNanos -= (waitMillis * 1000000L);
synchronized (ConnectionPool.this) {
try {
// 等待上一次cleanup計算出的最長空閒的鏈接距離驅逐到期的時間
ConnectionPool.this.wait(waitMillis, (int) waitNanos);
} catch (InterruptedException ignored) {
}
}
}
}
}
};
/**
* 對此池執行維護,若是超出保持活動限制或空閒鏈接限制,則驅逐已空閒的鏈接最長。
* 返回以納米爲單位的持續時間,直到下一次調用此方法爲止。 若是不須要進一步清理,則返回 -1。
*/
long cleanup(long now) {
int inUseConnectionCount = 0;
int idleConnectionCount = 0;
RealConnection longestIdleConnection = null;
long longestIdleDurationNs = Long.MIN_VALUE;
// 找到要驅逐的鏈接,或下次驅逐到期的時間。
synchronized (this) {
for (Iterator<RealConnection> i = connections.iterator(); i.hasNext(); ) {
RealConnection connection = i.next();
// 若是正在使用該鏈接,請跳過繼續搜索。
// 用於清理可能泄露的StreamAllocation並返回正在使用此鏈接的 StreamAllocation的數量
if (pruneAndGetAllocationCount(connection, now) > 0) {
inUseConnectionCount++;
continue;
}
// 空閒鏈接記住
idleConnectionCount++;
long idleDurationNs = now - connection.idleAtNanos;
// 判斷是不是最長空閒時間的鏈接
if (idleDurationNs > longestIdleDurationNs) {
longestIdleDurationNs = idleDurationNs;
longestIdleConnection = connection;
}
}
// 若當前Connection已經超過了最大的空閒時間
// 或者空閒鏈接數大於最大空閒鏈接數量,應該被回收
if (longestIdleDurationNs >= this.keepAliveDurationNs
|| idleConnectionCount > this.maxIdleConnections) {
// 將其從列表中刪除,而後在下面(同步塊的外部)將其關閉。
connections.remove(longestIdleConnection);
} else if (idleConnectionCount > 0) {
// 返回保活時長 - 最長空閒時間的鏈接當前存活的時間(即該鏈接還有多久須要被清理)
return keepAliveDurationNs - longestIdleDurationNs;
} else if (inUseConnectionCount > 0) {
// 全部鏈接都在使用中。說明全部鏈接都須要至少是保活時長才會被清理
return keepAliveDurationNs;
} else {
// 無鏈接,空閒或正在使用中。
cleanupRunning = false;
return -1;
}
}
// 3. 關閉鏈接的socket
// 代碼執行到此處說明此Connection已經超過了最大的空閒時間,應該被回收
closeQuietly(longestIdleConnection.socket());
// 繼續清理
return 0;
}
private int pruneAndGetAllocationCount(RealConnection connection, long now) {
List<Reference<StreamAllocation>> references = connection.allocations;
// 遍歷當前RealConnection中保存的StreamAllocation的弱引用
for (int i = 0; i < references.size(); ) {
Reference<StreamAllocation> reference = references.get(i);
// 若StreamAllocation的弱引用不爲空,則跳過繼續
if (reference.get() != null) {
i++;
continue;
}
// 若StreamAllocation的弱引用爲空
StreamAllocation.StreamAllocationReference streamAllocRef =
(StreamAllocation.StreamAllocationReference) reference;
String message = "A connection to " + connection.route().address().url()
+ " was leaked. Did you forget to close a response body?";
Platform.get().logCloseableLeak(message, streamAllocRef.callStackTrace);
// 則須要移除該位置的引用
references.remove(i);
connection.noNewStreams = true;
// 若references爲空,即該鏈接已經沒有了StreamAllocation使用,則該鏈接能夠被清理
if (references.isEmpty()) {
connection.idleAtNanos = now - keepAliveDurationNs;
return 0;
}
}
return references.size();
}
複製代碼
邏輯總結:
在執行完CacheInterceptor以後會執行下一個攔截器——ConnectInterceptor,那麼咱們來看一下其intercept方法中的源碼:
@Override
public Response intercept(Chain chain) throws IOException {
RealInterceptorChain realChain = (RealInterceptorChain) chain;
Request request = realChain.request();
// 從攔截器鏈裏獲得StreamAllocation對象
// 此StreamAllocation對象其實是攔截器鏈的第二個參數,是在第一個攔截器中初始化的
StreamAllocation streamAllocation = realChain.streamAllocation();
// We need the network to satisfy this request. Possibly for validating a conditional GET.
boolean doExtensiveHealthChecks = !request.method().equals("GET");
/**
* 用來編碼Request,解碼Response
* 它有對應的兩個子類, Http1Codec和Http2Codec, 分別對應Http1.1協議以及Http2.0協議,本文主要學習前者。
* 在Http1Codec中主要包括兩個重要的屬性,即source和sink,它們分別封裝了socket的輸入和輸出,
* CallServerInterceptor正是利用HttpCodec提供的I/O操做完成網絡通訊。
* */
HttpCodec httpCodec = streamAllocation.newStream(client, chain, doExtensiveHealthChecks);
// 獲取RealConnetion,實際網絡Io傳輸對象(實際上此步很簡單,只是返回上一步代碼中獲取到的connection)
RealConnection connection = streamAllocation.connection();
// 執行下一個攔截器
return realChain.proceed(request, streamAllocation, httpCodec, connection);
}
複製代碼
這個攔截器東西就這麼多?哈哈,那是想多了,這個攔截器中的東西可都藏的深,有料的很呀。咱們分別來看一下HttpCodec和RealConnection的獲取過程吧。
// StreamAllocation類:
public HttpCodec newStream(OkHttpClient client, Interceptor.Chain chain, boolean doExtensiveHealthChecks) {
//1. 獲取設置的鏈接超時時間,讀寫超時的時間,以及是否進行重連。
int connectTimeout = chain.connectTimeoutMillis();
int readTimeout = chain.readTimeoutMillis();
int writeTimeout = chain.writeTimeoutMillis();
int pingIntervalMillis = client.pingIntervalMillis();
boolean connectionRetryEnabled = client.retryOnConnectionFailure();
try {
// 2. 獲取健康可用的鏈接
RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout,
writeTimeout, pingIntervalMillis, connectionRetryEnabled,
doExtensiveHealthChecks);
//3. 經過ResultConnection初始化,對請求以及結果 編解碼的類(分http 1.1 和http 2.0)。
// 這裏主要是初始化,在後面一個攔截器纔用到這相關的東西。
HttpCodec resultCodec = resultConnection.newCodec(client, chain, this);
synchronized (connectionPool) {
codec = resultCodec;
// 返回HttpCodec
return resultCodec;
}
} catch (IOException e) {
throw new RouteException(e);
}
}
複製代碼
從上面代碼中來看,這個方法好像就作了兩件事:
那麼咱們接着看findHealthyConnection方法:
// StreamAllocation類:
private RealConnection findHealthyConnection(int connectTimeout, int readTimeout,
int writeTimeout, int pingIntervalMillis,boolean connectionRetryEnabled,
boolean doExtensiveHealthChecks) throws IOException {
while (true) {
// 獲取RealConnection對象
RealConnection candidate = findConnection(connectTimeout, readTimeout, writeTimeout,
pingIntervalMillis, connectionRetryEnabled);
// 若是這是一個全新的鏈接,咱們能夠跳過普遍的健康檢查。
synchronized (connectionPool) {
if (candidate.successCount == 0) {
// 直接返回
return candidate;
}
}
/**
* 對連接池中不健康的連接作銷燬處理
* 不健康的RealConnection條件爲以下幾種狀況:
* RealConnection對象 socket沒有關閉
* socket的輸入流沒有關閉
* socket的輸出流沒有關閉
* http2時鏈接沒有關閉
* */
if (!candidate.isHealthy(doExtensiveHealthChecks)) {
// 銷燬資源(該方法中會調用deallocate(解除分配)方法
// 獲取須要釋放的Socket鏈接,並執行closeQuietly方法關閉該Socket)
noNewStreams();
continue;
}
return candidate;
}
}
複製代碼
代碼中能夠看出獲取RealConnection對象的操做又交給了findConnection方法:
// StreamAllocation類:
private RealConnection findConnection(int connectTimeout, int readTimeout,
int writeTimeout,int pingIntervalMillis
, boolean connectionRetryEnabled) throws IOException {
boolean foundPooledConnection = false;
RealConnection result = null;
Route selectedRoute = null;
Connection releasedConnection;
Socket toClose;
// 1. 同步線程池,來獲取裏面的鏈接
synchronized (connectionPool) {
// 2. 作些判斷,是否已經釋放,是否編解碼類爲空,是否用戶已經取消
if (released) throw new IllegalStateException("released");
if (codec != null) throw new IllegalStateException("codec != null");
if (canceled) throw new IOException("Canceled");
// (嘗試複用)嘗試使用已分配的鏈接。咱們須要在這裏當心,由於咱們已經分配的鏈接可能已被限制建立新流。
releasedConnection = this.connection;
toClose = releaseIfNoNewStreams();
if (this.connection != null) {
// We had an already-allocated connection and it's good. result = this.connection; releasedConnection = null; } if (!reportedAcquired) { // If the connection was never reported acquired, don't report it as released!
releasedConnection = null;
}
if (result == null) {
/**
* 4. 嘗試在鏈接池中獲取一個鏈接,get方法中會直接調用,注意最後一個參數爲空
*
* Internal 是一個抽象類,而該類的實現則在OkHttpClient的static{}靜態代碼塊中(爲一匿名內部類)
* 而其get方法實際上會調onnectionPool鏈接池中的get方法使用一個for循環,在鏈接池裏面,尋找合格的鏈接
* 而合格的鏈接會經過,StreamAllocation中的acquire方法,更新connection的值。
* */
Internal.instance.get(connectionPool, address, this, null);
if (connection != null) {
foundPooledConnection = true;
result = connection;
} else {
selectedRoute = route;
}
}
}
closeQuietly(toClose);
if (releasedConnection != null) {
eventListener.connectionReleased(call, releasedConnection);
}
if (foundPooledConnection) {
eventListener.connectionAcquired(call, result);
}
if (result != null) {
// 若是咱們找到已經分配或池化的鏈接,咱們就完成了。
return result;
}
// 若是咱們須要選擇路線,請選擇一個。這是一個阻止操做。
boolean newRouteSelection = false;
if (selectedRoute == null && (routeSelection == null || !routeSelection.hasNext())) {
newRouteSelection = true;
// 對於線路Route的選擇,能夠深究一下這個RouteSeletor
routeSelection = routeSelector.next();
}
//5. 繼續線程池同步下去獲取鏈接
synchronized (connectionPool) {
if (canceled) throw new IOException("Canceled");
if (newRouteSelection) {
// 6. 如今咱們有了一組IP地址(線路Route),再次嘗試從池中獲取鏈接。
List<Route> routes = routeSelection.getAll();
for (int i = 0, size = routes.size(); i < size; i++) {
Route route = routes.get(i);
Internal.instance.get(connectionPool, address, this, route);
if (connection != null) {
foundPooledConnection = true;
result = connection;
this.route = route;
break;
}
}
}
// 沒有找到
if (!foundPooledConnection) {
if (selectedRoute == null) {
selectedRoute = routeSelection.next();
}
// 建立鏈接並當即將其分配給此分配。這時可能異步cancel()會中斷咱們即將進行的握手。
route = selectedRoute;
refusedStreamCount = 0;
// 7. 若是前面這麼尋找,都沒在鏈接池中找到可用的鏈接,那麼就新建一個
result = new RealConnection(connectionPool, selectedRoute);
// 更新connection,即RealConnection
acquire(result, false);
}
}
// 若是咱們第二次發現了聚集鏈接,咱們就完成了。
if (foundPooledConnection) {
eventListener.connectionAcquired(call, result);
return result;
}
/**
* 8. 作TCP + TLS握手。這是一個阻止操做。
調用RealConnection的connect方法打開一個Socket連接
* 這裏就是就是鏈接的操做了,終於找到鏈接的正主了,這裏會調用RealConnection的鏈接方法,進行鏈接操做。
* 若是是普通的http請求,會使用Socket進行鏈接
* 若是是https,會進行相應的握手,創建通道的操做。
* */
result.connect(connectTimeout, readTimeout, writeTimeout, pingIntervalMillis,
connectionRetryEnabled, call, eventListener);
routeDatabase().connected(result.route());
Socket socket = null;
synchronized (connectionPool) {
reportedAcquired = true;
// 9. 最後就是同步加到 鏈接池裏面了
Internal.instance.put(connectionPool, result);
// 最後加了一個多路複用的判斷,這個是http2纔有的
// 若是另外的多路複用鏈接在同時建立,則釋放此鏈接,用另外的連接
if (result.isMultiplexed()) {
socket = Internal.instance.deduplicate(connectionPool, address, this);
result = connection;
}
}
closeQuietly(socket);
eventListener.connectionAcquired(call, result);
return result;
}
複製代碼
這段代碼有點多,具體講解在代碼註釋當中,簡單總結一下:
接下來咱們繼續瞭解一下RealConnection的connect鏈接操做:
// RealConnection類:
public void connect(int connectTimeout, int readTimeout, int writeTimeout,
int pingIntervalMillis, boolean connectionRetryEnabled, Call call,
EventListener eventListener) {
// protocol(鏈接協議)是用來檢查此鏈接是否已經創建
if (protocol != null) throw new IllegalStateException("already connected");
RouteException routeException = null;
// ConnectionSpec指定了Socket鏈接的一些配置
List<ConnectionSpec> connectionSpecs = route.address().connectionSpecs();
// 鏈接規格選擇器(用於選擇鏈接,好比:隧道鏈接和Socket鏈接)
ConnectionSpecSelector connectionSpecSelector = new ConnectionSpecSelector(connectionSpecs);
if (route.address().sslSocketFactory() == null) {
if (!connectionSpecs.contains(ConnectionSpec.CLEARTEXT)) {
throw new RouteException(new UnknownServiceException(
"CLEARTEXT communication not enabled for client"));
}
String host = route.address().url().host();
if (!Platform.get().isCleartextTrafficPermitted(host)) {
throw new RouteException(new UnknownServiceException(
"CLEARTEXT communication to " + host + " not permitted by network " +
"security policy"));
}
}
while (true) {
try {
// 是否執行隧道鏈接,requiresTunnel()方法實現其實很簡單:判斷address的sslSocketFactory是否爲空而且proxy代理類型是否爲Http
if (route.requiresTunnel()) {
connectTunnel(connectTimeout, readTimeout, writeTimeout, call, eventListener);
if (rawSocket == null) {
// We were unable to connect the tunnel but properly closed down our
// resources.
break;
}
} else {
// 執行Socket鏈接
connectSocket(connectTimeout, readTimeout, call, eventListener);
}
// 創建協議
establishProtocol(connectionSpecSelector, pingIntervalMillis, call, eventListener);
eventListener.connectEnd(call, route.socketAddress(), route.proxy(), protocol);
break;
} catch (IOException e) {
closeQuietly(socket);
closeQuietly(rawSocket);
socket = null;
rawSocket = null;
source = null;
sink = null;
handshake = null;
protocol = null;
http2Connection = null;
eventListener.connectFailed(call, route.socketAddress(), route.proxy(), null, e);
if (routeException == null) {
routeException = new RouteException(e);
} else {
routeException.addConnectException(e);
}
if (!connectionRetryEnabled || !connectionSpecSelector.connectionFailed(e)) {
throw routeException;
}
}
}
if (route.requiresTunnel() && rawSocket == null) {
ProtocolException exception = new ProtocolException("Too many tunnel connections " +
"attempted: "
+ MAX_TUNNEL_ATTEMPTS);
throw new RouteException(exception);
}
if (http2Connection != null) {
synchronized (connectionPool) {
allocationLimit = http2Connection.maxConcurrentStreams();
}
}
}
/**
* 是否全部工做都是經過代理隧道構建HTTPS鏈接。這裏的問題是代理服務器能夠發出認證質詢,而後關閉鏈接。
*/
private void connectTunnel(int connectTimeout, int readTimeout, int writeTimeout
, Call call,EventListener eventListener) throws IOException {
//一、建立隧道請求對象
Request tunnelRequest = createTunnelRequest();
HttpUrl url = tunnelRequest.url();
//for循環: MAX_TUNNEL_ATTEMPTS == 21
for (int i = 0; i < MAX_TUNNEL_ATTEMPTS; i++) {
//二、打開socket連接
connectSocket(connectTimeout, readTimeout, call, eventListener);
//三、請求開啓隧道並返回tunnelRequest(開啓隧道會用到Socket鏈接中的sink和source)
tunnelRequest = createTunnel(readTimeout, writeTimeout, tunnelRequest, url);
//四、成功開啓了隧道,跳出while循環
if (tunnelRequest == null) break;
// 隧道未開啓成功,關閉相關資源,繼續while循環
closeQuietly(rawSocket);
rawSocket = null;
sink = null;
source = null;
eventListener.connectEnd(call, route.socketAddress(), route.proxy(), null);
}
}
/**
* 完成在原始套接字上構建完整HTTP或HTTPS鏈接所需的全部工做。
*/
private void connectSocket(int connectTimeout, int readTimeout, Call call,
EventListener eventListener) throws IOException {
Proxy proxy = route.proxy();
Address address = route.address();
//一、初始化Socket
rawSocket = proxy.type() == Proxy.Type.DIRECT || proxy.type() == Proxy.Type.HTTP
? address.socketFactory().createSocket()
: new Socket(proxy);// 使用SOCKS的代理服務器
eventListener.connectStart(call, route.socketAddress(), proxy);
rawSocket.setSoTimeout(readTimeout);
try {
//二、打開socket連接
Platform.get().connectSocket(rawSocket, route.socketAddress(), connectTimeout);
} catch (ConnectException e) {
ConnectException ce = new ConnectException("Failed to connect to " + route.socketAddress());
ce.initCause(e);
throw ce;
}
try {
// 注意:Sink能夠簡單的看作OutputStream,Source能夠簡單的看作InputStream
// 而這裏的sink和source,被用於打開隧道鏈接和最後一個攔截器用於真正的網絡請求發送和獲取響應
source = Okio.buffer(Okio.source(rawSocket));
sink = Okio.buffer(Okio.sink(rawSocket));
} catch (NullPointerException npe) {
if (NPE_THROW_WITH_NULL.equals(npe.getMessage())) {
throw new IOException(npe);
}
}
}
private Request createTunnel(int readTimeout, int writeTimeout, Request tunnelRequest,
HttpUrl url) throws IOException {
// 拼接CONNECT命令
String requestLine = "CONNECT " + Util.hostHeader(url, true) + " HTTP/1.1";
while (true) {//又一個while循環
//對應http/1.1 編碼HTTP請求並解碼HTTP響應
Http1Codec tunnelConnection = new Http1Codec(null, null, source, sink);
//發送CONNECT,請求打開隧道連接,
tunnelConnection.writeRequest(tunnelRequest.headers(), requestLine);
//完成連接
tunnelConnection.finishRequest();
//構建response,操控的是inputStream流
Response response = tunnelConnection.readResponseHeaders(false)
.request(tunnelRequest)
.build();
switch (response.code()) {
case HTTP_OK:
return null;
case HTTP_PROXY_AUTH://表示服務器要求對客戶端提供訪問證書,進行代理認證
//進行代理認證
tunnelRequest = route.address().proxyAuthenticator().authenticate(route,
response);
//代理認證不經過
if (tunnelRequest == null)
throw new IOException("Failed to authenticate with proxy");
//代理認證經過,可是響應要求close,則關閉TCP鏈接此時客戶端沒法再此鏈接上發送數據
if ("close".equalsIgnoreCase(response.header("Connection"))) {
return tunnelRequest;
}
break;
}
}
}
private void establishProtocol(ConnectionSpecSelector connectionSpecSelector) throws IOException {
//若是不是ssl
if (route.address().sslSocketFactory() == null) {
protocol = Protocol.HTTP_1_1;
socket = rawSocket;
return;
}
//若是是sll
connectTls(connectionSpecSelector);
//若是是HTTP2
if (protocol == Protocol.HTTP_2) {
socket.setSoTimeout(0); // HTTP/2 connection timeouts are set per-stream.
http2Connection = new Http2Connection.Builder(true)
.socket(socket, route.address().url().host(), source, sink)
.listener(this)
.build();
http2Connection.start();
}
}
複製代碼
什麼是隧道呢? 隧道技術(Tunneling)是HTTP的用法之一,使用隧道傳遞的數據(或負載)能夠是不一樣協議的數據幀或包,或者簡單的來講隧道就是利用一種網絡協議來傳輸另外一種網絡協議的數據。好比A主機和B主機的網絡而類型徹底相同都是IPv6的網,而連接A和B的是IPv4類型的網絡,A和B爲了通訊,可使用隧道技術,數據包通過Ipv4數據的多協議路由器時,將IPv6的數據包放入IPv4數據包;而後將包裹着IPv6數據包的IPv4數據包發送給B,當數據包到達B的路由器,原來的IPv6數據包被剝離出來發給B。
SSL隧道:SSL隧道的初衷是爲了經過防火牆來傳輸加密的SSL數據,此時隧道的做用就是將非HTTP的流量(SSL流量)傳過防火牆到達指定的服務器。
怎麼打開隧道? HTTP提供了一個CONNECT方法 ,它是HTTP/1.1協議中預留給可以將鏈接改成管道方式的代理服務器,該方法就是用來建議一條web隧道。客戶端發送一個CONNECT請求給隧道網關請求打開一條TCP連接,當隧道打通以後,客戶端經過HTTP隧道發送的全部數據會轉發給TCP連接,服務器響應的全部數據會經過隧道發給客戶端。 (注:以來內容來源參考《計算機網絡第五版》和《HTTP權威指南》第八章的有關內容,想深刻了解的話能夠查閱之。) 關於CONNECT在HTTP 的首部的內容格式,能夠簡單以下表示: CONNECT hostname:port HTTP/1.1
這部分就不深刻分析啦,感興趣的小夥伴自行查詢吧。
在Okhttp攔截器鏈上CallServerInterceptor攔截器是最後一個攔截器,該攔截器前面的攔截器ConnectInterceptor主要負責打開TCP連接。而CallServerInterceptor的主要功能就是—向服務器發送請求,並最終返回Response對象供客戶端使用。
小知識點:100-continue 是用於客戶端在發送 post 數據給服務器時,徵詢服務器是否處理 post 的數據,若是不處理,客戶端則不上傳 post 數據,正常狀況下服務器收到請求後,返回 100 或錯誤碼。
@Override
public Response intercept(Chain chain) throws IOException {
RealInterceptorChain realChain = (RealInterceptorChain) chain;
// 獲取http請求流(於上一個攔截器建立)
HttpCodec httpCodec = realChain.httpStream();
StreamAllocation streamAllocation = realChain.streamAllocation();
RealConnection connection = (RealConnection) realChain.connection();
Request request = realChain.request();
long sentRequestMillis = System.currentTimeMillis();
realChain.eventListener().requestHeadersStart(realChain.call());
// 向服務器發送請求
httpCodec.writeRequestHeaders(request);
realChain.eventListener().requestHeadersEnd(realChain.call(), request);
Response.Builder responseBuilder = null;
// 檢測是否有請求body
if (HttpMethod.permitsRequestBody(request.method()) && request.body() != null) {
// 若是請求中有「Expect:100-continue」標頭,請在發送請求正文以前等待「HTTP / 1.1 100 繼續」響應。
// 若是咱們沒有獲得它,請返回咱們所作的事情(例如4xx響應)而不傳輸請求體。
if ("100-continue".equalsIgnoreCase(request.header("Expect"))) {
httpCodec.flushRequest();
realChain.eventListener().responseHeadersStart(realChain.call());
// 構建responseBuilder對象
responseBuilder = httpCodec.readResponseHeaders(true);
}
if (responseBuilder == null) {
// 若是知足「Expect:100-continue」指望,請向服務器發送請求body
realChain.eventListener().requestBodyStart(realChain.call());
long contentLength = request.body().contentLength();
CountingSink requestBodyOut =
new CountingSink(httpCodec.createRequestBody(request, contentLength));
BufferedSink bufferedRequestBody = Okio.buffer(requestBodyOut);
// 寫入請求體到bufferedRequestBody中
request.body().writeTo(bufferedRequestBody);
// 將全部緩衝的字節推送到其最終目標,並釋放此接收器保存的資源。
bufferedRequestBody.close();
realChain.eventListener()
.requestBodyEnd(realChain.call(), requestBodyOut.successfulCount);
} else if (!connection.isMultiplexed()) {
// 若是未知足「Expect:100-continue」指望,則阻止重用HTTP / 1 鏈接。不然,咱們仍然有義務將請求正文傳輸給使鏈接保持一致狀態。
streamAllocation.noNewStreams();
}
}
// 實際是調用了 sink.flush(), 來刷數據
httpCodec.finishRequest();
// 讀取響應頭信息,狀態碼等
if (responseBuilder == null) {
realChain.eventListener().responseHeadersStart(realChain.call());
responseBuilder = httpCodec.readResponseHeaders(false);
}
// 構建Response, 寫入本次Request,握手狀況,請求時間,獲得的結果時間
Response response = responseBuilder
.request(request)
.handshake(streamAllocation.connection().handshake())
.sentRequestAtMillis(sentRequestMillis)
.receivedResponseAtMillis(System.currentTimeMillis())
.build();
int code = response.code();
if (code == 100) {
// 服務器發送了100-continue,即便咱們沒有請求。也再次嘗試閱讀實際的回覆
responseBuilder = httpCodec.readResponseHeaders(false);
response = responseBuilder
.request(request)
.handshake(streamAllocation.connection().handshake())
.sentRequestAtMillis(sentRequestMillis)
.receivedResponseAtMillis(System.currentTimeMillis())
.build();
code = response.code();
}
realChain.eventListener()
.responseHeadersEnd(realChain.call(), response);
// 經過狀態碼判斷以及是否webSocket判斷,是否返回一個空的body
if (forWebSocket && code == 101) {
// Connection is upgrading, but
// we need to ensure interceptors see a non-null
// response body.
response = response.newBuilder()
.body(Util.EMPTY_RESPONSE)
.build();
} else {
response = response.newBuilder()
// 返回讀取響應正文的流,並構建客戶端可用的RealResponseBody
.body(httpCodec.openResponseBody(response))
.build();
}
// 若是設置了鏈接 close ,斷開鏈接
if ("close".equalsIgnoreCase(response.request().header("Connection"))
|| "close".equalsIgnoreCase(response.header("Connection"))) {
streamAllocation.noNewStreams();
}
// HTTP 204(no content) 表明響應報文中包含若干首部和一個狀態行,可是沒有實體的主體內容。
// HTTP 205(reset content) 表示響應執行成功,重置頁面(Form表單),方便用戶下次輸入
// 這裏作了一樣的處理,就是拋出協議異常。
if ((code == 204 || code == 205) && response.body().contentLength() > 0) {
throw new ProtocolException(
"HTTP " + code + " had non-zero Content-Length: " + response.body().contentLength());
}
return response;
}
複製代碼
從CallServerInterceptor攔截器的代碼中看到OkHttp是經過HttpCodec來發送請求與獲取響應的,那麼咱們分別來看一看這兩步操做:
// Http1Codec類:
@Override
public void writeRequestHeaders(Request request) throws IOException {
// 返回請求狀態行,如「GET / HTTP / 1.1」。
String requestLine = RequestLine.get(
request, streamAllocation.connection().route().proxy().type());
// 寫入請求
writeRequest(request.headers(), requestLine);
}
/**
* 經過OkIO的Sink對象(該對象能夠看作Socket的OutputStream對象)來向服務器發送請求的。
*/
public void writeRequest(Headers headers, String requestLine) throws IOException {
if (state != STATE_IDLE) throw new IllegalStateException("state: " + state);
sink.writeUtf8(requestLine).writeUtf8("\r\n");
for (int i = 0, size = headers.size(); i < size; i++) {
sink.writeUtf8(headers.name(i))
.writeUtf8(": ")
.writeUtf8(headers.value(i))
.writeUtf8("\r\n");
}
sink.writeUtf8("\r\n");
state = STATE_OPEN_REQUEST_BODY;
}
複製代碼
咱們知道HTTP支持post,delete,get,put等方法,而post,put等方法是須要請求體的(在Okhttp中用RequestBody來表示)。因此接着writeRequestHeaders以後Okhttp對請求體也作了響應的處理,從上面分析處咱們也知道請求體是經過RequestBody的writeTo方法發送出去的(其實是調用bufferedRequestBody對象的write方法,RequestBody的實例多是FormBody或者是自定義的ReqeustBody):
// 使用post簡單示例:
// 構建RequestBody(FormBody是RequestBody實現類)
FormBody.Builder formBody = new FormBody.Builder();
if(mParams != null && !mParams.isEmpty()) {
for (Map.Entry<String,String> entry: mParams.entrySet()) {
formBody.add(entry.getKey(),entry.getValue());
}
}
// 構建RequestBody並將傳入的參數保存在FormBody的encodedNames和encodedValues兩個成員集合內
RequestBody form = formBody.build();
// 添加請求頭
Request.Builder builder = new Request.Builder();
if(mHeader != null && !mHeader.isEmpty()) {
for (Map.Entry<String,String> entry: mHeader.entrySet()) {
builder.addHeader(entry.getKey(),entry.getValue());
}
}
// 建立請求的Request 對象
final Request request = builder
.post(form)
.url(mUrl)
.build();
Call call = getOkHttpClient().newCall(request);
call.enqueue(new Callback() {
@Override
public void onFailure(Call call, IOException e) {
sendFailure();
LoggerUtil.d("onFailure : "+e.getMessage());
}
@Override
public void onResponse(Call call, Response response) {
responseProcess(response);
}
});
// FormBody類 —— 寫入請求體:
@Override
public void writeTo(BufferedSink sink) throws IOException {
writeOrCountBytes(sink, false);
}
private long writeOrCountBytes(@Nullable BufferedSink sink, boolean countBytes) {
long byteCount = 0L;
Buffer buffer;
if (countBytes) {
buffer = new Buffer();
} else {
buffer = sink.buffer();
}
// 將請求體寫入sink的緩存
for (int i = 0, size = encodedNames.size(); i < size; i++) {
if (i > 0) buffer.writeByte('&');
buffer.writeUtf8(encodedNames.get(i));
buffer.writeByte('=');
buffer.writeUtf8(encodedValues.get(i));
}
if (countBytes) {
byteCount = buffer.size();
buffer.clear();
}
return byteCount;
}
複製代碼
能夠看出請求體是經過writeTo方法寫入sink緩存內,最後會經過bufferedRequestBody.close();方法將請求體發送到服務器並釋放資源(攔截器邏輯中有說明)。
// Http1Codec類:
@Override
public Response.Builder readResponseHeaders(boolean expectContinue) throws IOException {
if (state != STATE_OPEN_REQUEST_BODY && state != STATE_READ_RESPONSE_HEADERS) {
throw new IllegalStateException("state: " + state);
}
try {
// HTTP響應狀態行,如「HTTP / 1.1 200 OK」
StatusLine statusLine = StatusLine.parse(readHeaderLine());
Response.Builder responseBuilder = new Response.Builder()
.protocol(statusLine.protocol) // http協議版本
.code(statusLine.code) // http響應狀態碼
.message(statusLine.message) // http的message :like "OK" or "Not Modified"
.headers(readHeaders()); // 讀取響應報頭
if (expectContinue && statusLine.code == HTTP_CONTINUE) {
return null;
} else if (statusLine.code == HTTP_CONTINUE) {
state = STATE_READ_RESPONSE_HEADERS;
return responseBuilder;
}
state = STATE_OPEN_RESPONSE_BODY;
return responseBuilder;
} catch (EOFException e) {
// 服務器在發送響應以前結束流。
IOException exception =
new IOException("unexpected end of stream on " + streamAllocation);
exception.initCause(e);
throw exception;
}
}
private String readHeaderLine() throws IOException {
// 經過source讀取
String line = source.readUtf8LineStrict(headerLimit);
headerLimit -= line.length();
return line;
}
public Headers readHeaders() throws IOException {
Headers.Builder headers = new Headers.Builder();
// 讀取響應報頭數據,響應報頭和響應正文數據之間是有空行分隔開的,當讀取到的數據爲空行時表示響應報頭讀取完畢
for (String line; (line = readHeaderLine()).length() != 0; ) {
Internal.instance.addLenient(headers, line);
}
return headers.build();
}
複製代碼
能夠看出上面代碼只是獲取了響應頭部分的數據,咱們再來看一下讀取響應正文的代碼:
// CallServerInterceptor#intercept:
response = response.newBuilder()
// 上面分析時說明過此處爲構建客戶端可用的響應體RealResponseBody
.body(httpCodec.openResponseBody(response))
.build();
// Http1Codec類:
@Override
public ResponseBody openResponseBody(Response response) throws IOException {
streamAllocation.eventListener.responseBodyStart(streamAllocation.call);
String contentType = response.header("Content-Type");
// 判斷是否有響應體(可從響應頭信息中判斷)
if (!HttpHeaders.hasBody(response)) {
Source source = newFixedLengthSource(0);
return new RealResponseBody(contentType, 0, Okio.buffer(source));
}
// 有響應體,根據不一樣狀況,構造對應的Socket的InputStream的Source對象(用於後面獲取響應體)
if ("chunked".equalsIgnoreCase(response.header("Transfer-Encoding"))) {
Source source = newChunkedSource(response.request().url());
return new RealResponseBody(contentType, -1L, Okio.buffer(source));
}
long contentLength = HttpHeaders.contentLength(response);
if (contentLength != -1) {
Source source = newFixedLengthSource(contentLength);
return new RealResponseBody(contentType, contentLength, Okio.buffer(source));
}
return new RealResponseBody(contentType, -1L, Okio.buffer(newUnknownLengthSource()));
}
複製代碼
邏輯很簡單,openResponseBody將Socket的輸入流InputStream對象交給OkIo的Source對象(在本篇博文中只需簡單的將Sink做爲Socket的輸入流,Source做爲Socket的輸入流看待便可),而後封裝成RealResponseBody(該類是ResponseBody的子類)做爲Response的body。那麼咱們怎麼經過這個body來獲取服務器發送過來的字符串呢?咱們上面在分析緩存攔截器時提到過,咱們獲取網絡數據最後一步其實就是經過調用ResponseBody.string()方法:
// ResponseBody類:
public final String string() throws IOException {
BufferedSource source = source();
try {
Charset charset = Util.bomAwareCharset(source, charset());
//InputStream 讀取數據
return source.readString(charset);
} finally {
Util.closeQuietly(source);
}
}
複製代碼
在此處調用source.readString不只來讀取服務器的數據還須要緩存經過緩存攔截器緩存響應體(具體詳看上方分析的緩存攔截器CacheInterceptor)。須要注意的是該方法最後調用closeQuietly來關閉了當前請求的InputStream輸入流,因此string()方法只能調用一次,再次調用的話會報錯,畢竟輸入流已經關閉了。
至此,經歷一週的時間,終於分析完整個流程,不過實際上還有一部分沒有去深刻了解,好比:路由、路由選擇器、鏈接規格選擇器等等,留待後續研究吧。
OkHttp3中的代理與路由,可參考:www.jianshu.com/p/5c98999bc…
參考連接:
...
(注:如有什麼地方闡述有誤,敬請指正。歡迎指點交流)