基於wokerman實現即時消息推送

目前網站有兩個用到實時消息推送的功能,源碼:https://github.com/wuzhc/teamjavascript

  • 最新動態,實時顯示用戶的操做行爲
  • 消息推送,如重要消息通知,任務指派等等

考慮的問題

  • 既然要實現即時,那就少不了socketio。由於項目是PHP寫的,因此服務端直接用phpsocket.io
  • 咱們應該保存離線消息,不然若是用戶不在線,那就接受不到消息。這裏我用mongodb來存儲消息。php

    • 一是消息不須要關聯表,一條消息一個文檔
    • 二是mongodb適合作海量數據存儲,而且分片也很簡單
    • 三是消息不須要永久存儲,隨着時間推移,消息的價值性越低,能夠用固定集合來存儲消息,當數據量達到必定值時覆蓋最久的消息
  • 一個頁面鏈接一個socket,若用戶同時打開了多個頁面(即一個用戶會對應多個socketID),那麼消息應該推送到用戶每個打開的頁面。一個簡單的作法就是把用戶多個sockID加到group分組(socket.emit('group')),group分組名稱能夠是"uid:1",表示userID爲1的用戶,而後socket.broadcast.to('uid:1').emit('event_name', data);

就能夠實現對多個頁面推送消息html

實時動態

圖片描述

實時消息推送

圖片描述

客戶端

// 初始化io對象
        var socket = io('http://' + document.domain + ':2120');
        // 當socket鏈接後發送登陸請求
        socket.on('connect', function () {
            socket.emit('login', {
                uid: "<?=Yii::$app->user->id?>",
                companyID: "<?=Yii::$app->user->identity->fdCompanyID?>"
            });
        });
        // 實時消息,當服務端推送來消息時觸發
        socket.on('new_msg', function (msg) {
            if (msg.typeID == '<?= \common\config\Conf::MSG_HANDLE?>') {
                var html = '<li>' +
                    '<a href="' + msg.url + '">' +
                    '<div class="pull-left">' +
                    '<img src="<?= $directoryAsset ?>/img/user2-160x160.jpg" class="img-circle" alt="User Image"/>' +
                    '</div>' +
                    '<h4 style="overflow:hidden;text-overflow:ellipsis;">' + msg.title + '</h4>' +
                    '<p style="overflow:hidden;text-overflow:ellipsis;">' + msg.content + '</p>' +
                    '</a>' +
                    '</li>';
                $('#msg-handle').prepend(html);

                var num = $('.msg-handle-num').eq(1).text() * 1;
                $('.msg-handle-num').text(num + 1);
            }
        });
        
        // 實時動態,當服務端推送來消息時觸發
        socket.on('update_dynamic', function (msg) {
            var html = '<li>' +
                '<i class="portrait">' +
                '<img class="img-circle img-bordered-sm portrait-img" src="' + msg.portrait + '" alt="User Image"></i>' +
                '<div class="timeline-item">' +
                '<span class="time">' +
                '<i class="fa fa-clock-o"></i>' + msg.date + '</span>' +
                '<h3 class="timeline-header">' +
                '<a href="' + msg.url + '">' + msg.operator + '</a>' +
                '<span class="desc">' + msg.title + '</span>' +
                '</h3>' +
                '<div class="timeline-body">' + msg.content + '</div>' +
                '</div>' +
                '</li>';
            $('#dynamic-list').prepend(html);
        });
        
        // 在線人數
        socket.on('update_online_count', function (msg) {
            $('#online-people').html(msg);
        });

服務端

/**
     * 消息推送
     */
    public function actionStart()
    {
        // PHPSocketIO服務
        $io = new SocketIO(2120);
        // 客戶端發起鏈接事件時,設置鏈接socket的各類事件回調
        $io->on('connection', function ($socket) use ($io) {

            /** @var Connection $redis */
            $redis = Yii::$app->redis;

            // 當客戶端發來登陸事件時觸發
            $socket->on('login', function ($data) use ($socket, $redis, $io) {

                $uid = isset($data['uid']) ? $data['uid'] : null;
                $companyID = isset($data['companyID']) ? $data['companyID'] : null;

                // uid和companyID應該作下加密 by wuzhc 2018-01-28
                if (empty($uid) || empty($companyID)) {
                    return ;
                }
                $this->companyID = $companyID;

                // 已經登陸過了
                if (isset($socket->uid)) {
                    return;
                }

                $key = 'socketio:company:' . $companyID;
                $field = 'uid:' . $uid;

                // 用hash方便統計用戶打開頁面數量
                if (!$redis->hget($key, $field)) {
                    $redis->hset($key, $field, 0);
                }

                // 同個用戶打開新頁面時加1
                $redis->hincrby($key, $field, 1);

                // 加入uid分組,方便對同個用戶的全部打開頁面推送消息
                $socket->join('uid:'. $uid);

                // 加入companyID,方便對整個公司的全部用戶推送消息
                $socket->join('company:'.$companyID);

                $socket->uid = $uid;
                $socket->companyID = $companyID;

                // 整個公司在線人數更新
                $io->to('company:'.$companyID)->emit('update_online_count', $redis->hlen($key));
            });

            // 當客戶端斷開鏈接是觸發(通常是關閉網頁或者跳轉刷新致使)
            $socket->on('disconnect', function () use ($socket, $redis, $io) {
                if (!isset($socket->uid)) {
                    return;
                }

                $key = 'socketio:company:' . $socket->companyID;
                $field = 'uid:' . $socket->uid;
                $redis->hincrby($key, $field, -1);

                if ($redis->hget($key, $field) <= 0) {
                    $redis->hdel($key, $field);

                    // 某某下線了,刷新整個公司的在線人數
                    $io->to('company:'.$socket->companyID)->emit('update_online_count', $redis->hlen($key));
                }
            });
        });

        // 開始進程時,監聽2121端口,用戶數據推送
        $io->on('workerStart', function () use ($io) {
            /** @var Connection $redis */
            $redis = Yii::$app->redis;

            $httpWorker = new Worker('http://0.0.0.0:2121');
            // 當http客戶端發來數據時觸發
            $httpWorker->onMessage = function ($conn, $data) use ($io, $redis) {
                $_POST = $_POST ? $_POST : $_GET;
                switch (@$_POST['action']) {
                    case 'message':
                        $to = 'uid:' . @$_POST['receiverID'];
                        $_POST['content'] = htmlspecialchars(@$_POST['content']);
                        $_POST['title'] = htmlspecialchars(@$_POST['title']);

                        // 有指定uid則向uid所在socket組發送數據
                        if ($to) {
                            $io->to($to)->emit('new_msg', $_POST);
                        }

                        $companyID = @$_POST['companyID'];
                        $key = 'socketio:company:' . $companyID;
                        $field = 'uid:' . $to;

                        // http接口返回,若是用戶離線socket返回fail
                        if ($to && $redis->hget($key, $field)) {
                            return $conn->send('offline');
                        } else {
                            return $conn->send('ok');
                        }
                        break;
                    case 'dynamic':
                        $to = 'company:' . @$_POST['companyID'];
                        $_POST['content'] = htmlspecialchars(@$_POST['content']);
                        $_POST['title'] = htmlspecialchars(@$_POST['title']);

                        // 有指定uid則向uid所在socket組發送數據
                        if ($to) {
                            $io->to($to)->emit('update_dynamic', $_POST);
                        }

                        $companyID = @$_POST['companyID'];
                        $key = 'socketio:company:' . $companyID;

                        // http接口返回,若是用戶離線socket返回fail
                        if ($to && $redis->hlen($key)) {
                            return $conn->send('offline');
                        } else {
                            return $conn->send('ok');
                        }
                }

                return $conn->send('fail');
            };

            // 執行監聽
            $httpWorker->listen();
        });

        // 運行全部的實例
        global $argv;
        array_shift($argv);

        if (isset($argv[2]) && $argv[2] == 'daemon') {
            $argv[2] = '-d';
        }

        Worker::runAll();
    }

例子

/**
     * 新建任務
     * @param $projectID
     * @param $categoryID
     * @return string
     * @throws ForbiddenHttpException
     * @since 2018-01-26
     */
    public function actionCreate($projectID, $categoryID)
    {
        $userID = Yii::$app->user->id;

        if (!Yii::$app->user->can('createTask')) {
            throw new ForbiddenHttpException(ResponseUtil::$msg[1]);
        }

        if ($data = Yii::$app->request->post()) {
            if (empty($data['name'])) {
                ResponseUtil::jsonCORS(['status' => Conf::FAILED, 'msg' => '任務標題不能爲空']);
            }

            // 保存任務
            $taskID = TaskService::factory()->save([
                'name'       => $data['name'],
                'creatorID'  => $userID,
                'companyID'  => $this->companyID,
                'level'      => $data['level'],
                'categoryID' => $categoryID,
                'projectID'  => $projectID,
                'content'    => $data['content']
            ]);

            if ($taskID) {
                $url = Url::to(['task/view', 'taskID' => $taskID]);
                $portrait = UserService::factory()->getUserPortrait($userID);
                $username = UserService::factory()->getUserName($userID);
                $title = '建立了新任務';
                $content = $data['name'];

                // 保存操做日誌
                LogService::factory()->saveHandleLog([
                    'objectID'   => $taskID,
                    'companyID'  => $this->companyID,
                    'operatorID' => $userID,
                    'objectType' => Conf::OBJECT_TASK,
                    'content'    => $content,
                    'url'        => $url,
                    'title'      => $title,
                    'portrait'   => $portrait,
                    'operator'   => $username
                ]);

                // 動態推送
                MsgService::factory()->push('dynamic', [
                    'companyID'  => $this->companyID,
                    'operatorID' => $userID,
                    'operator'   => $username,
                    'portrait'   => $portrait,
                    'title'      => $title,
                    'content'    => $content,
                    'url'        => $url,
                    'date'       => date('Y-m-d H:i:s'),
                ]);

                ResponseUtil::jsonCORS(null, Conf::SUCCESS, '建立成功');
            } else {
                ResponseUtil::jsonCORS(null, Conf::FAILED, '建立失敗');
            }
        } else {
            return $this->render('create', [
                'projectID' => $projectID,
                'category'  => TaskCategory::findOne(['id' => $categoryID]),
            ]);
        }
    }

調用保存操做日誌底層接口

LogService::factory()->saveHandleLog($args)

參數說明

參數 說明
operatorID 操做者ID,用於用戶跳轉連接
companyID 公司ID,用於該公司下的動態
operator 操做者姓名
portrait 操做者頭像
receiver 接受者,可選
title 動態標題,例如建立任務
content 動態內容,例如建立任務的名稱
date 動態時間
url 動態跳轉連接
token 驗證
objectType 對象類型,例如任務,文檔等等
objectID 對象ID,例如任務ID,文檔ID等等
  • companyID 用於查詢該公司下的動態
  • objectType和objectID組合能夠查詢某個對象的操做日誌,例如一個任務被操做的日誌

保存消息

參數 說明
senderID 發送者ID,用於用戶跳轉連接
companyID 公司ID,用於該公司下的動態
sender 發送者姓名
portrait 發送者頭像
receiverID 接受者ID
title 動態標題,例如建立任務
content 動態內容,例如建立任務的名稱
date 動態時間
url 動態跳轉連接
typeID 消息類型,1系統通知,2管理員通知,3操做通知
  • receiverID和typeID和isRead組合能夠查詢用戶未讀消息

請求接口

MsgService::factory()->push($action, $args)

即時動態

參數 說明
operatorID 操做者ID,用於用戶跳轉連接
companyID 公司ID,用於給該公司的全部socket推送
operator 操做者姓名
portrait 操做者頭像
receiver 接受者,可選
title 動態標題,例如建立任務
content 動態內容,例如建立任務的名稱
date 動態時間
url 動態跳轉連接

即時消息

參數 說明
senderID 發送者ID,用於用戶跳轉連接
sender 發送者姓名
receiverID 接受者ID,用於給接受者的全部socket推送消息
typeID 消息類型,1系統通知,2管理員通知,3操做通知
portrait 發送者頭像
title 消息標題
content 消息內容
url 消息跳轉連接

完整代碼參考:https://github.com/wuzhc/team

相關文章
相關標籤/搜索