從零開始實現一個RPC框架(五)

前言

這是系列最後一篇文章了,最後咱們來爲咱們的rpc框架實現一個http gateway。這個功能實際上受到了rpcx的啓發,基於這種方式實現一個簡單的相似service mesh中的sidecar。代碼地址:githubhtml

原理

http gateway能夠接收來自客戶端的http請求並將其轉換爲rpc請求而後交給服務端處理,再將服務端處理事後的結果經過http響應返回給客戶端。git

http gateway的大體原理就是將咱們的RPC協議中header部分放到http header中,而後RPC協議中的body部分放到http body便可。github

實現

首先咱們須要定義http header中各個字段的名稱:json

const (
	HEADER_SEQ            = "rpc-header-seq"            //序號, 用來惟一標識請求或響應
	HEADER_MESSAGE_TYPE   = "rpc-header-message_type"   //消息類型,用來標識一個消息是請求仍是響應
	HEADER_COMPRESS_TYPE  = "rpc-header-compress_type"  //壓縮類型,用來標識一個消息的壓縮方式
	HEADER_SERIALIZE_TYPE = "rpc-header-serialize_type" //序列化類型,用來標識消息體採用的編碼方式
	HEADER_STATUS_CODE    = "rpc-header-status_code"    //狀態類型,用來標識一個請求是正常仍是異常
	HEADER_SERVICE_NAME   = "rpc-header-service_name"   //服務名
	HEADER_METHOD_NAME    = "rpc-header-method_name"    //方法名
	HEADER_ERROR          = "rpc-header-error"          //方法調用發生的異常
	HEADER_META_DATA      = "rpc-header-meta_data"      //其餘元數據
)
複製代碼

而後咱們須要啓動一個http server,用來接收http請求。這裏咱們使用go自帶的api,默認使用5080端口,若是發現端口已經被佔用了,就遞增端口。api

func (s *SGServer) startGateway() {
	port := 5080
	ln, err := net.Listen("tcp", ":" + strconv.Itoa(port))
	for err != nil && strings.Contains(err.Error(), "address already in use") {
		port++
		ln, err = net.Listen("tcp", ":" + strconv.Itoa(port))
	}
	if err != nil {
		log.Printf("error listening gateway: %s", err.Error())
	}
	log.Printf("gateway listenning on " + strconv.Itoa(port))
	//避免阻塞,使用新的goroutine來執行http server
	go func() {
		err := http.Serve(ln, s)
		if err != nil {
			log.Printf("error serving http %s", err.Error())
		}
	}()
}
複製代碼

接下來咱們須要實現ServeHTTP函數:app

func (s *SGServer) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
         //若是url不對則直接返回
	if r.URL.Path != "/invoke" { 
		rw.WriteHeader(404)
		return
	}
	//若是method不對則直接返回
	if r.Method != "POST" {
		rw.WriteHeader(405)
		return
	}
	//構造新的請求
	request := protocol.NewMessage(s.Option.ProtocolType)
	//根據http header填充request的header
	request, err := parseHeader(request, r)
	if err != nil {
	    rw.WriteHeader(400)
	}
	//根據http body填充request的data
	request, err = parseBody(request, r)
	if err != nil {
	    rw.WriteHeader(400)
	}
	//構造context
	ctx := metadata.WithMeta(context.Background(), request.MetaData)
	response := request.Clone()
	response.MessageType = protocol.MessageTypeResponse
	//處理請求
	response = s.process(ctx, request, response)
	//返回相應
	s.writeHttpResponse(response, rw, r)
}

func parseBody(message *protocol.Message, request *http.Request) (*protocol.Message, error) {
	data, err := ioutil.ReadAll(request.Body)
	if err != nil {
		return nil, err
	}
	message.Data = data
	return message, nil
}

func parseHeader(message *protocol.Message, request *http.Request) (*protocol.Message, error) {
	headerSeq := request.Header.Get(HEADER_SEQ)
	seq, err := strconv.ParseUint(headerSeq, 10, 64)
	if err != nil {
		return nil, err
	}
	message.Seq = seq

	headerMsgType := request.Header.Get(HEADER_MESSAGE_TYPE)
	msgType, err := protocol.ParseMessageType(headerMsgType)
	if err != nil {
		return nil, err
	}
	message.MessageType = msgType

	headerCompressType := request.Header.Get(HEADER_COMPRESS_TYPE)
	compressType, err := protocol.ParseCompressType(headerCompressType)
	if err != nil {
		return nil, err
	}
	message.CompressType = compressType

	headerSerializeType := request.Header.Get(HEADER_SERIALIZE_TYPE)
	serializeType, err := codec.ParseSerializeType(headerSerializeType)
	if err != nil {
		return nil, err
	}
	message.SerializeType = serializeType

	headerStatusCode := request.Header.Get(HEADER_STATUS_CODE)
	statusCode, err := protocol.ParseStatusCode(headerStatusCode)
	if err != nil {
		return nil, err
	}
	message.StatusCode = statusCode

	serviceName := request.Header.Get(HEADER_SERVICE_NAME)
	message.ServiceName = serviceName

	methodName := request.Header.Get(HEADER_METHOD_NAME)
	message.MethodName = methodName

	errorMsg := request.Header.Get(HEADER_ERROR)
	message.Error = errorMsg

	headerMeta := request.Header.Get(HEADER_META_DATA)
	meta := make(map[string]interface{})
	err = json.Unmarshal([]byte(headerMeta), &meta)
	if err != nil {
		return nil, err
	}
	message.MetaData = meta

	return message, nil
}

func (s *SGServer) writeHttpResponse(message *protocol.Message, rw http.ResponseWriter, r *http.Request) {
	header := rw.Header()
	header.Set(HEADER_SEQ, string(message.Seq))
	header.Set(HEADER_MESSAGE_TYPE, message.MessageType.String())
	header.Set(HEADER_COMPRESS_TYPE, message.CompressType.String())
	header.Set(HEADER_SERIALIZE_TYPE, message.SerializeType.String())
	header.Set(HEADER_STATUS_CODE, message.StatusCode.String())
	header.Set(HEADER_SERVICE_NAME, message.ServiceName)
	header.Set(HEADER_METHOD_NAME, message.MethodName)
	header.Set(HEADER_ERROR, message.Error)
	metaDataJson, _ := json.Marshal(message.MetaData)
	header.Set(HEADER_META_DATA, string(metaDataJson))

	_, _ = rw.Write(message.Data)
}
複製代碼

最後咱們只須要在wrapper中啓動http server便可。框架

func (w *DefaultServerWrapper) WrapServe(s *SGServer, serveFunc ServeFunc) ServeFunc {
	return func(network string, addr string, meta map[string]interface{}) error {
		//省略前面的部分
		...
		
		//啓動gateway
		s.startGateway()
		return serveFunc(network, addr, meta)
	}
}
複製代碼

客戶端測試代碼:tcp

func MakeHttpCall() {
    //聲明參數並序列化,放到http請求的body中
	arg := service.Args{A: rand.Intn(200), B: rand.Intn(100)}
	data, _ := msgpack.Marshal(arg)
	body := bytes.NewBuffer(data)
	req, err := http.NewRequest("POST", "http://localhost:5080/invoke", body)
	if err != nil {
		log.Println(err)
		return
	}
	req.Header.Set(server.HEADER_SEQ, "1")
	req.Header.Set(server.HEADER_MESSAGE_TYPE, protocol.MessageTypeRequest.String())
	req.Header.Set(server.HEADER_COMPRESS_TYPE,protocol.CompressTypeNone.String())
	req.Header.Set(server.HEADER_SERIALIZE_TYPE,codec.MessagePack.String())
	req.Header.Set(server.HEADER_STATUS_CODE,protocol.StatusOK.String())
	req.Header.Set(server.HEADER_SERVICE_NAME,"Arith")
	req.Header.Set(server.HEADER_METHOD_NAME,"Add")
	req.Header.Set(server.HEADER_ERROR,"")
	meta := map[string]interface{}{"key":"value"}
	metaJson, _ := json.Marshal(meta)
	req.Header.Set(server.HEADER_META_DATA,string(metaJson))
	response, err := http.DefaultClient.Do(req)
	if err != nil {
		log.Println(err)
		return
	}
	if response.StatusCode != 200 {
		log.Println(response)
	} else if response.Header.Get(server.HEADER_ERROR) != "" {
		log.Println(response.Header.Get(server.HEADER_ERROR))
	} else {
		data, err = ioutil.ReadAll(response.Body)
		result := service.Reply{}
		msgpack.Unmarshal(data, &result)
		fmt.Println(result.C)
	}
}
複製代碼

結語

這個系列到此就告一段落了,可是還有不少須要改進和豐富的地方甚至是錯誤,後續再以單獨文章的形式更新。ide

歷史連接

從零開始實現一個RPC框架(零)函數

從零開始實現一個RPC框架(一)

從零開始實現一個RPC框架(二)

從零開始實現一個RPC框架(三)

從零開始實現一個RPC框架(四)

相關文章
相關標籤/搜索