什麼是
RPC
?html
RPC
全稱爲Remote Procedure Call
翻譯過來就是遠程過程調用java
RPC
和HTTP
的區別git
HTTP
是一種協議,RPC
能夠經過HTTP
來實現,也能夠經過Socket
本身實現一套協議來實現。github
論複雜度,RPC
框架確定是高於簡單的HTTP
接口的。但毋庸置疑,HTTP
接口因爲受限於HTTP
協議,須要帶HTTP
請求頭,致使傳輸起來效率或者說安全性不如RPC
golang
而且要否定一點,HTTP
協議相對於TCP
報文協議,增長的開銷在於鏈接與斷開。HTTP
是支持鏈接池複用的(HTTP 1.x)
apache
Apache Thrift
是一個跨語言的服務框架,本質上爲RPC
,同時具備序列化、反序列化機制Thrift
包含一個完整的堆棧結構用於構建客戶端和服務器端。segmentfault
TProtocol
)Thrift
可讓用戶選擇客戶端和服務端之間傳輸通訊協議的區別,在傳輸協議上整體分爲文本和二進制(binary
)傳輸協議,爲了節省帶寬,提升傳輸效率,通常狀況下使用二進制類型的傳輸協議爲多數。安全
TBinaryProtocol
:二進制編碼格式進行數據傳輸TCompactProtocol
:高效率的、密集的二進制編碼格式進行數據傳輸TJSONProtocol
:使用 JSON
的數據編碼協議進行數據傳輸TDebugProtocol
:使用易懂的可讀文本格式,以便於debug
TTransport
)TTransport
是與底層數據傳輸緊密相關的傳輸層。每一種支持的底層傳輸方式都存在一個與之對應的TTransport
。在這一層,數據是按字節流處理的,即傳輸層看到的是一個又一個的字節,並把這些字節按順序發送和接收。TTransport
並不瞭解它所傳輸的數據是什麼類型,實際上傳輸層也不關心數據是什麼類型,只須要按照字節方式對數據進行發送和接收便可。數據類型的解析在TProtocol
這一層完成。服務器
TSocket
:使用阻塞式 I/O
進行傳輸,是最多見的模式THttpTransport
:採用HTTP
協議進行數據傳輸TFramedTransPort
: 以frame
爲單位進行傳輸,非阻塞式服務中使用;TFileTransPort
:以文件形式進行傳輸TMemoryTransport
:將內存用於I/O
傳輸TZlibTransport
:使用zlib
進行壓縮, 與其餘傳輸方式聯合使用TBufferedTransport
對某個transport
對象操做的數據進行buffer
,即從buffer
中讀取數據進行傳輸,或將數據直接寫入到buffer
TServer
)TServer
在thrift
框架中的主要任務是接收client
請求,並轉發到某個processor
上進行請求處理。針對不一樣的訪問規模,thrift
提供了不一樣TServer
模型。thrift
目前支持的server
模型包括:網絡
TSimpleServer
: 單線程服務器端使用標準的阻塞式I/O
TTHreaadPoolServer
: 多線程服務器端使用標準的阻塞式I/O
TNonblockingServer
:多線程服務器端使用非阻塞式I/O
TThreadedServer
:多線程網絡模型,使用阻塞式I/O
,爲每一個請求建立一個線程對於`golang`來講,只有`TSimpleServer`服務模式,而且是非阻塞的
TProcessor
主要對TServer
中一次請求的inputProtocol
和outputProtocol
進行操做,也就是從inputProtocol
中讀出client
的請求數據,向outputProtocol
寫入用戶邏輯的返回值。TProcessorprocess
是一個很是關鍵的處理函數,由於client
全部的rpc
調用都會通過該函數處理並轉發
ThriftClient
跟TProcessor
同樣主要操做inputProtocol
和outputProtocol
,不一樣的是thriftClient
將rpc
調用分爲send
和receive
兩個步驟:
send
步驟,將用戶的調用參數做爲一個總體的struct
寫入TProtocol
,併發送到TServer
。send
結束後,thriftClient
便當即進入receive
狀態等待TServer
的響應。對於TServer
的響應,使用返回值解析類進行返回值解析,完成rpc
調用。實際上這不是典型的TSimpleServer
,由於它在接受套接字後沒有被阻塞。 它更像是一個TThreadedServer
,能夠處理不一樣goroutine
中的不一樣鏈接。 若是golang
用戶在客戶端實現conn-pool
之類的東西這將有效。
type TSimpleServer struct {
quit chan struct{} // 採用阻塞channel進行判斷
processorFactory TProcessorFactory
serverTransport TServerTransport
inputTransportFactory TTransportFactory
outputTransportFactory TTransportFactory
inputProtocolFactory TProtocolFactory
outputProtocolFactory TProtocolFactory
}
複製代碼
如下代碼thrift-idl
爲,接下來的解析以此爲例
namespace go echo
struct EchoReq {
1: string msg;
}
struct EchoRes {
1: string msg;
}
service Echo {
EchoRes echo(1: EchoReq req);
}
複製代碼
func (p *TSimpleServer) Serve() error {
err := p.Listen()
if err != nil {
return err
}
p.AcceptLoop()
return nil
}
func (p *TSimpleServer) AcceptLoop() error {
for {
// 此處的Accept()是阻塞的,是調用listener.Accept()
client, err := p.serverTransport.Accept()
if err != nil {
select {
case <-p.quit:
return nil
default:
}
return err
}
if client != nil {
go func() {
if err := p.processRequests(client); err != nil {
log.Println("error processing request:", err)
}
}()
}
}
}
複製代碼
若是server
此時還在處理請求,服務端忽然重啓,thrift 1.0
是沒法作到優雅重啓的,可是go thrift
的最新版本採用了golang waitgroup
的方式實現了優雅重啓~
func (p *TSimpleServer) processRequests(client TTransport) error {
processor := p.processorFactory.GetProcessor(client)
inputTransport := p.inputTransportFactory.GetTransport(client)
outputTransport := p.outputTransportFactory.GetTransport(client)
inputProtocol := p.inputProtocolFactory.GetProtocol(inputTransport)
outputProtocol := p.outputProtocolFactory.GetProtocol(outputTransport)
defer func() {
if e := recover(); e != nil {
log.Printf("panic in processor: %s: %s", e, debug.Stack())
}
}()
if inputTransport != nil {
defer inputTransport.Close()
}
if outputTransport != nil {
defer outputTransport.Close()
}
for {
ok, err := processor.Process(inputProtocol, outputProtocol)
if err, ok := err.(TTransportException); ok && err.TypeId() == END_OF_FILE {
return nil
} else if err != nil {
log.Printf("error processing request: %s", err)
return err
}
if err, ok := err.(TApplicationException); ok && err.TypeId() == UNKNOWN_METHOD {
continue
}
if !ok {
break
}
}
return nil
}
複製代碼
Process
處理邏輯
func (p *EchoProcessor) Process(iprot, oprot thrift.TProtocol) (success bool, err thrift.TException) {
name, _, seqId, err := iprot.ReadMessageBegin()
if err != nil {
return false, err
}
// 獲取傳遞過來的name,若是存在則處理
if processor, ok := p.GetProcessorFunction(name); ok {
return processor.Process(seqId, iprot, oprot)
}
// 異常邏輯
iprot.Skip(thrift.STRUCT)
iprot.ReadMessageEnd()
x3 := thrift.NewTApplicationException(thrift.UNKNOWN_METHOD, "Unknown function "+name)
oprot.WriteMessageBegin(name, thrift.EXCEPTION, seqId)
x3.Write(oprot)
oprot.WriteMessageEnd()
oprot.Flush()
return false, x3
}
複製代碼
TServer
接收到rpc
請求以後,調用TProcessorprocess
進行處理。 TProcessorprocess
首先調用TTransport.readMessageBegin
接口,讀出rpc
調用的名稱和rpc
調用類型。
若是rpc
調用類型是rpc call
,則調用TProcessor.process_fn
繼續處理,對於未知的rpc
調用類型,則拋出異常。 TProcessor.process_fn
根據rpc
調用名稱,到本身的processMap
中查找對應的rpc
處理函數。若是存在對應的rpc
處理函數,則調用該處理函數繼續進行請求響應。不存在則拋出異常。
func (p *echoProcessorEcho) Process(seqId int32, iprot, oprot thrift.TProtocol) (success bool, err thrift.TException) {
args := EchoEchoArgs{}
// 讀取入參的參數
if err = args.Read(iprot); err != nil {
iprot.ReadMessageEnd()
x := thrift.NewTApplicationException(thrift.PROTOCOL_ERROR, err.Error())
oprot.WriteMessageBegin("echo", thrift.EXCEPTION, seqId)
x.Write(oprot)
oprot.WriteMessageEnd()
oprot.Flush()
return false, err
}
iprot.ReadMessageEnd()
result := EchoEchoResult{}
var retval *EchoRes
var err2 error
// 此處是thrift爲何err不能傳錯誤,若是傳業務錯誤會被阻塞
if retval, err2 = p.handler.Echo(args.Req); err2 != nil {
x := thrift.NewTApplicationException(thrift.INTERNAL_ERROR, "Internal error processing echo: "+err2.Error())
oprot.WriteMessageBegin("echo", thrift.EXCEPTION, seqId)
x.Write(oprot)
oprot.WriteMessageEnd()
oprot.Flush()
return true, err2
} else {
result.Success = retval
}
if err2 = oprot.WriteMessageBegin("echo", thrift.REPLY, seqId); err2 != nil {
err = err2
}
if err2 = result.Write(oprot); err == nil && err2 != nil {
err = err2
}
if err2 = oprot.WriteMessageEnd(); err == nil && err2 != nil {
err = err2
}
if err2 = oprot.Flush(); err == nil && err2 != nil {
err = err2
}
if err != nil {
return
}
return true, err
}
複製代碼
stop
代碼var once sync.Once
func (p *TSimpleServer) Stop() error {
q := func() {
p.quit <- struct{}{}
p.serverTransport.Interrupt()
}
once.Do(q)
return nil
}
複製代碼
stop
函數比較簡單,能夠看出直接向阻塞隊列裏面寫入數據,而後server
再也不接受請求
Client
調用的函數
func (p *EchoClient) Echo(req *EchoReq) (r *EchoRes, err error) {
if err = p.sendEcho(req); err != nil {
return
}
return p.recvEcho()
}
複製代碼
sendEcho()
函數
func (p *EchoClient) sendEcho(req *EchoReq) (err error) {
oprot := p.OutputProtocol
if oprot == nil {
oprot = p.ProtocolFactory.GetProtocol(p.Transport)
p.OutputProtocol = oprot
}
// seqid + 1
p.SeqId++
if err = oprot.WriteMessageBegin("echo", thrift.CALL, p.SeqId); err != nil {
return
}
// 構建參數
args := EchoEchoArgs{
Req: req,
}
if err = args.Write(oprot); err != nil {
return
}
// 通知服務器發送完畢
if err = oprot.WriteMessageEnd(); err != nil {
return
}
return oprot.Flush()
}
複製代碼
recvEcho()
函數
func (p *EchoClient) recvEcho() (value *EchoRes, err error) {
iprot := p.InputProtocol
if iprot == nil {
iprot = p.ProtocolFactory.GetProtocol(p.Transport)
p.InputProtocol = iprot
}
//
method, mTypeId, seqId, err := iprot.ReadMessageBegin()
if err != nil {
return
}
if method != "echo" {
err = thrift.NewTApplicationException(thrift.WRONG_METHOD_NAME, "echo failed: wrong method name")
return
}
if p.SeqId != seqId {
err = thrift.NewTApplicationException(thrift.BAD_SEQUENCE_ID, "echo failed: out of sequence response")
return
}
if mTypeId == thrift.EXCEPTION {
error0 := thrift.NewTApplicationException(thrift.UNKNOWN_APPLICATION_EXCEPTION, "Unknown Exception")
var error1 error
error1, err = error0.Read(iprot)
if err != nil {
return
}
if err = iprot.ReadMessageEnd(); err != nil {
return
}
err = error1
return
}
if mTypeId != thrift.REPLY {
err = thrift.NewTApplicationException(thrift.INVALID_MESSAGE_TYPE_EXCEPTION, "echo failed: invalid message type")
return
}
result := EchoEchoResult{}
if err = result.Read(iprot); err != nil {
return
}
if err = iprot.ReadMessageEnd(); err != nil {
return
}
value = result.GetSuccess()
return
}
複製代碼
go get git.apache.org/thrift.git/lib/go/thrift
失敗github.com
提供的版本會報未知錯誤問題2須要根據你的thrift -version
來判斷下載哪個版本的thrift
,好比個人thrift版本是0.10.0
那麼須要下載的thrift
地址爲https://github.com/apache/thrift/archive/0.10.0.zip
手動建立mkdir -p git.apache.org/thrift.git/lib/go/
目錄,而後將下載後的go
文件移至該目錄下~