先上Demo動圖,效果以下: javascript
因爲redux更改數據是dispatch(action),因此很天然而然想到以action做爲基本單位在服務端和客戶端進行傳送,在客戶端和服務端用數組來存放action,那麼只要當客戶端和服務端的action隊列的順序保持同樣,reducer是純函數的特性能夠知道計算獲得的state是同樣的。前端
本文中C1,C2...Cn表示客戶端,S表示服務端,a1,a2,a3...an表示aciton,服務端是使用koa + socket.io來編寫的(做爲一個前端,服務端的知識幾乎爲0勿噴)。java
當客戶端發起一個action的時候,服務端接收到這個action,服務端作了3件事:git
不過上面的思路是比較籠統的,細想會發現許多問題:算法
服務端設立了2個概念:target、member指編輯對象(多是報告、流程圖等)和編輯用戶,當你要發送兩個action:a一、a2的時候,由於網絡發送前後的不肯定性,因此應該是先發送a1,而後等待客戶端接收到,再發送a2,這樣才能在客戶端中保證a1和a2的順序。所以,每一個member會有變量pending表示在不在發送,index表示發送的最新的action在action隊列中的索引。redux
this.socket.on('client send action', (action) => {
//目標將action放入隊列中,並返回該action的索引
let index = this.target.addAction(action);
if (this.pending) {
this.queue.push({
method: 'sendBeforeActions',
args: [index]
})
} else {
this.sendBeforeActions(index);
}
this.target.receiveAction({
member: this,
index
})
})
複製代碼
這就是上面講的當服務端收到a1的時候作的3件事情。只是這裏會去判斷該member是否是正在執行發送任務,若是是,那麼就將發送a1前面的aciton這個動做存入到一個動做隊列中,並告知target,我這個member發送了一個action。數組
sendBeforeActions(refIndex) {
let {actions} = this.getCurrent(refIndex);
actions = actions.slice(0, -1);
this.pending = true;
this.socket.emit('server send before actions', { actions }, () => {
this.pending = false;
this.target.setIndex(this, refIndex);
this.sendProcess();
});
}
複製代碼
這個函數接收一個索引,這個索引在上面的代碼中是這個member接收到的action在隊列中的索引,因此getCurrent(refIndex)指到refIndex爲止,還沒發送給這個member的全部的action(可能爲空),因此要剔除自己後actions.slice(0, -1)發送給客戶端。 回調中終止發送狀態,設置member最新的action的index,而後執行sendProcess函數去看看,在本身自己發送的過程當中,是否是有後續的動做存入到發送隊列中了瀏覽器
sendProcess() {
if (this.queue.length > 0 && !this.pending) {
let current = this.queue.shift();
let method = this[current.method];
method.apply(this, current.args);
}
}
複製代碼
若是你注意到剛纔的:緩存
if (this.pending) {
this.queue.push({
method: 'sendBeforeActions',
args: [index]
})
}
複製代碼
你就會發現,若是剛纔想發送before action的時候這個member在發送其餘action,那麼會等待這個action發送完後才觸發sendProcess去執行這個發送。bash
在剛纔的代碼中
//this指某個member對象
this.target.receiveAction({
member: this,
index
})
複製代碼
就是這個觸發了其餘客戶端的發送
//this指某個target對象
receiveAction({member, index}) {
this.members.forEach(m => {
if (m.id !== member.id) {
m.queue.push({
method: 'sendActions',
args: [index]
});
m.sendProcess();
}
})
}
複製代碼
若是members中存在發送方的member,那麼會將發送動做存入member的發送隊列中,執行sendProcess
sendActions(refIndex) {
let {actions} = this.getCurrent(refIndex);
if (actions.length) {
this.pending = true;
this.socket.emit('server send actions', {actions}, () => {
this.pending = false;
this.target.setIndex(this, refIndex);
this.sendProcess();
})
}
}
複製代碼
這個函數和sendBeforeActions幾乎同樣,只差要不要剔除最新的action,這樣,就保證了服務端的發送action順序
在客戶端中,將io有關的操做都封裝在一箇中間件中
module.exports = store => next => action => {
if (action.type === 'connection') {
//鏈接初始化一些事件
return initIo(action.payload)
}
if (action.type === 'disconnection') {
return socket.disconnect(action.payload)
}
if (['@replace/state'].indexOf(action.type.toLowerCase()) === -1 && !action.escapeServer && !action.temporary) {
//將action給定userId、targetId
action = actionCreator(action);
//獲得新的action隊列,並計算actions,而後更新到state上
let newCacheActions = [...cacheActions, action];
mapActionsToState(newCacheActions);
//發送給服務端
return delieverAction(action);
}
//這樣就只容許replace state 的action進入到store裏面,這個是我這個思路在實現undo、redo的一個要求,後面會講到
next();
}
複製代碼
具體做用後面會用到
let cacheActions = []; //action隊列,這個和服務端的action隊列保持一致
let currentActions = []; //根據cacheActions計算的action
let redoActions = {}; //緩存每一個用戶的undo後拿掉的action
let pending = false; //是否在發送請求
let actionsToPend = []; //緩存發送隊列
let beforeActions = []; //緩存pull下來的actions
let currentAction = null;//當前發送的action
let user, tid; //用戶名和targetId
let initialState; //初始的state
let timeline = []; //緩存state
複製代碼
主要講兩個地方:
(1)在computeActions的時候,碰到undo拿掉該用戶的最後一個action,並把倒數第二個action提高到最後的緣由是由於假如在該用戶倒數第二個action以後還有其餘用戶的action發生,那麼可能其餘用戶會覆蓋掉這個用戶action的設定值,那麼這個用戶undo的時候就沒法回到以前的狀態了,這時候提高至關因而undo後作了新的action,這個action就是前一次的action。這個算法是有bug的,當一個用戶undo的時候,因爲咱們會提高他倒數第二的action,這樣會致使與這個action衝突的action的修改被覆蓋。這個解決衝突的策略有點問題。若是沒有提高,那麼若是該用戶undo的時候,若是他上一個action被其餘用戶的action覆蓋了,那麼他就沒法undo回去了。這個是個痛點,我還在持續探索中,歡迎大神指教。
(2)在用戶pending的時候收到了actions,這個時候至關因而before actions。 下面貼幾個主要函數的代碼
function initIo(payload, dispatch) {
user = payload.user;
tid = parseInt(payload.tid, 10);
//初始化socket
let socket = cache.socket = io(location.protocol + '//' + location.host, {
query: {
user: JSON.stringify(user),
tid
}
});
//獲取初始數據
socket.on('deliver initial data', (params) => {
...獲取初始的state,actions
})
//發送action會等待pull以前的actions
socket.on('server send before actions', (payload, callback) => {
pending = false;
callback && callback();
let {actions} = payload;
actions = [...actions, ...beforeActions, currentAction];
cacheActions = [...cacheActions, ...actions];
if (actions.length > 1) {
//證實有前面的action,須要根據actions從新計算state
mapActionsToState();
}
if (actionsToPend.length) {
let action = actionsToPend.shift();
sendAction(action);
}
})
//接收actions
socket.on('server send actions', (payload, callback) => {
let {actions} = payload;
callback && callback();
if (pending) {
beforeActions = [...beforeActions, ...actions];
} else {
cacheActions = [...cacheActions, ...actions];
mapActionsToState();
}
})
}
複製代碼
function mapActionsToState(actions) {
actions = actions || cacheActions;
if (actions.length === 0) {
return replaceState(dispatch)(initialState);
}
let {newCurrentActions, newRedoActions} = computeActions(actions);
let {same} = diff(newCurrentActions);
let state = initialState;
if (timeline[same]) {
state = timeline[same];
timeline = timeline.slice(0, same + 1);
}
if (same === -1) {
timeline = [];
}
let differentActions = newCurrentActions.slice(same + 1);
differentActions.forEach(action => {
state = store.reducer(state, action);
timeline.push(state);
});
currentActions = newCurrentActions;
redoActions = newRedoActions;
store.canUndo = () => currentActions.some(action => action.userId === user.id);
store.canRedo = () => !!(redoActions[user.id] || []).length;
return replaceState(dispatch)(state);
}
複製代碼
function computeActions(actions) {
let newCurrentActions = [];
let newRedoActions = {};
actions.forEach(action => {
let type = action.type.toLowerCase();
newRedoActions[action.userId] = newRedoActions[action.userId] || [];
if (type !== 'redo' && type !== 'undo') {
newCurrentActions.push(action);
newRedoActions[action.userId] = [];
}
if (type === 'undo') {
let indexes = [];
for (let i = newCurrentActions.length - 1; i >= 0; i--) {
if (newCurrentActions[i].userId === action.userId) {
indexes.push(i);
}
if (indexes.length === 2) {
break;
}
}
if (indexes.length > 0) {
let redo = newCurrentActions.splice(indexes[0], 1)[0];
newRedoActions[action.userId].push(redo);
}
if (indexes.length > 1) {
let temp = newCurrentActions.splice(indexes[1], 1);
newCurrentActions.push(temp[0]);
}
}
if (type === 'redo') {
let redo = newRedoActions[action.userId].pop();
newCurrentActions.push(redo);
}
});
return {
newCurrentActions,
newRedoActions
}
}
複製代碼
function diff(newCurrentActions) {
let same = -1;
newCurrentActions.some((action, index) => {
let currentAction = currentActions[index];
if (currentAction && action.id === currentAction.id) {
same = index;
return false;
}
return true;
});
return {
same
}
}
複製代碼
講了一堆,不知道有沒有將本身的思路講清楚,本身的demo也運行了起來,測試只用了兩個瀏覽器來模擬測試,總感受一些併發延時出現還會有bug,後面會持續優化這個想法,添加一些自動化測試來驗證,另外,對於服務端的存儲也還沒考慮,先在只在內存中跑,會思考保存方案。但願對這方面有興趣的大神能夠指導一下