分佈式場景Swoole+redis使用消息隊列

前言:網站性能優化的場景需求

對於已有的mysql主從項目,應對數據量大時每每採起分庫分表的作法,爲了縮短頁面響應採用一主多從的 主寫+從讀 的讀寫分離架構。php

使用redis等級:一使用redis做爲php的緩存層,存儲經常使用、相對固定的公共數據;二:添加redis長用緩存,組成 mysql寫+ redis讀 的架構;三:甚至直接採用 redis讀+寫 的架構。mysql遷移redis須要後臺程序的緊密配合。「讀寫分離」容易出現數據不一致的問題。css

*面試*優化相關

傳統先後端優化途徑:
 1. 前端 
    減小請求次數:css精靈(小圖合併到大圖);
    data-image(data-icon:src=data:image/jpg;base64;xx,小圖合併到js文件);
 2. 網關
    web資源防盜鏈refer監測;
    nginx限流,nginx負載均衡、nginx緩存靜態資源、圖片服務器、gzip等;
    http2.0;
 3. 後端、數據庫
    升級到php7,使用服務模式啓動網站;
    使用redis、memcache緩存;
    mysql優化;

進階:
 1. 網關
    mysql遷移redis,分佈式集羣部署;
 2. 後端
    添加針對高併發的消息隊列,多線程、協程化,使用鏈接池;
    mysql慢查詢分析

分佈式部署常見問題:
    登陸session共享問題;讀寫分離的同步數據問題。
另外還各類諸如圖片數據庫、對象存儲等。複製代碼

要同時在多臺服務器上處理好比:庫存超賣、訂單支付問題須要頻繁的過程校驗,因此對並行任務串行化、使用一臺機器、單一線程處理這種一致性問題最爲穩妥。應對大數據量的狀況採用消息隊列,平衡服務器壓力。html

1.swoole的消息隊列相關

利用swoole_table+協程可使用隊列,把這些駐內存的數據放到mysql、redis可持久化。前端

a.swoole_table共享內存表

服務器端:node

//參考官方 https://wiki.swoole.com/wiki/page/292.html
$table = new Swoole\Table(1024); //$size參數指定表格的最大行數,必須爲2的指數,如1024,8192,65536等
$table->column('fd', swoole_table::TYPE_INT);
$table->column('from_id', swoole_table::TYPE_INT);
$table->column('data', swoole_table::TYPE_STRING, 1024);
$table->create();

$serv = new Swoole\Server('127.0.0.1', 9501, SWOOLE_BASE, SWOOLE_SOCK_TCP);
//將table保存在serv對象上
$serv->table = $table;

$serv->on('receive', function ($serv, $fd, $from_id, $data) {
    $pre = substr($data, 0, 1);
    $data = substr($data, 1);
    if($pre == \Swoole\Table::TYPE_STRING){
        /*
        foreach ($serv->table as $row) var_dump($row);
        print_r($serv->table);
        */
        $table = [];
        foreach ($serv->table as $k => $row) {
            $row['key'] = $k;
            $table[] = $row;
        }
        $count = count($serv->table);
        print_r("當前錶行數". $count);
        $serv->send($fd, json_encode($table, JSON_UNESCAPED_UNICODE));
    }elseif($pre == \Swoole\Table::TYPE_FLOAT){
        $key = $data;
        $exist = $serv->table->exist($key);
        if($exist){
            $row = $serv->table->get($key);
            $exist = $serv->table->del($key);
            $data = json_decode($row['data'], true);
            $serv->send($fd, "消費:" .($exist===true?'true':'false'). ' '. $data['order'].PHP_EOL);
        }
    }elseif($pre == \Swoole\Table::TYPE_INT){
        $ret = $serv->table->set($fd, array('from_id' => $from_id, 'data' => $data, 'fd' => $fd));
        $data = json_decode($data, true);
        $serv->send($fd, "服務器:". ($ret===true?'true':'false'). " from ".$fd.' order:'.$data['order']);
    }else{
        print_r($data);
        $serv->send($fd, 'others');
    }
});

$serv->start();複製代碼

客戶端:mysql

for($i=1; $i<=50; $i++){
    go(function () use($i) {
        $client = new \Swoole\Client(SWOOLE_SOCK_TCP);
        usleep(mt_rand(10,1000));
        $res = $client->connect('127.0.0.1', 9501, 0.5);
        if (!$res) {
            exit("connect failed. Error: {$client->errCode}\n");
        }

        echo "客戶端:". $i. PHP_EOL;
        $data = ['order'=>$i, 'data'=>md5(mt_rand(10,1000).time())];
        echo \Swoole\Table::TYPE_INT. json_encode($data, JSON_UNESCAPED_UNICODE). PHP_EOL;
        $client->send(\Swoole\Table::TYPE_INT. json_encode($data, JSON_UNESCAPED_UNICODE));
        //echo $client->recv(). PHP_EOL;
        $client->close();
    });
}

$client = new \Swoole\Client(SWOOLE_SOCK_TCP);
$res = $client->connect('127.0.0.1', 9501, 0.5);
if (!$res) {
    exit("connect failed. Error: {$client->errCode}\n");
}
$client->send(\Swoole\Table::TYPE_STRING. "GET_TABLE_INFO");
$table = $client->recv();
$client->close();

$table = json_decode($table, true);
while($row = array_pop($table)){
    go(function () use($row) {
        $client = new \Swoole\Client(SWOOLE_SOCK_TCP);
        usleep(mt_rand(10,1000));
        $res = $client->connect('127.0.0.1', 9501, 0.5);
        if (!$res) {
            exit("connect failed. Error: {$client->errCode}\n");
        }
        
        echo "處理{$row['fd']}發來的{$row['data']}". PHP_EOL;
        $client->send(\Swoole\Table::TYPE_FLOAT. $row['key']);
        $client->close();
    });
}複製代碼

swoole的協程對於普通處理效果不明顯,而當任務中包含有mysql、redis、curl等的遠程鏈接時才體現異步非阻塞的高效率。異步編程要始終考慮執行順序的問題,上面例子的客戶端中,sleep是真阻塞而不是異步的,下半部分讀取swoole_table表回調報告完成、能夠單獨放到文件請求。nginx

b.swoole+redis/mysql的消息處理

同上理,好處是多了持久化。laravel

2.rabbitmq消息隊列

特色:被動接受任務,不拒絕。選擇持久化配置,若未完成,所有退出/php中斷退出 再啓動還會繼續執行。web

a.安裝RabbitMQ消息隊列

rabbitmq官方docker說明,參考《Rabbitmq集羣高可用部署詳細》這裏部署普通模式。網絡加入之間創建的mybridge,使用固定ip。面試

[]:~/tmp/dk/rabbitmq# docker pull rabbitmq
#docker run --name rabbit --network mybridge -e RABBITMQ_ERLANG_COOKIE='123456' -d rabbitmq 
#docker cp rabbit:/etc/rabbitmq/rabbitmq.conf ./複製出配置文件
[]:~/tmp/dk/rabbitmq# docker run --name rbt1 -p 15672:15672 \
    --hostname rbt1 \
    --network mybridge --ip=172.1.12.13 \
    -v /root/tmp/dk/rabbitmq/rabbitmq.conf \
    -v /root/tmp/dk/rabbitmq/data13:/var/lib/rabbitmq/mnesia \
    -e RABBITMQ_ERLANG_COOKIE='123456' \
    -e RABBITMQ_DEFAULT_USER=root -e RABBITMQ_DEFAULT_PASS=123456 \
    -d rabbitmq
docker run --name rbt2 --hostname rbt2 \
    --network mybridge --ip=172.1.12.14 \
    -v /root/tmp/dk/rabbitmq/rabbitmq.conf \
    -v /root/tmp/dk/rabbitmq/data14:/var/lib/rabbitmq/mnesia \
    -e RABBITMQ_ERLANG_COOKIE='123456' \
    -e RABBITMQ_DEFAULT_USER=root -e RABBITMQ_DEFAULT_PASS=123456 \
    -d rabbitmq複製代碼

普通集羣模式

[]:~/tmp/dk/rabbitmq# docker exec -it rbt1 bash
root@rbt1:/# rabbitmqctl stop_app
Stopping rabbit application on node rabbit@rbt1 ...
root@rbt1:/# rabbitmqctl join_cluster --ram rabbit@rbt2
Clustering node rabbit@rbt1 with rabbit@rbt2
root@rbt1:/# rabbitmqctl start_app
Starting node rabbit@rbt1 ...
 completed with 0 plugins.
root@rbt1:/# rabbitmqctl cluster_status
Cluster status of node rabbit@rbt1 ...
[{nodes,[{disc,[rabbit@rbt2]},{ram,[rabbit@rbt1]}]},
 {running_nodes,[rabbit@rbt2,rabbit@rbt1]},
 {cluster_name,<<"rabbit@rbt2">>},
 {partitions,[]},
 {alarms,[{rabbit@rbt2,[]},{rabbit@rbt1,[]}]}]
root@rbt1:/# rabbitmq-plugins enable rabbitmq_management複製代碼

而後能夠登陸訪問 http://remote_host:15672/ 管理頁面。
php添加amqp擴展,Dockerfile添加(cffycls/php7:1.8):

apk add rabbitmq-c-dev
pecl install amqp複製代碼

b.消息添加和消費測試

用法參考一,參考二:phpstrom中輸入new AMQPConnection(),按Ctrl和鼠標確認鍵進入amqp.php方法用法註釋(php.net官方暫時找不到)。
生產者(publish.php):

date_default_timezone_set("Asia/Shanghai");
//配置信息
$conn_args = array(
    'host' => '172.1.12.13',
    'port' => '5672',
    'login' => 'root',
    'password' => '123456',
    'vhost'=>'/'
);
$k_route = 'k_route_1'; //路由key,用來綁定交換機和隊列
$e_name = 'e_switches'; //交換機名稱

//建立鏈接和channel
$conn = new AMQPConnection($conn_args);
if (!$conn->connect()) {
    die("Cannot connect to the broker!PHP_EOL");
}

$channel = new AMQPChannel($conn);
echo "<font color='red'>生產者</font>PHP_EOL已鏈接成功!準備發佈信息...".PHP_EOL;

//建立交換機對象
$ex = new AMQPExchange($channel);
$ex->setName($e_name);
$ex->setType(AMQP_EX_TYPE_DIRECT); // 設置交換機類型
$ex->setFlags(AMQP_DURABLE); // 設置交換機是否持久化消息

//發送消息
$channel->startTransaction(); //開始事務
for($i=1; $i<=50000; ++$i){
    usleep(100);//休眠1秒
    $message = "消息數據".$i. ' '.date("Y-m-d H:i:s A");
    echo "消息發送返回:".$ex->publish($message, $k_route).PHP_EOL;
}
$channel->commitTransaction(); //提交事務

$conn->disconnect();複製代碼

消費者(consumer.php):

date_default_timezone_set("Asia/Shanghai");
//配置信息
$conn_args = array(
    'host' => '172.1.12.13',
    'port' => '5672',
    'login' => 'root',
    'password' => '123456',
    'vhost'=>'/'
);
$k_route = 'k_route_1'; //路由key,用來綁定交換機和隊列
$e_name = 'e_switches'; //交換機名稱
$q_name = 'q_queue'; //隊列名

//建立鏈接和channel
$conn = new AMQPConnection($conn_args);
if (!$conn->connect()) {
    die("Cannot connect to the broker!".PHP_EOL);
}
$channel = new AMQPChannel($conn);
echo "<font color='red'>消費者</font>:".PHP_EOL ."已鏈接成功!準備接收信息...".PHP_EOL;

//建立交換機
$ex = new AMQPExchange($channel);
$ex->setName($e_name);
//direct類型:[AMQP_EX_TYPE_DIRECT,AMQP_EX_TYPE_FANOUT, AMQP_EX_TYPE_HEADERS or AMQP_EX_TYPE_TOPIC]
$ex->setType(AMQP_EX_TYPE_DIRECT);
$ex->setFlags(AMQP_DURABLE); //持久化

//建立隊列
$q = new AMQPQueue($channel);
$q->setName($q_name);
$q->setFlags(AMQP_DURABLE); //持久化
$q->declareQueue();

//綁定交換機與隊列,並指定路由鍵
$q->bind($e_name, $k_route);

//阻塞模式接收消息
echo "阻塞模式接收消息:".PHP_EOL;
while(True){
    $q->consume(function ($envelope, $queue) {
        $msg = $envelope->getBody();
        echo '收到:'. $msg.PHP_EOL; //處理消息
        sleep(1);//接收模擬處理休眠1秒
    }, AMQP_AUTOACK); //ACK應答
}
$conn->disconnect();複製代碼

在頁面能夠執行,只在cli交互模式下的才顯示收到的結果,(發佈者再web訪問、消費者在php-cli處理)。上面開啓事務:全部的消息 被接收纔開始處理。
只要保持有一個以上的消費者在工做,任務就會繼續進行,這是中間加入新的消費者(2個同時執行):

[]:~/tmp/dk/html# php rabbitmq/consumer.php 
<font color='red'>消費者</font>:
已鏈接成功!準備接收信息...
阻塞模式接收消息:
收到:消息數據7201 2019-07-08 16:08:05 PM
收到:消息數據7202 2019-07-08 16:08:05 PM
... ...複製代碼

像redis、mysql同樣,使用swoole協程併發會有明顯提速。

3.秒殺/紅包場景

可使用redis消息隊列專門處理秒殺問題,核心是解決庫存超賣問題,測試。

a.服務端sale.php:添加促銷信息,初始化消息隊列

require "../vendor/autoload.php";

//從集羣當中讀取
$servers = ['172.1.50.11:6379', '172.1.50.12:6379', '172.1.50.13:6379', '172.1.50.21:6379'];
define('PROMOTION_KEY_PREFIX', 'promotionList_');
define('TIME_LIMIT_PREFIX', 'timeLimit_');

//1、初始化:查出全部節點分佈,lPush、lRange函數準備,緩存基本信息
$redisServers = [];
$slotNodes = [];
foreach ($servers as $addr){
    //隨機
    $r = new Redis();
    $server=explode(':',$addr);
    $r->connect($server[0], (int) $server[1]);
    $r->auth('123456');
    $redisServers[$addr] = $r;

    if(empty($slotInfo)){
        //單一節點能夠看到全部存在槽的節點
        $slotInfo = $r->rawCommand('cluster','slots');
        foreach ($slotInfo as $ix => $value){
            $slotNodes[$value[2][0].':'.$value[2][1].' '.($ix+1)]=[$value[0], $value[1]];
        }
    }
}
$crc = new \Predis\Cluster\Hash\CRC16();
//注意method=lRange時args傳參是($key, $start, $end$opt = function ($method, $key, ...$args) use (&$redisServers, &$slotNodes, &$crc) {
    $code = $crc->hash($key) % 16384;
    foreach ($slotNodes as $addr => $boundry){
        if( $code>=$boundry[0] && $code<=$boundry[1] ){
            $host =explode(' ', $addr)[0];
            if(empty($args)){
                return $redisServers[$host]->$method($key);
            }elseif(count($args)==1){
                return $redisServers[$host]->$method($key, $args[0]);
            }else{ //...
                return $redisServers[$host]->$method($key, $args);
            }
        }
    }
};

//2、添加數據:集中添加商品,初始化消息隊列
/**
 * @param String $key 商品key
 * @param int $stock 庫存
 */
$createQueue = function (String $key, int $stock) use (&$opt) {
    for ($i=0; $i<$stock; $i++){
        $opt('lPush', $key, $i);
    }
};
//第一個任務
go(function () use (&$opt, &$limit1, &$createQueue){
    $limit1 = [
        'goods_id'=>'9505900000',
    ];
    $createQueue(PROMOTION_KEY_PREFIX. $limit1['goods_id'], 20);
    $has = $opt('lLen', PROMOTION_KEY_PREFIX. $limit1['goods_id']);
    print_r('建立goods1 '. $has. ' 個'. PROMOTION_KEY_PREFIX. $limit1['goods_id'].PHP_EOL);
    $has = 0;
    swoole_timer_tick(10, function ($timer_id) use (&$opt, &$limit1, &$has){
        $now = $opt('lLen', PROMOTION_KEY_PREFIX. $limit1['goods_id']);
        if($has != $now) {
            $has = $now;
            if ($now) {
                print_r('goods1 還有個' . $has . '剩餘' . PHP_EOL);
            } else {
                print_r('OK DONE1!'. PHP_EOL . PROMOTION_KEY_PREFIX . $limit1['goods_id'] . PHP_EOL);
                Swoole\Timer::clear($timer_id);
            }
        }
    });
});

//第二個任務:增長時段限制
go(function () use (&$opt, &$limit1, &$has, &$createQueue){
    $limit2 = [
        'goods_id'=>'2302500000',
        'start'=>date("Y-m-d H:i:s", strtotime('+ 1 minutes')),
        'end'=>date("Y-m-d H:i:s", strtotime('+ 5 minutes')),
    ];
    $createQueue(PROMOTION_KEY_PREFIX. $limit2['goods_id'], 500);
    //保存時間條件
    $opt('lPush', TIME_LIMIT_PREFIX. $limit2['goods_id'], json_encode($limit2, JSON_UNESCAPED_UNICODE));

    $has = $opt('lLen', PROMOTION_KEY_PREFIX. $limit2['goods_id']);
    print_r('建立goods2 '. $has. ' 個'. PROMOTION_KEY_PREFIX. $limit2['goods_id'].PHP_EOL);
    $has = 0;
    swoole_timer_tick(2000, function ($timer_id2) use (&$opt, &$limit2, &$has){
        $now = $opt('lLen', PROMOTION_KEY_PREFIX. $limit2['goods_id']);
        if($has != $now) {
            $has = $now;
            if ($now) {
                print_r('goods2 還有個' . $has . '剩餘' . PHP_EOL);
            } else {
                print_r('OK DONE2!'. PHP_EOL . PROMOTION_KEY_PREFIX . $limit2['goods_id'] . PHP_EOL);
                Swoole\Timer::clear($timer_id2);
            }
        }
    });
});複製代碼

b.客戶端buy.php:消費數據

//每次購買請求的客戶端工做--swoole常駐內存,緩存運行第一步
//1、初始化:查出全部節點分佈,lPush、lRange函數準備,同服務端(略)

//2、消費數據:模擬分散請求
$goods1 = '9505900000'; //商品已知
$goods2 = '2302500000';

//無限制:100人搶 -- goods1=20個
go(function () use (&$opt, &$limit1, &$has, &$createQueue, $goods1){
    $cart = [];
    $cart['key'] = PROMOTION_KEY_PREFIX. $goods1;
    for ($i=0; $i<100; $i++) {
        $cart['user'] = 'tom'.$i;
        go(function () use($cart, &$opt){
            co::sleep(mt_rand(1,500)*0.001); //增長redis網絡鏈接耗時
            $state = $opt('lPop', $cart['key']);
            if($state){
                print_r($cart['user'] .' 購買成功!' .PHP_EOL);
                return true;
            }else{
                //print_r('--已被搶光 ' .$cart['user'] .PHP_EOL);
                print_r('*');
                return false;
            }
        });
    }
});
//時段限制:1000人搶 -- goods2=500個
go(function () use (&$opt, &$limit1, &$has, &$createQueue, $goods2){
    $cart = [];
    $cart['key'] = PROMOTION_KEY_PREFIX. $goods2;
    //獲取時間條件
    $limit2 = json_decode($opt('lIndex', TIME_LIMIT_PREFIX. $goods2, 0), true);
    for ($i=0; $i<10000; $i++) {
        $cart['user'] = 'jack'.$i;
        go(function () use($cart, &$opt, &$limit2){
            $min = strtotime($limit2['start']);
            $max = strtotime($limit2['end']);
            co::sleep(mt_rand(2,6*60)); //請求分散 2s-6min/(1-5min)

            $cur = time();
            if($cur<$min){
                print_r('--還沒有開始 ' .$cart['user'] .PHP_EOL);
                return true;
            }elseif($min<=$cur && $cur<=$max){
                co::sleep(mt_rand(100,500)*0.001); //增長redis網絡鏈接耗時
                $state = $opt('lPop', $cart['key']);
                if($state){
                    echo $cart['user'] .' 購買成功!' .PHP_EOL;
                    return true;
                }else{
                    echo '.';
                    //echo '++已被搶光 ' .$cart['user'] .json_encode([$limit2,date('Y-m-d H:i:s',$cur)], JSON_UNESCAPED_UNICODE) .PHP_EOL;
                    return false;
                }
            }else{
                echo '++已被搶光 ' .$cart['user'] .json_encode([$limit2,date('Y-m-d H:i:s',$cur)], JSON_UNESCAPED_UNICODE) .PHP_EOL;
                return false;
            }
        });
    }
});複製代碼

c.運行結果

[]:~/tmp/dk/html/php-swoole# php sale.php
建立goods1 20 個promotionList_9505900000
建立goods2 500 個promotionList_2302500000
goods1 還有個20剩餘
goods2 還有個500剩餘
goods1 還有個20剩餘
goods2 還有個500剩餘
goods1 還有個10剩餘
OK DONE!
promotionList_9505900000
goods2 還有個500剩餘
goods2 還有個496剩餘
goods2 還有個441剩餘
goods2 還有個383剩餘
goods2 還有個314剩餘
goods2 還有個263剩餘
goods2 還有個208剩餘
goods2 還有個144剩餘
goods2 還有個88剩餘
goods2 還有個39剩餘
OK DONE!
promotionList_2302500000

[]:~/tmp/dk/html/php-swoole# php buy.php 
#達到預期效果,篇幅過長略複製代碼

小結

這裏Swoole+redis異步對秒殺執行速度很快,大量協程時注意內存佔用。

以上內容但願幫助到你們, 不少PHPer在進階的時候總會遇到一些問題和瓶頸,業務代碼寫多了沒有方向感,不知道該從那裏入手去提高,對此我整理了一些資料,包括但不限於:分佈式架構、高可擴展、高性能、高併發、服務器性能調優、TP6,laravel,YII2,Redis,Swoole、Swoft、Kafka、Mysql優化、shell腳本、Docker、微服務、Nginx等多個知識點高級進階乾貨須要的能夠免費分享給你們 ,須要戳這裏 PHP進階架構師>>>視頻、面試文檔免費獲取

相關文章
相關標籤/搜索