本文主要研究一下kingbus的binlog_syncer_handler.gogit
kingbus/api/binlog_syncer_handler.gogithub
//StartBinlogSyncer implements start a binlog syncer func (h *BinlogSyncerHandler) StartBinlogSyncer(echoCtx echo.Context) error { h.l.Lock() defer h.l.Unlock() var args config.SyncerArgs var err error var syncerID int defer func() { if err != nil { log.Log.Errorf("StartBinlogSyncer error,err: %s", err) echoCtx.JSON(http.StatusInternalServerError, utils.NewResp().SetError(err.Error())) } }() err = echoCtx.Bind(&args) if err != nil { return err } //check args err = args.Check() if err != nil { return err } //forward to leader if h.svr.IsLeader() == false { req, err := json.Marshal(args) if err != nil { return echoCtx.JSON(http.StatusInternalServerError, utils.NewResp().SetError(err.Error())) } resp, err := h.sendToLeader("PUT", "/binlog/syncer/start", req) if err != nil { log.Log.Errorf("sendToLeader error,err:%s,args:%v", err, args) return echoCtx.JSON(http.StatusInternalServerError, utils.NewResp().SetError(err.Error())) } if resp.Message != "success" { return echoCtx.JSON(http.StatusInternalServerError, utils.NewResp().SetError(resp.Message)) } return echoCtx.JSON(http.StatusOK, utils.NewResp().SetData(resp.Data)) } //start syncer server err = h.svr.StartServer(config.SyncerServerType, &args) if err != nil { log.Log.Errorf("StartServer error,err:%s,args:%v", err, args) return err } //propose start syncer info err = h.ProposeSyncerArgs(&args) if err != nil { log.Log.Errorf("ProposeSyncerArgs error,err:%s,args:%v", err, args) return err } return echoCtx.JSON(http.StatusOK, utils.NewResp().SetData(syncerID)) }
kingbus/api/binlog_syncer_handler.gojson
//StopBinlogSyncer implements stop binlog syncer func (h *BinlogSyncerHandler) StopBinlogSyncer(echoCtx echo.Context) error { h.l.Lock() defer h.l.Unlock() //forward to leader if h.svr.IsLeader() == false { resp, err := h.sendToLeader("PUT", "/binlog/syncer/stop", nil) if err != nil { log.Log.Errorf("sendToLeader error,err:%s", err) return echoCtx.JSON(http.StatusInternalServerError, utils.NewResp().SetError(err.Error())) } if resp.Message != "success" { return echoCtx.JSON(http.StatusInternalServerError, utils.NewResp().SetError(resp.Message)) } return echoCtx.JSON(http.StatusOK, utils.NewResp().SetData(resp.Data)) } h.svr.StopServer(config.SyncerServerType) return echoCtx.JSON(http.StatusOK, utils.NewResp().SetData("")) }
kingbus/api/binlog_syncer_handler.goapi
//GetBinlogSyncerStatus implements get binlog syncer status in the runtime state func (h *BinlogSyncerHandler) GetBinlogSyncerStatus(echoCtx echo.Context) error { h.l.Lock() defer h.l.Unlock() //forward to leader if h.svr.IsLeader() == false { resp, err := h.sendToLeader("GET", "/binlog/syncer/status", nil) if err != nil { log.Log.Errorf("sendToLeader error,err:%s", err) return echoCtx.JSON(http.StatusInternalServerError, utils.NewResp().SetError(err.Error())) } if resp.Message != "success" { return echoCtx.JSON(http.StatusInternalServerError, utils.NewResp().SetError(resp.Message)) } return echoCtx.JSON(http.StatusOK, utils.NewResp().SetData(resp.Data)) } status := h.svr.GetServerStatus(config.SyncerServerType) if syncerStatus, ok := status.(*config.SyncerStatus); ok { return echoCtx.JSON(http.StatusOK, utils.NewResp().SetData(syncerStatus)) } return echoCtx.JSON(http.StatusInternalServerError, utils.NewResp().SetError("no resp")) }
kingbus/api/membership_handler.goapp
func (h *MembershipHandler) sendToLeader(method string, req []byte) (*utils.Resp, error) { leaderID := h.svr.Leader() leader := h.cluster.Member(leaderID) if leader == nil { return nil, ErrNoLeader } if len(leader.AdminURLs) != 1 { log.Log.Errorf("leader admin url is not 1,leader:%v", *leader) return nil, ErrNoLeader } leaderURL, err := url.Parse(leader.AdminURLs[0]) if err != nil { return nil, err } url := leaderURL.Scheme + "://" + leaderURL.Host + "/members" resp, err := utils.SendRequest(method, url, req) if err != nil { log.Log.Errorf("sendToLeader:SendRequest error,err:%s,url:%s", err, url) return nil, err } return resp, nil }
kingbus/utils/http_utils.gourl
//SendRequest send PUT request to leader func SendRequest(method string, leaderURL string, data []byte) (*Resp, error) { client := &http.Client{} req, err := http.NewRequest(method, leaderURL, bytes.NewBuffer(data)) if err != nil { return nil, err } req.Header.Set("Content-Type", "application/json;charset=utf-8") resp, err := client.Do(req) if err != nil { return nil, err } defer resp.Body.Close() respBody, err := ioutil.ReadAll(resp.Body) if err != nil { return nil, err } r := new(Resp) err = json.Unmarshal(respBody, r) if err != nil { return nil, err } return r, nil }
application/json;charset=utf-8
,以後經過client.Do(req)發送請求,而後經過ioutil.ReadAll(resp.Body)讀取respBody,在經過json.Unmarshal(respBody, r)解析json爲Resp類型kingbus的binlog_syncer_handler.go提供了StartBinlogSyncer、StopBinlogSyncer、GetBinlogSyncerStatus方法code