Okhttp源碼解析(四)——Okio解析

參考:https://www.jianshu.com/p/f5941bcf3a2djava

 

 

1.什麼是OKIO

  將JDK封裝的IO操做再進行一層封裝數組

  好處:安全

  (1)使用了segement片斷管理的方式管理數據,以片斷做爲IO操做的單位,使IO操做吞吐率增長架構

   (2)使用鏈表將Segment片斷聯繫起來管理,對於移動指針就能夠對數據進行管理,擴容方便異步

  (3)使用SegmentPool對廢棄的片斷回收、複用和內存共享,從而減小內存的申請和減小了GC的頻率,性能獲得優化。socket

  (4)封裝了輸出輸入的超時處理,着在JDK原生io操做中沒有ide

  (5)具有了不少從字節到特定編碼格式或者數據類型的轉化方法,很方便函數

2.OKIO的簡單使用  

  OKIO寫流程圖:性能

  

public static void main(String[] args) {
        File file = new File("hello.txt");
        try {
            readString(new FileInputStream(file));
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public static void readString(InputStream in) throws IOException {
      BufferedSource source = Okio.buffer(Okio.source(in));  //建立RealBufferedSource輸入流
      String s = source.readUtf8();  //以UTF-8讀,就是經過執行回調(Okio.source(in)的read方法)往Buffer中寫source輸入流的數據,再返回Buffer存儲size的所有字節
      System.out.println(s);     
      source.close();
    }
  public static void writeString(OutputStream out){
    BufferedSink sink = Okio.buffer(Okio.sink(out));
            //建立RealBufferedSink輸出流
    sink.writeLone("test");//裏面往Buffer裏面寫「test」,再經過執行回調(Okio.sink(out)的write方法)將Buffer裏面的數據寫到輸出流sink裏
    sink.close();
  }    

  

 

3.okio架構優化

Souce、Sink接口:

    兩接口分別定義了read()和wriete(),表明着輸入流和輸出流,

BufferedSource、BufferedSink接口:

  分別繼承了Source和Sink,維護者Buffer,分別定義了一堆針對各類類型的讀和寫操做方法。

RealBufferedSource、RealBufferedSink:

    至關於一個操做的中介代理,內部都維護着一個Buffer,分別定義了進行具體操做。

 

Segment:

    存儲最大長度爲8K字節數組,該數組爲不可變字節序列,pos表明開始能夠讀的字節序號,limit表明能夠寫的字節序號,boolean shared表明該片斷是否共享,boolean owner表明本身是否能夠操做本片斷(與shared互斥),經過per、next造成鏈表(Buffer構建出雙向鏈表),方法固然有push()、pop()

    壓縮機制:

      因爲存儲的結果就是Head-new1-new2-new3-這樣子,新建new的前提僅僅是上一片斷(例如new3)的limit到Segment_size_max小於要存儲的字節數,若是很長時間會有一個segment只存了小部分數據,可是有不少segment,形成內存浪費。所以須要壓縮方法compact(),具體作法:先將上一片斷的數據從pos往前移,使pos從序號0開始,而後將本片斷的數據寫到上一片斷上,從limit開始,而後將本片斷拋出。

public void compact() {
    if (prev == this) throw new IllegalStateException();//若是上一節點是本身表明沒有上一節點
    if (!prev.owner) return; // Cannot compact: prev isn't writable.//若是上一節點不能夠操做,返回
    int byteCount = limit - pos;//記錄當前片斷存儲的字節長度L1
    int availableByteCount = SIZE - prev.limit + (prev.shared ? 0 : prev.pos);//計算上一節點沒有存儲數據的長度L2(pos以前的長度+limit以後的長度,若是perv是共享的,那麼就不能加入pos以前的長度)
    if (byteCount > availableByteCount) return; // Cannot compact: not enough writable space.//若是L1>L2,那麼就不夠位置存,返回
    writeTo(prev, byteCount);//將本部分數據寫入上一節點
    pop();//彈出本節點
    SegmentPool.recycle(this);
  }

 

/** Moves {@code byteCount} bytes from this segment to {@code sink}. */
  public void writeTo(Segment sink, int byteCount) {//往sink部分寫入this(Segment)長度爲byteCount的數據
    if (!sink.owner) throw new IllegalArgumentException();
    if (sink.limit + byteCount > SIZE) {
      // We can't fit byteCount bytes at the sink's current position. Shift sink first.
      if (sink.shared) throw new IllegalArgumentException();
      if (sink.limit + byteCount - sink.pos > SIZE) throw new IllegalArgumentException();
      System.arraycopy(sink.data, sink.pos, sink.data, 0, sink.limit - sink.pos);//先將sink本來存儲的數據往前移,pos設爲1
      sink.limit -= sink.pos;
      sink.pos = 0;
    }

    System.arraycopy(data, pos, sink.data, sink.limit, byteCount);//將this(Segment)數據寫入sink中,從limit開始
    sink.limit += byteCount;
    pos += byteCount;
  }

   

    共享機制(減小數據複製帶來的開銷)

    原理:將一個Segment分爲[pos...pos+byteCount]和[pos+byteCount...limit]兩部分,該方法只用在Buffer.wirte()上,注意的是byteCount參數的意義是兩個Segment相同的部分,若是相同的部分超多SHARE_MINIMUM=1024字節,就共享一個Segment,看起來咱們是new了一個Segment,可是裏面的data[]引用仍是原來的Segment,所以可以減小數據複製帶來的內存開銷,從而提升性能

public Segment split(int byteCount) {
    if (byteCount <= 0 || byteCount > limit - pos) throw new IllegalArgumentException();
    Segment prefix;

    // We have two competing performance goals:
    //  - Avoid copying data. We accomplish this by sharing segments.
    //  - Avoid short shared segments. These are bad for performance because they are readonly and
    //    may lead to long chains of short segments.
    // To balance these goals we only share segments when the copy will be large.
    if (byteCount >= SHARE_MINIMUM) {
      prefix = new Segment(this);
    } else {
      prefix = SegmentPool.take();
      System.arraycopy(data, pos, prefix.data, 0, byteCount);
    }

    prefix.limit = prefix.pos + byteCount;
    pos += byteCount;
    prev.push(prefix);
    return prefix;
  }

  

  

SegmentPool(減小cpu消耗(減小GC)+減小內存消耗):

    該池經過一個segment變量將所有segment聯繫起來,存儲最多8個segment的片斷池,若是該池還未滿,一旦segment廢棄了就放到該池中,減小了GC。若是io操做須要新建segment的話就不用new申請內存空間了,直接從片斷池中獲取。

 

 

Buffer:

    具體讀寫操做的執行類,內部維護着一個Segment head,經過該片斷就能夠獲取整個鏈表的全部數據,long size表明已經存儲的總字節數。實現了三個接口,讀、寫和Clone。

    實現及好處:循環的雙向鏈表實現,鏈表節點爲Segment片斷,而Segment維護着固定長度的字節數組。這樣採起分片的連接方式,片中又經過數組存儲,兼具讀的連續性和寫的可插入性,對比於單一的用鏈表或者數組,是比較折中的方式。還有個好處就是根據需求來改動分片的大小來權衡讀寫的業務操做。


    clone():

/** Returns a deep copy of this buffer. */
  @Override public Buffer clone() {
    Buffer result = new Buffer();
    if (size == 0) return result;

    result.head = new Segment(head);
    result.head.next = result.head.prev = result.head;//1
    for (Segment s = head.next; s != head; s = s.next) {
      result.head.prev.push(new Segment(s));
    }
    result.size = size;
    return result;
  }

 

Segment(Segment shareFrom) {
    this(shareFrom.data, shareFrom.pos, shareFrom.limit);
    shareFrom.shared = true;
  }

  Segment(byte[] data, int pos, int limit) {
    this.data = data;
    this.pos = pos;
    this.limit = limit;
    this.owner = false;
    this.shared = true;
  }

  該Clone拷貝方法爲淺拷貝,可是咱們注意到雖然拷貝新建了一個Buffer,可是該Buffer的head變量的構造由new Segment(head)構建,細看該構造器中最重要的字節數組data引用指向了新的data,沒有從新分配新的內存空間,性能獲得優化。Segment的shared共享標示參數設爲true,結合Segment類不少設計到shared的方法,有不少好處。

  同時注意到語句1,result.head.next = result.head.prev = result.head,這樣就造成了循環的鏈表中循環的特性了,以前分析Segment類不管增長數據add Segment或者讀取數據 popSegment都沒有造成循環閉環的特性,原來閉環(循環)的特性在Buffer類中造成。

 

  除此以外,Buffer進行寫操做每次都必須先獲取segment,語句2也是造成閉環的緣由

Segment writableSegment(int minimumCapacity) {
    if (minimumCapacity < 1 || minimumCapacity > Segment.SIZE) throw new IllegalArgumentException();

    if (head == null) {
      head = SegmentPool.take(); // Acquire a first segment.
      return head.next = head.prev = head;//2
    }

    Segment tail = head.prev;
    if (tail.limit + minimumCapacity > Segment.SIZE || !tail.owner) {//須要的長度若是從limit上再追加大於了Segment的最大大小了
      tail = tail.push(SegmentPool.take()); // Append a new empty segment to fill up.//使得head.prev每次都指向新push的segment
    }
    return tail;
  }

  

  全是讀、寫操做方法:

    具體分爲int(4)、long(8)、short(2)的等等數據類型的讀和寫,須要注意的是各類數據類型須要多少個字節存儲,讀和寫的參數分爲直接是字節數組或者是source、sink、輸入流、輸出流等等,大同小異。

  utf-8編碼輸出:

    若是爲String數據類型做爲輸入或者輸出時,編碼格式能夠選擇,utf-8

    幫助方法:

    以md五、sha一、sha256數字摘要輸出Buffer裏面存儲的數據。

 

ByteString

  其實就是一個不可變的字節數組,以特定的編碼格式進行解碼,支持的解碼方式有utf-8,base64和hex。

 

final字段科普

  該字段其實就是不可變的意思,在Segment存儲data仍是ByteString裏面data都設了該字段。

  定義:

    類不被拓展(繼承或者實現)

    全部域都是final的,全部域都是private

    沒有任何提供改變該類狀態的方法

  好處:固然是線程安全的,不用考慮同步的問題(final的語義:當構造函數結束時,final定義的值被保證當其餘線程訪問該對象時,該值是可見的)

  壞處:須要建立大量類的對象(。。。)

 

 

Okio中的超時機制

  超時機制:意義在於防止IO操做阻塞在某個異常上,okio的超時機制的基礎就是採起同步超時TimeOut。

  同步:TimeOut類

  使用在以任何inputStream或者outPutStream做爲輸入流輸出流參數構建時,使用同步超時機制。

  其實經過對OKIO的簡單使用,咱們知道讀寫操做的具體操做都在構建輸入輸出流的的回調方法。

 

  如下sink構建輸出流代碼爲例(source(final InPutStream in)構建輸入流在超時處理上也同樣執行了語句1,timeOut.throwInReached(),不拓展了)

  OKIO:

public static Sink sink(OutputStream out) {
return sink(out, new Timeout());
}

 

private static Sink sink(final OutputStream out, final Timeout timeout) {
    if (out == null) throw new IllegalArgumentException("out == null");
    if (timeout == null) throw new IllegalArgumentException("timeout == null");

    return new Sink() {
      @Override public void write(Buffer source, long byteCount) throws IOException {
        checkOffsetAndCount(source.size, 0, byteCount);
        while (byteCount > 0) {
          timeout.throwIfReached();//1
          Segment head = source.head;
          int toCopy = (int) Math.min(byteCount, head.limit - head.pos);
          out.write(head.data, head.pos, toCopy);

          head.pos += toCopy;
          byteCount -= toCopy;
          source.size -= toCopy;

          if (head.pos == head.limit) {
            source.head = head.pop();
            SegmentPool.recycle(head);
          }
        }
      }

      @Override public void flush() throws IOException {
        out.flush();
      }

      @Override public void close() throws IOException {
        out.close();
      }

      @Override public Timeout timeout() {
        return timeout;
      }

      @Override public String toString() {
        return "sink(" + out + ")";
      }
    };
  }

  TimeOut:  

public void throwIfReached() throws IOException {
    if (Thread.interrupted()) {
      throw new InterruptedIOException("thread interrupted");
    }

    if (hasDeadline && deadlineNanoTime - System.nanoTime() <= 0) {
      throw new InterruptedIOException("deadline reached");
    }
  }

  

 

  異步:AsyncTimeOut,

    使用在socket做爲輸入輸出流時,使用異步超時處理,之因此在socket上使用異步超時處理,由於socket自身性質決定,socket經常由於自身阻塞致使往下不執行。

  okio:與其餘inputStream不一樣的是,他不直接返回source,而是做爲AsyncTime.source()的參數再進行一次封裝,返回source

public static Source source(Socket socket) throws IOException {
    if (socket == null) throw new IllegalArgumentException("socket == null");
    AsyncTimeout timeout = timeout(socket);
    Source source = source(socket.getInputStream(), timeout);
    return timeout.source(source);
  }

  AsyncTimeOut:咱們發現這裏纔是執行的準確位置,語句1就是涉及到異步超時處理機制。

    在開始讀或者寫以前執行enter(),該方法就是開啓一個WatchDog線程,待分析。。。

public final Source source(final Source source) {
    return new Source() {
      @Override public long read(Buffer sink, long byteCount) throws IOException {
        boolean throwOnTimeout = false;
        enter();//1
        try {
          long result = source.read(sink, byteCount);
          throwOnTimeout = true;
          return result;
        } catch (IOException e) {
          throw exit(e);
        } finally {
          exit(throwOnTimeout);
        }
      }

      @Override public void close() throws IOException {
        boolean throwOnTimeout = false;
        try {
          source.close();
          throwOnTimeout = true;
        } catch (IOException e) {
          throw exit(e);
        } finally {
          exit(throwOnTimeout);
        }
      }

      @Override public Timeout timeout() {
        return AsyncTimeout.this;
      }

      @Override public String toString() {
        return "AsyncTimeout.source(" + source + ")";
      }
    };
  }
相關文章
相關標籤/搜索