thrift golang 解析

RPC

什麼是RPC?html

RPC全稱爲Remote Procedure Call 翻譯過來就是遠程過程調用java

RPCHTTP的區別git

HTTP是一種協議,RPC能夠經過HTTP來實現,也能夠經過Socket本身實現一套協議來實現。github

論複雜度,RPC框架確定是高於簡單的HTTP接口的。但毋庸置疑,HTTP接口因爲受限於HTTP協議,須要帶HTTP請求頭,致使傳輸起來效率或者說安全性不如RPCgolang

而且要否定一點,HTTP協議相對於TCP報文協議,增長的開銷在於鏈接與斷開。HTTP是支持鏈接池複用的(HTTP 1.x)apache

Thrift架構

Apache Thrift是一個跨語言的服務框架,本質上爲RPC,同時具備序列化、反序列化機制Thrift包含一個完整的堆棧結構用於構建客戶端和服務器端。segmentfault

傳輸協議(TProtocol)

Thrift可讓用戶選擇客戶端和服務端之間傳輸通訊協議的區別,在傳輸協議上整體分爲文本和二進制(binary)傳輸協議,爲了節省帶寬,提升傳輸效率,通常狀況下使用二進制類型的傳輸協議爲多數。安全

  • TBinaryProtocol:二進制編碼格式進行數據傳輸
  • TCompactProtocol:高效率的、密集的二進制編碼格式進行數據傳輸
  • TJSONProtocol:使用 JSON 的數據編碼協議進行數據傳輸
  • TDebugProtocol:使用易懂的可讀文本格式,以便於debug

數據傳輸方式(TTransport)

TTransport是與底層數據傳輸緊密相關的傳輸層。每一種支持的底層傳輸方式都存在一個與之對應的TTransport。在這一層,數據是按字節流處理的,即傳輸層看到的是一個又一個的字節,並把這些字節按順序發送和接收。TTransport並不瞭解它所傳輸的數據是什麼類型,實際上傳輸層也不關心數據是什麼類型,只須要按照字節方式對數據進行發送和接收便可。數據類型的解析在TProtocol這一層完成。服務器

  • TSocket:使用阻塞式 I/O進行傳輸,是最多見的模式
  • THttpTransport:採用HTTP協議進行數據傳輸
  • TFramedTransPort: 以frame爲單位進行傳輸,非阻塞式服務中使用;
  • TFileTransPort:以文件形式進行傳輸
  • TMemoryTransport:將內存用於I/O傳輸
  • TZlibTransport:使用zlib進行壓縮, 與其餘傳輸方式聯合使用
  • TBufferedTransport對某個transport對象操做的數據進行buffer,即從buffer中讀取數據進行傳輸,或將數據直接寫入到buffer

服務端網絡模型(TServer)

TServerthrift框架中的主要任務是接收client請求,並轉發到某個processor上進行請求處理。針對不一樣的訪問規模,thrift提供了不一樣TServer模型。thrift目前支持的server模型包括:網絡

  • TSimpleServer: 單線程服務器端使用標準的阻塞式I/O
  • TTHreaadPoolServer: 多線程服務器端使用標準的阻塞式I/O
  • TNonblockingServer:多線程服務器端使用非阻塞式I/O
  • TThreadedServer:多線程網絡模型,使用阻塞式I/O,爲每一個請求建立一個線程

對於`golang`來講,只有`TSimpleServer`服務模式,而且是非阻塞的

TProcesser

TProcessor主要對TServer中一次請求的inputProtocoloutputProtocol進行操做,也就是從inputProtocol中讀出client的請求數據,向outputProtocol寫入用戶邏輯的返回值。TProcessorprocess是一個很是關鍵的處理函數,由於client全部的rpc調用都會通過該函數處理並轉發

ThriftClient

ThriftClientTProcessor同樣主要操做inputProtocoloutputProtocol,不一樣的是thriftClientrpc調用分爲sendreceive兩個步驟:

  • send步驟,將用戶的調用參數做爲一個總體的struct寫入TProtocol,併發送到TServer
  • send結束後,thriftClient便當即進入receive狀態等待TServer的響應。對於TServer的響應,使用返回值解析類進行返回值解析,完成rpc調用。

TSimpleServer服務模式

實際上這不是典型的TSimpleServer由於它在接受套接字後沒有被阻塞。 它更像是一個TThreadedServer,能夠處理不一樣goroutine中的不一樣鏈接。 若是golang用戶在客戶端實現conn-pool之類的東西這將有效。

type TSimpleServer struct {
	quit chan struct{}     // 採用阻塞channel進行判斷

	processorFactory       TProcessorFactory
	serverTransport        TServerTransport
	inputTransportFactory  TTransportFactory
	outputTransportFactory TTransportFactory
	inputProtocolFactory   TProtocolFactory
	outputProtocolFactory  TProtocolFactory
}

複製代碼

如下代碼thrift-idl爲,接下來的解析以此爲例

namespace go echo

struct EchoReq {
    1: string msg;
}

struct EchoRes {
    1: string msg;
}

service Echo {
    EchoRes echo(1: EchoReq req);
}
複製代碼

服務端Server代碼

func (p *TSimpleServer) Serve() error {
	err := p.Listen()
	if err != nil {
		return err
	}
	p.AcceptLoop()
	return nil
}

func (p *TSimpleServer) AcceptLoop() error {
	for {
	    // 此處的Accept()是阻塞的,是調用listener.Accept()
		client, err := p.serverTransport.Accept()
		if err != nil {
			select {
			case <-p.quit:
				return nil
			default:
			}
			return err
		}
		if client != nil {
			go func() {
				if err := p.processRequests(client); err != nil {
					log.Println("error processing request:", err)
				}
			}()
		}
	}
}
複製代碼

若是server此時還在處理請求,服務端忽然重啓,thrift 1.0是沒法作到優雅重啓的,可是go thrift的最新版本採用了golang waitgroup的方式實現了優雅重啓~

func (p *TSimpleServer) processRequests(client TTransport) error {
	processor := p.processorFactory.GetProcessor(client)

	inputTransport := p.inputTransportFactory.GetTransport(client)
	outputTransport := p.outputTransportFactory.GetTransport(client)

	inputProtocol := p.inputProtocolFactory.GetProtocol(inputTransport)
	outputProtocol := p.outputProtocolFactory.GetProtocol(outputTransport)
	defer func() {
		if e := recover(); e != nil {
			log.Printf("panic in processor: %s: %s", e, debug.Stack())
		}
	}()
	if inputTransport != nil {
		defer inputTransport.Close()
	}
	if outputTransport != nil {
		defer outputTransport.Close()
	}
	for {
		ok, err := processor.Process(inputProtocol, outputProtocol)

		if err, ok := err.(TTransportException); ok && err.TypeId() == END_OF_FILE {
			return nil
		} else if err != nil {
			log.Printf("error processing request: %s", err)
			return err
		}
		if err, ok := err.(TApplicationException); ok && err.TypeId() == UNKNOWN_METHOD {
			continue
		}
 		if !ok {
			break
		}
	}
	return nil
}
複製代碼

Process處理邏輯

func (p *EchoProcessor) Process(iprot, oprot thrift.TProtocol) (success bool, err thrift.TException) {
	name, _, seqId, err := iprot.ReadMessageBegin()
	if err != nil {
		return false, err
	}
	// 獲取傳遞過來的name,若是存在則處理
	if processor, ok := p.GetProcessorFunction(name); ok {
		return processor.Process(seqId, iprot, oprot)
	}
	// 異常邏輯
	iprot.Skip(thrift.STRUCT)
	iprot.ReadMessageEnd()
	x3 := thrift.NewTApplicationException(thrift.UNKNOWN_METHOD, "Unknown function "+name)
	oprot.WriteMessageBegin(name, thrift.EXCEPTION, seqId)
	x3.Write(oprot)
	oprot.WriteMessageEnd()
	oprot.Flush()
	return false, x3

}
複製代碼

TServer接收到rpc請求以後,調用TProcessorprocess進行處理。 TProcessorprocess首先調用TTransport.readMessageBegin接口,讀出rpc調用的名稱和rpc調用類型。

若是rpc調用類型是rpc call,則調用TProcessor.process_fn繼續處理,對於未知的rpc調用類型,則拋出異常。 TProcessor.process_fn根據rpc調用名稱,到本身的processMap中查找對應的rpc處理函數。若是存在對應的rpc處理函數,則調用該處理函數繼續進行請求響應。不存在則拋出異常。

func (p *echoProcessorEcho) Process(seqId int32, iprot, oprot thrift.TProtocol) (success bool, err thrift.TException) {
	args := EchoEchoArgs{}
	// 讀取入參的參數
	if err = args.Read(iprot); err != nil {
		iprot.ReadMessageEnd()
		x := thrift.NewTApplicationException(thrift.PROTOCOL_ERROR, err.Error())
		oprot.WriteMessageBegin("echo", thrift.EXCEPTION, seqId)
		x.Write(oprot)
		oprot.WriteMessageEnd()
		oprot.Flush()
		return false, err
	}

	iprot.ReadMessageEnd()

	result := EchoEchoResult{}
	var retval *EchoRes
	var err2 error
	// 此處是thrift爲何err不能傳錯誤,若是傳業務錯誤會被阻塞
	if retval, err2 = p.handler.Echo(args.Req); err2 != nil {
		x := thrift.NewTApplicationException(thrift.INTERNAL_ERROR, "Internal error processing echo: "+err2.Error())
		oprot.WriteMessageBegin("echo", thrift.EXCEPTION, seqId)
		x.Write(oprot)
		oprot.WriteMessageEnd()
		oprot.Flush()
		return true, err2
	} else {
		result.Success = retval
	}

	if err2 = oprot.WriteMessageBegin("echo", thrift.REPLY, seqId); err2 != nil {
		err = err2
	}

	if err2 = result.Write(oprot); err == nil && err2 != nil {
		err = err2
	}
	if err2 = oprot.WriteMessageEnd(); err == nil && err2 != nil {
		err = err2
	}

	if err2 = oprot.Flush(); err == nil && err2 != nil {
		err = err2
	}
	if err != nil {
		return
	}
	return true, err
}
複製代碼

服務端stop代碼

var once sync.Once
func (p *TSimpleServer) Stop() error {
	q := func() {
		p.quit <- struct{}{}
		p.serverTransport.Interrupt()
	}
	once.Do(q)
	return nil
}
複製代碼

stop函數比較簡單,能夠看出直接向阻塞隊列裏面寫入數據,而後server再也不接受請求

客戶端代碼

Client調用的函數

func (p *EchoClient) Echo(req *EchoReq) (r *EchoRes, err error) {

	if err = p.sendEcho(req); err != nil {
		return
	}

	return p.recvEcho()
}
複製代碼

sendEcho()函數

func (p *EchoClient) sendEcho(req *EchoReq) (err error) {
	oprot := p.OutputProtocol
	if oprot == nil {
		oprot = p.ProtocolFactory.GetProtocol(p.Transport)
		p.OutputProtocol = oprot
	}
	// seqid + 1
	p.SeqId++

	if err = oprot.WriteMessageBegin("echo", thrift.CALL, p.SeqId); err != nil {
		return
	}

	// 構建參數
	args := EchoEchoArgs{
		Req: req,
	}

	if err = args.Write(oprot); err != nil {
		return
	}
	// 通知服務器發送完畢
	if err = oprot.WriteMessageEnd(); err != nil {
		return
	}
	return oprot.Flush()
}
複製代碼

recvEcho()函數

func (p *EchoClient) recvEcho() (value *EchoRes, err error) {
	iprot := p.InputProtocol
	if iprot == nil {
		iprot = p.ProtocolFactory.GetProtocol(p.Transport)
		p.InputProtocol = iprot
	}
	//
	method, mTypeId, seqId, err := iprot.ReadMessageBegin()
	if err != nil {
		return
	}
	if method != "echo" {
		err = thrift.NewTApplicationException(thrift.WRONG_METHOD_NAME, "echo failed: wrong method name")
		return
	}
	if p.SeqId != seqId {
		err = thrift.NewTApplicationException(thrift.BAD_SEQUENCE_ID, "echo failed: out of sequence response")
		return
	}
	if mTypeId == thrift.EXCEPTION {
		error0 := thrift.NewTApplicationException(thrift.UNKNOWN_APPLICATION_EXCEPTION, "Unknown Exception")
		var error1 error
		error1, err = error0.Read(iprot)
		if err != nil {
			return
		}
		if err = iprot.ReadMessageEnd(); err != nil {
			return
		}
		err = error1
		return
	}
	if mTypeId != thrift.REPLY {
		err = thrift.NewTApplicationException(thrift.INVALID_MESSAGE_TYPE_EXCEPTION, "echo failed: invalid message type")
		return
	}
	result := EchoEchoResult{}
	if err = result.Read(iprot); err != nil {
		return
	}
	if err = iprot.ReadMessageEnd(); err != nil {
		return
	}
	value = result.GetSuccess()
	return
}
複製代碼

thrift在mac機器安裝問題

  • 問題1:go get git.apache.org/thrift.git/lib/go/thrift失敗
  • 問題2:直接使用github.com提供的版本會報未知錯誤

問題2須要根據你的thrift -version來判斷下載哪個版本的thrift,好比個人thrift版本是0.10.0那麼須要下載的thrift地址爲https://github.com/apache/thrift/archive/0.10.0.zip

手動建立mkdir -p git.apache.org/thrift.git/lib/go/目錄,而後將下載後的go文件移至該目錄下~

Reference

相關文章
相關標籤/搜索