碼雲代碼倉庫:https://gitee.com/tanjiajun/MysqlPoolphp
代碼倉庫:https://github.com/asbectJ/swoole4.githtml
在寫這篇文章以前,看了好幾篇實現鏈接池的文章,都是寫的很很差的。擺明忽略了鏈接池的不少特性,不少都不具備抗高併發和鏈接複用。因此本身以爲有必須把最近幾天,實現一個比較完整的php數據庫鏈接池的點滴記錄下來,望能幫助各位,感激者望多點贊和打賞。mysql
所謂的數據庫鏈接池,通常指的就是程序和數據庫保持必定數量的數據庫鏈接不斷開,而且各請求的鏈接能夠相互複用,減小重複新建數據庫鏈接的消耗和避免在高併發的狀況下出現數據庫max connections等錯誤。本身總結了一下,若是要實現一個數據庫鏈接池,通常有幾個特色:git
總結幾個特性後,一個基本鏈接池,大體要實現下圖功能:github
swoole是一個PHP實現異步網絡通訊的引擎或者擴展,其中實現了不少傳統PHP-fpm沒有的東西,例如異步的客戶端,異步Io,常駐內存,協程等等,一個個優秀的擴展,其中異步和協程等概念能應用於高併發場景。缺點是文檔和入門的門檻都比較高,須要排坑。附上swoole的運行流程和進程結構圖:sql
運行流程圖數據庫
進程/線程架構圖json
首先,爲了減小你們對以後運行示例代碼產生沒必要要的天坑,先把注意事項和場景問題放前面:數組
一、程序中使用了協程的通訊管道channel(與go的chan差很少的),其中swoole2是不支持chan->pop($timeout)中timeout超時等待的,因此必須用swoole4版本swoole
二、使用swoole協程擴展的時候,必定不能裝xdebug之類的擴展,不然報錯。官方說明爲:https://wiki.swoole.com/wiki/page/674.html,同時參考以下了解更多關於swoole協程的使用和注意:https://wiki.swoole.com/wiki/page/749.html
三、筆者使用的環境爲:PHP 7.1.18和swoole4做爲這次開發的環境
首先,這次利用swoole實現鏈接池,運用到swoole如下技術或者概念
一、鏈接變量池,這裏能夠看作一個數組或者隊列,利用swoole全局變量的常駐內存特性,只要變量沒主動unset掉,數組或隊列中的鏈接對象能夠一直保持,不釋放。主要參考:https://wiki.swoole.com/wiki/page/p-zend_mm.html
二、協程。協程是純用戶狀態的線程,經過協做的方式而不是搶佔的方式來切換。首先這次的鏈接池兩處用到協程:
三、Coroutine/channel通道,相似於go
語言的chan
,支持多生產者協程和多消費者協程。底層自動實現了協程的切換和調度。高併發時,容易出鏈接池爲空時,若是用通常的array或者splqueue()做爲介質存儲鏈接對象變量,不能產生阻塞等待其餘請求釋放的效果,也就是說只能直接返回null.。因此這裏用了一個swoole4協程中很牛逼的channel經過管道做爲存儲介質,它的出隊方法pop($timeout)能夠指定阻塞等待指定時間後返回。注意,是swoole2是沒有超時timeout的參數,不適用此場景。在go語言中,若是chan等待或者push了沒有消費或者生產一對一的狀況,是會發生死鎖。因此swoole4的timeout應該是爲了不無限等待爲空channel狀況而產生。主要參考:
https://wiki.swoole.com/wiki/page/p-coroutine_channel.html
channel切換的例子:
<?php use \Swoole\Coroutine\Channel; $chan = new Channel(); go(function () use ($chan) { echo "我是第一個協程,等待3秒內有push就執行返回" . PHP_EOL; $p = $chan->pop(2);#1 echo "pop返回結果" . PHP_EOL; var_dump($p); }); go(function () use ($chan) { co::sleep(1);#2 $chan->push(1); }); echo "main" . PHP_EOL;
#1處代碼會首先執行,而後遇到pop(),由於channel仍是空,會等待2s。此時協程會讓出cpu,跳到第二個協程執行,而後#2出睡眠1秒,push變量1進去channel後返回#1處繼續執行,成功取車經過中剛push的值1.運行結果爲:
若是把#2處的睡眠時間換成大於pop()的等待時間,結果是:
<?php /** * 鏈接池封裝. * User: user * Date: 2018/9/1 * Time: 13:36 */ use Swoole\Coroutine\Channel; abstract class AbstractPool { private $min;//最少鏈接數 private $max;//最大鏈接數 private $count;//當前鏈接數 private $connections;//鏈接池組 protected $spareTime;//用於空閒鏈接回收判斷 //數據庫配置 protected $dbConfig = array( 'host' => '10.0.2.2', 'port' => 3306, 'user' => 'root', 'password' => 'root', 'database' => 'test', 'charset' => 'utf8', 'timeout' => 2, ); private $inited = false; protected abstract function createDb(); public function __construct() { $this->min = 10; $this->max = 100; $this->spareTime = 10 * 3600; $this->connections = new Channel($this->max + 1); } protected function createObject() { $obj = null; $db = $this->createDb(); if ($db) { $obj = [ 'last_used_time' => time(), 'db' => $db, ]; } return $obj; } /** * 初始換最小數量鏈接池 * @return $this|null */ public function init() { if ($this->inited) { return null; } for ($i = 0; $i < $this->min; $i++) { $obj = $this->createObject(); $this->count++; $this->connections->push($obj); } return $this; } public function getConnection($timeOut = 3) { $obj = null; if ($this->connections->isEmpty()) { if ($this->count < $this->max) {//鏈接數沒達到最大,新建鏈接入池 $this->count++; $obj = $this->createObject(); } else { $obj = $this->connections->pop($timeOut);//timeout爲出隊的最大的等待時間 } } else { $obj = $this->connections->pop($timeOut); } return $obj; } public function free($obj) { if ($obj) { $this->connections->push($obj); } } /** * 處理空閒鏈接 */ public function gcSpareObject() { //大約2分鐘檢測一次鏈接 swoole_timer_tick(120000, function () { $list = []; /*echo "開始檢測回收空閒連接" . $this->connections->length() . PHP_EOL;*/ if ($this->connections->length() < intval($this->max * 0.5)) { echo "請求鏈接數還比較多,暫不回收空閒鏈接\n"; }#1 while (true) { if (!$this->connections->isEmpty()) { $obj = $this->connections->pop(0.001); $last_used_time = $obj['last_used_time']; if ($this->count > $this->min && (time() - $last_used_time > $this->spareTime)) {//回收 $this->count--; } else { array_push($list, $obj); } } else { break; } } foreach ($list as $item) { $this->connections->push($item); } unset($list); }); } }
同步PDO客戶端下實現
<?php /** * 數據庫鏈接池PDO方式 * User: user * Date: 2018/9/8 * Time: 11:30 */ require "AbstractPool.php"; class MysqlPoolPdo extends AbstractPool { protected $dbConfig = array( 'host' => 'mysql:host=10.0.2.2:3306;dbname=test', 'port' => 3306, 'user' => 'root', 'password' => 'root', 'database' => 'test', 'charset' => 'utf8', 'timeout' => 2, ); public static $instance; public static function getInstance() { if (is_null(self::$instance)) { self::$instance = new MysqlPoolPdo(); } return self::$instance; } protected function createDb() { return new PDO($this->dbConfig['host'], $this->dbConfig['user'], $this->dbConfig['password']); } } $httpServer = new swoole_http_server('0.0.0.0', 9501); $httpServer->set( ['worker_num' => 1] ); $httpServer->on("WorkerStart", function () { MysqlPoolPdo::getInstance()->init(); }); $httpServer->on("request", function ($request, $response) { $db = null; $obj = MysqlPoolPdo::getInstance()->getConnection(); if (!empty($obj)) { $db = $obj ? $obj['db'] : null; } if ($db) { $db->query("select sleep(2)"); $ret = $db->query("select * from guestbook limit 1"); MysqlPoolPdo::getInstance()->free($obj); $response->end(json_encode($ret)); } }); $httpServer->start();
代碼調用過程詳解:
一、server啓動時,調用init()方法初始化最少數量(min指定)的鏈接對象,放進類型爲channelle的connections對象中。在init中循環調用中,依賴了createObject()返回鏈接對象,而createObject()
中是調用了原本實現的抽象方法,初始化返回一個PDO db鏈接。因此此時,鏈接池connections中有min個對象。
二、server監聽用戶請求,當接收發請求時,調用鏈接數的getConnection()方法從connections通道中pop()一個對象。此時若是併發了10個請求,server由於配置了1個worker,因此再pop到一個對象返回時,遇到sleep()的查詢,由於用的鏈接對象是pdo的查詢,此時的woker進程只能等待,完成後才能進入下一個請求。所以,池中的其他鏈接實際上是多餘的,同步客戶端的請求速度只能和woker的數量有關。
三、查詢結束後,調用free()方法把鏈接對象放回connections池中。
ab -c 10 -n 10運行的結果,單個worker處理,select sleep(2) 查詢睡眠2s,同步客戶端方式總共運行時間爲20s以上,並且mysql的鏈接始終維持在一條。結果以下:
<?php /** * 數據庫鏈接池協程方式 * User: user * Date: 2018/9/8 * Time: 11:30 */ require "AbstractPool.php"; class MysqlPoolCoroutine extends AbstractPool { protected $dbConfig = array( 'host' => '10.0.2.2', 'port' => 3306, 'user' => 'root', 'password' => 'root', 'database' => 'test', 'charset' => 'utf8', 'timeout' => 10, ); public static $instance; public static function getInstance() { if (is_null(self::$instance)) { self::$instance = new MysqlPoolCoroutine(); } return self::$instance; } protected function createDb() { $db = new Swoole\Coroutine\Mysql(); $db->connect( $this->dbConfig ); return $db; } } $httpServer = new swoole_http_server('0.0.0.0', 9501); $httpServer->set( ['worker_num' => 1] ); $httpServer->on("WorkerStart", function () { //MysqlPoolCoroutine::getInstance()->init()->gcSpareObject(); MysqlPoolCoroutine::getInstance()->init(); }); $httpServer->on("request", function ($request, $response) { $db = null; $obj = MysqlPoolCoroutine::getInstance()->getConnection(); if (!empty($obj)) { $db = $obj ? $obj['db'] : null; } if ($db) { $db->query("select sleep(2)"); $ret = $db->query("select * from guestbook limit 1"); MysqlPoolCoroutine::getInstance()->free($obj); $response->end(json_encode($ret)); } }); $httpServer->start();
代碼調用過程詳解
一、一樣的,協程客戶端方式下的調用,也是實現了以前封裝好的鏈接池類AbstractPool.php。只是createDb()的抽象方法用了swoole內置的協程客戶端去實現。
二、server啓動後,初始化都和同步同樣。不同的在獲取鏈接對象的時候,此時若是併發了10個請求,一樣是配置了1個worker進程在處理,可是在第一請求到達,pop出池中的一個鏈接對象,執行到query()方法,趕上sleep阻塞時,此時,woker進程不是在等待select的完成,而是切換到另外的協程去處理下一個請求。完成後一樣釋放對象到池中。當中有重點解釋的代碼段中getConnection()中。
public function getConnection($timeOut = 3) { $obj = null; if ($this->connections->isEmpty()) { if ($this->count < $this->max) {//鏈接數沒達到最大,新建鏈接入池 $this->count++; $obj = $this->createObject();#1 } else { $obj = $this->connections->pop($timeOut);#2 } } else { $obj = $this->connections->pop($timeOut);#3 } return $obj; }
當調用到getConnection()時,若是此時因爲大量併發請求過多,鏈接池connections爲空,而沒達到最大鏈接max數量時時,代碼運行到#1處,調用了createObject(),新建鏈接返回;但若是鏈接池connections爲空,而到達了最大鏈接數max時,代碼運行到了#2處,也就是$this->connections->pop($timeOut),此時會阻塞$timeOut的時間,若是期間有連接釋放了,會成功獲取到,而後協程返回。超時沒獲取到,則返回false。
三、最後說一下協程Mysql客戶端一項重要配置,那就是代碼裏$dbConfig中timeout值的配置。這個配置是意思是最長的查詢等待時間。能夠看一個例子說明下:
go(function () { $start = microtime(true); $db = new Swoole\Coroutine\MySQL(); $db->connect([ 'host' => '10.0.2.2', 'port' => 3306, 'user' => 'root', 'password' => 'root', 'database' => 'test', 'timeout' => 4#1 ]); $db->query("select sleep(5)"); echo "我是第一個sleep五秒以後\n"; $ret = $db->query("select user from guestbook limit 1");#2 var_dump($ret); $use = microtime(true) - $start; echo "協程mysql輸出用時:" . $use . PHP_EOL; });
#1處代碼,若是timeout配了4s查詢超時,而第一條查詢select sleep(5)阻塞後,協程切換到下一條sql的執行,其實$db並不能執行成功,由於用一個鏈接,同一個協程中,其實執行是同步的,因此此時第二條查詢在等待4s超時後,沒獲取到db的鏈接執行,就會執行失敗。而若是第一條查詢執行的時間少於這個timeout,那麼會執行查詢成功。猜猜上面執行用時多少?結果以下:
若是把timeout換成6s呢,結果以下:
因此要注意的是,協程的客戶端內執行實際上是同步的,不要理解爲異步,它只是遇到IO阻塞時能讓出執行權,切換到其餘協程而已,不能和異步混淆。
ab -c 10 -n 10運行的結果,單個worker處理,select sleep(2) 查詢睡眠2s,協程客戶端方式總共運行時間爲2s多。結果以下:
數據庫此時的鏈接數爲10條(show full PROCESSLIST):
再嘗試 ab -c 200 -n 1000 http://127.0.0.1:9501/,200多個併發的處理,時間是20多秒,mysql鏈接數達到指定的最大值100個。結果以下:
如今鏈接池基本實現了高併發時的鏈接分配和控制,可是還有一些細節要處理,例如:
對於以上,但願各大神看到後,能提供不錯的意見!