在前兩篇系列博文中,我已經實現了user-srv、web-srv、api-srv,在新的一篇博文中,我要講解的是如何在項目中如何使用redis存儲session。若是想直接查閱源碼或者經過demo學習的,能夠訪問ricoder_demo。git
如何編寫一個微服務?這裏用的是go的微服務框架go micro,具體的狀況能夠查閱:btfak.com/%E5%BE%AE%E…github
定義User的狀態操做函數,部分源碼以下:golang
syntax = "proto3";
package pb;
service UserStatus {
//經過uid獲取session
rpc GetSessionByUID(GetSessionByUIDReq) returns (GetSessionByUIDRep) {}
//經過token獲取session
rpc GetSessionByToken(GetSessionByTokenReq) returns (GetSessionByTokenRep) {}
//獲取用戶的長鏈接地址
rpc GetConnectorAddr(GetConnectorAddrReq) returns (GetConnectorAddrRep) {}
//更新用戶長鏈接地址(用戶創建長鏈接時調用)
rpc UpdateConnectorAddr(UpdateConnectorAddrReq) returns (UpdateConnectorAddrRep) {}
//構建session用戶登陸時調用,此接口會清除舊session
rpc NewSession(NewSessionReq) returns (NewSessionRep) {}
//移除session登出時會調用
rpc RemoveSession(RemoveSessionReq) returns (RemoveSessionRep) {}
//token續期
rpc RefreshSession(RefreshSessionReq) returns (RefreshSessionRep) {}
//更新用戶長鏈接地址(用戶創建長鏈接時調用)
rpc UserConnected(UserConnectedReq) returns (UserConnectedRep) {}
//刪除用戶的長鏈接地址(用戶長鏈接斷開時調用)
rpc UserDisonnected(UserDisonnectedReq) returns (UserDisonnectedRep) {}
//經過uid來移除session
rpc RemoveSessionByUID(RemoveSessionByUIDReq) returns (RemoveSessionByUIDRep) {}
//經過token找uid
rpc GetUserIDByToken(GetUserIDByTokenReq) returns (GetUserIDByTokenRep) {}
}
/* 還有一些定義,完整示例能夠查看源碼~ */複製代碼
$ bash ./build_proto.sh複製代碼
這個build_proto.sh是我本身構建的一個腳本文件,運行以後會在/src/share/pb/文件夾下面生成一個userStatus.pb.go文件web
我在src文件夾下面添加一個user-status-srv文件夾,並在裏邊添加一個handler文件夾和utils文件夾,一個存放handler文件,一個存放工具類函數,而後實現handler函數,源碼以下:redis
package handler
import (
//多個導入包,具體請查看源碼
)
type UserStatusHandler struct {
pool *redis.Pool
logger *zap.Logger
namespace string
sessionExpire int
tokenExpire int
}
func NewUserStatusHandler(pool *redis.Pool) *UserStatusHandler {
return &UserStatusHandler{
pool: pool,
sessionExpire: 15 * 86400,
tokenExpire: 15 * 86400,
}
}
//GetUserIDByToken GetUIDByToken
func (s *UserStatusHandler) GetUserIDByToken(ctx context.Context, req *pb.GetUserIDByTokenReq, rsp *pb.GetUserIDByTokenRep) error {
return nil
}
/* 還有其餘函數的實現,完整示例能夠查看源碼~ */複製代碼
這裏實現的函數所有先採用空實現,在後面會慢慢添加sql
源碼以下:數據庫
package main
import (
//多個導入包,具體查看完整源碼
)
func main() {
// 建立Service,並定義一些參數
service := micro.NewService(
micro.Name(config.Namespace+"userStatus"),
micro.Version("latest"),
)
// 定義Service動做操做
service.Init(
micro.Action(func(c *cli.Context) {
log.Println("micro.Action test ...")
// 註冊redis
redisPool := share.NewRedisPool(3, 3, 1,300*time.Second,":6379","redis")
// 先註冊db
db.Init(config.MysqlDSN)
pb.RegisterUserStatusHandler(service.Server(), handler.NewUserStatusHandler(redisPool), server.InternalHandler(true))
}),
micro.AfterStop(func() error {
log.Println("micro.AfterStop test ...")
return nil
}),
micro.AfterStart(func() error {
log.Println("micro.AfterStart test ...")
return nil
}),
)
log.Println("啓動user-status-srv服務 ...")
//啓動service
if err := service.Run(); err != nil {
log.Panic("user-status-srv服務啓動失敗 ...")
}
}複製代碼
由源碼能夠看出,我在啓動service以前先註冊了redis、db以及綁定handler,再經過Run啓動service。api
在瀏覽器打開 http://127.0.0.1:8500/ ,若是能夠在頁面中看到對應的srv,則說明service啓動成功。如:數組
在這一章節中,我將採用redis實現數據的存取。瀏覽器
在main.go函數中,我使用 *share.NewRedisPool(3, 3, 1,300time.Second,":6379","redis") 獲得了一個redisPool,NewRedisPool源碼以下:
func NewRedisPool(maxIdle, maxActive , DBNum int, timeout time.Duration, addr , password string) *redis.Pool {
return &redis.Pool{
MaxActive: maxActive,
MaxIdle: maxIdle,
IdleTimeout: timeout,
Wait: true,
Dial: func() (redis.Conn, error) {
// return redis.DialURL(rawurl)
// return redis.Dial("tcp", addr, redis.DialPassword(password), redis.DialDatabase(dbNum))
return redis.Dial("tcp", addr, redis.DialPassword(password), redis.DialDatabase(DBNum))
},
TestOnBorrow: func(c redis.Conn, t time.Time) error {
_, err := c.Do("PING")
return err
},
}
}複製代碼
在這裏我使用的是第三方開源框架,有興趣的能夠查看 github.com/garyburd/re… 瞭解狀況。
在這裏我以NewSession爲例,源碼以下:
func (s *UserStatusHandler) NewSession(ctx context.Context, req *pb.NewSessionReq, rsp *pb.NewSessionRep) error {
var oldSession *pb.Session
defer func() {
utils.SessionFree(oldSession)
}()
fieldMap := make(map[string]interface{}, 0)
fieldMap["Uid"] = req.Id
fieldMap["Address"] = req.Address
fieldMap["Phone"] = req.Phone
fieldMap["Name"] = req.Name
//生成Token
token, err := utils.NewToken(req.Id)
if err != nil {
log.Println("生成token失敗", zap.Error(err), zap.Int32("uid", req.Id))
return err
}
//刪除全部舊token
if err = utils.RemoveUserSessions(req.Id, s.pool); err != nil {
log.Println("刪除全部舊token失敗", zap.Error(err), zap.Int32("uid", req.Id))
return err
}
conn := s.pool.Get()
//會話數據寫入redis,格式:t:id => map的哈希值
if _, err := conn.Do("HMSET", redis.Args{}.Add(utils.KeyOfSession(req.Id)).AddFlat(fieldMap)...); err != nil {
conn.Close()
log.Println("會話數據寫入redis失敗", zap.Error(err), zap.String("key", utils.KeyOfSession(req.Id)), zap.Any("參數", fieldMap))
return err
}
//設置t:id的過時時間
if _, err := conn.Do("EXPIRE", utils.KeyOfSession(req.Id), s.sessionExpire); err != nil {
conn.Close()
s.logger.Error("設置session過時時間失敗", zap.Error(err), zap.String("key", utils.KeyOfSession(req.Id)))
return err
}
//用戶token寫入set裏邊,格式:t:uid:set:id => token
keyOfSet := utils.KeyOfSet(req.Id)
if _, err = conn.Do("SADD", keyOfSet, token); err != nil {
conn.Close()
log.Println("token寫入用戶集合失敗", zap.Error(err), zap.String("key", keyOfSet), zap.String("參數", token))
return err
}
//設置t:uid:set:id的過時時間
if _, err = conn.Do("EXPIRE", keyOfSet, s.sessionExpire); err != nil {
conn.Close()
log.Println("設置用戶token集合過時時間失敗", zap.Error(err), zap.String("key", keyOfSet))
return err
}
//將token和id對應,格式:token => id
if _, err = conn.Do("SETEX", utils.KeyOfToken(token), s.tokenExpire, req.Id); err != nil {
conn.Close()
log.Println("token寫入redis失敗", zap.Error(err), zap.String("key", utils.KeyOfToken(token)), zap.Int32("參數", req.Id))
return err
}
rsp.Token = token
return nil
}複製代碼
如代碼所示,操做redis的步驟是 conn := s.pool.Get() 先開啓一個鏈接,再經過conn.Do("EXPIRE", keyOfSet, s.sessionExpire) 的一種方式操做redis中的數據,具體的能夠查看redis的api,這裏有個函數 utils.SessionFree(oldSession) ,這是我在utils包下自定義的一個函數,這個知識點再接下來的知識點中會有涉及。
我在項目中使用了sync.pool存儲session對象,目的是爲了保存和複用session這個臨時對象,以減小內存分配,減低gc壓力,那麼sync.Pool是什麼呢?如下是官方給出的解釋(本身翻譯的):
如下是Pool的數據類型:
type Pool struct {
noCopy noCopy
local unsafe.Pointer // local fixed-size per-P pool, actual type is [P]poolLocal
localSize uintptr // size of the local array
// New optionally specifies a function to generate
// a value when Get would otherwise return nil.
// It may not be changed concurrently with calls to Get.
New func() interface{}
}
// Local per-P Pool appendix.
type poolLocalInternal struct {
private interface{} // Can be used only by the respective P.
shared []interface{} // Can be used by any P.
Mutex // Protects shared.
}
type poolLocal struct {
poolLocalInternal
// Prevents false sharing on widespread platforms with
// 128 mod (cache line size) = 0 .
pad [128 - unsafe.Sizeof(poolLocalInternal{})%128]byte
}複製代碼
由註釋咱們也能夠看出,其中的local成員的真實類型是一個poolLocal數組,而localSize是數組長度,poolLocal是真正保存數據的地方。private保存了一個臨時對象,shared是保存臨時對象的數組,而從private和shared的註釋咱們也能夠看出,一個是屬於特定的P私有的,一個是屬於全部的P,至於這個P是什麼,能夠自行參考golang的調度模型,後期我也會專門寫一篇相關的博客。其次,Pool是給每一個線程分配了一個poolLocal對象,就是說local數組的長度,就是工做線程的數量(size := runtime.GOMAXPROCS(0))。當多線程在併發讀寫的時候,一般狀況下都是在本身線程的poolLocal中存取數據,而只有當本身線程的poolLocal中沒有數據時,纔會嘗試加鎖去其餘線程的poolLocal中「偷」數據。
咱們能夠看看Get函數,源碼以下:
func (p *Pool) Get() interface{} {
if race.Enabled {
race.Disable()
}
l := p.pin()
x := l.private
l.private = nil
runtime_procUnpin()
if x == nil {
l.Lock()
last := len(l.shared) - 1
if last >= 0 {
x = l.shared[last]
l.shared = l.shared[:last]
}
l.Unlock()
if x == nil {
x = p.getSlow()
}
}
if race.Enabled {
race.Enable()
if x != nil {
race.Acquire(poolRaceAddr(x))
}
}
if x == nil && p.New != nil {
x = p.New()
}
return x
}複製代碼
這個函數的源碼並不難讀,在調用Get的時候首先會先在local數組中獲取當前線程對應的poolLocal對象,而後再從poolLocal對象中獲取private中的數據,若是private中有數據,則取出來直接返回。若是沒有則先鎖住shared,而後從shared中取出數據後直接返回,若是仍是沒有則調用getSlow函數。那麼爲何這裏要鎖住shared呢?答案咱們能夠在getSlow中找到,由於當shared中沒有數據的時候,會嘗試去其餘的poolLocal的shared中偷數據。
// See the comment in pin regarding ordering of the loads.
size := atomic.LoadUintptr(&p.localSize) // load-acquire
local := p.local // load-consume
// Try to steal one element from other procs.
pid := runtime_procPin()
runtime_procUnpin()
for i := 0; i < int(size); i++ {
l := indexLocal(local, (pid+i+1)%int(size))
l.Lock()
last := len(l.shared) - 1
if last >= 0 {
x = l.shared[last]
l.shared = l.shared[:last]
l.Unlock()
break
}
l.Unlock()
}
return x複製代碼
tip:該項目的源碼(包含數據庫的增刪查改的demo)能夠查看 源代碼
有興趣的能夠關注個人我的公衆號 ~