【轉】swoole4實現數據庫鏈接池

碼雲代碼倉庫:https://gitee.com/tanjiajun/MysqlPoolphp

代碼倉庫:https://github.com/asbectJ/swoole4.githtml

前言

在寫這篇文章以前,看了好幾篇實現鏈接池的文章,都是寫的很很差的。擺明忽略了鏈接池的不少特性,不少都不具備抗高併發和鏈接複用。因此本身以爲有必須把最近幾天,實現一個比較完整的php數據庫鏈接池的點滴記錄下來,望能幫助各位,感激者望多點贊和打賞。mysql

1、數據庫鏈接池基本概念

所謂的數據庫鏈接池,通常指的就是程序和數據庫保持必定數量的數據庫鏈接不斷開,而且各請求的鏈接能夠相互複用,減小重複新建數據庫鏈接的消耗和避免在高併發的狀況下出現數據庫max connections等錯誤。本身總結了一下,若是要實現一個數據庫鏈接池,通常有幾個特色:git

  • 鏈接複用,不一樣的請求鏈接,能夠放回池中,等待下個請求發分配和調用
  • 鏈接數量通常維持min-max的最大最少值之間
  • 對於空閒鏈接的回收
  • 能夠抗必定程度的高併發,也就是說當一次併發請求完池中全部的鏈接時,獲取不到鏈接的請求可等待其餘鏈接的釋放

總結幾個特性後,一個基本鏈接池,大體要實現下圖功能:github

 

  1. 建立鏈接:鏈接池啓動後,初始化必定的空閒鏈接,指定爲最少的鏈接min。當鏈接池爲空,不夠用時,建立新的鏈接放到池裏,但不能超過指定的最大鏈接max數量。
  2. 鏈接釋放:每次使用完鏈接,必定要調用釋放方法,把鏈接放回池中,給其餘程序或請求使用。
  3. 鏈接分配:鏈接池中用pop和push的方式對等入隊和出隊分配與回收。能實現阻塞分配,也就是在池空而且已建立數量大於max,阻塞必定時間等待其餘請求的鏈接釋放,超時則返回null。
  4. 鏈接管理:對鏈接池中的鏈接,定時檢活和釋放空閒鏈接等

2、Fpm+數據庫長鏈接的實現

  1. 利用fpm實現:例如你要實例一個100鏈接數的池,開啓100個空閒fpm,而後每一個fpm的鏈接都是數據庫長鏈接。通常pm.max_spare_servers = 8這個配置項就是維持鏈接池的空閒數量,而後pm.max_children = 50就是最大的鏈接數量。和fpm的進程數量一致。

3、基於swoole的實現

  • swoole簡單介紹(更多參閱swoole官網)

      swoole是一個PHP實現異步網絡通訊的引擎或者擴展,其中實現了不少傳統PHP-fpm沒有的東西,例如異步的客戶端,異步Io,常駐內存,協程等等,一個個優秀的擴展,其中異步和協程等概念能應用於高併發場景。缺點是文檔和入門的門檻都比較高,須要排坑。附上swoole的運行流程和進程結構圖:sql

運行流程圖數據庫

進程/線程架構圖json

  • 基於swoole現實時的注意事項

首先,爲了減小你們對以後運行示例代碼產生沒必要要的天坑,先把注意事項和場景問題放前面:數組

一、程序中使用了協程的通訊管道channel(與go的chan差很少的),其中swoole2是不支持chan->pop($timeout)中timeout超時等待的,因此必須用swoole4版本swoole

二、使用swoole協程擴展的時候,必定不能裝xdebug之類的擴展,不然報錯。官方說明爲:https://wiki.swoole.com/wiki/page/674.html,同時參考以下了解更多關於swoole協程的使用和注意:https://wiki.swoole.com/wiki/page/749.html

三、筆者使用的環境爲:PHP 7.1.18和swoole4做爲這次開發的環境

  • 基於swoole現實鏈接池的方法

首先,這次利用swoole實現鏈接池,運用到swoole如下技術或者概念

一、鏈接變量池,這裏能夠看作一個數組或者隊列,利用swoole全局變量的常駐內存特性,只要變量沒主動unset掉,數組或隊列中的鏈接對象能夠一直保持,不釋放。主要參考:https://wiki.swoole.com/wiki/page/p-zend_mm.html

二、協程。協程是純用戶狀態的線程,經過協做的方式而不是搶佔的方式來切換。首先這次的鏈接池兩處用到協程:

  • 一個是mysql的協程客戶端,爲何要用協程客戶端,由於若是是用同步客戶端PDO,在一個進程處理內,就算有幾百個鏈接池,swoole worker進程中用普通的PDO方式,隨便併發多少個請求,每個請求都只能等上一個請求執行完畢,woker才處理下一個請求,這裏就算阻塞了。爲了讓一個worker支持阻塞切換出cpu去處理其餘請求,因此要用到協程的協助切換,或者異步客戶端也能夠,可是異步客戶端使用起來嵌套太多,很不方便。swoole協程能夠無感知的用同步的代碼編寫方式達到異步IO的效果和性能。
  • 第二個是底層實現了協程切換和調度的channel,如下詳述什麼是channel

三、Coroutine/channel通道,相似於go語言的chan,支持多生產者協程和多消費者協程。底層自動實現了協程的切換和調度。高併發時,容易出鏈接池爲空時,若是用通常的array或者splqueue()做爲介質存儲鏈接對象變量,不能產生阻塞等待其餘請求釋放的效果,也就是說只能直接返回null.。因此這裏用了一個swoole4協程中很牛逼的channel經過管道做爲存儲介質,它的出隊方法pop($timeout)能夠指定阻塞等待指定時間後返回。注意,是swoole2是沒有超時timeout的參數,不適用此場景。在go語言中,若是chan等待或者push了沒有消費或者生產一對一的狀況,是會發生死鎖。因此swoole4的timeout應該是爲了不無限等待爲空channel狀況而產生。主要參考:

https://wiki.swoole.com/wiki/page/p-coroutine_channel.html

channel切換的例子:

<?php
use \Swoole\Coroutine\Channel;
$chan = new Channel();
go(function () use ($chan) {
    echo "我是第一個協程,等待3秒內有push就執行返回" . PHP_EOL;
    $p = $chan->pop(2);#1
    echo "pop返回結果" . PHP_EOL;
    var_dump($p);
});
go(function () use ($chan) {
    co::sleep(1);#2
    $chan->push(1);
});
echo "main" . PHP_EOL;

#1處代碼會首先執行,而後遇到pop(),由於channel仍是空,會等待2s。此時協程會讓出cpu,跳到第二個協程執行,而後#2出睡眠1秒,push變量1進去channel後返回#1處繼續執行,成功取車經過中剛push的值1.運行結果爲:

若是把#2處的睡眠時間換成大於pop()的等待時間,結果是:

  • 根據這些特性最終實現鏈接池的抽象封裝類爲:
<?php
/**
 * 鏈接池封裝.
 * User: user
 * Date: 2018/9/1
 * Time: 13:36
 */

use Swoole\Coroutine\Channel;

abstract class AbstractPool
{
    private $min;//最少鏈接數
    private $max;//最大鏈接數
    private $count;//當前鏈接數
    private $connections;//鏈接池組
    protected $spareTime;//用於空閒鏈接回收判斷
    //數據庫配置
    protected $dbConfig = array(
        'host' => '10.0.2.2',
        'port' => 3306,
        'user' => 'root',
        'password' => 'root',
        'database' => 'test',
        'charset' => 'utf8',
        'timeout' => 2,
    );

    private $inited = false;

    protected abstract function createDb();

    public function __construct()
    {
        $this->min = 10;
        $this->max = 100;
        $this->spareTime = 10 * 3600;
        $this->connections = new Channel($this->max + 1);
    }

    protected function createObject()
    {
        $obj = null;
        $db = $this->createDb();
        if ($db) {
            $obj = [
                'last_used_time' => time(),
                'db' => $db,
            ];
        }
        return $obj;
    }

    /**
     * 初始換最小數量鏈接池
     * @return $this|null
     */
    public function init()
    {
        if ($this->inited) {
            return null;
        }
        for ($i = 0; $i < $this->min; $i++) {
            $obj = $this->createObject();
            $this->count++;
            $this->connections->push($obj);
        }
        return $this;
    }

    public function getConnection($timeOut = 3)
    {
        $obj = null;
        if ($this->connections->isEmpty()) {
            if ($this->count < $this->max) {//鏈接數沒達到最大,新建鏈接入池
                $this->count++;
                $obj = $this->createObject();
            } else {
                $obj = $this->connections->pop($timeOut);//timeout爲出隊的最大的等待時間
            }
        } else {
            $obj = $this->connections->pop($timeOut);
        }
        return $obj;
    }

    public function free($obj)
    {
        if ($obj) {
            $this->connections->push($obj);
        }
    }

    /**
     * 處理空閒鏈接
     */
    public function gcSpareObject()
    {
        //大約2分鐘檢測一次鏈接
        swoole_timer_tick(120000, function () {
            $list = [];
            /*echo "開始檢測回收空閒連接" . $this->connections->length() . PHP_EOL;*/
            if ($this->connections->length() < intval($this->max * 0.5)) {
                echo "請求鏈接數還比較多,暫不回收空閒鏈接\n";
            }#1
            while (true) {
                if (!$this->connections->isEmpty()) {
                    $obj = $this->connections->pop(0.001);
                    $last_used_time = $obj['last_used_time'];
                    if ($this->count > $this->min && (time() - $last_used_time > $this->spareTime)) {//回收
                        $this->count--;
                    } else {
                        array_push($list, $obj);
                    }
                } else {
                    break;
                }
            }
            foreach ($list as $item) {
                $this->connections->push($item);
            }
            unset($list);
        });
    }
}
  • 同步PDO客戶端下實現

<?php
/**
 * 數據庫鏈接池PDO方式
 * User: user
 * Date: 2018/9/8
 * Time: 11:30
 */
require "AbstractPool.php";

class MysqlPoolPdo extends AbstractPool
{
    protected $dbConfig = array(
        'host' => 'mysql:host=10.0.2.2:3306;dbname=test',
        'port' => 3306,
        'user' => 'root',
        'password' => 'root',
        'database' => 'test',
        'charset' => 'utf8',
        'timeout' => 2,
    );
    public static $instance;

    public static function getInstance()
    {
        if (is_null(self::$instance)) {
            self::$instance = new MysqlPoolPdo();
        }
        return self::$instance;
    }

    protected function createDb()
    {
        return new PDO($this->dbConfig['host'], $this->dbConfig['user'], $this->dbConfig['password']);
    }
}

$httpServer = new swoole_http_server('0.0.0.0', 9501);
$httpServer->set(
    ['worker_num' => 1]
);
$httpServer->on("WorkerStart", function () {
    MysqlPoolPdo::getInstance()->init();
});
$httpServer->on("request", function ($request, $response) {
    $db = null;
    $obj = MysqlPoolPdo::getInstance()->getConnection();
    if (!empty($obj)) {
        $db = $obj ? $obj['db'] : null;
    }
    if ($db) {
        $db->query("select sleep(2)");
        $ret = $db->query("select * from guestbook limit 1");
        MysqlPoolPdo::getInstance()->free($obj);
        $response->end(json_encode($ret));
    }
});
$httpServer->start();

代碼調用過程詳解:
一、server啓動時,調用init()方法初始化最少數量(min指定)的鏈接對象,放進類型爲channelle的connections對象中。在init中循環調用中,依賴了createObject()返回鏈接對象,而createObject()
中是調用了原本實現的抽象方法,初始化返回一個PDO db鏈接。因此此時,鏈接池connections中有min個對象。

二、server監聽用戶請求,當接收發請求時,調用鏈接數的getConnection()方法從connections通道中pop()一個對象。此時若是併發了10個請求,server由於配置了1個worker,因此再pop到一個對象返回時,遇到sleep()的查詢,由於用的鏈接對象是pdo的查詢,此時的woker進程只能等待,完成後才能進入下一個請求。所以,池中的其他鏈接實際上是多餘的,同步客戶端的請求速度只能和woker的數量有關。
三、查詢結束後,調用free()方法把鏈接對象放回connections池中。

ab -c 10 -n 10運行的結果,單個worker處理,select sleep(2) 查詢睡眠2s,同步客戶端方式總共運行時間爲20s以上,並且mysql的鏈接始終維持在一條。結果以下:

  • 協程客戶端Coroutine\MySQL方式的調用
<?php
/**
 * 數據庫鏈接池協程方式
 * User: user
 * Date: 2018/9/8
 * Time: 11:30
 */
require "AbstractPool.php";

class MysqlPoolCoroutine extends AbstractPool
{
    protected $dbConfig = array(
        'host' => '10.0.2.2',
        'port' => 3306,
        'user' => 'root',
        'password' => 'root',
        'database' => 'test',
        'charset' => 'utf8',
        'timeout' => 10,
    );
    public static $instance;

    public static function getInstance()
    {
        if (is_null(self::$instance)) {
            self::$instance = new MysqlPoolCoroutine();
        }
        return self::$instance;
    }

    protected function createDb()
    {
        $db = new Swoole\Coroutine\Mysql();
        $db->connect(
            $this->dbConfig
        );
        return $db;
    }
}

$httpServer = new swoole_http_server('0.0.0.0', 9501);
$httpServer->set(
    ['worker_num' => 1]
);
$httpServer->on("WorkerStart", function () {
    //MysqlPoolCoroutine::getInstance()->init()->gcSpareObject();
    MysqlPoolCoroutine::getInstance()->init();
});

$httpServer->on("request", function ($request, $response) {
    $db = null;
    $obj = MysqlPoolCoroutine::getInstance()->getConnection();
    if (!empty($obj)) {
        $db = $obj ? $obj['db'] : null;
    }
    if ($db) {
        $db->query("select sleep(2)");
        $ret = $db->query("select * from guestbook limit 1");
        MysqlPoolCoroutine::getInstance()->free($obj);
        $response->end(json_encode($ret));
    }
});
$httpServer->start();

代碼調用過程詳解
一、一樣的,協程客戶端方式下的調用,也是實現了以前封裝好的鏈接池類AbstractPool.php。只是createDb()的抽象方法用了swoole內置的協程客戶端去實現。
二、server啓動後,初始化都和同步同樣。不同的在獲取鏈接對象的時候,此時若是併發了10個請求,一樣是配置了1個worker進程在處理,可是在第一請求到達,pop出池中的一個鏈接對象,執行到query()方法,趕上sleep阻塞時,此時,woker進程不是在等待select的完成,而是切換到另外的協程去處理下一個請求。完成後一樣釋放對象到池中。當中有重點解釋的代碼段中getConnection()中。

public function getConnection($timeOut = 3)
    {
        $obj = null;
        if ($this->connections->isEmpty()) {
            if ($this->count < $this->max) {//鏈接數沒達到最大,新建鏈接入池
                $this->count++;
                $obj = $this->createObject();#1
            } else {
                $obj = $this->connections->pop($timeOut);#2 
            }
        } else {
            $obj = $this->connections->pop($timeOut);#3
        }
        return $obj;
    }

當調用到getConnection()時,若是此時因爲大量併發請求過多,鏈接池connections爲空,而沒達到最大鏈接max數量時時,代碼運行到#1處,調用了createObject(),新建鏈接返回;但若是鏈接池connections爲空,而到達了最大鏈接數max時,代碼運行到了#2處,也就是$this->connections->pop($timeOut),此時會阻塞$timeOut的時間,若是期間有連接釋放了,會成功獲取到,而後協程返回。超時沒獲取到,則返回false。

三、最後說一下協程Mysql客戶端一項重要配置,那就是代碼裏$dbConfig中timeout值的配置。這個配置是意思是最長的查詢等待時間。能夠看一個例子說明下:

go(function () {
    $start = microtime(true);
    $db = new Swoole\Coroutine\MySQL();
    $db->connect([
        'host' => '10.0.2.2',
        'port' => 3306,
        'user' => 'root',
        'password' => 'root',
        'database' => 'test',
        'timeout' => 4#1
    ]);
    $db->query("select sleep(5)");
    echo "我是第一個sleep五秒以後\n";
    $ret = $db->query("select user from guestbook limit 1");#2
    var_dump($ret);
    $use = microtime(true) - $start;
    echo "協程mysql輸出用時:" . $use . PHP_EOL;
});

#1處代碼,若是timeout配了4s查詢超時,而第一條查詢select sleep(5)阻塞後,協程切換到下一條sql的執行,其實$db並不能執行成功,由於用一個鏈接,同一個協程中,其實執行是同步的,因此此時第二條查詢在等待4s超時後,沒獲取到db的鏈接執行,就會執行失敗。而若是第一條查詢執行的時間少於這個timeout,那麼會執行查詢成功。猜猜上面執行用時多少?結果以下:

若是把timeout換成6s呢,結果以下:

因此要注意的是,協程的客戶端內執行實際上是同步的,不要理解爲異步,它只是遇到IO阻塞時能讓出執行權,切換到其餘協程而已,不能和異步混淆。

ab -c 10 -n 10運行的結果,單個worker處理,select sleep(2) 查詢睡眠2s,協程客戶端方式總共運行時間爲2s多。結果以下:

數據庫此時的鏈接數爲10條(show full PROCESSLIST):

再嘗試 ab -c 200 -n 1000 http://127.0.0.1:9501/,200多個併發的處理,時間是20多秒,mysql鏈接數達到指定的最大值100個。結果以下:

 

4、後言

如今鏈接池基本實現了高併發時的鏈接分配和控制,可是還有一些細節要處理,例如:

  • 併發時,創建了max個池對象,不能一直在池中維護這麼多,要在請求空閒時,把鏈接池的數量維持在一個空閒值內。這裏是簡單作了gcSpareObject()的方法實現空閒處理。直接在初始化woker的時候調用:MysqlPoolCoroutine::getInstance()->init()->gcSpareObject();就會定時檢測回收。問題是如何判斷程序比較空閒,值得再去優化。
  • 定時檢測鏈接時候是活的,剔除死鏈
  • 假如程序忘記調用free()釋放對象到池,是否有更好方法避免這種狀況?

對於以上,但願各大神看到後,能提供不錯的意見!

相關文章
相關標籤/搜索