Okio源碼分析

okioSquare開源框架之一,它對java.iojava.nio作了補充,使訪問,存儲和數據處理變得更加容易。它最先是Okhttp組件之一。 html

在這裏插入圖片描述

一、ByteString與Buffer

Okio主要圍繞ByteStringBuffer這兩個類展開,其主要功能都封裝在這兩個類中:java

  • ByteString:是一個相似String的不可變類,它能夠很容易的在byteString之間進行轉換。該類提供了編/解碼爲hex,md5,base64及UTF-8等方法。
  • Buffer:是一個可變的字節序列。 與ArrayList同樣,無需提早調整緩衝區大小。 Buffer內部維護了一個雙向鏈表,從鏈表尾部寫入數據,頭部讀取數據。

ByteStringBuffer作了一些節省CPU和內存的操做。 若是將一個字符串編碼爲ByteStringByteString就會緩存對該字符串的引用(以空間換時間),這樣若是之後對其進行編/解碼等操做,則無需在byteString之間進行轉換。node

//字符串對應的字節數據,避免再一次轉換
  final byte[] data;
  //字符串
  transient String utf8; // Lazily computed.
複製代碼

Buffer內部維護了一個以Segment爲節點的雙向鏈表。 當數據從一個Buffer移動到另外一個Buffer時,僅須要進行一次數據拷貝,且它會從新分配Segment的全部權,而不是從新建立Segment對象。git

二、Source與Sink

Okio包含本身的流類型,稱爲SourceSink,其工做方式雖然相似InputStreamOutputStream,但它與Java I/O相比具備如下優點(參考自Android學習筆記——Okio):github

  • Okio實現了I/O讀寫的超時機制(Timeout),防止讀寫出錯從而致使一直阻塞。
  • N合一,OKio精簡了輸入輸出流的類個數
  • 低的CPU和內存消耗,引入SegmentSegmentPool複用機制
  • 使用方便。ByteString處理不變byteBuffer處理可變byte
  • 提供了一系列的工具。OKio支持md五、sha、base64等數據處理

SourceSink能夠與InputStreamOutputStream互相操做。咱們能夠將任何Source視爲InputStream,也能夠將任何InputStream視爲Source。一樣適用於SinkInputStream數組

三、Okio數據讀寫流程

 前面簡單介紹了Okio,下面就來看看如何使用。緩存

//okio實現圖片複製
    public void copyImage(File sinkFile, File sourceFile) throws IOException {
        //try裏面的代碼是Okio的標準寫法,不能改變
        try (Sink sink = Okio.sink(sinkFile);
             BufferedSink bufferedSink = Okio.buffer(sink);
             //從文件讀取數據
             Source source = Okio.source(sourceFile);
             BufferedSource bufferedSource = Okio.buffer(source)) {
            //圖片複製
            bufferedSink.write(bufferedSource.readByteArray());
            //設置超時時間爲1秒中,
            sink.timeout().deadline(1, TimeUnit.SECONDS);
            //寫入數據,將字符串以UTF-8格式寫入,Okio專門針對utf-8作了處理
            bufferedSink.writeUtf8(entry.getKey())
                     .writeUtf8("=")
                     .writeUtf8(entry.getValue())
                     .writeUtf8("\n");
            //讀取數據
            String str=bufferedSource.readUtf8();
            //讀取數據並返回一個ByteString
            ByteStringstr=bufferedSource.readByteString();

        }
    }
複製代碼

 正如前面所說的那樣,Okio使用起來很是方便。因爲Java字符串採用的是UTF-16編碼,而通常開發中使用的都是UTF-8編碼,因此Okio對字符串編碼作了特殊處理。框架

3.一、Okio讀數據原理分析

Source的意思是水源,它對應着輸入流,在Okio中經過Okio.source方法來得到一個Source對象。異步

//在Okio這個類中關於source重載的方法仍是蠻多的,這裏以文件爲例
  public static Source source(File file) throws FileNotFoundException {
    if (file == null) throw new IllegalArgumentException("file == null");
    return source(new FileInputStream(file));
  }
  public static Source source(InputStream in) {
    return source(in, new Timeout());
  }
  private static Source source(final InputStream in, final Timeout timeout) {
    ...
    //這裏纔是真正讀去數據的地方
    return new Source() {
      @Override public long read(Buffer sink, long byteCount) throws IOException {
        ...
        try {
          //每次寫數據時都先檢查是否超時,默認未設置超時
          timeout.throwIfReached();
          //獲取鏈表的尾節點
          Segment tail = sink.writableSegment(1);
          //因爲每一個Segment的SIZE爲8KB,因此每一次拷貝不能超過這個值
          int maxToCopy = (int) Math.min(byteCount, Segment.SIZE - tail.limit);
          //經過InputStream讀取數據
          int bytesRead = in.read(tail.data, tail.limit, maxToCopy);
          //數據讀取完畢
          if (bytesRead == -1) return -1;
          //可寫取位置日後移
          tail.limit += bytesRead;
          //讀取的總字節數
          sink.size += bytesRead;
          //返回當前讀取的字節數
          return bytesRead;
        } catch (AssertionError e) {
          ...
        }
      }
      ...
    };
  }
複製代碼

 能夠發現,這個的Source是一個匿名對象。獲得Source對象後,經過Okio.buffer方法將該對象傳遞給BufferedSourceBufferedSource是一個接口,它的具體實現類是RealBufferedSourcesocket

 在上面例子中是調用RealBufferedSourcereadByteArray方法來讀取數據,下面就來看這個方法的實現。

//RealBufferedSource對應的Buffer 
  public final Buffer buffer = new Buffer();
  @Override public byte[] readByteArray() throws IOException {
    //將數據寫入buffer
    buffer.writeAll(source);
    //將全部數據已字節數組形式返回
    return buffer.readByteArray();
  }
複製代碼

 在readByteArray方法中會首先將數據寫入到Buffer中,並生成一個雙向鏈表。

@Override public long writeAll(Source source) throws IOException {
    if (source == null) throw new IllegalArgumentException("source == null");
    long totalBytesRead = 0;
    //這裏的source就是前面在Okio中建立的匿名Source對象
    for (long readCount; (readCount = source.read(this, Segment.SIZE)) != -1; ) {
      totalBytesRead += readCount;
    }
    return totalBytesRead;
  }
複製代碼

 將數據寫入Buffer後,調用BufferreadByteArray方法生成一個字節數組並返回。

@Override
    public byte[] readByteArray() {
        try {
            //在讀取數據時,就會獲得size的大小
            return readByteArray(size);
        } catch (EOFException e) {
            throw new AssertionError(e);
        }
    }
    @Override
    public byte[] readByteArray(long byteCount) throws EOFException {
        checkOffsetAndCount(size, 0, byteCount);
        ...
        //建立一個大小爲size的byte數組
        byte[] result = new byte[(int) byteCount];
        //將讀取的數據寫入這個數組中
        readFully(result);
        return result;
    }
    @Override
    public void readFully(byte[] sink) throws EOFException {
        int offset = 0;
        while (offset < sink.length) {
            //不斷的將數據寫入sink數組中
            int read = read(sink, offset, sink.length - offset);
            if (read == -1) throw new EOFException();
            offset += read;
        }
    }
    @Override
    public int read(byte[] sink, int offset, int byteCount) {
        checkOffsetAndCount(sink.length, offset, byteCount);

        Segment s = head;
        if (s == null) return -1;
        int toCopy = Math.min(byteCount, s.limit - s.pos);
        //進行數據拷貝
        System.arraycopy(s.data, s.pos, sink, offset, toCopy);

        s.pos += toCopy;
        size -= toCopy;
        //釋放Segment並將其放入緩衝池
        if (s.pos == s.limit) {
            head = s.pop();
            SegmentPool.recycle(s);
        }

        return toCopy;
    }
複製代碼

 這樣就將數據寫入到一個新的數組中,並將鏈表中的全部Segment從新初始化並放入池中。

3.二、Okio寫數據原理分析

Sink的意思是水槽,它對應着輸出流。經過Okio.sink來獲取一個Sink對象。

public static Sink sink(File file) throws FileNotFoundException {
    if (file == null) throw new IllegalArgumentException("file == null");
    return sink(new FileOutputStream(file));
  }
  public static Sink sink(OutputStream out) {
    return sink(out, new Timeout());
  }

  private static Sink sink(final OutputStream out, final Timeout timeout) {
    ...
    //建立一個匿名Sink對象
    return new Sink() {
      @Override public void write(Buffer source, long byteCount) throws IOException {
        checkOffsetAndCount(source.size, 0, byteCount);
        //寫入數據
        while (byteCount > 0) {
          //每次寫數據時都先檢查是否超時,默認未設置超時
          timeout.throwIfReached();
          //獲取頭結點
          Segment head = source.head;
          //能copy的最小字節
          int toCopy = (int) Math.min(byteCount, head.limit - head.pos);
          //經過OutputStream來寫入數據
          out.write(head.data, head.pos, toCopy);
          //可讀取的位置向後移動
          head.pos += toCopy;
          //減小可寫入的字節數
          byteCount -= toCopy;
          //減小buffer中字節數
          source.size -= toCopy;
          //達到最大可寫的位置
          if (head.pos == head.limit) {
            //釋放節點
            source.head = head.pop();
            SegmentPool.recycle(head);
          }
        }
      }

      ...
    };
  }
複製代碼

 得到Sink對象後,將該對象傳遞給BufferedSinkBufferedSink是一個接口,它的具體實現是RealBufferedSink

public static BufferedSink buffer(Sink sink) {
    return new RealBufferedSink(sink);
  }
複製代碼

 在3.1節中講了經過InputStream讀取數據並返回一個字節數組。這裏就將這個數組經過RealBufferedSinkwrite方法寫入到新的文件中。

@Override public BufferedSink write(byte[] source) throws IOException {
    if (closed) throw new IllegalStateException("closed");
    buffer.write(source);
    return emitCompleteSegments();
  }
複製代碼

 寫入數據跟讀取數據流程基本上同樣,須要先將數據寫入到Buffer中。

@Override
    public Buffer write(byte[] source) {
        if (source == null) throw new IllegalArgumentException("source == null");
        return write(source, 0, source.length);
    }
    @Override
    public Buffer write(byte[] source, int offset, int byteCount) {
        ...

        int limit = offset + byteCount;
        while (offset < limit) {
            Segment tail = writableSegment(1);

            int toCopy = Math.min(limit - offset, Segment.SIZE - tail.limit);
            //進行數據拷貝
            System.arraycopy(source, offset, tail.data, tail.limit, toCopy);

            offset += toCopy;
            tail.limit += toCopy;
        }

        size += byteCount;
        return this;
    }
複製代碼

 前面說過Buffer維護的是一個鏈表,因此這裏也是將數據寫入一個鏈表中,因爲在數據讀取完畢後會將Segment對象從新初始化並放入到池中,因此這裏就不用建立新的Segment對象,直接從池中獲取便可。在寫入Buffer成功後,再調用emitCompleteSegments方法,該方法就是將數據從Buffer寫入到新文件。

@Override public BufferedSink emitCompleteSegments() throws IOException {
    if (closed) throw new IllegalStateException("closed");
    long byteCount = buffer.completeSegmentByteCount();
    if (byteCount > 0) sink.write(buffer, byteCount);
    return this;
  }
複製代碼

 這裏的Sink就是在Okio中建立的匿名對象,在Sink對象中經過OutputStream將數據寫入到新文件。

 整體流程以下。

在這裏插入圖片描述

四、Segment及SegmentPool

SegmentOkio中很是重要的一環,它能夠說是Buffer中數據的載體。容量是8kb,頭結點爲head。

final class Segment {
  //Segment的容量,最大爲8kb
  static final int SIZE = 8192;

  //若是Segment中字節數 > SHARE_MINIMUM時(大Segment),就能夠共享,不能添加到SegmentPool
  static final int SHARE_MINIMUM = 1024;
  //存儲的數據
  final byte[] data;

  //下一次讀取的開始位置
  int pos;

 //寫入的開始位置
  int limit;

  //當前Segment是否能夠共享
  boolean shared;

  //data是否僅當前Segment獨有,不share
  boolean owner;

  //後繼節點
  Segment next;

  //前驅節點
  Segment prev;

  ...

  //移除當前Segment
  public final @Nullable Segment pop() {
    Segment result = next != this ? next : null;
    prev.next = next;
    next.prev = prev;
    next = null;
    prev = null;
    return result;
  }

  //在當前節點後添加一個新的節點
  public final Segment push(Segment segment) {
    segment.prev = this;
    segment.next = next;
    next.prev = segment;
    next = segment;
    return segment;
  }

  //將當前Segment分裂成2個Segment結點。前面結點pos~limit數據範圍是[pos..pos+byteCount),後面結點pos~limit數據範圍是[pos+byteCount..limit)
  public final Segment split(int byteCount) {
    if (byteCount <= 0 || byteCount > limit - pos) throw new IllegalArgumentException();
    Segment prefix;

    //若是字節數大於SHARE_MINIMUM則拆分紅共享節點
    if (byteCount >= SHARE_MINIMUM) {
      prefix = sharedCopy();
    } else {
      prefix = SegmentPool.take();
      System.arraycopy(data, pos, prefix.data, 0, byteCount);
    }

    prefix.limit = prefix.pos + byteCount;
    pos += byteCount;
    prev.push(prefix);
    return prefix;
  }

  //當前Segment結點和prev前驅結點合併成一個Segment,統一合併到prev,而後當前Segment結點從雙向鏈表移除並添加到SegmentPool複用。固然合併的前提是:2個Segment的字節總和不超過8K。合併後可能會移動pos、limit
  public final void compact() {
    if (prev == this) throw new IllegalStateException();
    if (!prev.owner) return; // Cannot compact: prev isn't writable.
    int byteCount = limit - pos;
    int availableByteCount = SIZE - prev.limit + (prev.shared ? 0 : prev.pos);
    if (byteCount > availableByteCount) return; // Cannot compact: not enough writable space.
    writeTo(prev, byteCount);
    pop();
    SegmentPool.recycle(this);
  }

  //從當前節點移動byteCount個字節到sink中
  public final void writeTo(Segment sink, int byteCount) {
    if (!sink.owner) throw new IllegalArgumentException();
    if (sink.limit + byteCount > SIZE) {
      // We can't fit byteCount bytes at the sink's current position. Shift sink first.
      if (sink.shared) throw new IllegalArgumentException();
      if (sink.limit + byteCount - sink.pos > SIZE) throw new IllegalArgumentException();
      System.arraycopy(sink.data, sink.pos, sink.data, 0, sink.limit - sink.pos);
      sink.limit -= sink.pos;
      sink.pos = 0;
    }

    System.arraycopy(data, pos, sink.data, sink.limit, byteCount);
    sink.limit += byteCount;
    pos += byteCount;
  }
}
複製代碼

SegmentPool是一個Segment池,內部維護了一個Segment單向鏈表,容量爲64kb(8個Segment),回收不用的Segment對象。

final class SegmentPool {
    //SegmentPool的最大容量
    static final long MAX_SIZE = 64 * 1024; // 64 KiB.

    //後繼節點
    static Segment next;

    //當前池內的總字節數
    static long byteCount;

    private SegmentPool() {
    }
    //從池中獲取一個Segment對象
    static Segment take() {
        synchronized (SegmentPool.class) {
            if (next != null) {
                Segment result = next;
                next = result.next;
                result.next = null;
                byteCount -= Segment.SIZE;
                return result;
            }
        }
        return new Segment(); // Pool is empty. Don't zero-fill while holding a lock.
    }
    //將Segment狀態初始化並放入池中
    static void recycle(Segment segment) {
        if (segment.next != null || segment.prev != null) throw new IllegalArgumentException();
        if (segment.shared) return; // This segment cannot be recycled.
        synchronized (SegmentPool.class) {
            if (byteCount + Segment.SIZE > MAX_SIZE) return; // Pool is full.
            byteCount += Segment.SIZE;
            segment.next = next;
            segment.pos = segment.limit = 0;
            next = segment;
        }
    }
}
複製代碼

 當從InputStream中讀數據時,讀取的數據會寫進以Segment爲節點的雙向鏈表中。若是Segment容量不夠(容量大於8kb),就會從SegmentPooltake一個Segment對象並添加到雙向鏈表尾部。

 當經過OutputStrem寫數據時,會從雙向鏈表的head節點開始讀取,當Segment中的數據讀取完畢後,就會將該Segment從雙向鏈表中移除,並回收到SegmentPool中,等待下次複用。

五、超時機制

  Okio的亮點之一就是增長了超時機制,防止由於意外致使I/O一直阻塞的問題,默認的超時機制是同步的。AsyncTimeoutOkio中異步超時機制的實現,它是一個單鏈表,結點按等待時間從小到大排序,head是一個頭結點,起佔位做用。使用了一個WatchDog的後臺線程來不斷的遍歷全部節點,若是某個節點超時就會將該節點從鏈表中移除,並關閉Socket

AsyncTimeout提供了3個方法enterexittimeout,分別用於流操做開始、結束、超時三種狀況調用。

public class AsyncTimeout extends Timeout {
    //頭結點,佔位使用
    static
    AsyncTimeout head;

    //是否在鏈表中
    private boolean inQueue;

    //後繼節點
    private
    AsyncTimeout next;

    //超時時間
    private long timeoutAt;
    //把當前AsyncTimeout對象加入節點
    public final void enter() {
        ...
        scheduleTimeout(this, timeoutNanos, hasDeadline);
    }

    private static synchronized void scheduleTimeout( AsyncTimeout node, long timeoutNanos, boolean hasDeadline) {
        //建立佔位頭結點並開啓子線程
        if (head == null) {
            head = new AsyncTimeout();
            new Watchdog().start();
        }

        ...

        //插入到鏈表中,按照時間長短進行排序,等待事件越長越靠後
        for (AsyncTimeout prev = head; true; prev = prev.next) {
            if (prev.next == null || remainingNanos < prev.next.remainingNanos(now)) {
                node.next = prev.next;
                prev.next = node;
                if (prev == head) {
                    AsyncTimeout.class.notify(); // Wake up the watchdog when inserting at the front.
                }
                break;
            }
        }
    }

    //從鏈表中移除節點
    public final boolean exit() {
        if (!inQueue) return false;
        inQueue = false;
        return cancelScheduledTimeout(this);
    }

    //執行真正的移除操做
    private static synchronized boolean cancelScheduledTimeout(AsyncTimeout node) {
        // Remove the node from the linked list.
        for (AsyncTimeout prev = head; prev != null; prev = prev.next) {
            if (prev.next == node) {
                prev.next = node.next;
                node.next = null;
                return false;
            }
        }

        // The node wasn't found in the linked list: it must have timed out!
        return true;
    }

    //在子類中重寫了該方法,主要是進行socket的關閉
    protected void timedOut() {
    }

    //監聽節點是否超時的子線程
    private static final class Watchdog extends Thread {
        Watchdog() {
            super("Okio Watchdog");
            setDaemon(true);
        }

        public void run() {
            while (true) {
                try {
                    AsyncTimeout timedOut;
                    synchronized (AsyncTimeout.class) {
                        timedOut = awaitTimeout();
                        //表明頭結點的後繼節點已超時,
                        if (timedOut == null) continue;
                        //除頭結點外沒有任何其餘節點
                        if (timedOut == head) {
                            head = null;
                            return;
                        }
                    }

                    //關閉socket
                    timedOut.timedOut();
                } catch (InterruptedException ignored) {
                }
            }
        }
    }

    
    static AsyncTimeout awaitTimeout() throws InterruptedException {
        AsyncTimeout node = head.next;
        //除了頭結點外沒有任何其餘節點
        if (node == null) {
            long startNanos = System.nanoTime();
            AsyncTimeout.class.wait(IDLE_TIMEOUT_MILLIS);
            return head.next == null && (System.nanoTime() - startNanos) >= IDLE_TIMEOUT_NANOS
                    ? head  // The idle timeout elapsed.
                    : null; // The situation has changed.
        }

        long waitNanos = node.remainingNanos(System.nanoTime());

        //進行等待
        if (waitNanos > 0) {
            //等待
            long waitMillis = waitNanos / 1000000L;
            waitNanos -= (waitMillis * 1000000L);
            AsyncTimeout.class.wait(waitMillis, (int) waitNanos);
            return null;
        }

        //表明node節點已超時
        head.next = node.next;
        node.next = null;
        return node;
    }
}
複製代碼

 默認都是未設置超時時間的,須要咱們本身來設置,同步及異步的超時時間設置方式是同樣的,經過下面代碼便可。

sink.timeout().deadline(1, TimeUnit.SECONDS);
    source.timeout().deadline(1,TimeUnit.MILLISECONDS);
複製代碼
六、生產者/消費者模型

 在Okio中可使用Pipe來實現一個生產者/消費者模型。Pipe維護了一個必定大小Buffer。當該Buffer容量達到最大時,線程就會等待直到該Buffer有剩餘的空間。

public final class Pipe {
  //Pipe的最大容量
  final long maxBufferSize;
  //Pipe對應的Buffer
  final Buffer buffer = new Buffer();
  boolean sinkClosed;
  boolean sourceClosed;
  //寫入流,對應着生產者
  private final Sink sink = new PipeSink();
  //讀取流,對應着消費者
  private final Source source = new PipeSource();

  public Pipe(long maxBufferSize) {
    //最大容量不能小於1
    if (maxBufferSize < 1L) {
      throw new IllegalArgumentException("maxBufferSize < 1: " + maxBufferSize);
    }
    this.maxBufferSize = maxBufferSize;
  }
  ...
  //寫入數據到Pipe中
  final class PipeSink implements Sink {
    final Timeout timeout = new Timeout();

    @Override public void write(Buffer source, long byteCount) throws IOException {
      synchronized (buffer) {
        ...

        while (byteCount > 0) {
          ...

          long bufferSpaceAvailable = maxBufferSize - buffer.size();
          if (bufferSpaceAvailable == 0) {
            //buffer中,沒有剩餘空間,等待消費者消費
            timeout.waitUntilNotified(buffer); // Wait until the source drains the buffer.
            continue;
          }

          long bytesToWrite = Math.min(bufferSpaceAvailable, byteCount);
          buffer.write(source, bytesToWrite);
          byteCount -= bytesToWrite;
          //通知buffer,有新的數據了,
          buffer.notifyAll(); // Notify the source that it can resume reading.
        }
      }
    }

    ...
  }
  //從Pipe中讀取數據
  final class PipeSource implements Source {
    final Timeout timeout = new Timeout();

    @Override public long read(Buffer sink, long byteCount) throws IOException {
      synchronized (buffer) {
        ...

        while (buffer.size() == 0) {
          if (sinkClosed) return -1L;
          //Pipe中沒有數據,等待生產者寫入
          timeout.waitUntilNotified(buffer); // Wait until the sink fills the buffer.
        }

        long result = buffer.read(sink, byteCount);
        buffer.notifyAll(); // Notify the sink that it can resume writing.
        return result;
      }
    }
    ...
  }
}
複製代碼

Pipe的代碼仍是比較少的。下面就來如何使用Pipe

public void pipe() throws IOException {
        //設置Pipe的容量爲1024字節,即1kb
        Pipe pipe = new Pipe(1024);
        new Thread(new Runnable() {
            @Override
            public void run() {
                try (BufferedSource bufferedSource = Okio.buffer(pipe.source())) {
                    //將Pipe中數據寫入env4.txt這個文件中
                    bufferedSource.readAll(Okio.sink(new File("file/env4.txt")));
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }).start();

        new Thread(new Runnable() {
            @Override
            public void run() {
                try (BufferedSink bufferedSink = Okio.buffer(pipe.sink())) {
                    //將env3.txt中數據寫入到Pipe中
                    bufferedSink.writeAll(Okio.source(new File("file/env3.txt")));
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }
複製代碼
七、總結

 雖然沒有將Okio的所有功能點一一講解(如GZip),但通過前面梳理想必對Okio有了一個比較全面的瞭解。這對在之後的開發中熟練的使用Okio很是有幫助。須要注意的是,雖然Okio很好用,Okio是在Java I/O、nio的基礎上作了封裝、優化,並不具有非阻塞I/O的特性。關於非阻塞I/O能夠去學習netty這個庫。

【參考資料】

拆輪子系列:拆 Okio

OkHttp之Okio源碼分析(三)Okio讀寫流程梳理

Android學習筆記——Okio

深刻理解okio的優化思想

相關文章
相關標籤/搜索