websocket & swoole & swoft

1.Websocket

1.OSI七層與TCP/IP五層模型

image.png

2.socket

Socket其實是對TCP/IP協議的封裝,自己並非協議,而是一個調用接口(API).

Socket的出現只是使得程序員更方便地使用TCP/IP協議棧而已,是對TCP/IP協議的抽象,從而造成了咱們知道的一些最基本的函數接口.

好比create、listen、connect、accept、send、read和write.

3.簡介

WebSocket 是一種網絡通訊協議.

WebSocket 協議在2008年誕生,2011年成爲國際標準。全部瀏覽器都已經支持了.

服務器能夠主動向客戶端推送信息,客戶端也能夠主動向服務器發送信息,屬於服務器推送技術的一種。

`長連接`
# 服務器推送技術
1    Webpush
2    HTTP server push
3    Pushlet
4    Long polling
5    Flash XMLSocket relays
6    Reliable Group Data Delivery (RGDD)
7    Push notification

4.與HTTP的對比

image.png

5.特色

(1)創建在 TCP 協議之上,服務器端的實現比較容易。

(2)與 HTTP 協議有着良好的兼容性。默認端口也是`80`和`443`,而且握手階段採用 HTTP 協議,所以握手時不容易屏蔽,能經過各類 HTTP 代理服務器。

(3)數據格式比較輕量,性能開銷小,通訊高效。

(4)能夠發送文本,也能夠發送二進制數據。

(5)沒有`同源限制`,客戶端能夠與任意服務器通訊。

(6)協議標識符是`ws`(若是加密,則爲wss),服務器網址就是 URL。   (scheme)
ws://example.com:80/uri
wss://example.com:80/uri

6.示例

# 客戶端
let ws = new WebSocket("wss://echo.websocket.org");

ws.onopen = function(evt) { 
  console.log("Connection open ..."); 
  ws.send("Hello WebSockets!");
};

ws.onmessage = function(evt) {
  console.log( "Received Message: " + evt.data);
  ws.close();
};

ws.onclose = function(evt) {
  console.log("Connection closed.");
};

# 服務端
php -> socket_create(),  new Socket
python -> socket,
go -> gorilla/websocket
node -> socket.io / socket.io -client
# 調試
https://jsbin.com/?js,console,output
# webSocket.readyState
readyState屬性返回實例對象的當前狀態,共有四種。
CONNECTING:值爲0,表示正在鏈接。
OPEN:值爲1,表示鏈接成功,能夠通訊了。
CLOSING:值爲2,表示鏈接正在關閉。
CLOSED:值爲3,表示鏈接已經關閉,或者打開鏈接失敗。

2.swoole

1.簡介

做者 韓天峯 
pecl開發組成員

php擴展

Swoole 是一個 PHP 的 `協程` `高性能` 網絡通訊引擎,使用 C/C++ 語言編寫,提供了多種通訊協議的網絡服務器和客戶端模塊。能夠方便快速的實現 TCP/UDP服務、高性能Web、WebSocket服務、物聯網、實時通信、遊戲、微服務等,使 PHP 再也不侷限於傳統的 Web 領域。

4.4+以後, 全面協程化, PHP 協程框架
# 文檔
https://www.swoole.com/

php-fpm` 處理請求php

sapi---> 初始化http的環境變量node

phpcore ---> 初始化php拓展, 初始化上下文環境python

image.png

執行php腳本mysql

image.png

2.示例

1.http server
$http = new Swoole\Http\Server("127.0.0.1", 9501);

    $http->on("start", function ($server) {
        echo "Swoole http server is started at http://127.0.0.1:9501\n";
    });

    $http->on("request", function ($request, $response) {
        $response->header("Content-Type", "text/plain");
        $response->end("Hello World\n");
    });

    $http->start();
2.websocket server
$server = new Swoole\Websocket\Server("127.0.0.1", 9502);

    $server->on('open', function($server, $req) {
        echo "connection open: {$req->fd}\n";
    });

    $server->on('message', function($server, $frame) {
        echo "received message: {$frame->data}\n";
        $server->push($frame->fd, json_encode(["hello", "world"]));
    });

    $server->on('close', function($server, $fd) {
        echo "connection close: {$fd}\n";
    });

    $server->start();
3.tcp server
$server = new Swoole\Server("127.0.0.1", 9503);
    $server->on('connect', function ($server, $fd){
        echo "connection open: {$fd}\n";
    });
    $server->on('receive', function ($server, $fd, $reactor_id, $data) {
        $server->send($fd, "Swoole: {$data}");
        $server->close($fd);
    });
    $server->on('close', function ($server, $fd) {
        echo "connection close: {$fd}\n";
    });
    $server->start();
4.udp server
$serv = new Swoole\Server("127.0.0.1", 9502, SWOOLE_PROCESS, SWOOLE_SOCK_UDP);

    //監聽數據接收事件
    $serv->on('Packet', function ($serv, $data, $clientInfo) {
        $serv->sendto($clientInfo['address'], $clientInfo['port'], "Server ".$data);
        var_dump($clientInfo);
    });

    //啓動服務器
    $serv->start();
5.task
$server = new Swoole\Server("127.0.0.1", 9502);
    $server->set(array('task_worker_num' => 4));
    $server->on('receive', function($server, $fd, $reactor_id, $data) {
        $task_id = $server->task("Async");
        echo "Dispatch AsyncTask: [id=$task_id]\n";
    });
    $server->on('task', function ($server, $task_id, $reactor_id, $data) {
        echo "New AsyncTask[id=$task_id]\n";
        $server->finish("$data -> OK");
    });
    $server->on('finish', function ($server, $task_id, $data) {
        echo "AsyncTask[$task_id] finished: {$data}\n";
    });
    $server->start();
6.coroutine
//睡眠 1 萬次,讀取,寫入,檢查和刪除文件 1 萬次,使用 PDO 和 MySQLi 與數據庫通訊 1 萬次,建立 TCP 服務器和多個客戶端相互通訊 1 萬次,
//建立 UDP 服務器和多個客戶端到相互通訊 1 萬次...... 一切都在一個進程一秒內完美完成!

   Swoole\Runtime::enableCoroutine();//此行代碼後,文件操做,sleep,Mysqli,PDO,streams等都變成異步IO,見文檔'一鍵協程化'章節
   $s = microtime(true);
    //Co/run()見文檔'協程容器'章節
   Co\run(function() {
    // i just want to sleep...
    for ($c = 100; $c--;) {
        go(function () {
            for ($n = 100; $n--;) {
                usleep(1000);
            }
        });
    }

    // 10k file read and write
    for ($c = 100; $c--;) {
        go(function () use ($c) {
            $tmp_filename = "/tmp/test-{$c}.php";
            for ($n = 100; $n--;) {
                $self = file_get_contents(__FILE__);
                file_put_contents($tmp_filename, $self);
                assert(file_get_contents($tmp_filename) === $self);
            }
            unlink($tmp_filename);
        });
    }

    // 10k pdo and mysqli read
    for ($c = 50; $c--;) {
        go(function () {
            $pdo = new PDO('mysql:host=127.0.0.1;dbname=test;charset=utf8', 'root', 'root');
            $statement = $pdo->prepare('SELECT * FROM `user`');
            for ($n = 100; $n--;) {
                $statement->execute();
                assert(count($statement->fetchAll()) > 0);
            }
        });
    }
    for ($c = 50; $c--;) {
        go(function () {
            $mysqli = new Mysqli('127.0.0.1', 'root', 'root', 'test');
            $statement = $mysqli->prepare('SELECT `id` FROM `user`');
            for ($n = 100; $n--;) {
                $statement->bind_result($id);
                $statement->execute();
                $statement->fetch();
                assert($id > 0);
            }
        });
    }

    // php_stream tcp server & client with 12.8k requests in single process
    function tcp_pack(string $data): string
    {
        return pack('n', strlen($data)) . $data;
    }

    function tcp_length(string $head): int
    {
        return unpack('n', $head)[1];
    }

    go(function () {
        $ctx = stream_context_create(['socket' => ['so_reuseaddr' => true, 'backlog' => 128]]);
        $socket = stream_socket_server(
            'tcp://0.0.0.0:9502',
            $errno, $errstr, STREAM_SERVER_BIND | STREAM_SERVER_LISTEN, $ctx
        );
        if (!$socket) {
            echo "$errstr ($errno)\n";
        } else {
            $i = 0;
            while ($conn = stream_socket_accept($socket, 1)) {
                stream_set_timeout($conn, 5);
                for ($n = 100; $n--;) {
                    $data = fread($conn, tcp_length(fread($conn, 2)));
                    assert($data === "Hello Swoole Server #{$n}!");
                    fwrite($conn, tcp_pack("Hello Swoole Client #{$n}!"));
                }
                if (++$i === 128) {
                    fclose($socket);
                    break;
                }
            }
        }
    });
    for ($c = 128; $c--;) {
        go(function () {
            $fp = stream_socket_client("tcp://127.0.0.1:9502", $errno, $errstr, 1);
            if (!$fp) {
                echo "$errstr ($errno)\n";
            } else {
                stream_set_timeout($fp, 5);
                for ($n = 100; $n--;) {
                    fwrite($fp, tcp_pack("Hello Swoole Server #{$n}!"));
                    $data = fread($fp, tcp_length(fread($fp, 2)));
                    assert($data === "Hello Swoole Client #{$n}!");
                }
                fclose($fp);
            }
        });
    }

    // udp server & client with 12.8k requests in single process
    go(function () {
        $socket = new Swoole\Coroutine\Socket(AF_INET, SOCK_DGRAM, 0);
        $socket->bind('127.0.0.1', 9503);
        $client_map = [];
        for ($c = 128; $c--;) {
            for ($n = 0; $n < 100; $n++) {
                $recv = $socket->recvfrom($peer);
                $client_uid = "{$peer['address']}:{$peer['port']}";
                $id = $client_map[$client_uid] = ($client_map[$client_uid] ?? -1) + 1;
                assert($recv === "Client: Hello #{$id}!");
                $socket->sendto($peer['address'], $peer['port'], "Server: Hello #{$id}!");
            }
        }
        $socket->close();
    });
    for ($c = 128; $c--;) {
        go(function () {
            $fp = stream_socket_client("udp://127.0.0.1:9503", $errno, $errstr, 1);
            if (!$fp) {
                echo "$errstr ($errno)\n";
            } else {
                for ($n = 0; $n < 100; $n++) {
                    fwrite($fp, "Client: Hello #{$n}!");
                    $recv = fread($fp, 1024);
                    list($address, $port) = explode(':', (stream_socket_get_name($fp, true)));
                    assert($address === '127.0.0.1' && (int)$port === 9503);
                    assert($recv === "Server: Hello #{$n}!");
                }
                fclose($fp);
            }
        });
    }
  });
  echo 'use ' . (microtime(true) - $s) . ' s';
7.Channel
Co\run(function(){
        //使用Channel進行協程間通信
        $chan = new Swoole\Coroutine\Channel(1);
        Swoole\Coroutine::create(function () use ($chan) {
            for($i = 0; $i < 100000; $i++) {
                co::sleep(1.0);
                $chan->push(['rand' => rand(1000, 9999), 'index' => $i]);
                echo "$i\n";
            }
        });
        Swoole\Coroutine::create(function () use ($chan) {
            while(1) {
                $data = $chan->pop();
                var_dump($data);
            }
        });
  });

3.風格

服務端 + 客戶端react

1.異步風格
2.協程風格

進程 ---> 線程 ---> 協程 git

鏈接池 (server的manager模塊)程序員

# 建立
`Coroutine::create` 或 `go` 方法建立協程

支持 `waitgroup`
# 通訊問題
     進程
             高性能共享內存 `Table`
       `Process`模塊 (代替php自帶的 `pcntl` )
   協程
       `Coroutine\Channel`
        併發編程: Coroutine::create() + setdefer()  ->   go() + channel


退出協程`exit`禁用
`異常捕獲`不能跨協程
在多個協程間不能`共用`一個鏈接
禁止使用`靜態類`或者`全局變量`保存上下文對象
`sleep`不能用

4.基本須知

1.四種設置回調函數的方式
# 匿名函數
$server->on('Request', function ($req, $resp) use ($a, $b, $c) {
    echo "hello world";
});
Copy to clipboardErrorCopied
可以使用 use 向匿名函數傳遞參數
# 類靜態方法
class A
{
    static function test($req, $resp)
    {
        echo "hello world";
    }
}
$server->on('Request', 'A::Test');
$server->on('Request', array('A', 'Test'));
Copy to clipboardErrorCopied
對應的靜態方法必須爲 public
# 函數
function my_onRequest($req, $resp)
{
    echo "hello world";
}
$server->on('Request', 'my_onRequest');
# 對象方法
class A
{
    function test($req, $resp)
    {
        echo "hello world";
    }
}
$object = new A();
$server->on('Request', array($object, 'test'));
對應的方法必須爲 public
2.同步 IO / 異步 IO
# 網絡io模型:
同步模型(synchronous IO)
阻塞IO(bloking IO)
非阻塞IO(non-blocking IO)
多路複用IO(multiplexing IO) `poll/select`  -> `epoll`
信號驅動式IO(signal-driven IO)
異步IO(asynchronous IO)
3.EventLoop
所謂 EventLoop,即事件循環,能夠簡單的理解爲 `epoll_wait`,咱們會把全部要發生事件的句柄(fd)加入到 epoll_wait 中,這些事件包括可讀,可寫,出錯等。 咱們的進程就阻塞在 epoll_wait 這個內核函數上,當發生了事件 (或超時) 後 epoll_wait 這個函數就會結束阻塞返回結果,就能夠回調相應的 PHP 函數,例如,收到客戶端發來的數據,回調 OnRecieve 回調函數。

當有大量的 fd 放入到了 epoll_wait 中,而且同時產生了大量的事件,epoll_wait 函數返回的時候咱們就會挨個調用相應的回調函數,叫作一輪事件循環,即 IO 多路複用,而後再次阻塞調用 epoll_wait 進行下一輪事件循環。
4.TCP粘包問題
tcp 封包 解包 粘包 
數據封包協議規定:整個數據包包含2字節長度信息+數據包體。2字節長度信息包含自己着2字節。
如:數據體是(abcdefg)7個字節,總體封包就是09abcdefg,總共是9個字節的協議
EOF 結束符協議
固定包頭 + 包體協議
5.IPC
同一臺主機上兩個進程間通訊 (`Inter-Process Communication`)
在 Swoole 下使用了 2 種方式 :

# Unix Socket :
全名 UNIX Domain Socket, 簡稱 UDS
SOCK_STREAM : 數據大用 (有粘包問題)
SOCK_DGRAM : 數據小用 (64k)

# sysvmsg :
Linux 提供的消息隊列,這種 IPC 方式經過一個文件名來做爲 key 進行通信,這種方式很是的不靈活,實際項目使用的並很少

5.安裝

擴展衝突github

xdebug
phptrace
aop
molten
xhprof
phalcon(Swoole 協程沒法運行在 phalcon 框架中)

必須web

php-7.1 或更高版本
gcc-4.8 或更高版本
make
autoconf
1.源碼安裝
#1. 下載 swoole 源碼
https://github.com/swoole/swoole-src/releases
http://pecl.php.net/package/swoole
http://git.oschina.net/swoole/swoole

#2. 從源碼編譯安裝
下載源代碼包後,在終端進入源碼目錄,執行下面的命令進行編譯和安裝
cd swoole-src && \
phpize && \
./configure && \
--enable-openssl  \
--enable-http2 && \
make && sudo make install

#3. 啓用擴展
編譯安裝到系統成功後,須要在 php.ini 中加入一行 extension=swoole.so 來啓用 Swoole 擴展
2.pecl安裝
pecl install swoole

3.swoft

version:2.xsql

1.環境準備

#必要部分
PHP,版本 >=7.1
PHP 包管理器 Composer
PCRE 庫
PHP 擴展 Swoole,版本 >=4.3
額外擴展:PDO、Redis

#衝突部分
Xdebug
Xhprof
Blackfire
Zend
trace
Uopz

2.安裝方式

1.docker
docker run -p 18306:18306 --name swoft swoft/swoft
2.docker-compose
git clone https://github.com/swoft-cloud/swoft
cd swoft
docker-compose up

# sserver下面有如下文件配置
docker-compose.yml
docker-sync -> rsync , native_osx
3.composer
composer create-project swoft/swoft Swoft
4.手動安裝
git clone https://github.com/swoft-cloud/swoft
cd swoft
composer install
cp .env.example .env
5.swoftcli
# 支持從不一樣模板項目中快速建立一個乾淨的 Swoft 應用
php swoftcli.phar create:app --type full Swoft-Full
php swoftcli.phar create:app --type ws Swoft-WebSocket
php swoftcli.phar create:app --type tcp Swoft-TCP

# 使用
cp swoftcli.phar /usr/local/bin/swoftcli && chmod a+x swoftcli

3.目錄結構

├── app/    ----- 應用代碼目錄
│   ├── Annotation/        ----- 定義註解相關   (`ReflectionCalss`)
│   ├── Aspect/            ----- AOP 切面      (`Aspect-oriented programming`)
│   ├── Common/            ----- 一些具備獨立功能的 class bean
│   ├── Console/           ----- 命令行代碼目錄
│   ├── Exception/         ----- 定義異常類目錄
│   │   └── Handler/           ----- 定義異常處理類目錄
│   ├── Http/              ----- HTTP 服務代碼目錄
│   │   ├── Controller/
│   │   └── Middleware/
│   ├── Helper/            ----- 助手函數
│   ├── Listener/          ----- 事件監聽器目錄
│   ├── Model/             ----- 模型、邏輯等代碼目錄(這些層並不限定,根據須要使用)
│   │   ├── Dao/
│   │   ├── Data/
│   │   ├── Logic/
│   │   └── Entity/
│   ├── Rpc/               ----- RPC 服務代碼目錄
│   │   └── Service/
│   │   └── Middleware/
│   ├── WebSocket/         ----- WebSocket 服務代碼目錄
│   │   ├── Chat/
│   │   ├── Middleware/
│   │   └── ChatModule.php
│   ├── Tcp/               ----- TCP 服務代碼目錄
│   │   └── Controller/        ----- TCP 服務處理控制器目錄
│   ├── Application.php    ----- 應用類文件繼承自swoft核心
│   ├── AutoLoader.php     ----- 項目掃描等信息(應用自己也算是一個組件)
│   └── bean.php
├── bin/
│   ├── bootstrap.php
│   └── swoft              ----- Swoft 入口文件
├── config/                ----- 應用配置目錄
│   ├── base.php               ----- 基礎配置
│   └── db.php                 ----- 數據庫配置
├── public/                ----- 公共目錄
├── resource/              ----- 應用資源目錄
│   ├── language/              ----- 語言資源目錄  
│   └── view/                  ----- 視圖資源目錄  
├── runtime/               ----- 臨時文件目錄(日誌、上傳文件、文件緩存等)
├── test/                  ----- 單元測試目錄
│   └── bootstrap.php
├── composer.json
├── phar.build.inc
└── phpunit.xml.dist

4.運行服務

若是在 .env 文件中開啓了調試 SWOFT_DEBUG=1 將會在控制檯中顯示更多詳細的信息。
1.http server
# 啓動 HTTP 服務
$ php ./bin/swoft http:start

# 以守護進程模式啓動
$ php ./bin/swoft http:start -d

# 重啓 HTTP 服務
$ php ./bin/swoft http:restart

# 從新加載 HTTP 服務
$ php ./bin/swoft http:reload

# 中止 HTTP 服務
$ php ./bin/swoft http:stop

# swoftcli
swoftcli -h
swoftcli run -c ws:start
swoftcli run -c http:start
2.websocket server
# 啓動 WS 服務
$ php ./bin/swoft ws:start

# 以守護進程模式啓動
$ php ./bin/swoft ws:start -d

# 重啓 WS 服務
$ php ./bin/swoft ws:restart

# 從新加載 WS 服務
$ php ./bin/swoft ws:reload

# 關閉 WS 服務
$ php ./bin/swoft ws:stop
3.rpc server
# 啓動 RPC 服務
$ php ./bin/swoft rpc:start

# 以守護進程模式啓動
$ php ./bin/swoft rpc:start -d

# 重啓 RPC 服務
$ php ./bin/swoft rpc:restart

# 從新加載 RPC 服務
$ php ./bin/swoft rpc:reload

# 關閉 RPC 服務
$ php ./bin/swoft rpc:stop

5.註解

use Swoft\Http\Message\Request;
use Swoft\Http\Server\Annotation\Mapping\Controller;
use Swoft\Http\Server\Annotation\Mapping\RequestMapping;

/**
 * Class Home
 *
 * @Controller(prefix="home")
 */
class Home
{
    /**
     * 該方法路由地址爲 /home/index
     *
     * @RequestMapping(route="/index", method="post")
     *
     * @param Request $request
     */
    public function index(Request $request)
    {
        // TODO:
    }
}

6.IoC - DI

IoC: Inversion of Control
DI: Dependency Injection

Bean容器

# 定義
@Bean
命名空間:\Swoft\Bean\Annotation\Bean

name 定義 Bean 別名,缺省默認類名
scope 注入 Bean 類型,默認單例,Scope::SINGLETON/Scope::PROTOTYPE(每次建立)
ref 指定引用 Bean ,用於定義在接口上面,指定使用哪一個接口實現。
# 注入
@Inject
命名空間:\Swoft\Bean\Annotation\Inject

name 定義屬性注入的bean名稱,缺省屬性自動類型名稱
# 操做
App::getBean("name");
ApplicationContext::getBean('name');
BeanFactory::getBean('name');
BeanFactory::hasBean("name");
相關文章
相關標籤/搜索