licode服務端總結

1.系統架構

image

(來源:https://github.com/lynckia/licode/issues/335)git

2.nuve模塊

 

nuve-api-architecture

(修改:https://blog.csdn.net/u012908515/article/details/53940787)github

app.post('/rooms', roomsResource.createRoom);
app.get('/rooms', roomsResource.represent);

app.get('/rooms/:room', roomResource.represent);
app.put('/rooms/:room', roomResource.updateRoom);
app.patch('/rooms/:room', roomResource.patchRoom);
app.delete('/rooms/:room', roomResource.deleteRoom);

app.post('/rooms/:room/tokens', tokensResource.create);

app.post('/services', servicesResource.create);
app.get('/services', servicesResource.represent);

app.get('/services/:service', serviceResource.represent);
app.delete('/services/:service', serviceResource.deleteService);

app.get('/rooms/:room/users', usersResource.getList);

app.get('/rooms/:room/users/:user', userResource.getUser);
app.delete('/rooms/:room/users/:user', userResource.deleteUser);

2.1簽名驗證web

app.get('*', nuveAuthenticator.authenticate);
app.post('*', nuveAuthenticator.authenticate);
app.put('*', nuveAuthenticator.authenticate);
app.patch('*', nuveAuthenticator.authenticate);
app.delete('*', nuveAuthenticator.authenticate);

每一次客戶端請求都會進行簽名驗證。算法

2.2cloudHandle.jsapi

image

cloudHandler提供了nuve對EC的調用:獲取房間中的全部用戶,刪除房間,刪除指定用戶;以及EC對nuve的調用:刪除token,EC的添加、狀態更新,刪除以及保活。promise

調用方法經過RPCMQ實現,他們共同維護了兩個隊列,隊列A用來傳遞調用消息,Nuve發佈調用消息後,而且會維護一個調用方法和回調函數的字典;EC從隊列A中獲取調用消息後執行調用方法,並將結果消息push進隊列B;Nuve從隊列B中獲取調用結果消息,並從字典中獲得回調函數並執行callback,刪除對應字典值。服務器

2.3服務、房間、用戶管理websocket

image

1.只有superService纔有權限進行service的相關操做,即配置文件中的superService = config.nuve.superserviceIDsession

2.在room模塊中currentService即當前請求的service,會維護一個rooms列表,且建立房間的時候可攜帶房間的媒體信息:room.mediaConfiguration架構

3.Nuve中的user管理僅僅提供查詢和刪除功能,經過房間id,對EC發出獲取當前房間用戶信息,或者刪除該用戶
 
room {name: '', [p2p: bool], [data: {}], _id: ObjectId}
service {name: '', key: '', rooms: Array[room], testRoom: room, testToken: token, _id: ObjectId}
token {host: '', userName: '', room: '', role: '', service: '', creationDate: Date(), [use: int], [p2p: bool], _id: ObjectId}

2.4總結

     Nuve功能至關於一個負載均衡,它負責入庫的這些信息可以更好的輔助這個功能,ch_policies決定了負載均衡算法,默認取EC隊列的首位。EC服務的啓動並不歸Nuve管理,EC建立成功後在Nuve中入庫,Nuve僅僅維護列表以及保活,負責EC的分配。

3.ErizoController

 

 EC是一個service服務,會維護一個房間集合Rooms,建立成功後會在Nuve中進行註冊,而且鏈接amqper

server.listen(global.config.erizoController.listen_port);
  // eslint-disable-next-line global-require, import/no-extraneous-dependencies
const io = require('socket.io').listen(server, { log: false });

io.set('transports', ['websocket']);
  const addECToCloudHandler = (attempt) => {
    if (attempt <= 0) {
      log.error('message: addECtoCloudHandler cloudHandler does not respond - fatal');
      process.exit(-1);
      return;
    }

    const controller = {
      cloudProvider: global.config.cloudProvider.name,
      ip: publicIP,
      hostname: global.config.erizoController.hostname,
      port: global.config.erizoController.port,
      ssl: global.config.erizoController.ssl,
    };
    nuve.addNewErizoController(controller).then((msg) => {
      log.info('message: succesfully added to cloudHandler');

      publicIP = msg.publicIP;
      myId = msg.id;
      myState = 2;

      startKeepAlives(myId, publicIP);
      callback('callback');
    }).catch((reason) => {
      if (reason === 'timeout') {
        log.warn('message: addECToCloudHandler cloudHandler does not respond, ' +
                     `attemptsLeft: ${attempt}`);

            // We'll try it more!
        setTimeout(() => {
          attempt -= 1;
          addECToCloudHandler(attempt);
        }, 3000);
      } else {
        log.error('message: cannot contact cloudHandler');
      }
    });
  };
  addECToCloudHandler(5);

鏈接amqper即RPCMQ,此時會將Nuve對EC方法的調用綁定

amqper.connect(() => {
  try {
    rooms.on('updated', updateMyState);
    amqper.setPublicRPC(rpcPublic);

    addToCloudHandler(() => {
      const rpcID = `erizoController_${myId}`;
      amqper.bind(rpcID, listen);
    });
  } catch (error) {
    log.info(`message: Error in Erizo Controller, ${logger.objectToLog(error)}`);
  }
});

在Nuve註冊成功後回調會新建一個RPCMQ消息消費隊列,同時會對EC服務端口監聽「connection」消息,此時會新建房間(若房間已經存在即加入反那個房間),並新建socket鏈接和client。

const listen = () => {
  io.sockets.on('connection', (socket) => {
    log.info(`message: socket connected, socketId: ${socket.id}`);

    const channel = new Channel(socket, nuve);

    channel.on('connected', (token, options, callback) => {
      options = options || {};
      try {
        const room = rooms.getOrCreateRoom(myId, token.room, token.p2p); options.singlePC = getSinglePCConfig(options.singlePC); const client = room.createClient(channel, token, options);
        log.info(`message: client connected, clientId: ${client.id}, ` +
            `singlePC: ${options.singlePC}`);
        if (!room.p2p && global.config.erizoController.report.session_events) {
          const timeStamp = new Date();
          amqper.broadcast('event', { room: room.id,
            user: client.id,
            type: 'user_connection',
            timestamp: timeStamp.getTime() });
        }

        const streamList = [];
        room.streamManager.forEachPublishedStream((stream) => {
          streamList.push(stream.getPublicStream());
        });

        callback('success', { streams: streamList,
          id: room.id,
          clientId: client.id,
          singlePC: options.singlePC,
          p2p: room.p2p,
          defaultVideoBW: global.config.erizoController.defaultVideoBW,
          maxVideoBW: global.config.erizoController.maxVideoBW,
          iceServers: global.config.erizoController.iceServers });
      } catch (e) {
        log.warn('message: error creating Room or Client, error:', e);
      }
    });

    channel.on('reconnected', (clientId) => {
      rooms.forEachRoom((room) => {
        const client = room.getClientById(clientId);
        if (client !== undefined) {
          client.setNewChannel(channel);
        }
      });
    });

    socket.channel = channel;
  });
};

 client至關於客戶端中的用戶,它會根據EC新建的channel監聽來自客戶端的socket消息,客戶端的信令消息就是在這裏進行處理的。client監聽到事件後會經過room.controller(即roomController)中的方法進行具體實現。

listenToSocketEvents() {
    log.debug(`message: Adding listeners to socket events, client.id: ${this.id}`);
    this.socketEventListeners.set('sendDataStream', this.onSendDataStream.bind(this));
    this.socketEventListeners.set('connectionMessage', this.onConnectionMessage.bind(this));
    this.socketEventListeners.set('streamMessage', this.onStreamMessage.bind(this));
    this.socketEventListeners.set('streamMessageP2P', this.onStreamMessageP2P.bind(this));
    this.socketEventListeners.set('updateStreamAttributes', this.onUpdateStreamAttributes.bind(this));
    this.socketEventListeners.set('publish', this.onPublish.bind(this));
    this.socketEventListeners.set('subscribe', this.onSubscribe.bind(this));
    this.socketEventListeners.set('startRecorder', this.onStartRecorder.bind(this));
    this.socketEventListeners.set('stopRecorder', this.onStopRecorder.bind(this));
    this.socketEventListeners.set('unpublish', this.onUnpublish.bind(this));
    this.socketEventListeners.set('unsubscribe', this.onUnsubscribe.bind(this));
    this.socketEventListeners.set('autoSubscribe', this.onAutoSubscribe.bind(this));
    this.socketEventListeners.set('getStreamStats', this.onGetStreamStats.bind(this));
    this.socketEventListeners.forEach((value, key) => {
      this.channel.socketOn(key, value);
    });
    this.channel.on('disconnect', this.onDisconnect.bind(this));
  }
  stopListeningToSocketEvents() {
    log.debug(`message: Removing listeners to socket events, client.id: ${this.id}`);
    this.socketEventListeners.forEach((value, key) => {
      this.channel.socketRemoveListener(key, value);
    });
  }

  disconnect() {
    this.stopListeningToSocketEvents();
    this.channel.disconnect();
  }

 ecCloudHandler負責EC對ErizoAgent的分配(負載均衡),amqper會對綁定到MQ上的全部「ErizoAgent」消息隊列定時廣播獲取新建的ErizoAgent並放入ErizoAgent列表中。同時負責向ErizoAgent申請建立ErizoJS和刪除ErizoJS。

that.getErizoJS = (agentId, internalId, callback) => {
    let agentQueue = 'ErizoAgent';

    if (getErizoAgent) {
      agentQueue = getErizoAgent(agents, agentId);
    }

    log.info(`message: createErizoJS, agentId: ${agentQueue}`);

    amqper.callRpc(agentQueue, 'createErizoJS', [internalId], { callback(resp) {

roomController中會建立一個ErizoList,用於EC這邊erizo的分配以及erizo狀態的維護以及保活。同時roomController會經過MQ調用ErizoJS的方法。

erizoPosition->erizo(undefine)->ecch.getErizoJS->分配Agent->Agent建立erizo
const erizos = new ErizoList(maxErizosUsedByRoom);
..........
ErizoJS保活 const sendKeepAlive
= () => { erizos.forEachUniqueErizo((erizo) => { const erizoId = erizo.erizoId; amqper.callRpc(`ErizoJS_${erizoId}`, 'keepAlive', [], { callback: callbackFor(erizoId) }); }); }; setInterval(sendKeepAlive, KEEPALIVE_INTERVAL); ........
經過ecch申請建立ErizoJS getErizoJS
= (callback, previousPosition = undefined) => { let agentId; let erizoIdForAgent; const erizoPosition = previousPosition !== undefined ? previousPosition : currentErizo += 1; if (waitForErizoInfoIfPending(erizoPosition, callback)) { return; } const erizo = erizos.get(erizoPosition); if (!erizo.erizoId) { erizos.markAsPending(erizoPosition); } else { agentId = erizo.agentId; erizoIdForAgent = erizo.erizoIdForAgent; } log.debug(`message: Getting ErizoJS, agentId: ${agentId}, ` + `erizoIdForAgent: ${erizoIdForAgent}`); ecch.getErizoJS(agentId, erizoIdForAgent, (gotErizoId, gotAgentId, gotErizoIdForAgent) => { const theErizo = erizos.get(erizoPosition); if (!theErizo.erizoId && gotErizoId !== 'timeout') { erizos.set(erizoPosition, gotErizoId, gotAgentId, gotErizoIdForAgent); } else if (theErizo.erizoId) { theErizo.agentId = gotAgentId; theErizo.erizoIdForAgent = gotErizoIdForAgent; } callback(gotErizoId, gotAgentId, gotErizoIdForAgent); }); };

總結:

ErizoController是一個信令服務器,且負責erizo的負載均衡,分配erizoJS應該在哪一個Agent上建立,client監聽客戶端socket鏈接,經過roomController調用erizo。

4.ErizoAgent

 

 ErizoAgent是erizoJS的代理,代理啓動後會被EC定時掃描到並存入EC的Agent列表中,而後它會被分配給不一樣的使用者,監聽ErizoJS的建立和刪除申請,erizoJS的建立就是Agent啓動子進程運行erizoJS主程序(父子進程會分離異步)。
const launchErizoJS = (erizo) => {
  const id = erizo.id;
  log.debug(`message: launching ErizoJS, erizoId: ${id}`);
  let erizoProcess; let out; let
    err;
  const erizoLaunchOptions = ['./../erizoJS/erizoJS.js', id, privateIP, publicIP];
  if (global.config.erizoAgent.launchDebugErizoJS) {
    erizoLaunchOptions.push('-d');
  }

  if (global.config.erizoAgent.useIndividualLogFiles) {
    out = fs.openSync(`${global.config.erizoAgent.instanceLogDir}/erizo-${id}.log`, 'a');
    err = fs.openSync(`${global.config.erizoAgent.instanceLogDir}/erizo-${id}.log`, 'a');
    erizoProcess = spawn(LAUNCH_SCRIPT, erizoLaunchOptions,
      { detached: true, stdio: ['ignore', out, err] });
  } else {
    erizoProcess = spawn(LAUNCH_SCRIPT, erizoLaunchOptions,
      { detached: true, stdio: ['ignore', 'pipe', 'pipe'] });
    erizoProcess.stdout.setEncoding('utf8');
    erizoProcess.stdout.on('data', (message) => {
      printErizoLogMessage(`[erizo-${id}]`, message.replace(/\n$/, ''));
    });
    erizoProcess.stderr.setEncoding('utf8');
    erizoProcess.stderr.on('data', (message) => {
      printErizoLogMessage(`[erizo-${id}]`, message.replace(/\n$/, ''));
    });
  }
  erizoProcess.unref();
  erizoProcess.on('close', () => {
    log.info(`message: closed, erizoId: ${id}`);
    erizos.delete(id);

    if (out !== undefined) {
      fs.close(out, (message) => {
        if (message) {
          log.error('message: error closing log file, ',
                              `erizoId: ${id}`, 'error:', message);
        }
      });
    }

    if (err !== undefined) {
      fs.close(err, (message) => {
        if (message) {
          log.error('message: error closing log file, ',
                              `erizoId: ${id}`, 'error:', message);
        }
      });
    }
    erizos.fill();
  });
  log.info(`message: launched new ErizoJS, erizoId: ${id}`);
  // eslint-disable-next-line no-param-reassign
  erizo.process = erizoProcess;
};

 

5.ErizoJS

 
 在erizoJS中client依然是與用戶一一對應,與EC中的操做是異步的,只有在開始推流即addPublisher的時候纔會將對應的client建立,併爲對應的stream創建connection,在非singlePC模式下,每個stream都會與客戶端創建一個流鏈接,每個輸入流是一個Publisher,訂閱者是Subscriber,一個client會有多個Publisher,而每一個Publisher會有多個Subscriber,每一個Publisher都會有一個媒體轉發器OneToManyProcessor。底層的C++ erizo實現了 stream connection的創建,以及媒體轉發和帶寬控制。
 
  /*
   * Adds a publisher to the room. This creates a new OneToManyProcessor
   * and a new WebRtcConnection. This WebRtcConnection will be the publisher
   * of the OneToManyProcessor.
   */
  that.addPublisher = (erizoControllerId, clientId, streamId, options, callbackRpc) => {
    updateUptimeInfo();
    let publisher;
    log.info('addPublisher, clientId', clientId, 'streamId', streamId);
    const client = getOrCreateClient(erizoControllerId, clientId, options.singlePC);

    if (publishers[streamId] === undefined) {
      // eslint-disable-next-line no-param-reassign
      options.publicIP = that.publicIP;
      // eslint-disable-next-line no-param-reassign
      options.privateRegexp = that.privateRegexp;
      //新建connection的時候會在C++層創建WebRtcConnection,附帶的有媒體設置消息,
      //同時會建立mediaStream,同時維護一個mediaStream字典
      const connection = client.getOrCreateConnection(options);
      log.info('message: Adding publisher, ' +
        `clientId: ${clientId}, ` +
        `streamId: ${streamId}`,
        logger.objectToLog(options),
        logger.objectToLog(options.metadata));
      publisher = new Publisher(clientId, streamId, connection, options);
      publishers[streamId] = publisher;
      publisher.initMediaStream();
      publisher.on('callback', onAdaptSchemeNotify.bind(this, callbackRpc));
      publisher.on('periodic_stats', onPeriodicStats.bind(this, streamId, undefined));
      publisher.promise.then(() => {
        //更新connection狀態並初始化,options.createOffer=true就會建立offer,
        //而且在connection狀態到達onInitialized/onGathered的時候發送offer
        connection.init(options.createOffer);
      });
      connection.onInitialized.then(() => {
        callbackRpc('callback', { type: 'initializing', connectionId: connection.id });
      });
      connection.onReady.then(() => {
        callbackRpc('callback', { type: 'ready' });
      });
      connection.onStarted.then(() => {
        callbackRpc('callback', { type: 'started' });
      });
      if (options.createOffer) {
        let onEvent;
        if (options.trickleIce) {
          onEvent = connection.onInitialized;
        } else {
          onEvent = connection.onGathered;
        }
        onEvent.then(() => {
          connection.sendOffer();
        });
      }
    } else {
      publisher = publishers[streamId];
      if (publisher.numSubscribers === 0) {
        log.warn('message: publisher already set but no subscribers will ignore, ' +
          `code: ${WARN_CONFLICT}, streamId: ${streamId}`,
          logger.objectToLog(options.metadata));
      } else {
        log.warn('message: publisher already set has subscribers will ignore, ' +
          `code: ${WARN_CONFLICT}, streamId: ${streamId}`);
      }
    }
  };

6.時序圖

(未完待續)

相關文章
相關標籤/搜索