Recently I'm making a customer feedback web applciation.Since the backend and app end already using the protocal ProtoBuf(Protocol Buffer).So I just socket with'em in the same way.git
Doing some researches about ProtoBuf stuff and found that it really deserve to be the chosen one.github
So What are protocol buffers?web
Protocol buffers are Google's language-neutral, platform-neutral, extensible mechanism for serializing structured data – think XML, but smaller, faster, and simpler. You define how you want your data to be structured once, then you can use special generated source code to easily write and read your structured data to and from a variety of data streams and using a variety of languages.算法
Talk is cheap,let's focus on code,here I use typescript to develop.typescript
Here's the protocal we base on the protobuff.數據庫
/**
* 應用層的協議
*/
syntax = "proto3";
package protoc;
/*
協議頭:
五部分組成:
首字節: 是否壓縮標誌位。0:不壓縮;1:壓縮。 gzip壓縮方式
第二個字節: 用於序列化方式: 0:pb 1:json
第3、四個字節: 表示請求命令:
0x01 ConfigReq
0x02 ConfigResp
0x03 ConfigAck
0x04 Sync
0x05 SyncData
0x06 SyncDataFin
0x07 SyncDataFinAck
0x08 Notify MsgGroup group = 4; //命令消息指向的消息的大類
0x09 NotifyFin
0x0A NotifyFinAck
第五到八個字節:
用於存放uuid信息,記錄用戶回話信息
其餘:剩餘8個字節爲保留字段
*/
//消息大類
enum MsgGroup {
IM = 0; //即時消息類
Comment = 1; //評論類
Up = 2; //點贊類
Follow = 3; //關注類
Review = 4; //審覈類
}
//配置相關
//客戶端發送配置信息
message ConfigReq {
string uid = 1; //用戶id
string sid = 2; //用戶sid、設備id
string appver = 3; //當前版本信息
string token = 4; //客戶端攜帶token信息
string appid = 5; //app相關
string did = 6; //設備惟一標識
uint32 protover = 7; //協議版本號
string logInfo = 8; //用於日誌 client_id=xxx`uid=yyy`
}
//增長錯誤碼標識
//服務端返回
message ConfigResp {
string channelAes = 1; //加解密算法 不用加密:""
uint32 errorCode = 2; //錯誤碼信息 700:成功 701 token錯誤 702 token
uint64 seqId = 3 ; //服務端當前消息位置
}
//客戶端確認 無需內容
message ConfigAck {
}
//消息分類類型
enum ChannelType {
IMMsg = 0; //即時消息
CommandMsg = 1; //命令消息
SystemMsg = 2; //系統消息
}
//消息類型 Message Type
enum IMMsgType {
Text = 0; //文本
Audio = 1; //音頻
Video = 2; //視頻
Image = 3; //圖片
}
//消息狀態:
enum CommandMsgType {
Read = 0; // 已讀
Received = 1; //已達
Cancel = 2; //撤銷
Deleted = 3; //刪除
}
//系統消息
enum SystemMsgType {
CommentUp = 0; //評論點贊
CommentReply = 1; //評論回覆
ContentComment = 2; //內容評論,feed流
ContentUp = 3; //內容點贊
ContentReview = 4; //審覈被拒消息
Followed = 5; //關注
}
//消息頭部
message MessageHeader {
uint64 seqId = 1; //消息ID
string from = 2; //消息發送方, 系統消息:業務名
string to = 3; //消息的接收方
int64 createTime = 4; //消息建立時間
uint64 sourceId = 5; //消息初始seq id
}
//文本消息
message TextMessage {
string text = 2; //消息體
}
//圖片消息
message ImageMessage {
string coverUrl = 2; //圖片縮略圖
string imageUrl = 3; //圖片uri
uint32 height = 4; //圖片高
uint32 width =5; //圖片寬
}
//notice消息
message NoticeMessage {
string data = 2; //Notice消息進行透傳
string noticeId = 3; //系統消息的id
}
//comment消息
message CommentMessage {
string data = 2; //評論消息進行透傳
string noticeId = 3; //系統消息的id
}
//command消息
message CommandMessage {
uint64 msgSoxurceId = 2; //命令消息指向的消息
uint64 msgTargetId = 3; //多端read問題
MsgGroup group = 4; //命令消息指向的消息的大類
}
//---------------------------------消息協議--------------------------------------
//用戶發起Sync包
message Sync {
uint64 seqId = 2; //sync包發起序號
}
//sync返回的數據
message SyncData {
uint64 seqId = 2; //當前的消息位置
repeated DataEntry data = 3; //最終序列化後的數據
//數據體
message DataEntry {
MessageHeader header = 1;
bool isFcm = 14;
ChannelType channel = 4;
oneof MessageType {
IMMsgType imMsgType = 5; //即時消息類型
CommandMsgType commandType = 6; //命令類型
SystemMsgType systemMsgType = 7; //系統消息類型
}
//存放具體的消息內容
oneof Body {
TextMessage textMessage = 8;
ImageMessage imageMessage = 9;
NoticeMessage noticeMessage = 10;
CommandMessage commandMessage = 11;
CommentMessage commentMessage = 12;
AudioMessage audioMessage = 13;
}
//bytes body = 8; // 存放消息信息 <===>結構體
}
}
//客戶端數據接收結束
message SyncDataFin {
repeated SyncDataResult syncDataResult = 1;
}
message SyncDataResult {
uint64 seqId = 1;
int32 errorCode = 2; //700 succ 711 協議不支持 712其餘錯誤
int64 createTime = 3;
}
//服務端確認
message SyncDataFinAck {
}
//服務端通知客戶端拉取消息
message Notify {
uint64 seqId = 1; //當前的消息位置
}
//客戶端收到通知
message NotifyFin {
}
//服務端確認
message NotifyFinAck {
}
message AudioMessage {
uint32 test1 = 1;
string test2 = 2;
}
複製代碼
According the protobuff file im.proto,we need to make up the message entity.We use protobufjs to load the protobuff file im.proto.json
As the loading action in asynchronous.So we directly lookupType all the entity in the constructor in order to use it directly in the afterward codes.bash
import * as protobuf from "protobufjs"
constructor() {
//把proto裏面的對象所有取出來
protobuf.load(require('./im.proto'), (err, root) => {
this.ConfigReq = root.lookupType('protoc.ConfigReq');
this.ConfigResp = root.lookupType('protoc.ConfigResp');
this.Sync = root.lookupType('protoc.Sync');
this.SyncData = root.lookupType('protoc.SyncData');
this.SyncDataFin = root.lookupType('protoc.SyncDataFin');
this.Notify = root.lookupType('protoc.Notify');
})
}
複製代碼
Init socket is the trigger of the im class,the uid here means the currentUser's id,The toAcc means other user's id we talking to. Before connecting the websocket,we pull the history message from the database.and then using Websocket to connect the server. The msg content from the server is blog,so we use FileReader to change it into Uint8Array,if it's needed,we need to unzip the body by gzip-buffer服務器
initSocket(uid: string, sid: string, toAcc: string) {
this.initParams(uid, sid, toAcc);
//先到數據庫查詢聊天曆史
request.get('im/history', { fromUid: uid, toUid: toAcc }).then((res: ImMsgInfoList) => {
this.seqId = res.seqId || 0;
[this.seqIdList, this.adaptMessageList] = adaptInitialMessages(res.list)
//獲取websocket連接
return request.get('im/init', { uid, sid })
}
).then((res: { data: string }) => {
return res.data;
}).then((url: string) => {
//鏈接websocket
this.socket = new WebSocket(`${url}/rpc/conn`);
this.socket.onopen = (event) => {
//完成初始化
this.configRequest();
}
this.socket.onmessage = (event) => {
//將blog轉換爲ArrayBuffer
const reader = new FileReader();
reader.readAsArrayBuffer(event.data);
reader.onload = () => {
const resBuffer = new Uint8Array(reader.result as ArrayBuffer);
//從頭部拿到消息類型
const msgType = resBuffer[SCOKET_TYPE_POSITION];
//從頭部拿到壓縮字段
const bodyIsGzipped = resBuffer[COMPRESS_POSITION];
//真正的內容body
const contentBuffer = resBuffer.slice(SCOKET_HEADER_SIZE);
if (bodyIsGzipped) {
//解壓body
gzipBuffer.gunzip(contentBuffer, (data: Uint8Array) => {
this.handleResult(msgType, data);
});
} else {
this.handleResult(msgType, contentBuffer);
}
}
}
//錯誤重連
this.socket.onerror = (event) => {
console.log('websocket error,', event);
this.caughtErr('connect error');
}
//關閉重連
this.socket.onclose = (event) => {
//若是是意外關閉
if (!this.positiveClose) {
console.log('socket closed accidently', event);
console.log('try to reconnect.....');
this.initSocket(uid, sid, toAcc);
this.onReconnect();
}
}
})
}
複製代碼
/**
* 組裝Scoket Buffer頭部
*/
private plugSocketBufferHead(buffer: Uint8Array, msgType: MSG_TYPE) {
//消息類型
buffer.set([msgType], SCOKET_TYPE_POSITION);
//消息惟一id
buffer.set(generateMsgId(), SCOKET_ID_POSITION);
return buffer;
}
/**
* 首次連接
*/
private configRequest() {
const payload = { uid: this.uid, sid: this.sid };
const config = this.ConfigReq.create(payload);
const contentBuffer: Uint8Array = this.ConfigReq.encode(config).finish();
let realBuffer = new Uint8Array(contentBuffer.length + SCOKET_HEADER_SIZE);
realBuffer = this.plugSocketBufferHead(realBuffer, MSG_TYPE.CONFIGREQ);
realBuffer.set(contentBuffer, SCOKET_HEADER_SIZE);
this.socket.send(realBuffer)
}
複製代碼
And here shows how to make a message as well as send to the server side.websocket
//im.ts
import request from "request";
import * as protobuf from "protobufjs"
import * as moment from 'moment'
const gzipBuffer = require('gzip-buffer');
import { generateMsgId, adaptComingMessages, getNeedMarkedCommandMessage, adaptInitialMessages } from './im_utils'
import { ConfigResult, SyncData, TextMessage, ImageMessage, ChannelType, IMMsgType, Notify, AdaptMessageEntity, AdaptMessageStatus, CommandMessage } from "./msg_result"
import { ImMsgInfoList } from "../request_result"
//消息類型,根據proto定義的類型,判斷返回類型以及傳送頭
enum MSG_TYPE {
CONFIGREQ = 1,
CONFIGRESP,
CONFIGACK,
SYNC,
SYNCDATA,
SYNCDATAFIN,
SYNCDATAFINACK,
NOTIFY,
NOTIFYFIN,
NOTIFYFINACK
}
//proto 頭部須要16個字節
const SCOKET_HEADER_SIZE = 16;
//頭部裏面type的位置
const SCOKET_TYPE_POSITION = 2;
//頭部裏面MSGID的位置
const SCOKET_ID_POSITION = 4;
//success code
const SUCCESS_CODE = 700;
//首字節:是否壓縮標誌位。0:不壓縮;1:壓縮。 gzip壓縮方式
const COMPRESS_POSITION = 0;
enum SOCKET_READY_STATE {
CONNECTING,
OPEN,
CLOSING,
CLOSED
}
export default abstract class IM {
//是否是主動去關閉這個websocket的,用於斷開從新
private positiveClose: boolean;
private uid: string;
private sid: string;
private toAcc: string;
//正在發送中的消息列表
private sendingMessageList: Map<number, AdaptMessageEntity>;
//整理好的信息列表
private adaptMessageList: AdaptMessageEntity[];
//目前已有的seqId list
private seqIdList: Set<number>;
//當前消息的offsetId
private seqId: number;
private socket: WebSocket;
private initializing: boolean;
//抓到錯誤會觸發
abstract caughtErr(errorMsg: string): void;
//初始化結束
abstract initFinish(): void;
//消息列表變更
abstract onMsgListChange(msgList: AdaptMessageEntity[]): void;
//socket意外斷開,觸發reconnect
abstract onReconnect(): void;
/**
* protoBuff定義的一些類
*/
private ConfigReq: protobuf.Type;
private ConfigResp: protobuf.Type;
private Sync: protobuf.Type;
private SyncData: protobuf.Type;
private SyncDataFin: protobuf.Type;
private Notify: protobuf.Type;
constructor() {
//把proto裏面的對象所有取出來
protobuf.load(require('./im.proto'), (err, root) => {
this.ConfigReq = root.lookupType('protoc.ConfigReq');
this.ConfigResp = root.lookupType('protoc.ConfigResp');
this.Sync = root.lookupType('protoc.Sync');
this.SyncData = root.lookupType('protoc.SyncData');
this.SyncDataFin = root.lookupType('protoc.SyncDataFin');
this.Notify = root.lookupType('protoc.Notify');
})
}
/**
* 組裝Scoket Buffer頭部
*/
private plugSocketBufferHead(buffer: Uint8Array, msgType: MSG_TYPE) {
//消息類型
buffer.set([msgType], SCOKET_TYPE_POSITION);
//消息惟一id
buffer.set(generateMsgId(), SCOKET_ID_POSITION);
return buffer;
}
/**
* 首次連接
*/
private configRequest() {
const payload = { uid: this.uid, sid: this.sid };
const config = this.ConfigReq.create(payload);
const contentBuffer: Uint8Array = this.ConfigReq.encode(config).finish();
let realBuffer = new Uint8Array(contentBuffer.length + SCOKET_HEADER_SIZE);
realBuffer = this.plugSocketBufferHead(realBuffer, MSG_TYPE.CONFIGREQ);
realBuffer.set(contentBuffer, SCOKET_HEADER_SIZE);
this.socket.send(realBuffer)
}
/**
* ack 統一都用這個哦
* @param type
*/
private ack(type: MSG_TYPE.CONFIGACK | MSG_TYPE.NOTIFYFIN | MSG_TYPE.SYNCDATAFINACK, needSync = true) {
let realBuffer = new Uint8Array(SCOKET_HEADER_SIZE);
realBuffer = this.plugSocketBufferHead(realBuffer, type);
this.socket.send(realBuffer)
if (Object.is(type, MSG_TYPE.CONFIGACK) || needSync) {
//拉消息
this.sync();
}
}
//拉消息
private sync() {
const payload = { seqId: this.seqId || 0 };
console.log('sync', payload);
const syncPayload = this.Sync.create(payload);
const contentBuffer: Uint8Array = this.Sync.encode(syncPayload).finish();
let realBuffer = new Uint8Array(contentBuffer.length + SCOKET_HEADER_SIZE);
realBuffer = this.plugSocketBufferHead(realBuffer, MSG_TYPE.SYNC);
realBuffer.set(contentBuffer, SCOKET_HEADER_SIZE);
this.socket.send(realBuffer)
}
private makeMessageHeader(from: string, to: string, seqId: number) {
return {
from,
to,
seqId,
createTime: moment().format('X')
}
}
/**
* 消息的工廠方法,目前只支持imageMessage或者textMessage
* 後續能夠繼續++++,參考proto文件
* @param msgPayload
*
*/
private makeImMessage(msgPayload: TextMessage | ImageMessage, imMsgType: IMMsgType) {
const header = this.makeMessageHeader(this.uid, this.toAcc, this.seqId + 1);
switch (imMsgType) {
case IMMsgType.Image: {
//圖片類型
return { data: [{ header, imageMessage: msgPayload as ImageMessage, channel: ChannelType.IMMsg, imMsgType: IMMsgType.Image }] }
}
case IMMsgType.Text: {
//文本類型
return { data: [{ header, textMessage: msgPayload as TextMessage, channel: ChannelType.IMMsg, imMsgType: IMMsgType.Text }] }
}
}
}
/**
* 發消息
* @param msgPayload
* @param IMMsgType
*/
sendMessage(msgPayload: TextMessage | ImageMessage, imMsgType: IMMsgType) {
const payload = this.makeImMessage(msgPayload, imMsgType);
this.syncData(payload);
//插入到當前AdaptMessageList,並將狀態置爲sending
const adaptMessage: any = payload.data[0];
adaptMessage.status = AdaptMessageStatus.Sending;
this.seqIdList.add(adaptMessage.header.seqId);
this.adaptMessageList.push(adaptMessage);
//放入到發送中的消息列表
this.sendingMessageList.set(adaptMessage.header.seqId, adaptMessage);
}
private syncData(payload: any) {
const syncData = this.SyncData.create(payload);
const contentBuffer: Uint8Array = this.SyncData.encode(syncData).finish();
//壓縮body
gzipBuffer.gzip(contentBuffer, (data: Uint8Array) => {
let realBuffer = new Uint8Array(data.length + SCOKET_HEADER_SIZE);
realBuffer = this.plugSocketBufferHead(realBuffer, MSG_TYPE.SYNCDATA);
//告訴服務器要壓縮
realBuffer.set([1], COMPRESS_POSITION);
realBuffer.set(data, SCOKET_HEADER_SIZE);
this.socket.send(realBuffer)
});
}
/**
* 告訴服務器已讀
* @param sourceSeqIdList sourceSeqId列表
*/
private sendCommands(commandMessage: CommandMessage) {
if (commandMessage && commandMessage.msgSourceId) {
const msgTargetId = this.seqId + 1;
const header = this.makeMessageHeader(this.uid, this.toAcc, msgTargetId);
const payload = { data: [{ header, commandMessage, channel: ChannelType.CommandMsg, commandType: AdaptMessageStatus.Read }] }
this.syncData(payload);
}
}
closeSocket() {
//關閉websoket
if (this.socket && (Object.is(this.socket.readyState, SOCKET_READY_STATE.OPEN)
|| Object.is(this.socket.readyState, SOCKET_READY_STATE.CONNECTING))) {
this.positiveClose = true;
this.socket.close();
}
this.socket = null;
}
private initParams(uid: string, sid: string, toAcc: string) {
if (!uid || !sid || !toAcc) {
throw Error('uid,sid,toAcc must not be null');
}
this.uid = uid;
this.sid = sid;
this.toAcc = toAcc;
this.adaptMessageList = [] as AdaptMessageEntity[];
this.seqIdList = new Set();
this.seqId = 0;
this.positiveClose = false;
this.initializing = true;
this.sendingMessageList = new Map();
}
initSocket(uid: string, sid: string, toAcc: string) {
this.initParams(uid, sid, toAcc);
//先到數據庫查詢聊天曆史
request.get('im/history', { fromUid: uid, toUid: toAcc }).then((res: ImMsgInfoList) => {
this.seqId = res.seqId || 0;
[this.seqIdList, this.adaptMessageList] = adaptInitialMessages(res.list)
//獲取websocket連接
return request.get('im/init', { uid, sid })
}
).then((res: { data: string }) => {
return res.data;
}).then((url: string) => {
//鏈接websocket
this.socket = new WebSocket(`${url}/rpc/conn`);
this.socket.onopen = (event) => {
//完成初始化
this.configRequest();
}
this.socket.onmessage = (event) => {
//將blog轉換爲ArrayBuffer
const reader = new FileReader();
reader.readAsArrayBuffer(event.data);
reader.onload = () => {
const resBuffer = new Uint8Array(reader.result as ArrayBuffer);
//從頭部拿到消息類型
const msgType = resBuffer[SCOKET_TYPE_POSITION];
//從頭部拿到壓縮字段
const bodyIsGzipped = resBuffer[COMPRESS_POSITION];
//真正的內容body
const contentBuffer = resBuffer.slice(SCOKET_HEADER_SIZE);
if (bodyIsGzipped) {
//解壓body
gzipBuffer.gunzip(contentBuffer, (data: Uint8Array) => {
this.handleResult(msgType, data);
});
} else {
this.handleResult(msgType, contentBuffer);
}
}
}
//錯誤重連
this.socket.onerror = (event) => {
console.log('websocket error,', event);
this.caughtErr('connect error');
}
//關閉重連
this.socket.onclose = (event) => {
//若是是意外關閉
if (!this.positiveClose) {
console.log('socket closed accidently', event);
console.log('try to reconnect.....');
this.initSocket(uid, sid, toAcc);
this.onReconnect();
}
}
})
}
private handleResult(msgType: MSG_TYPE, contentBuffer: Uint8Array) {
switch (msgType) {
case MSG_TYPE.CONFIGRESP:
this.configResult(contentBuffer);
break;
case MSG_TYPE.SYNCDATA:
this.syncResult(contentBuffer);
break;
case MSG_TYPE.SYNCDATAFIN:
this.ack(MSG_TYPE.SYNCDATAFINACK)
break;
case MSG_TYPE.NOTIFY:
this.notifyResult(contentBuffer);
break;
}
}
private notifyResult(buffer: Uint8Array) {
const result = this.Notify.decode(buffer) as Notify;
console.log('notifyResult', result);
//有新消息就去拿,沒有就不拿
this.ack(MSG_TYPE.NOTIFYFIN, result.seqId > this.seqId);
if (result.seqId <= this.seqId && this.initializing) {
// 正式完成註冊,調用初始化完畢回調
this.initializing = false;
this.initFinish();
}
}
private getSyncDataFinBody(syncDataRes: SyncData) {
return syncDataRes.data.map(data => {
return {
seqId: data.header.seqId,
errorCode: SUCCESS_CODE,
createTime: data.header.createTime
}
})
}
private syncDataFin(syncDataRes: SyncData) {
const payload = { syncDataResult: this.getSyncDataFinBody(syncDataRes) }
const syncDataFin = this.SyncDataFin.create(payload);
const contentBuffer: Uint8Array = this.SyncDataFin.encode(syncDataFin).finish();
let realBuffer = new Uint8Array(contentBuffer.length + SCOKET_HEADER_SIZE);
realBuffer = this.plugSocketBufferHead(realBuffer, MSG_TYPE.SYNCDATAFIN);
realBuffer.set(contentBuffer, SCOKET_HEADER_SIZE);
this.socket.send(realBuffer)
}
private syncResult(buffer: Uint8Array) {
const result = this.SyncData.decode(buffer) as SyncData;
console.log('syncResult', result)
this.syncDataFin(result);
this.seqId = result.seqId;
//若是沒數據,就結束
if (!result.data.length && this.initializing) {
// 正式完成註冊,調用初始化完畢回調
this.initializing = false;
this.initFinish();
this.onMsgListChange(this.adaptMessageList);
return;
}
//接受新來的message而且裝配
[this.seqIdList, this.adaptMessageList, this.sendingMessageList] = adaptComingMessages(this.uid, this.toAcc, this.seqIdList, result.data, this.adaptMessageList, this.sendingMessageList);
if (!this.initializing) {
//若是初始化完成了,每次syncResult都檢查一下是否須要發送command
let commandMsg = null;
[commandMsg, this.adaptMessageList] = getNeedMarkedCommandMessage(this.toAcc, this.adaptMessageList);
this.sendCommands(commandMsg);
this.onMsgListChange(this.adaptMessageList);
}
}
private configResult(buffer: Uint8Array) {
const result = this.ConfigResp.decode(buffer) as ConfigResult;
if (!Object.is(result.errorCode, SUCCESS_CODE)) {
//錯誤碼信息 700:成功 701 token錯誤 702 token
this.caughtErr('config error');
return;
}
this.ack(MSG_TYPE.CONFIGACK);
}
}
複製代碼
//im_utils.ts
import { MessageEntity, AdaptMessageEntity, ChannelType, AdaptMessageStatus, CommandMessage, IMMsgType } from "./msg_result"
import { ImMsgInfo } from "../request_result"
//用於生成每次socket 發送時候的隨機msgId
export const generateMsgId = () => {
//全部候選組成驗證碼的字符
const codeChars: Array<number> = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
return [
codeChars[Math.floor(Math.random() * codeChars.length)],
codeChars[Math.floor(Math.random() * codeChars.length)],
codeChars[Math.floor(Math.random() * codeChars.length)],
codeChars[Math.floor(Math.random() * codeChars.length)]
]
}
/**
* 對比消息體是否相同
*/
export const compareMsgList = (newMsgList: AdaptMessageEntity[], oldMsgList: AdaptMessageEntity[]): boolean => {
//若是長度不同,那確定不同的了
if (oldMsgList.length != newMsgList.length)
return false;
for (let i = 0; i < newMsgList.length; i++) {
//若是某個item的狀態不同
if (!Object.is(newMsgList[i].status, oldMsgList[i].status)) {
return false;
}
}
return true;
}
/**
* 將IM收到的信息轉化爲AdaptMessageEntity用來渲染
* @param fromUid 本身的uid,用來區分commandMsg的設置
* @param toUid 本身的toUid,用來篩選只是當前對話的用戶
* @param seqIdList 已經目前收到的全部message的seqId
* @param comingMessageList 服務端推過來的messages
* @param adaptMessageList 真正用於顯示的messages
* @param sendingMessageList 正在發送的messages
*/
export const adaptComingMessages = (fromUid: string, toUid: string, seqIdList: Set<number>, comingMessageList: MessageEntity[], adaptMessageList: AdaptMessageEntity[], sendingMessageList: Map<number, AdaptMessageEntity>): [Set<number>, AdaptMessageEntity[], Map<number, AdaptMessageEntity>] => {
const newSeqIdList = new Set([...seqIdList]);
const newAdaptMessageList = [...adaptMessageList];
const newSendingMessageList = new Map(sendingMessageList);
//from或者to其一要等於toUid
const usefulComingMsgs = comingMessageList.filter((message: MessageEntity) => {
return (Object.is(message.header.from, toUid) || Object.is(message.header.to, toUid));
})
for (const message of usefulComingMsgs) {
/**
* IMMsg = 0; //即時消息
* CommandMsg = 1; //命令消息
* SystemMsg = 2; //系統消息(暫時不處理)
*/
if (Object.is(message.channel, ChannelType.CommandMsg)) {
/**
* 參照多端已讀的圖,https://nemo.yuque.com/starhalo/rd/nm0mv9
* 若是from是本身就用msgSourceId
* 不然就用msgTargetId
* */
const keyId = Object.is(message.header.to, fromUid) ? message.commandMessage.msgSourceId : message.commandMessage.msgTargetId;
//若是以前根本沒有這條seqId,就不用管了
if (!newSeqIdList.has(keyId))
continue;
//倒着來找,由於消息都是最近的,這樣快一些吧
for (let i = newAdaptMessageList.length - 1; i >= 0; i--) {
//找到匹配的seqId,而後修改成對應的狀態
if (Object.is(newAdaptMessageList[i].header.seqId, keyId)) {
newAdaptMessageList[i].status = message.commandType;
break;
}
}
} else if (Object.is(message.channel, ChannelType.SystemMsg)) {
//系統消息直接不處理
continue;
} else {
//IM消息類型,textMessage或者imageMessage
//若是是本身剛剛發送過的
if (newSeqIdList.has(message.header.seqId)) {
//倒着來找,由於消息都是最近的,這樣快一些吧
for (let i = newAdaptMessageList.length - 1; i >= 0; i--) {
//找到匹配的seqId,而後修改成對應的狀態
if (Object.is(newAdaptMessageList[i].header.seqId, message.header.seqId)) {
newAdaptMessageList[i].status = AdaptMessageStatus.Sent;
newSendingMessageList.delete(message.header.seqId);
break;
}
}
} else {
const { Body, MessageType, isFcm, ...adaptMessage } = message;
(adaptMessage as AdaptMessageEntity).status = AdaptMessageStatus.Sent;
newAdaptMessageList.push(adaptMessage);
}
}
newSeqIdList.add(message.header.seqId);
}
return [newSeqIdList, newAdaptMessageList, newSendingMessageList];
}
/**
* 獲取須要發送已讀的列表
* @param toUid 對方的uid
* @param msgList 目前的消息列表
* 只須要通知最晚的一條就能夠了
*/
export const getNeedMarkedCommandMessage = (toUid: string, msgList: AdaptMessageEntity[]): [CommandMessage, AdaptMessageEntity[]] => {
let result = null;
let newMsgList = [...msgList];
for (let i = msgList.length - 1; i >= 0; i--) {
//是別人發的,並且狀態不是已讀,而且目前還沒知道過的
if (!result && Object.is(msgList[i].header.from, toUid) && !Object.is(msgList[i].status, AdaptMessageStatus.Read)) {
result = { msgSourceId: msgList[i].header.sourceId, msgTargetId: msgList[i].header.seqId };
}
if (Object.is(msgList[i].header.from, toUid)) {
//是別人發的,將它置爲已讀
newMsgList[i].status = AdaptMessageStatus.Read;
}
}
return [result, newMsgList];
}
/**
* 將數據庫記錄的信息轉化爲AdaptMessageEntity用來渲染
* @param fromUid
* @param initialMessageList
*/
export const adaptInitialMessages = (initialMessageList: ImMsgInfo[]): [Set<number>, AdaptMessageEntity[]] => {
let seqIdList = new Set<number>();
let adaptMessageList = [];
for (let initialMessage of initialMessageList) {
seqIdList.add(initialMessage.seqid);
const header = { seqId: initialMessage.seqid, from: initialMessage.fromuid, to: initialMessage.touid, createTime: initialMessage.msg_time };
let adaptMessage: any = { header }
if (Object.is(initialMessage.ctype, 'text')) {
adaptMessage[`${initialMessage.ctype}Message`] = { text: initialMessage.content };
adaptMessage.imMsgType = IMMsgType.Text;
} else {
adaptMessage[`${initialMessage.ctype}Message`] = { imageUrl: initialMessage.content };
adaptMessage.imMsgType = IMMsgType.Image;
}
//所有看成已讀
adaptMessage.status = AdaptMessageStatus.Read;
adaptMessageList.push(adaptMessage);
}
return [seqIdList, adaptMessageList]
}
複製代碼
//msg_result.ts
/**
* 對應proto文件返回的結果
*/
import * as protobuf from "protobufjs"
//消息分類類型
export enum ChannelType {
IMMsg = 0, //即時消息
CommandMsg = 1, //命令消息
SystemMsg = 2 //系統消息
}
export enum MessageType {
IMMsgType = 5, //即時消息類型
CommandMsgType = 6, //命令類型
SystemMsgType = 7 //系統消息類型
}
export enum AdaptMessageStatus {
Read = 0, // 已讀
Received = 1, //已達
Cancel = 2, //撤銷
Deleted = 3, //刪除
Sent = 4, //發過去服務端了,還沒知道結果
Sending = 5, //正在發送
}
//消息類型 Message Type
export enum IMMsgType {
Text = 0, //文本
Audio = 1, //音頻
Video = 2, //視頻
Image = 3 //圖片
}
export interface ConfigResult extends protobuf.Message {
errorCode: number,
channelAes: string,
seqId: string
}
export interface TextMessage extends protobuf.Message {
text: string
}
export interface ImageMessage extends protobuf.Message {
coverUrl?: string,
imageUrl: string,
height?: number,
width?: number
}
export interface MessageHeader extends protobuf.Message {
seqId?: number,
from: string,
to: string,
createTime: number
sourceId?: number //消息初始seq id
}
export interface CommandMessage {
msgSourceId: number; //命令消息指向的消息
msgTargetId: number; //多端read問題
}
export interface MessageEntity {
header?: MessageHeader,
channel?: ChannelType,
imMsgType?: IMMsgType,
textMessage?: TextMessage,
imageMessage?: ImageMessage,
commandMessage?: CommandMessage,
commandType?: AdaptMessageStatus
isFcm?: boolean,
MessageType?: string,
Body?: string,
}
export interface AdaptMessageEntity {
header?: MessageHeader,
textMessage?: TextMessage,
imMsgType?: IMMsgType,
imageMessage?: ImageMessage
status?: AdaptMessageStatus
}
export interface SyncData extends protobuf.Message {
seqId?: number,
data: Array<MessageEntity>
}
export interface SyncDataResult extends protobuf.Message {
seqId: number,
errorCode: number,
createTime: number
}
export interface Notify extends protobuf.Message {
seqId: number
}
複製代碼
It took me over 2 weeks to accomplish the application including the ui end.Also, for the UI emoji set I choose emoji-mart. This time I just provide some variants,if you have any of ideas,don't forget to leave an comment! And also star haha. Before this,I've been making a chatting system by socket.io,maybe I will also post another article including demonstration about that system next few days.