golang RPC 應用(1) :net/rpc的應用

RPC(Remote Procedure Call)—遠程過程調用,它是一種經過網絡從遠程計算機程序上請求服務,而不須要了解底層網絡技術的協議。RPC協議假定某些傳輸協議的存在,如TCP或UDP,爲通訊程序之間攜帶信息數據。在OSI網絡通訊模型中,RPC跨越了傳輸層和應用層。RPC使得開發包括網絡分佈式多程序在內的應用程序更加容易。---百度百科
實際後臺開發中,rpc是服務器與服務器交互的方式之一,隱藏底層網絡實現,代碼程式化,開發效率高,BUG少。shell

經過一個簡單的demo來講明go官方rpc包的應用。
項目結構:服務器

rpc
    ----Makefile
    ----src
        ----client
            ----main.go
        ----protocol
            ----type.go
        ----server
            ----main.go

rpc/Makefile網絡

GOPATH := $(shell pwd)
all:
    GOPATH=${GOPATH} go install client
    GOPATH=${GOPATH} go install server

rpc/protocol/type.go異步

package protocol

const(
    RPC_ADDITION = "Calculator.Addition"
    RPC_SUBTRACTION = "Calculator.Subtraction"
    RPC_MULTIPLICATION = "Calculator.Multiplication"
    RPC_DIVISION = "Calculator.Division"
)

type Param struct {
    A int32
    B int32
}

RPC客戶端實現,包含同步(Call)和異步(Go)調用方式,一般爲了效率會使用異步方式。
rpc/src/client/main.gotcp

package main

import "net/rpc"
import (
    . "protocol"
    "fmt"
    "time"
)

var (    _CLIENT  *rpc.Client
    _RPC_MSG chan *rpc.Call
    _CAN_CANCEL chan bool
)

func main()  {
    DialRpcServer()
    //起個協程處理異步rpc調用結果
    go loop()

    //測試同步的方式調用rpc服務
    param := Param{A:int32(10),B:int32(30)}
    reply := int32(0)
    SyncCallRpcFunc(RPC_ADDITION, &param, &reply)
    fmt.Printf("Sync Call Addition Result %d \n", reply)
    SyncCallRpcFunc(RPC_SUBTRACTION, &param, &reply)
    fmt.Printf("Sync Call Subtraction Result %d \n", reply)
    ////測試異步的方式調用rpc服務
    ASyncCallRpcFunc(RPC_MULTIPLICATION, &param, &reply)
    ASyncCallRpcFunc(RPC_DIVISION, &param, &reply)
    //阻塞等待異步調用完成
    <- _CAN_CANCEL
}

func init(){
    _RPC_MSG = make(chan *rpc.Call, 1024)
    _CAN_CANCEL = make(chan bool)
}

func DialRpcServer(){
    c, e := rpc.DialHTTP("tcp", "127.0.0.1:2311")

    if e != nil {
        fmt.Errorf("Dial RPC Error %s", e.Error())
    }

    _CLIENT = c
}
//重連RPC服務器
func ReDialRpcServer() bool{
    c, e := rpc.DialHTTP("tcp", "127.0.0.1:2311")

    if e != nil {
        fmt.Printf("ReDial RPC Error %s \n", e.Error())

        return false
    }

    _CLIENT = c

    fmt.Println("ReDial Rpc Server Succ")

    return true
}
//同步rpc調用
func SyncCallRpcFunc(method string, args interface{}, reply interface{}){
    if nil == _CLIENT{
        for{//若是斷線就等到重連上爲止
            if ReDialRpcServer(){
                break
            }
            time.Sleep(5000 * time.Millisecond)
        }
    }

    _CLIENT.Call(method, args, reply)
}
//異步rpc調用
func ASyncCallRpcFunc(method string, args interface{}, reply interface{}){
    if nil == _CLIENT{
        for{//若是斷線就等到重連上爲止
            if ReDialRpcServer(){
                break
            }
            time.Sleep(5000 * time.Millisecond)
        }
    }
    // Go(serviceMethod string, args interface{}, reply interface{}, done chan *Call)的done若是填nil會構建個新的channel用於接受結果
    _CLIENT.Go(method, args, reply, _RPC_MSG)
}
//接收異步調用的返回
func loop(){
    for{
        select {
        case rpcMsg, ok := <- _RPC_MSG:
            if !ok{
                fmt.Errorf("Rpc Call Error")
            }
            rpcMsgHandler(rpcMsg)
        }
    }

    _CAN_CANCEL <- true
}
// 處理異步rpc的返回值
func rpcMsgHandler(msg * rpc.Call){
    switch msg.ServiceMethod {
    case RPC_ADDITION:
        reply := msg.Reply.(*int32)
        fmt.Printf("Addtoion Result [%d] \n", *reply)
    case RPC_SUBTRACTION:
        reply := msg.Reply.(*int32)
        fmt.Printf("Subtraction Result [%d] \n", *reply)
    case RPC_MULTIPLICATION:
        reply := msg.Reply.(*int32)
        fmt.Printf("Multiplication Result [%d] \n", *reply)
    case RPC_DIVISION:
        reply := msg.Reply.(*int32)
        fmt.Printf("Division Result [%d] \n", *reply)
    default:
        fmt.Errorf("Can Not Handler Reply [%s] \n", msg.ServiceMethod)

    }
}

RPC服務器的實現。
rpc/src/server/main.go分佈式

package main

import "net/rpc"
import (
    . "protocol"
    "errors"
    "net"
    "fmt"
    "net/http"
)

type Calculator struct {}

var (    _DATA *Calculator
    _CAN_CANCEL chan bool
)

func main()  {
    runRpcServer()
}

func init(){
    _DATA = new(Calculator)
    _CAN_CANCEL = make(chan bool)
}

func runRpcServer(){
    //rpc包裏面定義了個DefaultServer,缺省的Register和HandleHTTP均是對DefaultServer做的操做,若是想定製新的Server,就本身寫
    rpc.Register(_DATA)
    rpc.HandleHTTP()
    l,e := net.Listen("tcp","127.0.0.1:2311")
    if e != nil{
        fmt.Errorf("Create Listener Error %s", e.Error())
    }

    go http.Serve(l, nil)
    //阻塞主進程,等待客戶端輸入
    <-_CAN_CANCEL
}
//輸出方法的格式要求:func (t *T) MethodName(argType T1, replyType *T2) error
func (*Calculator) Addition(param *Param, reply *int32) error{
    *reply = param.A + param.B

    return nil
}

func (*Calculator) Subtraction(param *Param, reply *int32) error{
    *reply = param.A - param.B
    return nil
}

func (*Calculator) Multiplication(param *Param, reply *int32) error{
    *reply = param.A * param.B

    return nil
}

func (*Calculator) Division(param *Param, reply *int32) error{
    if 0 == param.B{
        return errors.New("divide by zero")
    }
    *reply = param.A/param.B

    return nil
}
相關文章
相關標籤/搜索