聊聊kingbus的binlog_syncer_handler.go

本文主要研究一下kingbus的binlog_syncer_handler.gogit

StartBinlogSyncer

kingbus/api/binlog_syncer_handler.gogithub

//StartBinlogSyncer implements start a binlog syncer
func (h *BinlogSyncerHandler) StartBinlogSyncer(echoCtx echo.Context) error {
    h.l.Lock()
    defer h.l.Unlock()

    var args config.SyncerArgs
    var err error
    var syncerID int

    defer func() {
        if err != nil {
            log.Log.Errorf("StartBinlogSyncer error,err: %s", err)
            echoCtx.JSON(http.StatusInternalServerError, utils.NewResp().SetError(err.Error()))
        }
    }()

    err = echoCtx.Bind(&args)
    if err != nil {
        return err
    }
    //check args
    err = args.Check()
    if err != nil {
        return err
    }

    //forward to leader
    if h.svr.IsLeader() == false {
        req, err := json.Marshal(args)
        if err != nil {
            return echoCtx.JSON(http.StatusInternalServerError, utils.NewResp().SetError(err.Error()))
        }
        resp, err := h.sendToLeader("PUT", "/binlog/syncer/start", req)
        if err != nil {
            log.Log.Errorf("sendToLeader error,err:%s,args:%v", err, args)
            return echoCtx.JSON(http.StatusInternalServerError, utils.NewResp().SetError(err.Error()))
        }
        if resp.Message != "success" {
            return echoCtx.JSON(http.StatusInternalServerError, utils.NewResp().SetError(resp.Message))
        }
        return echoCtx.JSON(http.StatusOK, utils.NewResp().SetData(resp.Data))
    }

    //start syncer server
    err = h.svr.StartServer(config.SyncerServerType, &args)
    if err != nil {
        log.Log.Errorf("StartServer error,err:%s,args:%v", err, args)
        return err
    }

    //propose start syncer info
    err = h.ProposeSyncerArgs(&args)
    if err != nil {
        log.Log.Errorf("ProposeSyncerArgs error,err:%s,args:%v", err, args)
        return err
    }

    return echoCtx.JSON(http.StatusOK, utils.NewResp().SetData(syncerID))
}
  • StartBinlogSyncer方法先執行echoCtx.Bind(&args),而後針對h.svr.IsLeader()爲false的經過h.sendToLeader("PUT", "/binlog/syncer/start", req)將請求轉發給leader;爲true的則執行h.svr.StartServer(config.SyncerServerType, &args)啓動syncer server,而後執行h.ProposeSyncerArgs(&args)啓動propose syncer

StopBinlogSyncer

kingbus/api/binlog_syncer_handler.gojson

//StopBinlogSyncer implements stop binlog syncer
func (h *BinlogSyncerHandler) StopBinlogSyncer(echoCtx echo.Context) error {
    h.l.Lock()
    defer h.l.Unlock()

    //forward to leader
    if h.svr.IsLeader() == false {
        resp, err := h.sendToLeader("PUT", "/binlog/syncer/stop", nil)
        if err != nil {
            log.Log.Errorf("sendToLeader error,err:%s", err)
            return echoCtx.JSON(http.StatusInternalServerError, utils.NewResp().SetError(err.Error()))
        }
        if resp.Message != "success" {
            return echoCtx.JSON(http.StatusInternalServerError, utils.NewResp().SetError(resp.Message))
        }
        return echoCtx.JSON(http.StatusOK, utils.NewResp().SetData(resp.Data))
    }

    h.svr.StopServer(config.SyncerServerType)
    return echoCtx.JSON(http.StatusOK, utils.NewResp().SetData(""))
}
  • StopBinlogSyncer方法對於h.svr.IsLeader()爲false的經過h.sendToLeader("PUT", "/binlog/syncer/stop", nil)將請求轉發給leader;爲true的則執行h.svr.StopServer(config.SyncerServerType)

GetBinlogSyncerStatus

kingbus/api/binlog_syncer_handler.goapi

//GetBinlogSyncerStatus implements get binlog syncer status in the runtime state
func (h *BinlogSyncerHandler) GetBinlogSyncerStatus(echoCtx echo.Context) error {
    h.l.Lock()
    defer h.l.Unlock()

    //forward to leader
    if h.svr.IsLeader() == false {
        resp, err := h.sendToLeader("GET", "/binlog/syncer/status", nil)
        if err != nil {
            log.Log.Errorf("sendToLeader error,err:%s", err)
            return echoCtx.JSON(http.StatusInternalServerError, utils.NewResp().SetError(err.Error()))
        }
        if resp.Message != "success" {
            return echoCtx.JSON(http.StatusInternalServerError, utils.NewResp().SetError(resp.Message))
        }
        return echoCtx.JSON(http.StatusOK, utils.NewResp().SetData(resp.Data))
    }

    status := h.svr.GetServerStatus(config.SyncerServerType)
    if syncerStatus, ok := status.(*config.SyncerStatus); ok {
        return echoCtx.JSON(http.StatusOK, utils.NewResp().SetData(syncerStatus))
    }
    return echoCtx.JSON(http.StatusInternalServerError, utils.NewResp().SetError("no resp"))
}
  • GetBinlogSyncerStatus方法對於h.svr.IsLeader()爲false的經過h.sendToLeader("GET", "/binlog/syncer/status", nil)方法轉發給leader;爲true的則執行h.svr.GetServerStatus(config.SyncerServerType)獲取status

sendToLeader

kingbus/api/membership_handler.goapp

func (h *MembershipHandler) sendToLeader(method string, req []byte) (*utils.Resp, error) {
    leaderID := h.svr.Leader()
    leader := h.cluster.Member(leaderID)
    if leader == nil {
        return nil, ErrNoLeader
    }
    if len(leader.AdminURLs) != 1 {
        log.Log.Errorf("leader admin url is not 1,leader:%v", *leader)
        return nil, ErrNoLeader
    }
    leaderURL, err := url.Parse(leader.AdminURLs[0])
    if err != nil {
        return nil, err
    }
    url := leaderURL.Scheme + "://" + leaderURL.Host + "/members"

    resp, err := utils.SendRequest(method, url, req)
    if err != nil {
        log.Log.Errorf("sendToLeader:SendRequest error,err:%s,url:%s", err, url)
        return nil, err
    }

    return resp, nil
}
  • sendToLeader方法經過h.svr.Leader()獲取leaderId,而後執行h.cluster.Member(leaderID)獲取leader,以後構造leaderURL,而後執行utils.SendRequest(method, url, req)發送請求

SendRequest

kingbus/utils/http_utils.gourl

//SendRequest send PUT request to leader
func SendRequest(method string, leaderURL string, data []byte) (*Resp, error) {
    client := &http.Client{}
    req, err := http.NewRequest(method, leaderURL, bytes.NewBuffer(data))
    if err != nil {
        return nil, err
    }
    req.Header.Set("Content-Type", "application/json;charset=utf-8")

    resp, err := client.Do(req)
    if err != nil {
        return nil, err
    }
    defer resp.Body.Close()

    respBody, err := ioutil.ReadAll(resp.Body)
    if err != nil {
        return nil, err
    }

    r := new(Resp)
    err = json.Unmarshal(respBody, r)
    if err != nil {
        return nil, err
    }

    return r, nil
}
  • SendRequest經過http.NewRequest構建request,其contentType爲application/json;charset=utf-8,以後經過client.Do(req)發送請求,而後經過ioutil.ReadAll(resp.Body)讀取respBody,在經過json.Unmarshal(respBody, r)解析json爲Resp類型

小結

kingbus的binlog_syncer_handler.go提供了StartBinlogSyncer、StopBinlogSyncer、GetBinlogSyncerStatus方法code

doc

相關文章
相關標籤/搜索