Redux應用多人協做的思路和實現

先上Demo動圖,效果以下: javascript

Alt pic

基本思路

因爲redux更改數據是dispatch(action),因此很天然而然想到以action做爲基本單位在服務端和客戶端進行傳送,在客戶端和服務端用數組來存放action,那麼只要當客戶端和服務端的action隊列的順序保持同樣,reducer是純函數的特性能夠知道計算獲得的state是同樣的。前端

一些約定

本文中C1,C2...Cn表示客戶端,S表示服務端,a1,a2,a3...an表示aciton,服務端是使用koa + socket.io來編寫的(做爲一個前端,服務端的知識幾乎爲0勿噴)。java

總體思路

當客戶端發起一個action的時候,服務端接收到這個action,服務端作了3件事:git

  1. 把action推動棧中
  2. 把該客戶端a1以前的action發送該客戶端(相似git push以前有個git pull的過程)
  3. 將a1發送給其餘的客戶端

Alt pic

不過上面的思路是比較籠統的,細想會發現許多問題:算法

  1. 若是a1到達S端的時候,C二、C3還有action在派發怎麼處理?
  2. 若是a1到達S端的時候,C1正在接收其餘客戶端發送的action怎麼處理?
  3. 若是a1發送以前,C1正在發送前一個action怎麼處理? 後面咱們一一解決。

服務端派發action機制

服務端設立了2個概念:target、member指編輯對象(多是報告、流程圖等)和編輯用戶,當你要發送兩個action:a一、a2的時候,由於網絡發送前後的不肯定性,因此應該是先發送a1,而後等待客戶端接收到,再發送a2,這樣才能在客戶端中保證a1和a2的順序。所以,每一個member會有變量pending表示在不在發送,index表示發送的最新的action在action隊列中的索引。redux

當服務端接收到客戶端的aciton的時候

this.socket.on('client send action', (action) => {
    //目標將action放入隊列中,並返回該action的索引
    let index = this.target.addAction(action);
      if (this.pending) {
        this.queue.push({
          method: 'sendBeforeActions',
          args: [index]
        })
      } else {
        this.sendBeforeActions(index);
      }
      this.target.receiveAction({
        member: this,
        index
      })
    })
複製代碼

這就是上面講的當服務端收到a1的時候作的3件事情。只是這裏會去判斷該member是否是正在執行發送任務,若是是,那麼就將發送a1前面的aciton這個動做存入到一個動做隊列中,並告知target,我這個member發送了一個action。數組

sendBeforeActions

sendBeforeActions(refIndex) {
    let {actions} = this.getCurrent(refIndex);
    actions = actions.slice(0, -1);
    this.pending = true;
    this.socket.emit('server send before actions', { actions }, () => {
      this.pending = false;
      this.target.setIndex(this, refIndex);
      this.sendProcess();
    });
  }
複製代碼

這個函數接收一個索引,這個索引在上面的代碼中是這個member接收到的action在隊列中的索引,因此getCurrent(refIndex)指到refIndex爲止,還沒發送給這個member的全部的action(可能爲空),因此要剔除自己後actions.slice(0, -1)發送給客戶端。 回調中終止發送狀態,設置member最新的action的index,而後執行sendProcess函數去看看,在本身自己發送的過程當中,是否是有後續的動做存入到發送隊列中了瀏覽器

sendProcess() {
    if (this.queue.length > 0 && !this.pending) {
      let current = this.queue.shift();
      let method = this[current.method];
      method.apply(this, current.args);
    }
  }
複製代碼

若是你注意到剛纔的:緩存

if (this.pending) {
    this.queue.push({
        method: 'sendBeforeActions',
        args: [index]
    })
}
複製代碼

你就會發現,若是剛纔想發送before action的時候這個member在發送其餘action,那麼會等待這個action發送完後才觸發sendProcess去執行這個發送。bash

還要將這個action發送給其餘用戶

在剛纔的代碼中

//this指某個member對象
this.target.receiveAction({
    member: this,
    index
})

複製代碼

就是這個觸發了其餘客戶端的發送

//this指某個target對象
receiveAction({member, index}) {
    this.members.forEach(m => {
      if (m.id !== member.id) {
        m.queue.push({
          method: 'sendActions',
          args: [index]
        });
        m.sendProcess();
      }
    })
  }
複製代碼

若是members中存在發送方的member,那麼會將發送動做存入member的發送隊列中,執行sendProcess

sendActions

sendActions(refIndex) {
    let {actions} = this.getCurrent(refIndex);
    if (actions.length) {
      this.pending = true;
      this.socket.emit('server send actions', {actions}, () => {
        this.pending = false;
        this.target.setIndex(this, refIndex);
        this.sendProcess();
      })
    }
  }

複製代碼

這個函數和sendBeforeActions幾乎同樣,只差要不要剔除最新的action,這樣,就保證了服務端的發送action順序

客戶端IO中間件

在客戶端中,將io有關的操做都封裝在一箇中間件中

module.exports = store => next => action => {
    if (action.type === 'connection') {
        //鏈接初始化一些事件
        return initIo(action.payload)
    }  
    if (action.type === 'disconnection') {
        return socket.disconnect(action.payload)
    }
    if (['@replace/state'].indexOf(action.type.toLowerCase()) === -1 && !action.escapeServer && !action.temporary) {
        //將action給定userId、targetId
        action = actionCreator(action);
        //獲得新的action隊列,並計算actions,而後更新到state上
        let newCacheActions = [...cacheActions, action];
        mapActionsToState(newCacheActions);
        //發送給服務端
        return delieverAction(action);
    }
    //這樣就只容許replace state 的action進入到store裏面,這個是我這個思路在實現undo、redo的一個要求,後面會講到
    next();
}
複製代碼

一些全局變量

具體做用後面會用到

let cacheActions = [];   //action隊列,這個和服務端的action隊列保持一致
let currentActions = []; //根據cacheActions計算的action
let redoActions = {};    //緩存每一個用戶的undo後拿掉的action
let pending = false;     //是否在發送請求
let actionsToPend = [];  //緩存發送隊列
let beforeActions = [];  //緩存pull下來的actions
let currentAction = null;//當前發送的action
let user, tid;           //用戶名和targetId
let initialState;        //初始的state
let timeline = [];       //緩存state
複製代碼

客戶端總體思路圖

Alt pic

主要講兩個地方:

(1)在computeActions的時候,碰到undo拿掉該用戶的最後一個action,並把倒數第二個action提高到最後的緣由是由於假如在該用戶倒數第二個action以後還有其餘用戶的action發生,那麼可能其餘用戶會覆蓋掉這個用戶action的設定值,那麼這個用戶undo的時候就沒法回到以前的狀態了,這時候提高至關因而undo後作了新的action,這個action就是前一次的action。這個算法是有bug的,當一個用戶undo的時候,因爲咱們會提高他倒數第二的action,這樣會致使與這個action衝突的action的修改被覆蓋。這個解決衝突的策略有點問題。若是沒有提高,那麼若是該用戶undo的時候,若是他上一個action被其餘用戶的action覆蓋了,那麼他就沒法undo回去了。這個是個痛點,我還在持續探索中,歡迎大神指教。

(2)在用戶pending的時候收到了actions,這個時候至關因而before actions。 下面貼幾個主要函數的代碼

initIo

function initIo(payload, dispatch) {
  user = payload.user;
  tid = parseInt(payload.tid, 10);
  //初始化socket
  let socket = cache.socket = io(location.protocol + '//' + location.host, {
    query: {
      user: JSON.stringify(user),
      tid
    }
  });
  //獲取初始數據
  socket.on('deliver initial data', (params) => {
    ...獲取初始的state,actions
  })
  //發送action會等待pull以前的actions
  socket.on('server send before actions', (payload, callback) => {
    pending = false;
    callback && callback();
    let {actions} = payload;
    actions = [...actions, ...beforeActions, currentAction];
    cacheActions = [...cacheActions, ...actions];
    if (actions.length > 1) {
      //證實有前面的action,須要根據actions從新計算state
      mapActionsToState();
    }
    if (actionsToPend.length) {
      let action = actionsToPend.shift();
      sendAction(action);
    }
  })
  //接收actions
  socket.on('server send actions', (payload, callback) => {
    let {actions} = payload;
    callback && callback();
    if (pending) {
      beforeActions = [...beforeActions, ...actions];
    } else {
      cacheActions = [...cacheActions, ...actions];
      mapActionsToState();
    }
  })
}
複製代碼

mapActionsToState

function mapActionsToState(actions) {
  actions = actions || cacheActions;
  if (actions.length === 0) {
    return replaceState(dispatch)(initialState);
  }
  let {newCurrentActions, newRedoActions} = computeActions(actions);
  let {same} = diff(newCurrentActions);

  let state = initialState;
  if (timeline[same]) {
    state = timeline[same];
    timeline = timeline.slice(0, same + 1);
  }
  if (same === -1) {
    timeline = [];
  }
  let differentActions = newCurrentActions.slice(same + 1);
  differentActions.forEach(action => {
    state = store.reducer(state, action);
    timeline.push(state);
  });
  currentActions = newCurrentActions;
  redoActions = newRedoActions;
  store.canUndo = () => currentActions.some(action => action.userId === user.id);
  store.canRedo = () => !!(redoActions[user.id] || []).length;
  return replaceState(dispatch)(state);
}
複製代碼

computeActions

function computeActions(actions) {
  let newCurrentActions = [];
  let newRedoActions = {};
  actions.forEach(action => {
    let type = action.type.toLowerCase();
    newRedoActions[action.userId] = newRedoActions[action.userId] || [];
    if (type !== 'redo' && type !== 'undo') {
      newCurrentActions.push(action);
      newRedoActions[action.userId] = [];
    }
    if (type === 'undo') {
      let indexes = [];
      for (let i = newCurrentActions.length - 1; i >= 0; i--) {
        if (newCurrentActions[i].userId === action.userId) {
          indexes.push(i);
        }
        if (indexes.length === 2) {
          break;
        }
      }
      if (indexes.length > 0) {
        let redo = newCurrentActions.splice(indexes[0], 1)[0];
        newRedoActions[action.userId].push(redo);
      }
      if (indexes.length > 1) {
        let temp = newCurrentActions.splice(indexes[1], 1);
        newCurrentActions.push(temp[0]);
      }
    }
    if (type === 'redo') {
      let redo = newRedoActions[action.userId].pop();
      newCurrentActions.push(redo);
    }
  });
  return {
    newCurrentActions,
    newRedoActions
  }
}
複製代碼

diff

function diff(newCurrentActions) {
  let same = -1;
  newCurrentActions.some((action, index) => {
    let currentAction = currentActions[index];
    if (currentAction && action.id === currentAction.id) {
      same = index;
      return false;
    }
    return true;
  });
  return {
    same
  }
}
複製代碼

結束語

講了一堆,不知道有沒有將本身的思路講清楚,本身的demo也運行了起來,測試只用了兩個瀏覽器來模擬測試,總感受一些併發延時出現還會有bug,後面會持續優化這個想法,添加一些自動化測試來驗證,另外,對於服務端的存儲也還沒考慮,先在只在內存中跑,會思考保存方案。但願對這方面有興趣的大神能夠指導一下

相關文章
相關標籤/搜索