kafka-請求頭部解析

上篇介紹了NetworkReceivejava

當接收到NetworkReceive, Processor會構造了Request實例,發送給RequestChannelapi

private def processCompletedReceives() {
    selector.completedReceives.asScala.foreach { receive =>
    val openChannel = selector.channel(receive.source)
    val openOrClosingChannel = if (openChannel != null) openChannel else selector.closingChannel(receive.source)
    val session = RequestChannel.Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, openOrClosingChannel.principal.getName), openOrClosingChannel.socketAddress)
    // 建立Request實例
     val req = RequestChannel.Request(processor = id, connectionId = receive.source, session = session,
          buffer = receive.payload, startTimeNanos = time.nanoseconds,
          listenerName = listenerName, securityProtocol = securityProtocol)
        requestChannel.sendRequest(req)
        selector.mute(receive.source)
    }
  }

Request

Request表示請求,它有兩個主要的屬性。session

header是通用請求的頭部數據結構

bodyAndSize是請求的數據部分,它根據不一樣類型的請求,返回不一樣的實例socket

case class Request(processor: Int, connectionId: String, session: Session, private var buffer: ByteBuffer,
                     startTimeNanos: Long, listenerName: ListenerName, securityProtocol: SecurityProtocol) {
    val requestId = buffer.getShort()
    // 這裏只是爲了支持v0版本的shutdown請求
    val requestObj: RequestOrResponse = if (requestId == ApiKeys.CONTROLLED_SHUTDOWN_KEY.id)
      ControlledShutdownRequest.readFrom(buffer)
    else
      null

    val header: RequestHeader =
      if (requestObj == null) {
        buffer.rewind
        // 使用RequestHeader的類方法解析
        try RequestHeader.parse(buffer)
        catch {
          case ex: Throwable =>
            throw new InvalidRequestException(s"Error parsing request header. Our best guess of the apiKey is: $requestId", ex)
        }
      } else
        null
    val bodyAndSize: RequestAndSize =
      if (requestObj == null)
        try {
          if (header.apiKey == ApiKeys.API_VERSIONS.id && !Protocol.apiVersionSupported(header.apiKey, header.apiVersion)) {
            new RequestAndSize(new ApiVersionsRequest.Builder().build(), 0)
          }
          else
            // 根據apiKey,apiVersion和buffer,實例化RequestAndSize
            AbstractRequest.getRequest(header.apiKey, header.apiVersion, buffer)
        } catch {
          case ex: Throwable =>
            throw new InvalidRequestException(s"Error getting request for apiKey: ${header.apiKey} and apiVersion: ${header.apiVersion}", ex)
        }
      else
        null

    buffer = null

Type

Type類是基本的數據格式,它內置定義了經常使用的數據結構,方便數據的讀取ide

public abstract class Type {
    // 進數據Object寫入到ByteBuffer
    public abstract void write(ByteBuffer buffer, Object o);
    // 從ByteBuffer讀取Object數據
    public abstract Object read(ByteBuffer buffer);
    // 驗證Object是否合理
    public abstract Object validate(Object o);
    // 返回Object數據的大小
    public abstract int sizeOf(Object o);
    // 是否數據爲null
    public boolean isNullable() {
        return false;
    }

自定義的Type, 有INT8,INT16,...STRING,BYTES,VARINT...。ui

下面以INT16爲例。由於INT16佔2個字節,恰好是Short類型this

public static final Type INT16 = new Type() {
        @Override
        public void write(ByteBuffer buffer, Object o) {
            // 調用ByteBuffer的putShort方法
            buffer.putShort((Short) o);
        }

        @Override
        public Object read(ByteBuffer buffer) {
            // 調用ByteBuffer的getShort方法
            return buffer.getShort();
        }

        @Override
        public int sizeOf(Object o) {
            // 佔兩個字節
            return 2;
        }

        @Override
        public String toString() {
            return "INT16";
        }

        @Override
        public Short validate(Object item) {
            // 檢驗Object是否爲Short類型
            if (item instanceof Short)
                return (Short) item;
            else
                throw new SchemaException(item + " is not a Short.");
        }
    };

Field

Field只是一些屬性的集合類scala

public class Field {

    public static final Object NO_DEFAULT = new Object();
    // 位置,代表在Schema的位置
    final int index;
    // 名稱
    public final String name;
    // 類型
    public final Type type;
    // 默認值
    public final Object defaultValue;
    // 解釋文檔
    public final String doc;
    final Schema schema;

    public Field(int index, String name, Type type, String doc, Object defaultValue) {
        this(index, name, type, doc, defaultValue, null);
    }

    public Field(String name, Type type, String doc, Object defaultValue) {
        this(-1, name, type, doc, defaultValue);
    }

    public Field(String name, Type type, String doc) {
        this(name, type, doc, NO_DEFAULT);
    }

Schema

Schema是Field的集合,Field在裏面是有順序的。它支持數據讀取,返回Struct類型。code

public class Schema extends Type {
    // fields列表,按照順序排序
    private final Field[] fields;
    // 哈希表,用來經過string查看field
    private final Map<String, Field> fieldsByName;

    public Schema(Field... fs) {
        // 
        this.fields = new Field[fs.length];
        this.fieldsByName = new HashMap<>();
        for (int i = 0; i < this.fields.length; i++) {
            Field field = fs[i];
            if (fieldsByName.containsKey(field.name))
                throw new SchemaException("Schema contains a duplicate field: " + field.name);
            // 實例Field,注意第一個參數i,表示位置
            this.fields[i] = new Field(i, field.name, field.type, field.doc, field.defaultValue, this);
            this.fieldsByName.put(fs[i].name, this.fields[i]);
        }
    }
    
    
    public Struct read(ByteBuffer buffer) {
        Object[] objects = new Object[fields.length];
        // 按照fields的順序,依次讀取值,而且保存到objects列表
        for (int i = 0; i < fields.length; i++) {
            try {
                objects[i] = fields[i].type.read(buffer);
            } catch (Exception e) {
                throw new SchemaException("Error reading field '" + fields[i].name + "': " +
                                          (e.getMessage() == null ? e.getClass().getName() : e.getMessage()));
            }
        }
        // Struct提供了方便的接口訪問數據
        return new Struct(this, objects);
    }
    
    public void write(ByteBuffer buffer, Object o) {
        Struct r = (Struct) o;
        // 按照fields的順序依次遍歷
        for (Field field : fields) {
            try {
                // 調用Struct的get方法,獲取值
                Object value = field.type().validate(r.get(field));
                // 寫入到buffer
                field.type.write(buffer, value);
            } catch (Exception e) {
                throw new SchemaException("Error writing field '" + field.name + "': " +
                                          (e.getMessage() == null ? e.getClass().getName() : e.getMessage()));
            }
        }
    }

內置的Schema都在Protocol類裏定義

public class Protocol {

    public static final Schema REQUEST_HEADER = new Schema(
                                 new Field("api_key", INT16, "The id of the request type."),
                                 new Field("api_version", INT16, "The version of the API."),
                                 new Field("correlation_id", INT32,
                                    "A user-supplied integer value that will be passed back with the response"),
                                 new Field("client_id",  NULLABLE_STRING,
                                    "A user specified identifier for the client making the request.", ""));
    ......

RequestHeader

RequestHeader代表通用的請求頭部

RequestHeader數據結構

|api_key | api_version | correlation_id | client_id |

public class RequestHeader extends AbstractRequestResponse {

    private final short apiKey;
    private final short apiVersion;
    private final String clientId;
    private final int correlationId;

    public RequestHeader(Struct struct) {
        apiKey = struct.getShort(API_KEY_FIELD);
        apiVersion = struct.getShort(API_VERSION_FIELD);
        clientId = struct.getString(CLIENT_ID_FIELD);
        correlationId = struct.getInt(CORRELATION_ID_FIELD);
    }

     public static RequestHeader parse(ByteBuffer buffer) {
        // 使用內置的Schema,調用read方法讀取
        return new RequestHeader(Protocol.REQUEST_HEADER.read(buffer));
    }

AbstractRequest

在Request類中,調用AbstractRequest.getRequest(header.apiKey, header.apiVersion, buffer),初始化AbstractRequest。最後request和struct實例化RequestAndSize

public abstract class AbstractRequest extends AbstractRequestResponse {
    public static RequestAndSize getRequest(int requestId, short version, ByteBuffer buffer) {
        // 根據header.apiKey,取出ApiKeys
        ApiKeys apiKey = ApiKeys.forId(requestId);
        Struct struct = apiKey.parseRequest(version, buffer);
        AbstractRequest request;
        // 根據apiKey實例化不一樣的RequestAndSize
        switch (apiKey) {
            case PRODUCE:
                request = new ProduceRequest(struct, version);
                break;
            case FETCH:
                request = new FetchRequest(struct, version);
                break;
            case LIST_OFFSETS:
                request = new ListOffsetRequest(struct, version);
                break;
            case METADATA:
                request = new MetadataRequest(struct, version);
                break;
            case OFFSET_COMMIT:
                request = new OffsetCommitRequest(struct, version);
                break;
            case OFFSET_FETCH:
                request = new OffsetFetchRequest(struct, version);
                break;
        ....
        }
        return new RequestAndSize(request, struct.sizeOf());

ApiKeys

ApiKeys是一個枚舉類型,它定義了header的apikey。經過這個apikey,能夠解析出這個請求是什麼類型的

public enum ApiKeys {
    PRODUCE(0, "Produce"),
    FETCH(1, "Fetch"),
    LIST_OFFSETS(2, "Offsets"),
    METADATA(3, "Metadata"),
    LEADER_AND_ISR(4, "LeaderAndIsr", true),
    ....   
    // id 到ApiKeys的value的列表
    private static final ApiKeys[] ID_TO_TYPE;
    private static final int MIN_API_KEY = 0;
    public static final int MAX_API_KEY;
    
    static {
        int maxKey = -1;
        // 更新maxKey
        for (ApiKeys key : ApiKeys.values())
            maxKey = Math.max(maxKey, key.id);
        ApiKeys[] idToType = new ApiKeys[maxKey + 1];
        // 更新idToType
        for (ApiKeys key : ApiKeys.values())
            idToType[key.id] = key;
        ID_TO_TYPE = idToType;
        MAX_API_KEY = maxKey;
    }
    .......
    
    // 根據獲取id從ID_TO_TYPE獲取相應的type
    public static ApiKeys forId(int id) {
        if (!hasId(id))
            throw new IllegalArgumentException(String.format("Unexpected ApiKeys id `%s`, it should be between `%s` " +
                    "and `%s` (inclusive)", id, MIN_API_KEY, MAX_API_KEY));
        return ID_TO_TYPE[id];
    }
    // 解析請求
    public Struct parseRequest(short version, ByteBuffer buffer) {
        return requestSchema(version).read(buffer);
    }
    // 返回對應version的Request Schema
    public Schema requestSchema(short version) {
        return schemaFor(Protocol.REQUESTS, version);
    }
    // 針對schemas列表,返回對應version的Schema
    private Schema schemaFor(Schema[][] schemas, short version) {
        // 檢查version的值
        if (id > schemas.length)
            throw new IllegalArgumentException("No schema available for API key " + this);
        if (version < 0 || version > latestVersion())
            throw new IllegalArgumentException("Invalid version for API key " + this + ": " + version);
        // 返回id對應的Schema列表,裏面包含了不一樣的version
        Schema[] versions = schemas[id];
        if (versions[version] == null)
            throw new IllegalArgumentException("Unsupported version for API key " + this + ": " + version);
        // 返回version對應的
        return versions[version];
    }

Protocol

Protocol類,裏面定義了不少apikey和schema的關係。

// REQUESTS是Schema的二維數據。一維座標是key_id,二維座標是version
public static final Schema[][] REQUESTS = new Schema[ApiKeys.MAX_API_KEY + 1][];

// produce request不一樣版本的集合
public static final Schema[] PRODUCE_REQUEST = {PRODUCE_REQUEST_V0, PRODUCE_REQUEST_V1, PRODUCE_REQUEST_V2, PRODUCE_REQUEST_V3};

static {
        REQUESTS[ApiKeys.PRODUCE.id] = PRODUCE_REQUEST;
        REQUESTS[ApiKeys.FETCH.id] = FETCH_REQUEST;
        REQUESTS[ApiKeys.LIST_OFFSETS.id] = LIST_OFFSET_REQUEST;
        REQUESTS[ApiKeys.METADATA.id] = METADATA_REQUEST;
        REQUESTS[ApiKeys.LEADER_AND_ISR.id] = LEADER_AND_ISR_REQUEST;
        .......
}

歸納

Schema由多個Field組成,Field主要包含了Type,name等屬性。Schema負責從buffer解析數據,返回Struct結果。

Protocol包含了許多內置的Schema。

RequestHeader代表請求頭部,就是使用了內置的Protocol.REQUEST_HEADER這個Schema解析。請求頭部包含apiKey,version等屬性。

ApiKeys包含了apikey的集合。它能夠根據apikey和version找到對應的Schema。

AbstractRequest提供了根據apikey和version,解析和返回對應的Request的實例。

相關文章
相關標籤/搜索