基於Netty實現Redis協議的編碼解碼器

Netty消息處理結構

上面是Netty的服務器端基本消息處理結構,爲了便於初學者理解,它和真實的結構有稍許出入。Netty是基於NIO的消息處理框架,用來高效處理網絡IO。處理網絡消息通常走如下步驟java

  1. 監聽端口 Bind & Listengit

  2. 接收新鏈接 Accept數組

  3. 經過鏈接收取客戶端發送的字節流,轉換成輸入消息對象 Read & Decode服務器

  4. 處理消息,生成輸出消息對象 Process網絡

  5. 轉換成字節,經過鏈接發送到客戶端 Encode & Write數據結構

步驟2拿到新鏈接以後,若是是開啓了新線程進入步驟3,那就是走傳統的多線程服務器模式。一個線程一個鏈接,每一個線程是都阻塞式讀寫消息。若是併發量比較大,須要的線程資源也是比較多的。多線程

Netty的消息處理基於NIO的多路複用機理,一個線程經過NIO Selector非阻塞地讀寫很是多的鏈接。傳統的多線程服務器須要的線程數到了NIO這裏就能夠大幅縮減,節省了不少操做系統資源。併發

Netty的線程劃分爲兩種,一種是用來監聽ServerSocket並接受新鏈接的Acceptor線程,另外一種用來讀寫套件字鏈接上的消息的IO線程,兩種線程都是使用NIO Selector異步並行管理多個套件字。Acceptor線程能夠同時監聽多個ServerSocket,管理多個端口,將接收到的新鏈接扔到IO線程。IO線程能夠同時讀寫多個Socket,管理多個鏈接的讀寫。app

IO線程從套件字上讀取到的是字節流,而後經過消息解碼器將字節流反序列化成輸入消息對象,再傳遞到業務處理器進行處理,業務處理器會生成輸出消息對象,經過消息編碼器序列化成字節流,再經過套件字輸出到客戶端。框架

Redis協議編碼解碼的實現

本文的重點是教讀者實現一個簡單的Redis Protocol編碼解碼器。

首先咱們來介紹一下Redis Protocol的格式,Redis協議分爲指令和返回兩個部分,指令的格式比較簡單,就是一個字符串數組,好比指令setnx a b就是三個字符串的數組,若是指令中有整數,也是以字符串的形式發送的。Redis協議的返回就比較複雜了,由於要支持複雜的數據類型和結構嵌套。本文是以服務端的角色來處理Redis協議,也就是編寫指令的解碼器和返回對象的編碼器。而客戶端則是反過來的,客戶端須要編寫指令的編碼器和返回對象的解碼器。

指令的編碼格式

setnx a b => *3\r\n$5\r\nsetnx\r\n$1\r\na\r\n$1\r\nb\r\n

指令是一個字符串數組,編碼一個字符串數組,首先須要編碼數組長度*3\r\n。而後依次編碼各個字符串參數。編碼字符串首先須要編碼字符串的長度$5\r\n。而後再編碼字符串的內容setnx\r\n。Redis消息以\r\n做爲分隔符,這樣設計其實挺浪費網絡傳輸流量的,消息內容裏面處處都是\r\n符號。可是這樣的消息可讀性會比較好,便於調試。這也是軟件世界犧牲性能換取可讀性便捷性的一個經典例子。

指令解碼器的實現 網絡字節流的讀取存在半包問題。所謂半包問題是指一次Read調用從套件字讀到的字節數組可能只是一個完整消息的一部分。而另一部分則須要發起另一次Read調用纔可能讀到,甚至要發起多個Read調用才能夠讀到完整的一條消息。

若是咱們拿部分消息去反序列化成輸入消息對象確定是要失敗的,或者說生成的消息對象是不完整填充的。這個時候咱們須要等待下一次Read調用,而後將這兩次Read調用的字節數組拼起來,嘗試再一次反序列化。

問題來了,若是一個輸入消息對象很大,就可能須要多個Read調用和屢次反序列化操做才能完整的解包出一個輸入對象。那這個反序列化的過程就會重複了屢次。好比第一次完成了30%,而後第二次從頭開始又完成了60%,第三次又從頭開始完成了90%,第四次又從頭開始總算完成了100%,這下終於能夠放心交給業務處理器處理了。

Netty使用ReplayingDecoder引入檢查點機制[Checkpoint]解決了這個重複反序列化的問題。

在反序列化的過程當中咱們反覆打點記錄下當前讀到了哪一個位置,也就是檢查點,而後下次反序列化的時候就能夠從上次記錄的檢查點直接繼續反序列化。這樣就避免了重複的問題。

這就比如咱們玩單機RPG遊戲同樣,這些遊戲每每有自動保存的功能。這樣就能夠避免進程不當心退出時,再進來的時候就能夠從上次保存的狀態直接繼續進行下去,而不是像Flappy Bird同樣從新玩它一遍又一遍,這簡直要把人虐死。

import java.util.ArrayList;
import java.util.List;

import com.google.common.base.Charsets;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.DecoderException;
import io.netty.handler.codec.ReplayingDecoder;

class InputState {
    public int index;
}

public class RedisInputDecoder extends ReplayingDecoder<InputState> {

    private int length;
    private List<String> params;

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        InputState state = this.state();
        if (state == null) {
            length = readParamsLen(in);
            this.params = new ArrayList<>(length);
            state = new InputState();
            this.checkpoint(state);
        }
        for (int i = state.index; i < length; i++) {
            String param = readParam(in);
            this.params.add(param);
            state.index = state.index + 1;
            this.checkpoint(state);
        }
        out.add(new RedisInput(params));
        this.checkpoint(null);
    }

    private final static int CR = '\r';
    private final static int LF = '\n';
    private final static int DOLLAR = '$';
    private final static int ASTERISK = '*';

    private int readParamsLen(ByteBuf in) {
        int c = in.readByte();
        if (c != ASTERISK) {
            throw new DecoderException("expect character *");
        }
        int len = readLen(in, 3); // max 999 params
        if (len == 0) {
            throw new DecoderException("expect non-zero params");
        }
        return len;
    }

    private String readParam(ByteBuf in) {
        int len = readStrLen(in);
        return readStr(in, len);
    }

    private String readStr(ByteBuf in, int len) {
        if (len == 0) {
            return "";
        }
        byte[] cs = new byte[len];
        in.readBytes(cs);
        skipCrlf(in);
        return new String(cs, Charsets.UTF_8);
    }

    private int readStrLen(ByteBuf in) {
        int c = in.readByte();
        if (c != DOLLAR) {
            throw new DecoderException("expect character $");
        }
        return readLen(in, 6); // string maxlen 999999
    }

    private int readLen(ByteBuf in, int maxBytes) {
        byte[] digits = new byte[maxBytes]; // max 999個參數
        int len = 0;
        while (true) {
            byte d = in.getByte(in.readerIndex());
            if (!Character.isDigit(d)) {
                break;
            }
            in.readByte();
            digits[len] = d;
            len++;
            if (len > maxBytes) {
                throw new DecoderException("params length too large");
            }
        }
        skipCrlf(in);
        if (len == 0) {
            throw new DecoderException("expect digit");
        }
        return Integer.parseInt(new String(digits, 0, len));
    }

    private void skipCrlf(ByteBuf in) {
        int c = in.readByte();
        if (c == CR) {
            c = in.readByte();
            if (c == LF) {
                return;
            }
        }
        throw new DecoderException("expect cr ln");
    }

}
複製代碼

輸出消息編碼器的實現

高能預警:前方有大量代碼,請酌情觀看

輸出消息的結構要複雜不少,要支持多種數據類型,包括狀態、整數、錯誤、字符串和數組,要支持數據結構嵌套,數組裏還有數組。相比解碼器而言它簡單的地方在於不用考慮半包問題,編碼器只負責將消息序列化成字節流,剩下的事由Netty偷偷幫你搞定。

首先咱們定義一個輸出消息對象接口,全部的數據類型都要實現該接口,將對象內部的狀態轉換成字節數組放置到ByteBuf中。

import io.netty.buffer.ByteBuf;

public interface IRedisOutput {

    public void encode(ByteBuf buf);

}
複製代碼

整數輸出消息類,整數的序列化格式爲:value\r\n,value是整數的字符串表示。

import com.google.common.base.Charsets;
import io.netty.buffer.ByteBuf;

public class IntegerOutput implements IRedisOutput {

    private long value;

    public IntegerOutput(long value) {
        this.value = value;
    }

    @Override
    public void encode(ByteBuf buf) {
        buf.writeByte(':');
        buf.writeBytes(String.valueOf(value).getBytes(Charsets.UTF_8));
        buf.writeByte('\r');
        buf.writeByte('\n');
    }

    public static IntegerOutput of(long value) {
        return new IntegerOutput(value);
    }

    public static IntegerOutput ZERO = new IntegerOutput(0);
    public static IntegerOutput ONE = new IntegerOutput(1);

}
複製代碼

狀態輸出消息類,序列化格式爲+status\r\n

import com.google.common.base.Charsets;
import io.netty.buffer.ByteBuf;

public class StateOutput implements IRedisOutput {

    private String state;

    public StateOutput(String state) {
        this.state = state;
    }

    public void encode(ByteBuf buf) {
        buf.writeByte('+');
        buf.writeBytes(state.getBytes(Charsets.UTF_8));
        buf.writeByte('\r');
        buf.writeByte('\n');
    }

    public static StateOutput of(String state) {
        return new StateOutput(state);
    }

    public final static StateOutput OK = new StateOutput("OK");

    public final static StateOutput PONG = new StateOutput("PONG");

}
複製代碼

錯誤輸出消息類,序列化格式爲-type reason\r\n,reason必須爲單行字符串

import com.google.common.base.Charsets;
import io.netty.buffer.ByteBuf;

public class ErrorOutput implements IRedisOutput {

    private String type;
    private String reason;

    public ErrorOutput(String type, String reason) {
        this.type = type;
        this.reason = reason;
    }

    public String getType() {
        return type;
    }

    public String getReason() {
        return reason;
    }

    @Override
    public void encode(ByteBuf buf) {
        buf.writeByte('-');
        // reason不容許多行字符串
        buf.writeBytes(String.format("%s %s", type, headOf(reason)).getBytes(Charsets.UTF_8));
        buf.writeByte('\r');
        buf.writeByte('\n');
    }

    private String headOf(String reason) {
        int idx = reason.indexOf("\n");
        if (idx < 0) {
            return reason;
        }
        return reason.substring(0, idx).trim();
    }

    // 通用錯誤
    public static ErrorOutput errorOf(String reason) {
        return new ErrorOutput("ERR", reason);
    }

    // 語法錯誤
    public static ErrorOutput syntaxOf(String reason) {
        return new ErrorOutput("SYNTAX", reason);
    }

    // 協議錯誤
    public static ErrorOutput protoOf(String reason) {
        return new ErrorOutput("PROTO", reason);
    }

    // 參數無效
    public static ErrorOutput paramOf(String reason) {
        return new ErrorOutput("PARAM", reason);
    }

    // 服務器內部錯誤
    public static ErrorOutput serverOf(String reason) {
        return new ErrorOutput("SERVER", reason);
    }

}
複製代碼

字符串輸出消息類,字符串分爲null、空串和普通字符串。null的序列化格式爲$-1\r\n,普通字符串的格式爲$len\r\ncontent\r\n,空串就是一個長度爲0的字符串,格式爲$0\r\n\r\n。

import com.google.common.base.Charsets;
import io.netty.buffer.ByteBuf;

public class StringOutput implements IRedisOutput {

    private String content;

    public StringOutput(String content) {
        this.content = content;
    }

    @Override
    public void encode(ByteBuf buf) {
        buf.writeByte('$');
        if (content == null) {
            // $-1\r\n
            buf.writeByte('-');
            buf.writeByte('1');
            buf.writeByte('\r');
            buf.writeByte('\n');
            return;
        }
        byte[] bytes = content.getBytes(Charsets.UTF_8);
        buf.writeBytes(String.valueOf(bytes.length).getBytes(Charsets.UTF_8));
        buf.writeByte('\r');
        buf.writeByte('\n');
        if (content.length() > 0) {
            buf.writeBytes(bytes);
        }
        buf.writeByte('\r');
        buf.writeByte('\n');
    }

    public static StringOutput of(String content) {
        return new StringOutput(content);
    }

    public static StringOutput of(long value) {
        return new StringOutput(String.valueOf(value));
    }

    public final static StringOutput NULL = new StringOutput(null);

}
複製代碼

最後一個數組輸出消息類,支持數據結構嵌套就靠它了。數組的內部是多個子消息,每一個子消息的類型是不定的,類型能夠不同。好比scan操做的返回就是一個數組,數組的第一個子消息是遊標的offset字符串,第二個子消息是一個字符串數組。它的序列化格式開頭爲*len\r\n,後面依次是內部全部子消息的序列化形式。

import java.util.ArrayList;
import java.util.List;
import com.google.common.base.Charsets;
import io.netty.buffer.ByteBuf;

public class ArrayOutput implements IRedisOutput {

    private List<IRedisOutput> outputs = new ArrayList<>();

    public static ArrayOutput newArray() {
        return new ArrayOutput();
    }

    public ArrayOutput append(IRedisOutput output) {
        outputs.add(output);
        return this;
    }

    @Override
    public void encode(ByteBuf buf) {
        buf.writeByte('*');
        buf.writeBytes(String.valueOf(outputs.size()).getBytes(Charsets.UTF_8));
        buf.writeByte('\r');
        buf.writeByte('\n');
        for (IRedisOutput output : outputs) {
            output.encode(buf);
        }
    }

}
複製代碼

下面是ArrayOutput對象使用的一個實例,是從小編的某個項目裏扒下來的,嵌套了三層數組。讀者能夠不用死磕下面的代碼,重點看看ArrayOutput大體怎麼使用的就能夠了。

ArrayOutput out = ArrayOutput.newArray();
for (Result result : res) {
    if (result.isEmpty()) {
        continue;
    }
    ArrayOutput row = ArrayOutput.newArray();
    row.append(StringOutput.of(new String(result.getRow(), Charsets.UTF_8)));
    for (KeyValue kv : result.list()) {
        ArrayOutput item = ArrayOutput.newArray();
        item.append(StringOutput.of("family"));
        item.append(StringOutput.of(new String(kv.getFamily(), Charsets.UTF_8)));
        item.append(StringOutput.of("qualifier"));
        item.append(StringOutput.of(new String(kv.getQualifier(), Charsets.UTF_8)));
        item.append(StringOutput.of("value"));
        item.append(StringOutput.of(new String(kv.getValue(), Charsets.UTF_8)));
        item.append(StringOutput.of("timestamp"));
        item.append(StringOutput.of(kv.getTimestamp()));
        row.append(item);
    }
    out.append(row);
}
ctx.writeAndFlush(out);
複製代碼

最後,有了以上清晰的類結構,解碼器類的實現就很是簡單了。

import java.util.List;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageEncoder;

@Sharable
public class RedisOutputEncoder extends MessageToMessageEncoder<IRedisOutput> {

    @Override
    protected void encode(ChannelHandlerContext ctx, IRedisOutput msg, List<Object> out) throws Exception {
        ByteBuf buf = PooledByteBufAllocator.DEFAULT.directBuffer();
        msg.encode(buf);
        out.add(buf);
    }

}
複製代碼

由於解碼器對象是無狀態的,因此它能夠被channel共享。解碼器的實現很是簡單,就是分配一個ByteBuf,而後將將消息輸出對象序列化的字節數組塞到ByteBuf中輸出就能夠了。

閱讀相關文章,關注公衆號【碼洞】
相關文章
相關標籤/搜索