(來源:https://github.com/lynckia/licode/issues/335)git
(修改:https://blog.csdn.net/u012908515/article/details/53940787)github
app.post('/rooms', roomsResource.createRoom); app.get('/rooms', roomsResource.represent); app.get('/rooms/:room', roomResource.represent); app.put('/rooms/:room', roomResource.updateRoom); app.patch('/rooms/:room', roomResource.patchRoom); app.delete('/rooms/:room', roomResource.deleteRoom); app.post('/rooms/:room/tokens', tokensResource.create); app.post('/services', servicesResource.create); app.get('/services', servicesResource.represent); app.get('/services/:service', serviceResource.represent); app.delete('/services/:service', serviceResource.deleteService); app.get('/rooms/:room/users', usersResource.getList); app.get('/rooms/:room/users/:user', userResource.getUser); app.delete('/rooms/:room/users/:user', userResource.deleteUser);
2.1簽名驗證web
app.get('*', nuveAuthenticator.authenticate); app.post('*', nuveAuthenticator.authenticate); app.put('*', nuveAuthenticator.authenticate); app.patch('*', nuveAuthenticator.authenticate); app.delete('*', nuveAuthenticator.authenticate);
每一次客戶端請求都會進行簽名驗證。算法
2.2cloudHandle.jsapi
cloudHandler提供了nuve對EC的調用:獲取房間中的全部用戶,刪除房間,刪除指定用戶;以及EC對nuve的調用:刪除token,EC的添加、狀態更新,刪除以及保活。promise
調用方法經過RPCMQ實現,他們共同維護了兩個隊列,隊列A用來傳遞調用消息,Nuve發佈調用消息後,而且會維護一個調用方法和回調函數的字典;EC從隊列A中獲取調用消息後執行調用方法,並將結果消息push進隊列B;Nuve從隊列B中獲取調用結果消息,並從字典中獲得回調函數並執行callback,刪除對應字典值。服務器
2.3服務、房間、用戶管理websocket
1.只有superService纔有權限進行service的相關操做,即配置文件中的superService = config.nuve.superserviceIDsession
2.在room模塊中currentService即當前請求的service,會維護一個rooms列表,且建立房間的時候可攜帶房間的媒體信息:room.mediaConfiguration架構
room {name: '', [p2p: bool], [data: {}], _id: ObjectId} service {name: '', key: '', rooms: Array[room], testRoom: room, testToken: token, _id: ObjectId} token {host: '', userName: '', room: '', role: '', service: '', creationDate: Date(), [use: int], [p2p: bool], _id: ObjectId}
2.4總結
EC是一個service服務,會維護一個房間集合Rooms,建立成功後會在Nuve中進行註冊,而且鏈接amqper
server.listen(global.config.erizoController.listen_port); // eslint-disable-next-line global-require, import/no-extraneous-dependencies const io = require('socket.io').listen(server, { log: false }); io.set('transports', ['websocket']);
const addECToCloudHandler = (attempt) => { if (attempt <= 0) { log.error('message: addECtoCloudHandler cloudHandler does not respond - fatal'); process.exit(-1); return; } const controller = { cloudProvider: global.config.cloudProvider.name, ip: publicIP, hostname: global.config.erizoController.hostname, port: global.config.erizoController.port, ssl: global.config.erizoController.ssl, }; nuve.addNewErizoController(controller).then((msg) => { log.info('message: succesfully added to cloudHandler'); publicIP = msg.publicIP; myId = msg.id; myState = 2; startKeepAlives(myId, publicIP); callback('callback'); }).catch((reason) => { if (reason === 'timeout') { log.warn('message: addECToCloudHandler cloudHandler does not respond, ' + `attemptsLeft: ${attempt}`); // We'll try it more! setTimeout(() => { attempt -= 1; addECToCloudHandler(attempt); }, 3000); } else { log.error('message: cannot contact cloudHandler'); } }); }; addECToCloudHandler(5);
鏈接amqper即RPCMQ,此時會將Nuve對EC方法的調用綁定
amqper.connect(() => { try { rooms.on('updated', updateMyState); amqper.setPublicRPC(rpcPublic); addToCloudHandler(() => { const rpcID = `erizoController_${myId}`; amqper.bind(rpcID, listen); }); } catch (error) { log.info(`message: Error in Erizo Controller, ${logger.objectToLog(error)}`); } });
在Nuve註冊成功後回調會新建一個RPCMQ消息消費隊列,同時會對EC服務端口監聽「connection」消息,此時會新建房間(若房間已經存在即加入反那個房間),並新建socket鏈接和client。
const listen = () => { io.sockets.on('connection', (socket) => { log.info(`message: socket connected, socketId: ${socket.id}`); const channel = new Channel(socket, nuve); channel.on('connected', (token, options, callback) => { options = options || {}; try { const room = rooms.getOrCreateRoom(myId, token.room, token.p2p); options.singlePC = getSinglePCConfig(options.singlePC); const client = room.createClient(channel, token, options); log.info(`message: client connected, clientId: ${client.id}, ` + `singlePC: ${options.singlePC}`); if (!room.p2p && global.config.erizoController.report.session_events) { const timeStamp = new Date(); amqper.broadcast('event', { room: room.id, user: client.id, type: 'user_connection', timestamp: timeStamp.getTime() }); } const streamList = []; room.streamManager.forEachPublishedStream((stream) => { streamList.push(stream.getPublicStream()); }); callback('success', { streams: streamList, id: room.id, clientId: client.id, singlePC: options.singlePC, p2p: room.p2p, defaultVideoBW: global.config.erizoController.defaultVideoBW, maxVideoBW: global.config.erizoController.maxVideoBW, iceServers: global.config.erizoController.iceServers }); } catch (e) { log.warn('message: error creating Room or Client, error:', e); } }); channel.on('reconnected', (clientId) => { rooms.forEachRoom((room) => { const client = room.getClientById(clientId); if (client !== undefined) { client.setNewChannel(channel); } }); }); socket.channel = channel; }); };
client至關於客戶端中的用戶,它會根據EC新建的channel監聽來自客戶端的socket消息,客戶端的信令消息就是在這裏進行處理的。client監聽到事件後會經過room.controller(即roomController)中的方法進行具體實現。
listenToSocketEvents() { log.debug(`message: Adding listeners to socket events, client.id: ${this.id}`); this.socketEventListeners.set('sendDataStream', this.onSendDataStream.bind(this)); this.socketEventListeners.set('connectionMessage', this.onConnectionMessage.bind(this)); this.socketEventListeners.set('streamMessage', this.onStreamMessage.bind(this)); this.socketEventListeners.set('streamMessageP2P', this.onStreamMessageP2P.bind(this)); this.socketEventListeners.set('updateStreamAttributes', this.onUpdateStreamAttributes.bind(this)); this.socketEventListeners.set('publish', this.onPublish.bind(this)); this.socketEventListeners.set('subscribe', this.onSubscribe.bind(this)); this.socketEventListeners.set('startRecorder', this.onStartRecorder.bind(this)); this.socketEventListeners.set('stopRecorder', this.onStopRecorder.bind(this)); this.socketEventListeners.set('unpublish', this.onUnpublish.bind(this)); this.socketEventListeners.set('unsubscribe', this.onUnsubscribe.bind(this)); this.socketEventListeners.set('autoSubscribe', this.onAutoSubscribe.bind(this)); this.socketEventListeners.set('getStreamStats', this.onGetStreamStats.bind(this)); this.socketEventListeners.forEach((value, key) => { this.channel.socketOn(key, value); }); this.channel.on('disconnect', this.onDisconnect.bind(this)); } stopListeningToSocketEvents() { log.debug(`message: Removing listeners to socket events, client.id: ${this.id}`); this.socketEventListeners.forEach((value, key) => { this.channel.socketRemoveListener(key, value); }); } disconnect() { this.stopListeningToSocketEvents(); this.channel.disconnect(); }
ecCloudHandler負責EC對ErizoAgent的分配(負載均衡),amqper會對綁定到MQ上的全部「ErizoAgent」消息隊列定時廣播獲取新建的ErizoAgent並放入ErizoAgent列表中。同時負責向ErizoAgent申請建立ErizoJS和刪除ErizoJS。
that.getErizoJS = (agentId, internalId, callback) => { let agentQueue = 'ErizoAgent'; if (getErizoAgent) { agentQueue = getErizoAgent(agents, agentId); } log.info(`message: createErizoJS, agentId: ${agentQueue}`); amqper.callRpc(agentQueue, 'createErizoJS', [internalId], { callback(resp) {
roomController中會建立一個ErizoList,用於EC這邊erizo的分配以及erizo狀態的維護以及保活。同時roomController會經過MQ調用ErizoJS的方法。
const erizos = new ErizoList(maxErizosUsedByRoom); ..........
ErizoJS保活 const sendKeepAlive = () => { erizos.forEachUniqueErizo((erizo) => { const erizoId = erizo.erizoId; amqper.callRpc(`ErizoJS_${erizoId}`, 'keepAlive', [], { callback: callbackFor(erizoId) }); }); }; setInterval(sendKeepAlive, KEEPALIVE_INTERVAL); ........
經過ecch申請建立ErizoJS getErizoJS = (callback, previousPosition = undefined) => { let agentId; let erizoIdForAgent; const erizoPosition = previousPosition !== undefined ? previousPosition : currentErizo += 1; if (waitForErizoInfoIfPending(erizoPosition, callback)) { return; } const erizo = erizos.get(erizoPosition); if (!erizo.erizoId) { erizos.markAsPending(erizoPosition); } else { agentId = erizo.agentId; erizoIdForAgent = erizo.erizoIdForAgent; } log.debug(`message: Getting ErizoJS, agentId: ${agentId}, ` + `erizoIdForAgent: ${erizoIdForAgent}`); ecch.getErizoJS(agentId, erizoIdForAgent, (gotErizoId, gotAgentId, gotErizoIdForAgent) => { const theErizo = erizos.get(erizoPosition); if (!theErizo.erizoId && gotErizoId !== 'timeout') { erizos.set(erizoPosition, gotErizoId, gotAgentId, gotErizoIdForAgent); } else if (theErizo.erizoId) { theErizo.agentId = gotAgentId; theErizo.erizoIdForAgent = gotErizoIdForAgent; } callback(gotErizoId, gotAgentId, gotErizoIdForAgent); }); };
總結:
ErizoController是一個信令服務器,且負責erizo的負載均衡,分配erizoJS應該在哪一個Agent上建立,client監聽客戶端socket鏈接,經過roomController調用erizo。
const launchErizoJS = (erizo) => { const id = erizo.id; log.debug(`message: launching ErizoJS, erizoId: ${id}`); let erizoProcess; let out; let err; const erizoLaunchOptions = ['./../erizoJS/erizoJS.js', id, privateIP, publicIP]; if (global.config.erizoAgent.launchDebugErizoJS) { erizoLaunchOptions.push('-d'); } if (global.config.erizoAgent.useIndividualLogFiles) { out = fs.openSync(`${global.config.erizoAgent.instanceLogDir}/erizo-${id}.log`, 'a'); err = fs.openSync(`${global.config.erizoAgent.instanceLogDir}/erizo-${id}.log`, 'a'); erizoProcess = spawn(LAUNCH_SCRIPT, erizoLaunchOptions, { detached: true, stdio: ['ignore', out, err] }); } else { erizoProcess = spawn(LAUNCH_SCRIPT, erizoLaunchOptions, { detached: true, stdio: ['ignore', 'pipe', 'pipe'] }); erizoProcess.stdout.setEncoding('utf8'); erizoProcess.stdout.on('data', (message) => { printErizoLogMessage(`[erizo-${id}]`, message.replace(/\n$/, '')); }); erizoProcess.stderr.setEncoding('utf8'); erizoProcess.stderr.on('data', (message) => { printErizoLogMessage(`[erizo-${id}]`, message.replace(/\n$/, '')); }); } erizoProcess.unref(); erizoProcess.on('close', () => { log.info(`message: closed, erizoId: ${id}`); erizos.delete(id); if (out !== undefined) { fs.close(out, (message) => { if (message) { log.error('message: error closing log file, ', `erizoId: ${id}`, 'error:', message); } }); } if (err !== undefined) { fs.close(err, (message) => { if (message) { log.error('message: error closing log file, ', `erizoId: ${id}`, 'error:', message); } }); } erizos.fill(); }); log.info(`message: launched new ErizoJS, erizoId: ${id}`); // eslint-disable-next-line no-param-reassign erizo.process = erizoProcess; };
/* * Adds a publisher to the room. This creates a new OneToManyProcessor * and a new WebRtcConnection. This WebRtcConnection will be the publisher * of the OneToManyProcessor. */ that.addPublisher = (erizoControllerId, clientId, streamId, options, callbackRpc) => { updateUptimeInfo(); let publisher; log.info('addPublisher, clientId', clientId, 'streamId', streamId); const client = getOrCreateClient(erizoControllerId, clientId, options.singlePC); if (publishers[streamId] === undefined) { // eslint-disable-next-line no-param-reassign options.publicIP = that.publicIP; // eslint-disable-next-line no-param-reassign options.privateRegexp = that.privateRegexp; //新建connection的時候會在C++層創建WebRtcConnection,附帶的有媒體設置消息, //同時會建立mediaStream,同時維護一個mediaStream字典 const connection = client.getOrCreateConnection(options); log.info('message: Adding publisher, ' + `clientId: ${clientId}, ` + `streamId: ${streamId}`, logger.objectToLog(options), logger.objectToLog(options.metadata)); publisher = new Publisher(clientId, streamId, connection, options); publishers[streamId] = publisher; publisher.initMediaStream(); publisher.on('callback', onAdaptSchemeNotify.bind(this, callbackRpc)); publisher.on('periodic_stats', onPeriodicStats.bind(this, streamId, undefined)); publisher.promise.then(() => { //更新connection狀態並初始化,options.createOffer=true就會建立offer, //而且在connection狀態到達onInitialized/onGathered的時候發送offer connection.init(options.createOffer); }); connection.onInitialized.then(() => { callbackRpc('callback', { type: 'initializing', connectionId: connection.id }); }); connection.onReady.then(() => { callbackRpc('callback', { type: 'ready' }); }); connection.onStarted.then(() => { callbackRpc('callback', { type: 'started' }); }); if (options.createOffer) { let onEvent; if (options.trickleIce) { onEvent = connection.onInitialized; } else { onEvent = connection.onGathered; } onEvent.then(() => { connection.sendOffer(); }); } } else { publisher = publishers[streamId]; if (publisher.numSubscribers === 0) { log.warn('message: publisher already set but no subscribers will ignore, ' + `code: ${WARN_CONFLICT}, streamId: ${streamId}`, logger.objectToLog(options.metadata)); } else { log.warn('message: publisher already set has subscribers will ignore, ' + `code: ${WARN_CONFLICT}, streamId: ${streamId}`); } } };
(未完待續)