當接收到NetworkReceive, Processor會構造了Request實例,發送給RequestChannelapi
private def processCompletedReceives() { selector.completedReceives.asScala.foreach { receive => val openChannel = selector.channel(receive.source) val openOrClosingChannel = if (openChannel != null) openChannel else selector.closingChannel(receive.source) val session = RequestChannel.Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, openOrClosingChannel.principal.getName), openOrClosingChannel.socketAddress) // 建立Request實例 val req = RequestChannel.Request(processor = id, connectionId = receive.source, session = session, buffer = receive.payload, startTimeNanos = time.nanoseconds, listenerName = listenerName, securityProtocol = securityProtocol) requestChannel.sendRequest(req) selector.mute(receive.source) } }
case class Request(processor: Int, connectionId: String, session: Session, private var buffer: ByteBuffer, startTimeNanos: Long, listenerName: ListenerName, securityProtocol: SecurityProtocol) { val requestId = buffer.getShort() // 這裏只是爲了支持v0版本的shutdown請求 val requestObj: RequestOrResponse = if (requestId == ApiKeys.CONTROLLED_SHUTDOWN_KEY.id) ControlledShutdownRequest.readFrom(buffer) else null val header: RequestHeader = if (requestObj == null) { buffer.rewind // 使用RequestHeader的類方法解析 try RequestHeader.parse(buffer) catch { case ex: Throwable => throw new InvalidRequestException(s"Error parsing request header. Our best guess of the apiKey is: $requestId", ex) } } else null val bodyAndSize: RequestAndSize = if (requestObj == null) try { if (header.apiKey == ApiKeys.API_VERSIONS.id && !Protocol.apiVersionSupported(header.apiKey, header.apiVersion)) { new RequestAndSize(new ApiVersionsRequest.Builder().build(), 0) } else // 根據apiKey,apiVersion和buffer,實例化RequestAndSize AbstractRequest.getRequest(header.apiKey, header.apiVersion, buffer) } catch { case ex: Throwable => throw new InvalidRequestException(s"Error getting request for apiKey: ${header.apiKey} and apiVersion: ${header.apiVersion}", ex) } else null buffer = null
public abstract class Type { // 進數據Object寫入到ByteBuffer public abstract void write(ByteBuffer buffer, Object o); // 從ByteBuffer讀取Object數據 public abstract Object read(ByteBuffer buffer); // 驗證Object是否合理 public abstract Object validate(Object o); // 返回Object數據的大小 public abstract int sizeOf(Object o); // 是否數據爲null public boolean isNullable() { return false; }
自定義的Type, 有INT8,INT16,...STRING,BYTES,VARINT...。ui
public static final Type INT16 = new Type() { @Override public void write(ByteBuffer buffer, Object o) { // 調用ByteBuffer的putShort方法 buffer.putShort((Short) o); } @Override public Object read(ByteBuffer buffer) { // 調用ByteBuffer的getShort方法 return buffer.getShort(); } @Override public int sizeOf(Object o) { // 佔兩個字節 return 2; } @Override public String toString() { return "INT16"; } @Override public Short validate(Object item) { // 檢驗Object是否爲Short類型 if (item instanceof Short) return (Short) item; else throw new SchemaException(item + " is not a Short."); } };
public class Field { public static final Object NO_DEFAULT = new Object(); // 位置,代表在Schema的位置 final int index; // 名稱 public final String name; // 類型 public final Type type; // 默認值 public final Object defaultValue; // 解釋文檔 public final String doc; final Schema schema; public Field(int index, String name, Type type, String doc, Object defaultValue) { this(index, name, type, doc, defaultValue, null); } public Field(String name, Type type, String doc, Object defaultValue) { this(-1, name, type, doc, defaultValue); } public Field(String name, Type type, String doc) { this(name, type, doc, NO_DEFAULT); }
public class Schema extends Type { // fields列表,按照順序排序 private final Field[] fields; // 哈希表,用來經過string查看field private final Map<String, Field> fieldsByName; public Schema(Field... fs) { // this.fields = new Field[fs.length]; this.fieldsByName = new HashMap<>(); for (int i = 0; i < this.fields.length; i++) { Field field = fs[i]; if (fieldsByName.containsKey(field.name)) throw new SchemaException("Schema contains a duplicate field: " + field.name); // 實例Field,注意第一個參數i,表示位置 this.fields[i] = new Field(i, field.name, field.type, field.doc, field.defaultValue, this); this.fieldsByName.put(fs[i].name, this.fields[i]); } } public Struct read(ByteBuffer buffer) { Object[] objects = new Object[fields.length]; // 按照fields的順序,依次讀取值,而且保存到objects列表 for (int i = 0; i < fields.length; i++) { try { objects[i] = fields[i].type.read(buffer); } catch (Exception e) { throw new SchemaException("Error reading field '" + fields[i].name + "': " + (e.getMessage() == null ? e.getClass().getName() : e.getMessage())); } } // Struct提供了方便的接口訪問數據 return new Struct(this, objects); } public void write(ByteBuffer buffer, Object o) { Struct r = (Struct) o; // 按照fields的順序依次遍歷 for (Field field : fields) { try { // 調用Struct的get方法,獲取值 Object value = field.type().validate(r.get(field)); // 寫入到buffer field.type.write(buffer, value); } catch (Exception e) { throw new SchemaException("Error writing field '" + field.name + "': " + (e.getMessage() == null ? e.getClass().getName() : e.getMessage())); } } }
public class Protocol { public static final Schema REQUEST_HEADER = new Schema( new Field("api_key", INT16, "The id of the request type."), new Field("api_version", INT16, "The version of the API."), new Field("correlation_id", INT32, "A user-supplied integer value that will be passed back with the response"), new Field("client_id", NULLABLE_STRING, "A user specified identifier for the client making the request.", "")); ......
|api_key | api_version | correlation_id | client_id |
public class RequestHeader extends AbstractRequestResponse { private final short apiKey; private final short apiVersion; private final String clientId; private final int correlationId; public RequestHeader(Struct struct) { apiKey = struct.getShort(API_KEY_FIELD); apiVersion = struct.getShort(API_VERSION_FIELD); clientId = struct.getString(CLIENT_ID_FIELD); correlationId = struct.getInt(CORRELATION_ID_FIELD); } public static RequestHeader parse(ByteBuffer buffer) { // 使用內置的Schema,調用read方法讀取 return new RequestHeader(Protocol.REQUEST_HEADER.read(buffer)); }
在Request類中,調用AbstractRequest.getRequest(header.apiKey, header.apiVersion, buffer),初始化AbstractRequest。最後request和struct實例化RequestAndSize
public abstract class AbstractRequest extends AbstractRequestResponse { public static RequestAndSize getRequest(int requestId, short version, ByteBuffer buffer) { // 根據header.apiKey,取出ApiKeys ApiKeys apiKey = ApiKeys.forId(requestId); Struct struct = apiKey.parseRequest(version, buffer); AbstractRequest request; // 根據apiKey實例化不一樣的RequestAndSize switch (apiKey) { case PRODUCE: request = new ProduceRequest(struct, version); break; case FETCH: request = new FetchRequest(struct, version); break; case LIST_OFFSETS: request = new ListOffsetRequest(struct, version); break; case METADATA: request = new MetadataRequest(struct, version); break; case OFFSET_COMMIT: request = new OffsetCommitRequest(struct, version); break; case OFFSET_FETCH: request = new OffsetFetchRequest(struct, version); break; .... } return new RequestAndSize(request, struct.sizeOf());
public enum ApiKeys { PRODUCE(0, "Produce"), FETCH(1, "Fetch"), LIST_OFFSETS(2, "Offsets"), METADATA(3, "Metadata"), LEADER_AND_ISR(4, "LeaderAndIsr", true), .... // id 到ApiKeys的value的列表 private static final ApiKeys[] ID_TO_TYPE; private static final int MIN_API_KEY = 0; public static final int MAX_API_KEY; static { int maxKey = -1; // 更新maxKey for (ApiKeys key : ApiKeys.values()) maxKey = Math.max(maxKey, key.id); ApiKeys[] idToType = new ApiKeys[maxKey + 1]; // 更新idToType for (ApiKeys key : ApiKeys.values()) idToType[key.id] = key; ID_TO_TYPE = idToType; MAX_API_KEY = maxKey; } ....... // 根據獲取id從ID_TO_TYPE獲取相應的type public static ApiKeys forId(int id) { if (!hasId(id)) throw new IllegalArgumentException(String.format("Unexpected ApiKeys id `%s`, it should be between `%s` " + "and `%s` (inclusive)", id, MIN_API_KEY, MAX_API_KEY)); return ID_TO_TYPE[id]; } // 解析請求 public Struct parseRequest(short version, ByteBuffer buffer) { return requestSchema(version).read(buffer); } // 返回對應version的Request Schema public Schema requestSchema(short version) { return schemaFor(Protocol.REQUESTS, version); } // 針對schemas列表,返回對應version的Schema private Schema schemaFor(Schema[][] schemas, short version) { // 檢查version的值 if (id > schemas.length) throw new IllegalArgumentException("No schema available for API key " + this); if (version < 0 || version > latestVersion()) throw new IllegalArgumentException("Invalid version for API key " + this + ": " + version); // 返回id對應的Schema列表,裏面包含了不一樣的version Schema[] versions = schemas[id]; if (versions[version] == null) throw new IllegalArgumentException("Unsupported version for API key " + this + ": " + version); // 返回version對應的 return versions[version]; }
// REQUESTS是Schema的二維數據。一維座標是key_id,二維座標是version public static final Schema[][] REQUESTS = new Schema[ApiKeys.MAX_API_KEY + 1][]; // produce request不一樣版本的集合 public static final Schema[] PRODUCE_REQUEST = {PRODUCE_REQUEST_V0, PRODUCE_REQUEST_V1, PRODUCE_REQUEST_V2, PRODUCE_REQUEST_V3}; static { REQUESTS[ApiKeys.PRODUCE.id] = PRODUCE_REQUEST; REQUESTS[ApiKeys.FETCH.id] = FETCH_REQUEST; REQUESTS[ApiKeys.LIST_OFFSETS.id] = LIST_OFFSET_REQUEST; REQUESTS[ApiKeys.METADATA.id] = METADATA_REQUEST; REQUESTS[ApiKeys.LEADER_AND_ISR.id] = LEADER_AND_ISR_REQUEST; ....... }