該IM模塊是使用golang開發,性能優秀,並且提供全套的客戶端,服務端程序,就是說若是要快速擁有一套完整的IM系統,這個IM是很不錯的選擇.git
IM分爲三個模塊,分別爲:github
im模塊---用於消息分發,能夠起多個模塊作分佈式,經過LVS實現負載均衡.golang
imr模塊---用於作im模塊的路由,多個im之間經過imr來搜索各個用戶的存在.web
ims模塊---用於作消息的順序存儲,不停的存儲在message_x文件,x表明數字,每一個文件存滿默認128M以後,會存到下一個文件,x數字遞增.redis
im消息處理:bash
用戶A->B:
websocket
1.SaveMessage:保存消息到目標用戶B存儲隊列 (rpc->SavePeerMessage)
app
2.SaveMessage:保存消息到發送用戶A存儲隊列(供多點登陸同步消息)
負載均衡
3.PushMessage:外部推送消息給目標用戶B(由IMR尋路由)MSG_IM
socket
4.SendMessage:發送同步消息給目標用戶B (外部推送+本地尋址發送) MSG_SYNC_NOTIFY
5.SendMessage:發送同步消息給發送用戶A(多點登陸)MSG_SYNC_NOTIFY
6.EnqueueMessage:給本鏈接回覆MSG_ACK消息
1.SaveMessage:保存消息,將消息發送給ims去保存
rpc庫: github.com/valyala/gorpc
函數: SaveMessage(appid int64, uid int64, device_id int64, m *Message) (int64, error)複製代碼
(1)由於ims支持分佈式,因此ims能夠有多個.因此經過對uid進行取模(%),來獲取對應的ims對象,達到負載均衡的效果,這樣每個uid就跟每個ims一一對應了,這也是爲何ims沒法動態添加的緣由.
(2)經過rpc函數SavePeerMessage,將msg發送給ims.
(3)最後rpc響應成功後,會返回一個msgid,也就是消息存儲在文件的偏移量.
2.PushMessage:外部推送消息給目標用戶(由IMR尋路由)
函數: func PublishMessage(appid int64, uid int64, m *Message) 複製代碼
(1)imr跟ims同樣支持分佈式,同樣沒法動態獲取,用uid來取模獲取imr的對象,uid與imr一一對應.獲取一個imr對象(channel).
(2)調用PublishMessage,將Message封裝成AppMessage,而後傳給chanel.Publish(amsg).
(3)channel.Publish將消息又封裝成Message,並傳入到channel隊列wt中.
(4)channel內部有個for循環,wt不停將消息發給SendMessage.
函數: func SendMessage(conn io.Writer, msg *Message) error複製代碼
(1)將字節流經過WriteMessage(buffer,msg),轉成成以下格式的buf
包:header(12)|body
header:len(4),seq(4),cmd(1),version(1),空(2)複製代碼
(2)將buf寫入conn.Write
(1)將msg傳給Connection 的wt隊列
(2)Connection的for,將wt中的msg,從新封裝成Message對象,調用Connection.send(msg)
(3)判斷是tcp就發給SendMessages,websocket就發給SendEngineIOBinaryMessage(conn,msg)
-------------------------------------------------------------------
功能:im模塊的啓動入口
main:
1.初始化配置參數.
2.設置RedisPool鏈接池.
3.設置ims鏈接及其幾個RPC方法,根據配置給的多個ims地址實現分佈式,可是不能動態添加,後面加入的ims模塊須要再重啓im模塊才能使用.兩種地址:
storage_rpc_addrs用來保存 我的消息/普通羣消息/客服消息
group_storage_rpc_addrs用來保存 超級羣消息
(可讓他們分開存放數據,若group_storage_rpc_addrs不存在時,直接使用storage_rpc_addrs,
咱們公司就是沒有分開存放,一個緣由也是沒有使用超級羣.)
tips: 超級羣消息和普通羣消息的區別
通常都是用普通羣,當有一個羣特別多人的狀況,可使用超級羣,可是千萬不要濫用.
普通羣消息:
當用戶在羣裏發送一條消息後,會對羣上的成員遍歷一次,有多少個成員就將消息存多少條,
分別存到羣成員各自的消息隊列上面.在發送消息的時候,性能消耗會多點.
超級羣消息:
當用戶在羣裏發送一條消息後,只會存一條消息到該羣的超級羣隊列上面.可是客戶端每次獲取消息的時候,
除了須要本身隊列上獲取消息,還要去超級羣隊列獲取,會增長獲取消息的耗時.
複製代碼
4.設置imr鏈接及其幾個RPC方法,兩種地址: route_addrs和group_route_addrs,狀況如上.
DispatchAppMessage, DispatchGroupMessage, DispatchRoomMessage
分發 普通消息,羣消息,房間消息複製代碼
5.設置關鍵詞字典
6.啓動羣管理: group_manager.Start() [group_manager.go文件] 建立多個臨時文件夾.
普通羣消息首先保存到臨時文件中,以後按照保存到文件中的順序依次派發,經過配置group_deliver_count的數量能夠設置多少建立個臨時文件夾,多少個協程來操做.
7.開啓訂閱redis服務
8.開啓SyncKey服務,SyncKey是每條消息存在文件中的偏移量,如一個文件上限設置128M,當偏移量超過128M,則取餘後存入第二個文件,以此類推,第三,第四...
9.開啓TCP和websocket的鏈接服務端
10.開啓API
(2)app_route.go
功能:以appid劃分多個集合,每一個集合存放route.及其針對AppRoute的Route增刪查改.
type AppRoute struct{
mutex sunc.Mutex
apps map[int64]*Route
}
(3)route.go
功能:以uid劃分多個集合,每一個集合存放每一個用戶的信息.及其針對Route的Clients增刪查改.
初始化Client使用了set是一個空struct,用來存聽任意類型的類.
type Route struct{
appid int64
mutex sunc.Mutex
clients map[int64]ClientSet
room_clients map[int64]ClientSet
}
(4)channel.go
功能:記錄用戶在線狀態,跟route的route_addrs,group_route_addrs數量對應.
(5)client.go
功能:客戶端鏈接的對象,負責與客戶端的通信(write和read).發送messages等待隊列中的消息.
type Client struct {
Connection //必須放在結構體首部
*PeerClient
*GroupClient
*RoomClient
*CustomerClient
public_ip int32
}複製代碼
read()獲取到消息,到client.HandleMessage(msg)去解析.
首次鏈接須要經過token驗證,im獲取token後,拼接上access_token_%s到redis尋找字段,若字段存在則經過驗證,不然鏈接失敗.
client.HandleAuthToken(login *AuthenticationToken,version int)
---驗證token,失敗則直接返回status=1,成功往下走,並返回status=0.
---經過platform_id判斷客戶端平臺,iOS和安卓屬於在線狀態,修改online狀態,中止用戶推送消息.
---爲client添加各類初始化信息.
---client.PeerClient.Login(),發送通知Sunbscribe函數,並通知調用Sunbscribe到全部group_route_channels.
---CountDAU按天劃分key,添加進HyperLogLog
(5)config.go
功能:配置初始參數變量
(6)connection.go
功能:處理消息隊列,經過uid尋找接收客戶端的對象,將消息塞到client對象的messages等待隊列
type Connection struct {
conn interface{}
closed int32
forbidden int32 //是否被禁言
notification_on bool //桌面在線時是否通知手機端
online bool
tc int32 //write channel timeout count
wt chan *Message
lwt chan int //客戶端協議版本號
version int
tm time.Time
appid int64
uid int64
device_id string
device_ID int64 //generated by device_id + platform_id
platform_id int8
messages *list.List //待發送的消息隊列 FIFO
mutex sync.Mutex
}複製代碼
(7)customer_client.go customer_service.go
功能:處理客服消息
(8)device.go
就一個方法,獲取設備的DeviceID,若是沒有就去redis申請一個最新的ID
(9)dummy_grpc.go grpc.go(廢棄)