Swoole 協程與 Go 協程的區別

Swoole 協程與 Go 協程的區別

進程、線程、協程的概念

  • 進程是什麼?

進程就是應用程序的啓動實例。
例如:打開一個軟件,就是開啓了一個進程。
進程擁有代碼和打開的文件資源,數據資源,獨立的內存空間。php

  • 線程是什麼?

線程屬於進程,是程序的執行者。
一個進程至少包含一個主線程,也能夠有更多的子線程。
線程有兩種調度策略,一是:分時調度,二是:搶佔式調度。mysql

  • 協程是什麼?

協程是輕量級線程, 協程的建立、切換、掛起、銷燬所有爲內存操做,消耗是很是低的。
協程是屬於線程,協程是在線程裏執行的。
協程的調度是用戶手動切換的,因此又叫用戶空間線程。
協程的調度策略是:協做式調度。git

Swoole 協程

  • Swoole 的協程客戶端必須在協程的上下文環境中使用。
// 第一種狀況:Request 回調自己是協程環境
$server->on('Request', function($request, $response) {
    // 建立 Mysql 協程客戶端
    $mysql = new Swoole\Coroutine\MySQL();
    $mysql->connect([]);
    $mysql->query();
});

// 第二種狀況:WorkerStart 回調不是協程環境
$server->on('WorkerStart', function() {
    // 須要先聲明一個協程環境,才能使用協程客戶端
    go(function(){
        // 建立 Mysql 協程客戶端
        $mysql = new Swoole\Coroutine\MySQL();
        $mysql->connect([]);
        $mysql->query();
    });
});
  • Swoole 的協程是基於單線程的, 沒法利用多核CPU,同一時間只有一個在調度。
// 啓動 4 個協程
$n = 4;
for ($i = 0; $i < $n; $i++) {
    go(function () use ($i) {
        // 模擬 IO 等待
        Co::sleep(1);
        echo microtime(true) . ": hello $i " . PHP_EOL;
    });
};
echo "hello main \n";

// 每次輸出的結果都是同樣
$ php test.php 
hello main 
1558749158.0913: hello 0 
1558749158.0915: hello 3 
1558749158.0915: hello 2 
1558749158.0915: hello 1
  • Swoole 協程使用示例及詳解
// 建立一個 Http 服務
$server = new Swoole\Http\Server('127.0.0.1', 9501, SWOOLE_BASE);

// 調用 onRequest 事件回調函數時,底層會調用 C 函數 coro_create 建立一個協程,
// 同時保存這個時間點的 CPU 寄存器狀態和 ZendVM stack 信息。
$server->on('Request', function($request, $response) {
    // 建立一個 Mysql 的協程客戶端
    $mysql = new Swoole\Coroutine\MySQL();
    
    // 調用 mysql->connect 時發生 IO 操做,底層會調用 C 函數 coro_save 保存當前協程的狀態,
    // 包括 Zend VM 上下文以及協程描述的信息,並調用 coro_yield 讓出程序控制權,當前的請求會掛起。
    // 當協程讓出控制權以後,會繼續進入 EventLoop 處理其餘事件,這時 Swoole 會繼續去處理其餘客戶端發來的 Request。
    $res = $mysql->connect([
        'host'     => '127.0.0.1',
        'user'     => 'root',
        'password' => 'root',
        'database' => 'test'
    ]);
    
    // IO 事件完成後,MySQL 鏈接成功或失敗,底層調用 C 函數 coro_resume 恢復對應的協程,恢復 ZendVM 上下文,繼續向下執行 PHP 代碼。
    if ($res == false) {
        $response->end("MySQL connect fail");
        return;
    }
    
    // mysql->query 的執行過程和 mysql->connect 一致,也會進行一次協程切換調度
    $ret = $mysql->query('show tables', 2);
    
    // 全部操做完成後,調用 end 方法返回結果,並銷燬此協程。
    $response->end('swoole response is ok, result='.var_export($ret, true));
});

// 啓動服務
$server->start();

Go 的協程 goroutine

goroutine 是輕量級的線程,Go 語言從語言層面就支持原生協程。
Go 協程與線程相比,開銷很是小。
Go 協程的堆棧開銷只用2KB,它能夠根據程序的須要增大和縮小,
而線程必須指定堆棧的大小,而且堆棧的大小都是固定的。github

goroutine 是經過 GPM 調度模型實現的。
M: 表示內核級線程,一個 M 就是一個線程,goroutine 跑在 M 之上的。
G: 表示一個 goroutine,它有本身的棧。
P: 全稱是 Processor,處理器。它主要用來執行 goroutine 的,同時它也維護了一個 goroutine 隊列。sql

Go 在 runtime、系統調用等多個方面對 goroutine 調度進行了封裝和處理,當遇到長時間執行或進行系統調用時,
會主動把當前協程的 CPU 轉讓出去,讓其餘協程調度執行。mongodb

  • Go 語言原生層面就支持協層,不須要聲明協程環境。
package main

import "fmt"

func main() {
    // 直接經過 Go 關鍵字,就能夠啓動一個協程。
    go func() {
        fmt.Println("Hello Go!")
    }()
}
  • Go 協程是基於多線程的,能夠利用多核 CPU,同一時間可能會有多個協程在執行。
package main

import (
    "fmt"
    "time"
)

func main() {
    // 設置這個參數,能夠模擬單線程與 Swoole 的協程作比較
    // 若是這個參數設置成 1,則每次輸出的結果都同樣。
    // runtime.GOMAXPROCS(1)

    // 啓動 4 個協程
    var i int64
    for i = 0; i < 4; i++ {
        go func(i int64) {
            // 模擬 IO 等待
            time.Sleep(1 * time.Second)
            fmt.Printf("hello %d \n", i)
        }(i)
    }

    fmt.Println("hello main")

    // 等待其餘的協程執行完,若是不等待的話,
    // main 執行完退出後,其餘的協程也會相繼退出。
    time.Sleep(10 * time.Second)
}

// 第一次輸出的結果
$ go run test.go
hello main
hello 2 
hello 1 
hello 0 
hello 3 

// 第二次輸出的結果
$ go run test.go
hello main
hello 2 
hello 0 
hello 3 
hello 1 

// 依次類推,每次輸出的結果都不同
  • go 協程使用示例及詳解
package main

import (
    "fmt"
    "github.com/jinzhu/gorm"
    "net/http"
    "time"
)
import _ "github.com/go-sql-driver/mysql"

func main() {
    dsn := fmt.Sprintf("%v:%v@(%v:%v)/%v?charset=utf8&parseTime=True&loc=Local",
        "root",
        "root",
        "127.0.0.1",
        "3306",
        "fastadmin",
    )
    db, err := gorm.Open("mysql", dsn)
    if err != nil {
        fmt.Printf("mysql connection failure, error: (%v)", err.Error())
        return
    }
    db.DB().SetMaxIdleConns(10)  // 設置鏈接池
    db.DB().SetMaxOpenConns(100) // 設置與數據庫創建鏈接的最大數目
    db.DB().SetConnMaxLifetime(time.Second * 7)

    http.HandleFunc("/test", func(writer http.ResponseWriter, request *http.Request) {
        // http Request 是在協程中處理的
        // 在 Go 源碼 src/net/http/server.go:2851 行處 `go c.serve(ctx)` 給每一個請求啓動了一個協程
        var name string
        row := db.Table("fa_auth_rule").Where("id = ?", 1).Select("name").Row()
        err = row.Scan(&name)
        if err != nil {
            fmt.Printf("error: %v", err)
            return
        }
        fmt.Printf("name: %v \n", name)
    })
    http.ListenAndServe("0.0.0.0:8001", nil)
}

案例分析

背景:數據庫

在咱們的積分策略服務系統中,使用到了 mongodb 存儲,可是 swoole 沒有提供 mongodb 協程客戶端。 那麼這種場景下,在鏈接及操做 Mongodb 時會發生同步阻塞,沒法發生協程切換,致使整個進程都會阻塞。在這段時間內,進程將沒法再處理新的請求,這使得系統的併發性大大下降。swoole

使用同步的 mongodb 客戶端網絡

$server->on('Request', function($request, $response) {
    // swoole 沒有提供協程客戶端,那麼只能使用同步客戶端
    // 這種狀況下,進程阻塞,沒法切換協程
    $m = new MongoClient();    // 鏈接到mongodb
    $db = $m->test;            // 選擇一個數據庫
    $collection = $db->runoob; // 選擇集合
    // 更新文檔
    $collection->update(array("title"=>"MongoDB"), array('$set'=>array("title"=>"Swoole")));
    $cursor = $collection->find();
    foreach ($cursor as $document) {
        echo $document["title"] . "\n";
    }
}}

經過使用 Server->taskCo 來異步化對 mongodb 的操做session

$server->on('Task', function (swoole_server $serv, $task_id, $worker_id, $data) {
    $m = new MongoClient();    // 鏈接到mongodb
    $db = $m->test;            // 選擇一個數據庫
    $collection = $db->runoob; // 選擇集合
    // 更新文檔
    $collection->update(array("title"=>"MongoDB"), array('$set'=>array("title"=>"Swoole")));
    $cursor = $collection->find();
    foreach ($cursor as $document) {
        $data = $document["title"];
    }
    return $data;
});

$server->on('Request', function ($request, $response) use ($server) {
    // 經過 $server->taskCo() 把對 mongodb 的操做,投遞到異步 task 中。
    // 投遞到異步 task 後,將發生協程切換,能夠繼續處理其餘的請求,提供併發能力。
    $tasks[] = "hello world";
    $result = $server->taskCo($tasks, 0.5);
    $response->end('Test End, Result: '.var_export($result, true));
});

上面兩種使用方式就是 Swoole 中經常使用的方法了。
那麼咱們在 Go 中怎麼處理這種同步的問題呢 ?

實際上在 Go 語言中就不用擔憂這個問題了,如咱們以前所說到的,
Go 在語言層面就已經支持協程了,只要是發生 IO 操做,網絡請求都會發生協程切換。
這也就是 Go 語言天生以來就支持高併發的緣由了。

package main

import (
    "fmt"
    "gopkg.in/mgo.v2"
    "net/http"
)

func main() {
    http.HandleFunc("/test", func(writer http.ResponseWriter, request *http.Request) {
        session, err := mgo.Dial("127.0.0.1:27017")
        if err != nil {
            fmt.Printf("Error: %v \n", err)
            return
        }
        session.SetMode(mgo.Monotonic, true)
        c := session.DB("test").C("runoob")
        fmt.Printf("Connect %v \n", c)
    })
    http.ListenAndServe("0.0.0.0:8001", nil)
}

==並行:同一時刻,同一個 CPU 只能執行同一個任務,要同時執行多個任務,就須要有多個 CPU。==
==併發:CPU 切換時間任務很是快,就會感受到有不少任務在同時執行。==

協程 CPU 密集場景調度

咱們上面說到都是基於 IO 密集場景的調度。
那麼若是是 CPU 密集型的場景,應該怎麼處理呢?

在 Swoole v4.3.2 版本中,已經支持了協程 CPU 密集場景的調度。
想要支持 CPU 密集調度,須要在編譯時增長編譯選項 --enable-scheduler-tick 開啓 tick 調度器。
其次還須要咱們手動聲明 declare(tick=N) 語法功能來實現協程調度。

<?php
declare(ticks=1000);

$max_msec = 10;
Swoole\Coroutine::set([
    'max_exec_msec' => $max_msec,
]);

$s = microtime(1);
echo "start\n";
$flag = 1;
go(function () use (&$flag, $max_msec){
    echo "coro 1 start to loop for $max_msec msec\n";
    $i = 0;
    while($flag) {
        $i ++;
    }
    echo "coro 1 can exit\n";
});

$t = microtime(1);
$u = $t-$s;
echo "shedule use time ".round($u * 1000, 5)." ms\n";
go(function () use (&$flag){
    echo "coro 2 set flag = false\n";
    $flag = false;
});
echo "end\n";

// 輸出結果
start
coro 1 start to loop for 10 msec
shedule use time 10.2849 ms
coro 2 set flag = false
end
coro 1 can exit

Go 在 CPU 密集運算時,有可能致使協程沒法搶佔 CPU 會一直掛起。
這時候就須要顯示的調用代碼 runtime.Gosched() 掛起當前協程,讓出 CPU 給其餘的協程。

package main

import (
    "fmt"
    "time"
)

func main() {
    // 若是設置單線程,則第一個協程沒法讓出時間片
    // 第二個協程一直得不到時間片,阻塞等待。
    // runtime.GOMAXPROCS(1)

    flag := true

    go func() {
        fmt.Printf("coroutine one start \n")
        i := 0
        for flag {
            i++
            // 若是加了這行代碼,協程可讓時間片
            // 這個由於 fmt.Printf 是內聯函數,這是種特殊狀況
            // fmt.Printf("i: %d \n", i)
        }
        fmt.Printf("coroutine one exit \n")
    }()

    go func() {
        fmt.Printf("coroutine two start \n")
        flag = false
        fmt.Printf("coroutine two exit \n")
    }()

    time.Sleep(5 * time.Second)
    fmt.Printf("end \n")
}

// 輸出結果
coroutine one start 
coroutine two start 
coroutine two exit 
coroutine one exit 
end

注:==time.sleep() 模擬 IO 操做,for i++ 模擬 CPU 密集運算。==

總結

  • 協程是輕量級的線程,開銷很小。
  • Swoole 的協程客戶端須要在協程的上下文環境中使用。
  • 在 Swoole v4.3.2 版本以後,已經支持協程 CPU 密集場景調度。
  • Go 語言層面就已經徹底支持協程了。
相關文章
相關標籤/搜索