OkHttp源碼分析

大概按照設計模式分析流程web

先來一張圖設計模式

image.png

咱們先從OkHttp的具體使用開始 :緩存

val okHttpClient = OkHttpClient()
        //val okHttpClient = OkHttpClient().newBuilder().build()
        val request = Request.Builder().build()

        val call = okHttpClient.newCall(request)
        //異步請求
        call.enqueue(object :Callback {
            override fun onFailure(call: Call, e: IOException) {

            }

            override fun onResponse(call: Call, response: Response) {

            }

        })
        
        //同步請求
        val response = call.execute()
複製代碼

我們這裏先分析異步請求:服務器

首先,分析OkhttpClient的建立markdown

//從這裏能夠看出來 他是經過建造者模式來建立對象
    public OkHttpClient() {
        this(new Builder());
    }
    
    //------------------Builder-------------------
    public Builder() {
      //建立分配器,對Call進行處理
      dispatcher = new Dispatcher();
      protocols = DEFAULT_PROTOCOLS;
      connectionSpecs = DEFAULT_CONNECTION_SPECS;
      eventListenerFactory = EventListener.factory(EventListener.NONE);
      proxySelector = ProxySelector.getDefault();
      if (proxySelector == null) {
        proxySelector = new NullProxySelector();
      }
      cookieJar = CookieJar.NO_COOKIES;
      socketFactory = SocketFactory.getDefault();
      hostnameVerifier = OkHostnameVerifier.INSTANCE;
      certificatePinner = CertificatePinner.DEFAULT;
      proxyAuthenticator = Authenticator.NONE;
      authenticator = Authenticator.NONE;
      connectionPool = new ConnectionPool();
      dns = Dns.SYSTEM;
      followSslRedirects = true;
      followRedirects = true;
      retryOnConnectionFailure = true;
      //對各類超時的處理
      callTimeout = 0;
      connectTimeout = 10_000;
      readTimeout = 10_000;
      writeTimeout = 10_000;
      pingInterval = 0;
    }
    
    
複製代碼

而後咱們再來看Request的建立cookie

//建造者模式 
        Request.Builder().build()
        
        //只有一個構造函數,而且protected ,只能經過build
        Request(Builder builder) 
複製代碼

咱們看看內部的build都幹了什麼網絡

public Builder() {
      //設置默認請求爲GET
      this.method = "GET";
      //初始化請求頭集合
      this.headers = new Headers.Builder();
    }
    
    //調用Request的有參函數, 建立請求
    public Request build() {
      if (url == null) throw new IllegalStateException("url == null");
      return new Request(this);
    }
    
複製代碼

上面的代碼應該沒什麼難度 ,咱們繼續進行下一步 。異步

okHttpClient.newCall(request)socket

//建立請求的Call
    @Override public Call newCall(Request request) {
        //工廠模式建立
        return RealCall.newRealCall(this, request, false /* for web socket */);
    }
    
複製代碼

看到@Override應該想到這個方法不是繼承就是實現async

public class OkHttpClient implements Cloneable, Call.Factory, WebSocket.Factory
    
    //實現了這個接口
    interface Factory {
        Call newCall(Request request);
    }
    
複製代碼

經過這個能夠發現 ,這裏利用了工廠模式的思想 ,不關心具體的實現細節,提供一個方法負責拿到Call對象就能夠了 ,具體的細節交給子類實現 。

再回到咱們的主線流程

static RealCall newRealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {
        // Safely publish the Call instance to the EventListener.
        //構建了RealCall
        RealCall call = new RealCall(client, originalRequest, forWebSocket);
        //事件發射器 ,控制connection stream requests responses
        call.transmitter = new Transmitter(client, call);
        return call;
      }
複製代碼

異步請求

拿到Call以後,接下來開始執行一步請求

call.enqueue(object :Callback {
        override fun onFailure(call: Call, e: IOException) {

        }

        override fun onResponse(call: Call, response: Response) {

        }

    })
複製代碼

咱們直接看RealCallenqueue

@Override public void enqueue(Callback responseCallback) {
        synchronized (this) {
          if (executed) throw new IllegalStateException("Already Executed");
          executed = true;
        }
        //發射器執行 eventListener.callStart()
        transmitter.callStart();
        //這裏是關鍵的代碼
        client.dispatcher().enqueue(new AsyncCall(responseCallback));
   }
複製代碼

咱們先看看Dispatcher

public Dispatcher dispatcher() {
        return dispatcher;
  }
  
  //貌似沒什麼 ,讓咱們看看enqueue方法
  void enqueue(AsyncCall call) {
    synchronized (this) {
      //加入到異步的準備集合中    這裏須要說一下 Okhttp的有 異步隊列  同步隊列  異步準備隊列,是經過隊列的形式存儲請求信息的 
      readyAsyncCalls.add(call);

      // Mutate the AsyncCall so that it shares the AtomicInteger of an existing running call to
      // the same host.
      //默認是false
      if (!call.get().forWebSocket) {
        //從正在運行的隊列中查找有沒有這個call 有就返回 ,不然在等待隊列中查找 
        AsyncCall existingCall = findExistingCallWithHost(call.host());
        //若是存在 將這個call 賦值到 callsPerHost 變量
        if (existingCall != null) call.reuseCallsPerHostFrom(existingCall);
      }
    }
    
    //這裏會用到callsPerHost
    promoteAndExecute();
  }
  
  
  /**
   * Promotes eligible calls from {@link #readyAsyncCalls} to {@link #runningAsyncCalls} and runs
   * them on the executor service. Must not be called with synchronization because executing calls
   * can call into user code.
   *
   * @return true if the dispatcher is currently running calls.
   */
  private boolean promoteAndExecute() {
    assert (!Thread.holdsLock(this));

    List<AsyncCall> executableCalls = new ArrayList<>();
    boolean isRunning;
    synchronized (this) {
        //從準備隊列讀值
      for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
        AsyncCall asyncCall = i.next();
           
        //maxRequests 最大請求數(64)
        if (runningAsyncCalls.size() >= maxRequests) break; // Max capacity.
        //maxRequestsPerHost 最大請求同一個主機的數量(5)
        if (asyncCall.callsPerHost().get() >= maxRequestsPerHost) continue; // Host max capacity.

        i.remove();
        asyncCall.callsPerHost().incrementAndGet();
        //加入到執行集合中
        executableCalls.add(asyncCall);
        //加入到運行隊列中
        runningAsyncCalls.add(asyncCall);
      }
      isRunning = runningCallsCount() > 0;
    }

    for (int i = 0, size = executableCalls.size(); i < size; i++) {
      AsyncCall asyncCall = executableCalls.get(i);
      //這裏開始真正的執行 
      asyncCall.executeOn(executorService());
    }

    return isRunning;
  }
  
  
  ------------------------------AsyncCall---------------------------------

/**
     * Attempt to enqueue this async call on {@code executorService}. This will attempt to clean up
     * if the executor has been shut down by reporting the call as failed.
     */
    void executeOn(ExecutorService executorService) {
      assert (!Thread.holdsLock(client.dispatcher()));
      boolean success = false;
      try {
        //這裏開始執行 內部handler來執行call 這個 runnable
        executorService.execute(this);
        success = true;
      } catch (RejectedExecutionException e) {
        InterruptedIOException ioException = new InterruptedIOException("executor rejected");
        ioException.initCause(e);
        transmitter.noMoreExchanges(ioException);
        //異常返回
        responseCallback.onFailure(RealCall.this, ioException);
      } finally {
        if (!success) {
          client.dispatcher().finished(this); // This call is no longer running!
        }
      }
    }
複製代碼

讓咱們來看看AsyncCall

final class AsyncCall extends NamedRunnable {
        ***
        AsyncCall(Callback responseCallback) {
          super("OkHttp %s", redactedUrl());
          this.responseCallback = responseCallback;
        }
        
        ***
    }
複製代碼

咱們能夠看到AsyncCall 繼承的是 NamedRunnable

/**
 * Runnable implementation which always sets its thread name.
 */
public abstract class NamedRunnable implements Runnable {
  protected final String name;

  public NamedRunnable(String format, Object... args) {
    this.name = Util.format(format, args);
  }
   
  @Override public final void run() {
    String oldName = Thread.currentThread().getName();
    Thread.currentThread().setName(name);
    try {
      execute();
    } finally {
      Thread.currentThread().setName(oldName);
    }
  }

  protected abstract void execute();
}
複製代碼

能夠看到一個Runnable

run方法中設置了線程的名稱 ,執行了 execute();

@Override protected void execute() {
      boolean signalledCallback = false;
      transmitter.timeoutEnter();
      try {
        //重要的方法來了getResponseWithInterceptorChain()
        Response response = getResponseWithInterceptorChain();
        signalledCallback = true;
        //返回網絡響應信息
        responseCallback.onResponse(RealCall.this, response);
      } catch (IOException e) {
        if (signalledCallback) {
          // Do not signal the callback twice!
          Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
        } else {
          responseCallback.onFailure(RealCall.this, e);
        }
      } catch (Throwable t) {
        cancel();
        if (!signalledCallback) {
          IOException canceledException = new IOException("canceled due to " + t);
          canceledException.addSuppressed(t);
          responseCallback.onFailure(RealCall.this, canceledException);
        }
        throw t;
      } finally {
        //從完成任務 釋放call
        client.dispatcher().finished(this);
      }
    }
複製代碼

終於找到了Response的身影 , 說明getResponseWithInterceptorChain這個方法中進行的網絡請求 ,下面進行了一些異常判斷的接口回調 ,釋放call資源 ,就先不分析 ,咱們這裏看看、getResponseWithInterceptorChain 這個方法 。

Response getResponseWithInterceptorChain() throws IOException {
    // Build a full stack of interceptors.
    List<Interceptor> interceptors = new ArrayList<>();
    //添加用戶的自定義攔截器
    interceptors.addAll(client.interceptors());
    //添加劇定向的攔截器
    interceptors.add(new RetryAndFollowUpInterceptor(client));
    //封裝request和response過濾器
    interceptors.add(new BridgeInterceptor(client.cookieJar()));
    //緩存相關的過濾器,負責讀取緩存直接返回、更新緩存
    interceptors.add(new CacheInterceptor(client.internalCache()));
    //負責和服務器創建鏈接
    interceptors.add(new ConnectInterceptor(client));
    if (!forWebSocket) {
      //配置 OkHttpClient 時設置的 networkInterceptors
      interceptors.addAll(client.networkInterceptors());
    }
    //負責向服務器發送請求數據、從服務器讀取響應數據(實際網絡請求)
    interceptors.add(new CallServerInterceptor(forWebSocket));

    //originalRequest 就是真實的請求 ,這裏注意一下這個 0
    Interceptor.Chain chain = new RealInterceptorChain(interceptors, transmitter, null, 0,
        originalRequest, this, client.connectTimeoutMillis(),
        client.readTimeoutMillis(), client.writeTimeoutMillis());

    

    boolean calledNoMoreExchanges = false;
    try {
      //這裏開始請求
      Response response = chain.proceed(originalRequest);
      if (transmitter.isCanceled()) {
        closeQuietly(response);
        throw new IOException("Canceled");
      }
      return response;
    } catch (IOException e) {
      calledNoMoreExchanges = true;
      throw transmitter.noMoreExchanges(e);
    } finally {
      if (!calledNoMoreExchanges) {
        transmitter.noMoreExchanges(null);
      }
    }
  }
複製代碼

能夠看到上面調用了 Interceptor.Chainproceed方法獲取 Response,咱們看看Interceptor.Chain 的實現類 RealInterceptorChainproceed方法

@Override public Response proceed(Request request) throws IOException {
    //是個多態
    return proceed(request, transmitter, exchange);
  }

  public Response proceed(Request request, Transmitter transmitter, @Nullable Exchange exchange)
      throws IOException {
    if (index >= interceptors.size()) throw new AssertionError();
    
    //這裏其實爲了確保每個攔截器只執行一次
    calls++;

    // If we already have a stream, confirm that the incoming request will use it.
    if (this.exchange != null && !this.exchange.connection().supportsUrl(request.url())) {
      throw new IllegalStateException("network interceptor " + interceptors.get(index - 1)
          + " must retain the same host and port");
    }

    // If we already have a stream, confirm that this is the only call to chain.proceed().
    if (this.exchange != null && calls > 1) {
      throw new IllegalStateException("network interceptor " + interceptors.get(index - 1)
          + " must call proceed() exactly once");
    }

    // Call the next interceptor in the chain.
    //注意這裏的 index + 1
    RealInterceptorChain next = new RealInterceptorChain(interceptors, transmitter, exchange,
        index + 1, request, call, connectTimeout, readTimeout, writeTimeout);
    //拿出第index個攔截器 還記得我們的初始是0
    Interceptor interceptor = interceptors.get(index);
    //執行intercept
    Response response = interceptor.intercept(next);

    // Confirm that the next interceptor made its required call to chain.proceed().
    if (exchange != null && index + 1 < interceptors.size() && next.calls != 1) {
      throw new IllegalStateException("network interceptor " + interceptor
          + " must call proceed() exactly once");
    }

    // Confirm that the intercepted response isn't null.
    if (response == null) {
      throw new NullPointerException("interceptor " + interceptor + " returned null");
    }

    if (response.body() == null) {
      throw new IllegalStateException(
          "interceptor " + interceptor + " returned a response with no body");
    }

    return response;
  }
複製代碼

上面能夠看出來,執行intercept獲取了Response,讓咱們往裏走,咱們先用一個系統攔截器進行分析 。

public final class RetryAndFollowUpInterceptor implements Interceptor {
  ***

  @Override public Response intercept(Chain chain) throws IOException {
    Request request = chain.request();
    RealInterceptorChain realChain = (RealInterceptorChain) chain;
    Transmitter transmitter = realChain.transmitter();

    int followUpCount = 0;
    Response priorResponse = null;
    while (true) {
      //準備鏈接請求
      transmitter.prepareToConnect(request);

      if (transmitter.isCanceled()) {
        throw new IOException("Canceled");
      }

      Response response;
      boolean success = false;
      try {
        // !!!  這裏又一次調用了realChain.proceed() 開始下一個攔截器
        response = realChain.proceed(request, transmitter, null);
        success = true;
      } catch (RouteException e) {
        // The attempt to connect via a route failed. The request will not have been sent.
        if (!recover(e.getLastConnectException(), transmitter, false, request)) {
          throw e.getFirstConnectException();
        }
        continue;
      } catch (IOException e) {
        // An attempt to communicate with a server failed. The request may have been sent.
        boolean requestSendStarted = !(e instanceof ConnectionShutdownException);
        if (!recover(e, transmitter, requestSendStarted, request)) throw e;
        continue;
      } finally {
        // The network call threw an exception. Release any resources.
        if (!success) {
          transmitter.exchangeDoneDueToException();
        }
      }

      // Attach the prior response if it exists. Such responses never have a body.
      if (priorResponse != null) {
        response = response.newBuilder()
            .priorResponse(priorResponse.newBuilder()
                    .body(null)
                    .build())
            .build();
      }

      Exchange exchange = Internal.instance.exchange(response);
      Route route = exchange != null ? exchange.connection().route() : null;
      Request followUp = followUpRequest(response, route);

      if (followUp == null) {
        if (exchange != null && exchange.isDuplex()) {
          transmitter.timeoutEarlyExit();
        }
        return response;
      }

      RequestBody followUpBody = followUp.body();
      if (followUpBody != null && followUpBody.isOneShot()) {
        return response;
      }

      closeQuietly(response.body());
      if (transmitter.hasExchange()) {
        exchange.detachWithViolence();
      }

      if (++followUpCount > MAX_FOLLOW_UPS) {
        throw new ProtocolException("Too many follow-up requests: " + followUpCount);
      }

      request = followUp;
      priorResponse = response;
    }
  }
  
  ***
}
複製代碼

看到上面的代碼就開始OkHttp最有意思的環節了 ,也是它最經典的模式 ,責任鏈模式, 有一點遞歸的思想 ,一層一層 ,最後獲得這個Response 。

同步請求

@Override public Response execute() throws IOException {
    synchronized (this) {
      if (executed) throw new IllegalStateException("Already Executed");
      executed = true;
    }
    transmitter.timeoutEnter();
    transmitter.callStart();
    try {
      //經過調度器將請求放到隊列中
      client.dispatcher().executed(this);
      //真正開始請求
      return getResponseWithInterceptorChain();
    } finally {
      client.dispatcher().finished(this);
    }
  }
複製代碼

看完了異步 ,其實同步很簡單 ,一個流程 ,就到這裏吧 。

相關文章
相關標籤/搜索