Netty之緩衝區ByteBuf解讀(二)

上篇介紹了 ByteBuf 的簡單讀寫操做以及讀寫指針的基本介紹,本文繼續對 ByteBuf 的基本操做進行解讀。

讀寫指針回滾

這裏的 demo 例子仍是使用上節使用的。java

ByteBuf buf = Unpooled.buffer(15);
String content = "ytao公衆號";
buf.writeBytes(content.getBytes());
System.out.println(String.format("\nwrite: ridx=%s widx=%s cap=%s", buf.readerIndex(), buf.writerIndex(), buf.capacity()));

byte[] dst = new byte[4];
buf.readBytes(dst);
System.out.println(String.format("\nread(4): ridx=%s widx=%s cap=%s", buf.readerIndex(), buf.writerIndex(), buf.capacity()));

進入 readBytes 方法,能夠看到每次讀取的時候,指針是累加的,如圖:segmentfault

可是,有時咱們可能須要對當前操做進行回滾,讓指針回到以前的位置。這時,mark 和 reset 搭配使用,能夠實現該操做需求。
mark 用來記錄可能須要回滾的當前位置,reset 是將指針回滾至 mark 記錄的值。
好比,接着面的 demo,再讀取三個字節,而後回滾讀取三個字節的操做。安全

buf.markReaderIndex();
dst = new byte[3];
buf.readBytes(dst);
System.out.println(String.format("\nmarkRead and read(3): ridx=%s widx=%s cap=%s", buf.readerIndex(), buf.writerIndex(), buf.capacity()));

buf.resetReaderIndex();
System.out.println(String.format("\nresetReaderIndex: ridx=%s widx=%s cap=%s", buf.readerIndex(), buf.writerIndex(), buf.capacity()));

先將讀索引進行 mark,而後讀取內容,在調用讀取的 reset,指針索引以下:ide

讀指針累加到 7 後,又從新回滾至 4 的位置。函數

一樣,寫指針也是如此操做進行回滾。因此 mark 和 reset 都有一個讀和寫。this

以及spa

讀寫指針清空

將讀寫指針清爲初始值,使用 clear() 函數。線程

ByteBuf buf = Unpooled.buffer(15);
String content = "ytao公衆號";
buf.writeBytes(content.getBytes());
System.out.println(String.format("\nwrite: ridx=%s widx=%s cap=%s", buf.readerIndex(), buf.writerIndex(), buf.capacity()));
buf.markWriterIndex();

byte[] dst = new byte[4];
buf.readBytes(dst);
System.out.println(String.format("\nread(4): ridx=%s widx=%s cap=%s", buf.readerIndex(), buf.writerIndex(), buf.capacity()));
buf.markReaderIndex();

buf.clear();
System.out.println(String.format("\nclear: ridx=%s widx=%s cap=%s", buf.readerIndex(), buf.writerIndex(), buf.capacity()));

執行結果:3d

clear 只會將指針的位置重置爲初始值,並不會清空緩衝區裏的內容,以下圖。同時,也可以使用 mark 和 reset 進行驗證,這裏再也不進行演示。指針

查找字符位置

查找字符是在不少場景下,都會使用到,好比前面文章講過的粘包/拆包處理,就有根據字符串進行劃分包數據。其實現原理就是根據查找指定字符進行讀取。
ByteBuf 也提供多種不一樣的查找方法進行處理:

indexOf

indexOf 函數,擁有三個參數,查找開始位置索引 fromIndex, 查詢位置最大的索引 toIndex,查找字節 value

// fromIndex 爲 0, toIndex 爲 13, value 爲 a
int i = buf.indexOf(0, 13, (byte)'a');
System.out.println("[a]索引位置:"+i);

在索引 0~13 中返回查找的字符 a 索引位置:

indexOf 源碼實現:

// ByteBuf 實現類
@Override
public int indexOf(int fromIndex, int toIndex, byte value) {
    return ByteBufUtil.indexOf(this, fromIndex, toIndex, value);
}

// ByteBufUtil 類
public static int indexOf(ByteBuf buffer, int fromIndex, int toIndex, byte value) {
    // 判斷查詢起始和終點索引大小
    if (fromIndex <= toIndex) {
        return firstIndexOf(buffer, fromIndex, toIndex, value);
    } else {
        return lastIndexOf(buffer, fromIndex, toIndex, value);
    }
}

private static int firstIndexOf(ByteBuf buffer, int fromIndex, int toIndex, byte value) {
    fromIndex = Math.max(fromIndex, 0);
    if (fromIndex >= toIndex || buffer.capacity() == 0) {
        return -1;
    }
    
    // 從起始索引進行遍歷到終點索引,若是這區間有查找的字節,就返回第一個字節的位置,不然返回 -1
    for (int i = fromIndex; i < toIndex; i ++) {
        if (buffer.getByte(i) == value) {
            return i;
        }
    }
    
    return -1;
}

private static int lastIndexOf(ByteBuf buffer, int fromIndex, int toIndex, byte value) {
    fromIndex = Math.min(fromIndex, buffer.capacity());
    if (fromIndex < 0 || buffer.capacity() == 0) {
        return -1;
    }
    //  從起始索引進行遍歷到終點索引倒着遍歷,獲取的是查找區間的最後一個字節位置
    for (int i = fromIndex - 1; i >= toIndex; i --) {
        if (buffer.getByte(i) == value) {
            return i;
        }
    }

    return -1;
}

bytesBefore

bytesBefore 函數擁有三個重載方法:

bytesBefore 函數的實現,就是在 indexOf 上進行一層查找區間的封裝,最後都是在 indexOf 中實現查找。

@Override
public int bytesBefore(int index, int length, byte value) {
    // 最終都進入 indexOf 中查找
    int endIndex = indexOf(index, index + length, value);
    if (endIndex < 0) {
        return -1;
    }
    // 返回相對查找起始索引的位置
    return endIndex - index;
}

注意:這裏返回的是相對查找起始索引的位置。

forEachByte

forEachByte 函數有兩個重載方法:

這裏涉及到一個 ByteBufProcessor 接口,這個是對一些經常使用的字節,其中包括 空,空白鍵,換行等等進行了抽象定義。
forEachByte 函數實現主要邏輯:

private int forEachByteAsc0(int index, int length, ByteBufProcessor processor) {
if (processor == null) {
    throw new NullPointerException("processor");
}

if (length == 0) {
    return -1;
}

final int endIndex = index + length;
// 起始 -> 終點索引,進行遍歷
int i = index;
try {
    do {
        // 若是能夠匹配上字節,返回該索引位置
        if (processor.process(_getByte(i))) {
            i ++;
        } else {
            return i;
        }
    } while (i < endIndex);
} catch (Exception e) {
    PlatformDependent.throwException(e);
}
// 查找區間遍歷完沒有匹配上,返回 -1
return -1;
}

forEachByteDesc

forEachByteDesc 也是有兩個重載方法:

forEachByteDesc 從函數名字能夠看出,指的倒序查找。意指從查找區間最大索引到最小索引進行遍歷:

private int forEachByteDesc0(int index, int length, ByteBufProcessor processor) {
    if (processor == null) {
        throw new NullPointerException("processor");
    }
    if (length == 0) {
        return -1;
    }
    // 從最大索引開始,進行遍歷
    int i = index + length - 1;
    try {
        do {
            if (processor.process(_getByte(i))) {
                i --;
            } else {
                return i;
            }
        // 直到 i 小於查找區間最小索引值時,遍歷完成
        } while (i >= index);
    } catch (Exception e) {
        PlatformDependent.throwException(e);
    }
    // 沒有找到指定字節返回 -1
    return -1;
}

查找操做的具體實現仍是比較好理解,進入代碼查看實現通常都能讀懂。

複製

ByteBuf 複製後會生成一個新的 ByteBuf 對象。
copy() 整個對象被複制,其全部數據都是該對象自身維護,與舊對象無任何關聯關係。包括緩衝區內容,可是該方法的的容量默認爲舊 buf 的可讀區間大小,讀索引爲 0,寫索引爲舊數據寫索引的值。

ByteBuf buf2 = buf.copy();
System.out.println(String.format("\ncopy: ridx=%s widx=%s cap=%s", buf2.readerIndex(), buf2.writerIndex(), buf2.capacity()));

執行結果:

copy(int index, int length) 爲指定複製的起始位置及長度,其餘與上面 copy() 相似。
duplicate() 這個也是複製,可是與 copy 函數不一樣的是,複製後生成的 ByteBuf 和舊的 ByteBuf 是共享一份緩衝區內容的。它複製的只是本身能夠單獨維護的一份索引。而且它複製的默認容量也是和舊的同樣。

對象引用/回收

ByteBuf 對象被引用後,能夠調用 retain() 函數進行累計計數。每調用一次 retain() 則會 +1。
其在 AbstractReferenceCountedByteBuf 實現:

@Override
public ByteBuf retain() {
    for (;;) {
        int refCnt = this.refCnt;
        if (refCnt == 0) {
            throw new IllegalReferenceCountException(0, 1);
        }
        // 達到最大值時,拋出異常
        if (refCnt == Integer.MAX_VALUE) {
            throw new IllegalReferenceCountException(Integer.MAX_VALUE, 1);
        }
        // 保證線程安全,這裏 CAS 進行累加
        if (refCntUpdater.compareAndSet(this, refCnt, refCnt + 1)) {
            break;
        }
    }
    return this;
}

@Override
public boolean compareAndSet(T obj, int expect, int update) {
    // unsafe 爲jdk的 Unsafe 類
    return unsafe.compareAndSwapInt(obj, offset, expect, update);
}

一樣,能夠進行添加多個引用,本身指定數量,retain(int increment) 帶參函數實現,和上面 +1 實現思路同樣,代碼就不貼出來了。

ByteBuf 在申請內存使用完後,須要對其進行釋放,不然可能會形成資源浪費及內存泄漏的風險。這也是 ByteBuf 本身實現的一套有效回收機制。
釋放的函數爲 release(),它的實現就是每次 -1。直到爲 1 時,調用釋放函數 deallocate() 進行釋放。
其在 AbstractReferenceCountedByteBuf 實現:

@Override
public final boolean release() {
    for (;;) {
        int refCnt = this.refCnt;
        if (refCnt == 0) {
            throw new IllegalReferenceCountException(0, -1);
        }
        // 引用數量 -1
        if (refCntUpdater.compareAndSet(this, refCnt, refCnt - 1)) {
            當引用數量爲 1 時,符合釋放條件
            if (refCnt == 1) {
                deallocate();
                return true;
            }
            return false;
        }
    }
}

一樣,釋放也支持一次釋放多個引用數量,也是經過指定數量,傳遞給 release(int decrement) 進行引用數量的減小並釋放對象。

總結

本文對 ByteBuf 中最基本,最經常使用 API 進行的解讀,這也是在實際開發中或閱讀相關代碼時,可能會遇到的基本 API,經過兩篇文章的說明,相信對 ByteBuf 的基本使用不會存在太大問題,還有些未分析到的 API,根據本身對 ByteBuf 已有的理解,差很少也能進行分析。

我的博客: https://ytao.top
關注公衆號 【ytao】,更多原創好文
個人公衆號

相關文章
相關標籤/搜索