(網絡學習)一、消息隊列總結

應用場景

https://www.imooc.com/video/15163php

冗餘:延遲處理、保障完成
解耦:出入解耦
流量消峯:redis緩存
異步通訊:異步場景
方便擴展:解耦的結果
排序保證:順序處理

一、mysql加鎖隊列

-- 訂單系統【生產方】 --
接收訂單,核心代碼:mysql

//把訂單信息保存到訂單表中
    $db = DB::getIntance();
    $res = $db->insert("order_queue", $insert_data);

-- 配送系統【消費方】 --
定時發貨,核心代碼:git

//1. 中間態:鎖定處理中數據
    $waiting = ['status'=>0];
    $lock = ['status'=>1];
    $lock_res = $db->update('order_queue',$lock,$waiting,2); //每次刷新新處理2條

    if ($lock_res){
        //2. 對這些處理中數據進行配貨
        $res = $db->selectAll('order_queue',$lock);
        //配貨代碼...

        //3. 修改訂單狀態爲已完成
        $success = [
            'update_time' => date('Y-m-d H:i:s'),
            'status'=>2
        ];
        $res_last = $db->update("order_queue", $success,$lock);
        if($res_last){
            echo 'Success: '.$res_last;
        }else{
            echo 'Failed';
        }
    }

二、redis秒殺隊列

秒殺場景下的流量消峯:
【生產方】github

if ($redis->lLen($redis_name) < $num) {
        $redis->rPush($redis_name, $uid . '%' . microtime());
        echo '秒殺成功' . $uid;
    } else {
        echo '秒殺已結束.';
    }

【消費方】redis

sleep(2);
    $user = $redis->lPop($redis_name);
    if(!$user || $user=='nil'){ //爲空判斷
        continue;
    }else{
        $user_arr = explode('%', $user);
        $insert_data = [
            'uid' => $user_arr[0],
            'rtime' => date('Y-m-d H:i:s'),
            'req_time' => $user_arr[1]
        ];
        echo '-';
        $res = $db->insert('redis_queue',$insert_data); //數據庫寫入成功判斷
        if(!$res){
            $redis->rPush($redis_name, $user);
        }
    }

三、rabbitmq應用解耦

可用於解耦、方便擴展
php-amqplib/php-amqplib: demo//publish.php、consumer.php
用法直接參考demo:sql

$exchange = 'router';
$queue = 'msgs';

$connection = new AMQPStreamConnection(HOST, PORT, USER, PASS, VHOST);
$channel = $connection->channel();

$channel->queue_declare($queue, false, true, false, false);

$channel->exchange_declare($exchange, AMQPExchangeType::DIRECT, false, true, false);

$channel->queue_bind($queue, $exchange);

$messageBody = implode(' ', array_slice($argv, 1));
$message = new AMQPMessage($messageBody, array('content_type' => 'text/plain', 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT));
$channel->basic_publish($message, $exchange);

$channel->close();
$connection->close();

結尾:代碼同步

# 本機搭建
git clone https://github.com/cffycls/cluster.git /home/wwwroot
docker run --name rbt -p 5672:5672 \
    --network mybridge --ip=172.1.12.15 \
    -v /home/wwwroot/cluster/rabbitmq/rabbitmq.conf \
    -v /home/wwwroot/cluster/rabbitmq/data:/var/lib/rabbitmq/mnesia \
    -e RABBITMQ_ERLANG_COOKIE='123456' \
    -e RABBITMQ_DEFAULT_USER=root -e RABBITMQ_DEFAULT_PASS=123456 \
    -d rabbitmq

實例及數據庫代碼上傳:
https://github.com/cffycls/msg_quedocker

相關文章
相關標籤/搜索