im_service--im模塊

前言

該IM模塊是使用golang開發,性能優秀,並且提供全套的客戶端,服務端程序,就是說若是要快速擁有一套完整的IM系統,這個IM是很不錯的選擇.git

IM分爲三個模塊,分別爲:github

im模塊---用於消息分發,能夠起多個模塊作分佈式,經過LVS實現負載均衡.golang

imr模塊---用於作im模塊的路由,多個im之間經過imr來搜索各個用戶的存在.web

ims模塊---用於作消息的順序存儲,不停的存儲在message_x文件,x表明數字,每一個文件存滿默認128M以後,會存到下一個文件,x數字遞增.redis





im模塊

--模塊目錄

帶小箭頭的文件爲幾個模塊共用文件,解析放在im模塊;benchmark是測試文件;Makefile是用來編譯的文件.



im消息處理:bash

用戶A->B:
websocket

1.SaveMessage:保存消息到目標用戶B存儲隊列 (rpc->SavePeerMessage)
app

2.SaveMessage:保存消息到發送用戶A存儲隊列(供多點登陸同步消息)
負載均衡

3.PushMessage:外部推送消息給目標用戶B(由IMR尋路由)MSG_IM
socket

4.SendMessage:發送同步消息給目標用戶B (外部推送+本地尋址發送) MSG_SYNC_NOTIFY

5.SendMessage:發送同步消息給發送用戶A(多點登陸)MSG_SYNC_NOTIFY

6.EnqueueMessage:給本鏈接回覆MSG_ACK消息


1.SaveMessage:保存消息,將消息發送給ims去保存

rpc庫: github.com/valyala/gorpc
函數:  SaveMessage(appid int64, uid int64, device_id int64, m *Message) (int64, error)複製代碼

      (1)由於ims支持分佈式,因此ims能夠有多個.因此經過對uid進行取模(%),來獲取對應的ims對象,達到負載均衡的效果,這樣每個uid就跟每個ims一一對應了,這也是爲何ims沒法動態添加的緣由.

      (2)經過rpc函數SavePeerMessage,將msg發送給ims.

      (3)最後rpc響應成功後,會返回一個msgid,也就是消息存儲在文件的偏移量.


2.PushMessage:外部推送消息給目標用戶(由IMR尋路由)

函數:  func PublishMessage(appid int64, uid int64, m *Message) 複製代碼

    (1)imr跟ims同樣支持分佈式,同樣沒法動態獲取,用uid來取模獲取imr的對象,uid與imr一一對應.獲取一個imr對象(channel).

    (2)調用PublishMessage,將Message封裝成AppMessage,而後傳給chanel.Publish(amsg).

    (3)channel.Publish將消息又封裝成Message,並傳入到channel隊列wt中.

    (4)channel內部有個for循環,wt不停將消息發給SendMessage.


  • SendMessage:發送同步消息給目標用戶B (外部推送+本地尋址發送)
    函數:  func SendMessage(conn io.Writer, msg *Message) error複製代碼

          (1)將字節流經過WriteMessage(buffer,msg),轉成成以下格式的buf

包:header(12)|body

header:len(4),seq(4),cmd(1),version(1),空(2)複製代碼

         (2)將buf寫入conn.Write

        

  • cliengt.EnqueueMessage:給本鏈接回覆MSG_ACK消息

        (1)將msg傳給Connection 的wt隊列

        (2)Connection的for,將wt中的msg,從新封裝成Message對象,調用Connection.send(msg)

        (3)判斷是tcp就發給SendMessages,websocket就發給SendEngineIOBinaryMessage(conn,msg)

  

-------------------------------------------------------------------

(1)im.go

功能:im模塊的啓動入口

main:

1.初始化配置參數.

2.設置RedisPool鏈接池.

3.設置ims鏈接及其幾個RPC方法,根據配置給的多個ims地址實現分佈式,可是不能動態添加,後面加入的ims模塊須要再重啓im模塊才能使用.兩種地址:

storage_rpc_addrs用來保存 我的消息/普通羣消息/客服消息 
group_storage_rpc_addrs用來保存 超級羣消息
(可讓他們分開存放數據,若group_storage_rpc_addrs不存在時,直接使用storage_rpc_addrs,
咱們公司就是沒有分開存放,一個緣由也是沒有使用超級羣.) 

tips: 超級羣消息和普通羣消息的區別
通常都是用普通羣,當有一個羣特別多人的狀況,可使用超級羣,可是千萬不要濫用.

普通羣消息:
當用戶在羣裏發送一條消息後,會對羣上的成員遍歷一次,有多少個成員就將消息存多少條,
分別存到羣成員各自的消息隊列上面.在發送消息的時候,性能消耗會多點.


超級羣消息: 
當用戶在羣裏發送一條消息後,只會存一條消息到該羣的超級羣隊列上面.可是客戶端每次獲取消息的時候,
除了須要本身隊列上獲取消息,還要去超級羣隊列獲取,會增長獲取消息的耗時.

 複製代碼


4.設置imr鏈接及其幾個RPC方法,兩種地址: route_addrs和group_route_addrs,狀況如上.

DispatchAppMessage, DispatchGroupMessage, DispatchRoomMessage
分發 普通消息,羣消息,房間消息複製代碼

5.設置關鍵詞字典


6.啓動羣管理: group_manager.Start() [group_manager.go文件] 建立多個臨時文件夾.
 普通羣消息首先保存到臨時文件中,以後按照保存到文件中的順序依次派發,經過配置group_deliver_count的數量能夠設置多少建立個臨時文件夾,多少個協程來操做.


7.開啓訂閱redis服務

8.開啓SyncKey服務,SyncKey是每條消息存在文件中的偏移量,如一個文件上限設置128M,當偏移量超過128M,則取餘後存入第二個文件,以此類推,第三,第四...

9.開啓TCP和websocket的鏈接服務端

10.開啓API



(2)app_route.go

功能:以appid劃分多個集合,每一個集合存放route.及其針對AppRoute的Route增刪查改.

type AppRoute struct{

         mutex     sunc.Mutex

         apps       map[int64]*Route

}


(3)route.go

功能:以uid劃分多個集合,每一個集合存放每一個用戶的信息.及其針對Route的Clients增刪查改.

初始化Client使用了set是一個空struct,用來存聽任意類型的類.

type Route struct{

         appid    int64

         mutex   sunc.Mutex

         clients  map[int64]ClientSet

          room_clients  map[int64]ClientSet

}


(4)channel.go

功能:記錄用戶在線狀態,跟route的route_addrs,group_route_addrs數量對應.


(5)client.go

功能:客戶端鏈接的對象,負責與客戶端的通信(write和read).發送messages等待隊列中的消息.

type Client struct {   
     Connection //必須放在結構體首部   
     *PeerClient   
     *GroupClient   
     *RoomClient   
     *CustomerClient   
     public_ip int32
}複製代碼

read()獲取到消息,到client.HandleMessage(msg)去解析.

首次鏈接須要經過token驗證,im獲取token後,拼接上access_token_%s到redis尋找字段,若字段存在則經過驗證,不然鏈接失敗.

client.HandleAuthToken(login *AuthenticationToken,version int)

---驗證token,失敗則直接返回status=1,成功往下走,並返回status=0.

---經過platform_id判斷客戶端平臺,iOS和安卓屬於在線狀態,修改online狀態,中止用戶推送消息.

---爲client添加各類初始化信息.

---client.PeerClient.Login(),發送通知Sunbscribe函數,並通知調用Sunbscribe到全部group_route_channels.

---CountDAU按天劃分key,添加進HyperLogLog


(5)config.go

功能:配置初始參數變量


(6)connection.go

功能:處理消息隊列,經過uid尋找接收客戶端的對象,將消息塞到client對象的messages等待隊列

type Connection struct {   
    conn   interface{}   
    closed int32      
    forbidden int32 //是否被禁言   
    notification_on bool //桌面在線時是否通知手機端   
    online bool      
    tc     int32 //write channel timeout count   
    wt     chan *Message   
    lwt    chan int      //客戶端協議版本號   
    version int   
    tm     time.Time   
    appid  int64   
    uid    int64   
    device_id string   
    device_ID int64 //generated by device_id + platform_id   
    platform_id int8      
    messages *list.List //待發送的消息隊列 FIFO   
    mutex  sync.Mutex
}複製代碼



(7)customer_client.go  customer_service.go

功能:處理客服消息


(8)device.go

就一個方法,獲取設備的DeviceID,若是沒有就去redis申請一個最新的ID


(9)dummy_grpc.go grpc.go(廢棄)

相關文章
相關標籤/搜索