看源代碼,解析一次完整的 public channel
下發流程。php
此圖來自網上,若有侵權,通知我刪除前端
經過上圖,咱們至少要知道兩件事:vue
Socket.io Server
來作中轉,這是怎麼作到的呢?Brocadcasted Data
?下面來一一解析。node
BroadcastServiceProvider
主要包含了 Broadcast
相關的五個驅動器、Broadcast
事件、Broadcast
隊列等方法,比較簡單就不在解析了,今天主要說說怎麼經過 redis
來驅動 Broadcast
的。python
首先仍是簡單配置下 Broadcast
的 config
:linux
// broadcasting.php
<?php
return [
/* |-------------------------------------------------------------------------- | Default Broadcaster |-------------------------------------------------------------------------- | | This option controls the default broadcaster that will be used by the | framework when an event needs to be broadcast. You may set this to | any of the connections defined in the "connections" array below. | | Supported: "pusher", "redis", "log", "null" | */
'default' => env('BROADCAST_DRIVER', 'null'),
/* |-------------------------------------------------------------------------- | Broadcast Connections |-------------------------------------------------------------------------- | | Here you may define all of the broadcast connections that will be used | to broadcast events to other systems or over websockets. Samples of | each available type of connection are provided inside this array. | */
'connections' => [
'pusher' => [
'driver' => 'pusher',
'key' => env('PUSHER_APP_KEY'),
'secret' => env('PUSHER_APP_SECRET'),
'app_id' => env('PUSHER_APP_ID'),
'options' => [
//
],
],
'redis' => [
'driver' => 'redis',
'connection' => 'default',
],
'log' => [
'driver' => 'log',
],
'null' => [
'driver' => 'null',
],
],
];
// .env
BROADCAST_DRIVER=redis
REDIS_HOST=redis
REDIS_PASSWORD=null
REDIS_PORT=6379
複製代碼
以前瞭解過 Laravel 的 ServiceProvider
的工做原理,因此咱們就不用贅述太多這方面的流程了,咱們主要看看 BroadcastServiceProvider
的註冊方法:ios
public function register() {
$this->app->singleton(BroadcastManager::class, function ($app) {
return new BroadcastManager($app);
});
$this->app->singleton(BroadcasterContract::class, function ($app) {
return $app->make(BroadcastManager::class)->connection();
});
$this->app->alias(
BroadcastManager::class, BroadcastingFactory::class
);
}
複製代碼
咱們寫一個發送 Broadcast
demo:laravel
// routes/console.php
Artisan::command('public_echo', function () {
event(new RssPublicEvent());
})->describe('echo demo');
// app/Events/RssPublicEvent.php
<?php
namespace AppEvents;
use CarbonCarbon;
use IlluminateBroadcastingChannel;
use IlluminateQueueSerializesModels;
use IlluminateBroadcastingPrivateChannel;
use IlluminateBroadcastingPresenceChannel;
use IlluminateFoundationEventsDispatchable;
use IlluminateBroadcastingInteractsWithSockets;
use IlluminateContractsBroadcastingShouldBroadcast;
class RssPublicEvent implements ShouldBroadcast {
use Dispatchable, InteractsWithSockets, SerializesModels;
/** * Create a new event instance. * * @return void */
public function __construct() {
//
}
/** * Get the channels the event should broadcast on. * * @return IlluminateBroadcastingChannel|array */
public function broadcastOn() {
return new Channel('public_channel');
}
/** * 指定廣播數據。 * * @return array */
public function broadcastWith() {
// 返回當前時間
return ['name' => 'public_channel_'.Carbon::now()->toDateTimeString()];
}
}
複製代碼
有了這下發 Event
,咱們看看它是怎麼執行的,主要看 BroadcastEvent
的 handle
方法:git
public function handle(Broadcaster $broadcaster) {
// 主要看,有沒有自定義該 Event 名稱,沒有的話,直接使用類名
$name = method_exists($this->event, 'broadcastAs')
? $this->event->broadcastAs() : get_class($this->event);
$broadcaster->broadcast(
Arr::wrap($this->event->broadcastOn()), $name,
$this->getPayloadFromEvent($this->event)
);
}
複製代碼
先看怎麼獲取參數的 $this->getPayloadFromEvent($this->event)
:github
protected function getPayloadFromEvent($event) {
if (method_exists($event, 'broadcastWith')) {
return array_merge(
$event->broadcastWith(), ['socket' => data_get($event, 'socket')]
);
}
$payload = [];
foreach ((new ReflectionClass($event))->getProperties(ReflectionProperty::IS_PUBLIC) as $property) {
$payload[$property->getName()] = $this->formatProperty($property->getValue($event));
}
unset($payload['broadcastQueue']);
return $payload;
}
複製代碼
主要傳入咱們自定義的數組,見函數 $event->broadcastWith()
、['socket' => data_get($event, 'socket')] 和 Event
中定義的全部 public
屬性。
最後就是執行方法了:
$broadcaster->broadcast(
Arr::wrap($this->event->broadcastOn()), $name,
$this->getPayloadFromEvent($this->event)
);
複製代碼
看上面的例子,$this->event->broadcastOn()
對應的是:
return new Channel('public_channel');
複製代碼
好了,該是看看接口 Broadcaster
了。
<?php
namespace IlluminateContractsBroadcasting;
interface Broadcaster {
/** * Authenticate the incoming request for a given channel. * * @param IlluminateHttpRequest $request * @return mixed */
public function auth($request);
/** * Return the valid authentication response. * * @param IlluminateHttpRequest $request * @param mixed $result * @return mixed */
public function validAuthenticationResponse($request, $result);
/** * Broadcast the given event. * * @param array $channels * @param string $event * @param array $payload * @return void */
public function broadcast(array $channels, $event, array $payload = []);
}
複製代碼
這裏主要提供三個函數,咱們暫時看目前最關心的 broadcast()
,經過「PhpStorm」IDE,咱們也能看出,繼承這個接口的,主要就是平臺 config
配置提供的幾個驅動器:
咱們開始往下走,看 redis
驅動器:
public function broadcast(array $channels, $event, array $payload = []) {
$connection = $this->redis->connection($this->connection);
$payload = json_encode([
'event' => $event,
'data' => $payload,
'socket' => Arr::pull($payload, 'socket'),
]);
foreach ($this->formatChannels($channels) as $channel) {
$connection->publish($channel, $payload);
}
}
複製代碼
這就簡單的,無非就是建立 redis
鏈接,而後將數據 (包含 event
、data
和 socket
構成的數組),利用 redis publish
出去,等着 laravel-echo-server
監聽接收!
注:redis 有發佈 (
publish
),就會有訂閱,如:Psubscribe
。
好了,咱們開始研究 laravel-echo-server
,看它怎麼訂閱的。
在 Laravel 項目沒有專門提供該 Server,不少項目都是使用 tlaverdure/laravel-echo-server
(github.com/tlaverdure/…),其中咱們的偶像 Laradock
也集成了該工具。
因此咱們就拿 Laradock
配置來講一說。
.
|____Dockerfile
|____laravel-echo-server.json
|____package.json
複製代碼
主要包含三個文件,一個 Dockerfile 文件,用來建立容器;package.json
主要是安裝 tlaverdure/laravel-echo-server
插件;laravel-echo-server.json
文件就是與 Laravel 交互的配置文件。
看看 Dockfile 內容:
FROM node:alpine
RUN sed -i 's/dl-cdn.alpinelinux.org/mirrors.aliyun.com/g' /etc/apk/repositories
# Create app directory
RUN mkdir -p /usr/src/app
WORKDIR /usr/src/app
# Install app dependencies
COPY package.json /usr/src/app/
RUN apk add --update \n python \n python-dev \n py-pip \n build-base
RUN npm install
# Bundle app source
COPY laravel-echo-server.json /usr/src/app/laravel-echo-server.json
EXPOSE 3000
CMD [ "npm", "start" ]
複製代碼
主要是以 node:alpine
爲底,將項目部署在路徑 /usr/src/app/
,執行命令 npm install
安裝插件,參考文件 package.json
:
{
"name": "laravel-echo-server-docker",
"description": "Docker container for running laravel-echo-server",
"version": "0.0.1",
"license": "MIT",
"dependencies": {
"laravel-echo-server": "^1.3.9"
},
"scripts": {
"start": "laravel-echo-server start"
}
}
複製代碼
而後,在將配置文件加載進該路徑下,最後執行 npm start
,也就是執行命令 laravel-echo-server start
,而且放出 3000 端口。
咱們經過啓動容器,而後進入容器看看文件結構:
執行 docker-compose up laravel-echo-server
後就能夠看到 server
啓動:
一樣的,咱們也能夠下載它的源代碼,來運行達到效果。
tlaverdure/laravel-echo-server
Laravel Echo Node JS Server for Socket.io
下載源代碼:
git clone https://github.com/tlaverdure/laravel-echo-server.git
複製代碼
進入項目安裝插件:
npm install
複製代碼
執行後,直接生成 dist
文件夾:
.
|____api
| |____http-api.js
| |____index.js
|____channels
| |____channel.js
| |____index.js
| |____presence-channel.js
| |____private-channel.js
|____cli
| |____cli.js
| |____index.js
|____database
| |____database-driver.js
| |____database.js
| |____index.js
| |____redis.js
| |____sqlite.js
|____echo-server.js
|____index.js
|____log.js
|____server.js
|____subscribers
| |____http-subscriber.js
| |____index.js
| |____redis-subscriber.js
| |____subscriber.js
複製代碼
經過提供的 example
能夠知道執行的入口在於 EchoServer
的 run
方法,簡單修改下 options
配置:
var echo = require('../dist/index.js');
var options = {
"authHost": "http://lrss.learning.test",
"authEndpoint": "/broadcasting/auth",
"clients": [],
"database": "redis",
"databaseConfig": {
"redis": {
"port": "63794",
"host": "0.0.0.0"
}
},
"devMode": true,
"host": null,
"port": "6001",
"protocol": "http",
"socketio": {},
"sslCertPath": "",
"sslKeyPath": ""
};
echo.run(options);
複製代碼
測試一下看看,是否和 Laravel 服務鏈接到位:
看 Laravel-echo-server
打印結果:
說明鏈接上了。
剛纔的 dist
文件夾是經過 TypeScript
生成的結果,固然,咱們須要經過它的源代碼來解讀:
.
|____api
| |____http-api.ts
| |____index.ts
|____channels
| |____channel.ts
| |____index.ts
| |____presence-channel.ts
| |____private-channel.ts
|____cli
| |____cli.ts
| |____index.ts
|____database
| |____database-driver.ts
| |____database.ts
| |____index.ts
| |____redis.ts
| |____sqlite.ts
|____echo-server.ts
|____index.ts
|____log.ts
|____server.ts
|____subscribers
| |____http-subscriber.ts
| |____index.ts
| |____redis-subscriber.ts
| |____subscriber.ts
複製代碼
主要包含:接口 (api
)、頻道 (channels
)、 數據庫 (database
)、訂閱 (subscribers
) 等,咱們會一個個來講的。
咱們先看 echo-server.ts
的 listen
函數:
/** * Listen for incoming event from subscibers. * * @return {void} */
listen(): Promise<any> {
return new Promise((resolve, reject) => {
let http = this.httpSub.subscribe((channel, message) => {
return this.broadcast(channel, message);
});
let redis = this.redisSub.subscribe((channel, message) => {
return this.broadcast(channel, message);
});
Promise.all([http, redis]).then(() => resolve());
});
}
複製代碼
咱們主要看 this.redisSub.subscribe()
無非就是經過 redis
訂閱,而後再把 channel
和 message
廣播出去,好了,咱們看看怎麼作到訂閱的,看 redis-subscriber
的 subscribe()
函數:
/** * Subscribe to events to broadcast. * * @return {Promise<any>} */
subscribe(callback): Promise<any> {
return new Promise((resolve, reject) => {
this._redis.on('pmessage', (subscribed, channel, message) => {
try {
message = JSON.parse(message);
if (this.options.devMode) {
Log.info("Channel: " + channel);
Log.info("Event: " + message.event);
}
callback(channel, message);
} catch (e) {
if (this.options.devMode) {
Log.info("No JSON message");
}
}
});
this._redis.psubscribe('*', (err, count) => {
if (err) {
reject('Redis could not subscribe.')
}
Log.success('Listening for redis events...');
resolve();
});
});
}
複製代碼
這裏咱們就能夠看到以前提到的 redis
訂閱函數了:
this._redis.psubscribe('*', (err, count) => {
if (err) {
reject('Redis could not subscribe.')
}
Log.success('Listening for redis events...');
resolve();
});
複製代碼
好了,只要獲取信息,就能夠廣播出去了:
this._redis.on('pmessage', (subscribed, channel, message) => {
try {
message = JSON.parse(message);
if (this.options.devMode) {
Log.info("Channel: " + channel);
Log.info("Event: " + message.event);
}
// callback(channel, message);
// return this.broadcast(channel, message);
if (message.socket && this.find(message.socket)) {
this.server.io.sockets.connected[message.socket](channel)
.emit(message.event, channel, message.data);
return true
} else {
this.server.io.to(channel)
.emit(message.event, channel, message.data);
return true
}
} catch (e) {
if (this.options.devMode) {
Log.info("No JSON message");
}
}
});
複製代碼
到此,咱們已經知道 Laravel 是怎麼和 Laravel-echo-server
利用 redis
訂閱和發佈消息的。同時,也知道是用 socket.io
和前端 emit/on
交互的。
下面咱們看看前端是怎麼接收消息的。
前端須要安裝兩個插件:laravel-echo
和 socket.io-client
,除了作配置外,監聽一個公開的 channel
,寫法仍是比較簡單的:
window.Echo.channel('public_channel')
.listen('RssPublicEvent', (e) => {
that.names.push(e.name)
});
複製代碼
達到的效果就是,只要接收到服務器發出的在公開頻道 public_channel
的事件 RssPublicEvent
,就會把消息內容顯示出來:
咱們開始看看這個 Laravel-echo
源代碼了:
先看配置信息:
window.Echo = new Echo({
broadcaster: 'socket.io',
host: window.location.hostname + ':6001',
auth:
{
headers:
{
'authorization': 'Bearer ' + store.getters.token
}
}
});
複製代碼
配置的 broadcaster
是: socket.io
,全部用的是:
// echo.ts
constructor(options: any) {
this.options = options;
if (typeof Vue === 'function' && Vue.http) {
this.registerVueRequestInterceptor();
}
if (typeof axios === 'function') {
this.registerAxiosRequestInterceptor();
}
if (typeof jQuery === 'function') {
this.registerjQueryAjaxSetup();
}
if (this.options.broadcaster == 'pusher') {
this.connector = new PusherConnector(this.options);
} else if (this.options.broadcaster == 'socket.io') {
this.connector = new SocketIoConnector(this.options);
} else if (this.options.broadcaster == 'null') {
this.connector = new NullConnector(this.options);
}
}
複製代碼
接着看 channel
函數:
// echo.ts
channel(channel: string): Channel {
return this.connector.channel(channel);
}
// socketio-connector.ts
channel(name: string): SocketIoChannel {
if (!this.channels[name]) {
this.channels[name] = new SocketIoChannel(
this.socket,
name,
this.options
);
}
return this.channels[name];
}
複製代碼
主要是建立 SocketIoChannel
,咱們看看怎麼作 listen
:
// socketio-connector.ts
listen(event: string, callback: Function): SocketIoChannel {
this.on(this.eventFormatter.format(event), callback);
return this;
}
複製代碼
繼續看 on()
on(event: string, callback: Function): void {
let listener = (channel, data) => {
if (this.name == channel) {
callback(data);
}
};
this.socket.on(event, listener);
this.bind(event, listener);
}
複製代碼
到這就比較清晰了,只用利用 this.socket.on(event, listener);
注:更多有關
socketio/socket.io-client
,能夠看官網:github.com/socketio/so…
到目前爲止,經過解讀這幾個插件和源代碼,咱們基本跑通了一個 public channel
流程。
這過程主要參考:
下一步主要看看怎麼解析一個 private channel
?
看完 public channel
的流程,咱們該來講說怎麼跑通 private channel
了。
本文結合以前使用的 JWT
來作身份認證。
但這個流程,咱們要先從前端提及。
咱們先寫一個 demo:
window.Echo.private('App.User.3')
.listen('RssCreatedEvent', (e) => {
that.names.push(e.name)
});
複製代碼
先建立 private channel
:
/** * Get a private channel instance by name. * * @param {string} name * @return {SocketIoChannel} */
privateChannel(name: string): SocketIoPrivateChannel {
if (!this.channels['private-' + name]) {
this.channels['private-' + name] = new SocketIoPrivateChannel(
this.socket,
'private-' + name,
this.options
);
}
return this.channels['private-' + name];
}
複製代碼
它與 public channel
的區別在於爲 private channel
的 channel
名前頭增長 private-
。
接着咱們須要爲每次請求添加認證信息 headers
:
window.Echo = new Echo({
broadcaster: 'socket.io',
host: window.location.hostname + ':6001',
auth:
{
headers:
{
'authorization': 'Bearer ' + store.getters.token
}
}
});
複製代碼
這裏,咱們用 store.getters.token
存儲着 jwt
登陸後下發的認證 token
。
好了,只要創新頁面,就會先往 Laravel-echo-server
發送一個 subscribe
事件:
/** * Subscribe to a Socket.io channel. * * @return {object} */
subscribe(): any {
this.socket.emit('subscribe', {
channel: this.name,
auth: this.options.auth || {}
});
}
複製代碼
咱們來看看 Laravel-echo-server
怎麼接收到這個事件,並把 auth
,也就是 jwt token
發到後臺的?在研究怎麼發以前,咱們仍是先把 Laravel 的 private channel Event
建好。
咱們建立 Laravel PrivateChannel
:
// RssCreatedEvent
<?php
namespace AppEvents;
use AppUser;
use CarbonCarbon;
use IlluminateBroadcastingChannel;
use IlluminateQueueSerializesModels;
use IlluminateBroadcastingPrivateChannel;
use IlluminateBroadcastingPresenceChannel;
use IlluminateFoundationEventsDispatchable;
use IlluminateBroadcastingInteractsWithSockets;
use IlluminateContractsBroadcastingShouldBroadcast;
class RssCreatedEvent implements ShouldBroadcast {
use Dispatchable, InteractsWithSockets, SerializesModels;
/** * Create a new event instance. * * @return void */
public function __construct() {
}
/** * Get the channels the event should broadcast on. * * @return IlluminateBroadcastingChannel|array */
public function broadcastOn() {
// 14. 建立頻道
info('broadcastOn');
return new PrivateChannel('App.User.3');
}
/** * 指定廣播數據。 * * @return array */
public function broadcastWith() {
// 返回當前時間
return ['name' => 'private_channel_'.Carbon::now()->toDateTimeString()];
}
}
// routes/console.php
Artisan::command('echo', function () {
event(new RssCreatedEvent());
})->describe('echo demo');
複製代碼
與 jwt 結合
修改 BroadcastServiceprovider
的認證路由爲 api:
// 修改前
// Broadcast::routes();
// 修改後
Broadcast::routes(["middleware" => "auth:api"]);
複製代碼
固然,咱們的認證方式也已經改爲 JWT 方式了:
<?php
return [
/* |-------------------------------------------------------------------------- | Authentication Defaults |-------------------------------------------------------------------------- | | This option controls the default authentication "guard" and password | reset options for your application. You may change these defaults | as required, but they're a perfect start for most applications. | */
'defaults' => [
'guard' => 'api',
'passwords' => 'users',
],
...
'guards' => [
'web' => [
'driver' => 'session',
'provider' => 'users',
],
'api' => [
'driver' => 'jwt',
'provider' => 'users',
],
],
複製代碼
最後,別忘了把 BroadcastServiceprovider
加入 app.config
中。
注:更多有關
JWT
歡迎查看以前的文章
有了前端和後臺的各自 private channel
,那必然須要用 Laravel-echo-server
來銜接。
先說回怎麼接收前端發過來的 subscribe
事件和 token
。
首先看 echo-server
初始化:
init(io: any): Promise<any> {
return new Promise((resolve, reject) => {
this.channel = new Channel(io, this.options);
this.redisSub = new RedisSubscriber(this.options);
this.httpSub = new HttpSubscriber(this.server.express, this.options);
this.httpApi = new HttpApi(io, this.channel, this.server.express, this.options.apiOriginAllow);
this.httpApi.init();
this.onConnect();
this.listen().then(() => resolve(), err => Log.error(err));
});
}
複製代碼
其中,this.onConnect()
:
onConnect(): void {
this.server.io.on('connection', socket => {
this.onSubscribe(socket);
this.onUnsubscribe(socket);
this.onDisconnecting(socket);
this.onClientEvent(socket);
});
}
複製代碼
主要註冊了四個事件,第一個就是咱們須要關注的:
onSubscribe(socket: any): void {
socket.on('subscribe', data => {
this.channel.join(socket, data);
});
}
複製代碼
這就和前端呼應上了,接着看 join
函數:
join(socket, data): void {
if (data.channel) {
if (this.isPrivate(data.channel)) {
this.joinPrivate(socket, data);
} else {
socket.join(data.channel);
this.onJoin(socket, data.channel);
}
}
}
複製代碼
看 isPrivate()
函數:
/** * Channels and patters for private channels. * * @type {array} */
protected _privateChannels: string[] = ['private-*', 'presence-*'];
/** * Check if the incoming socket connection is a private channel. * * @param {string} channel * @return {boolean} */
isPrivate(channel: string): boolean {
let isPrivate = false;
this._privateChannels.forEach(privateChannel => {
let regex = new RegExp(privateChannel.replace('*', '.*'));
if (regex.test(channel)) isPrivate = true;
});
return isPrivate;
}
複製代碼
這也是印證了,爲何 private channel
要以 private-
開頭了。接着看代碼:
/** * Join private channel, emit data to presence channels. * * @param {object} socket * @param {object} data * @return {void} */
joinPrivate(socket: any, data: any): void {
this.private.authenticate(socket, data).then(res => {
socket.join(data.channel);
if (this.isPresence(data.channel)) {
var member = res.channel_data;
try {
member = JSON.parse(res.channel_data);
} catch (e) { }
this.presence.join(socket, data.channel, member);
}
this.onJoin(socket, data.channel);
}, error => {
if (this.options.devMode) {
Log.error(error.reason);
}
this.io.sockets.to(socket.id)
.emit('subscription_error', data.channel, error.status);
});
}
複製代碼
就由於是 private channel
,因此須要走認證流程:
/** * Send authentication request to application server. * * @param {any} socket * @param {any} data * @return {Promise<any>} */
authenticate(socket: any, data: any): Promise<any> {
let options = {
url: this.authHost(socket) + this.options.authEndpoint,
form: { channel_name: data.channel },
headers: (data.auth && data.auth.headers) ? data.auth.headers : {},
rejectUnauthorized: false
};
return this.serverRequest(socket, options);
}
/** * Send a request to the server. * * @param {any} socket * @param {any} options * @return {Promise<any>} */
protected serverRequest(socket: any, options: any): Promise<any> {
return new Promise<any>((resolve, reject) => { options.headers = this.prepareHeaders(socket, options); let body; this.request.post(options, (error, response, body, next) => { if (error) { if (this.options.devMode) { Log.error(`[${new Date().toLocaleTimeString()}] - Error authenticating ${socket.id} for ${options.form.channel_name}`); Log.error(error); } reject({ reason: 'Error sending authentication request.', status: 0 }); } else if (response.statusCode !== 200) { if (this.options.devMode) { Log.warning(`[${new Date().toLocaleTimeString()}] - ${socket.id} could not be authenticated to ${options.form.channel_name}`); Log.error(response.body); } reject({ reason: 'Client can not be authenticated, got HTTP status ' + response.statusCode, status: response.statusCode }); } else { if (this.options.devMode) { Log.info(`[${new Date().toLocaleTimeString()}] - ${socket.id} authenticated for: ${options.form.channel_name}`); } try { body = JSON.parse(response.body); } catch (e) { body = response.body } resolve(body); } }); }); } 複製代碼
到此,相信你就看的出來了,會把前端發過來的 auth.headers
加入發日後臺的請求中。
好了,咱們測試下,先刷新頁面,加入 private channel
中,
而後在後臺,發一個事件出來,看前端是否是能夠接收
到此,基本就解釋了怎麼創建 private channel
,而後利用 jwt
認證身份,最後將 Event
內容下發出去。
接下來咱們就能夠看看怎麼建 chat room
,然更多客戶端加入進來聊天。
*未完待續