swoole 连接池php fpm,【转】swoole4实现数据库连接池
前言
在寫這篇文章之前,看了好幾篇實現連接池的文章,都是寫的很不好的。擺明忽略了連接池的很多特性,很多都不具有抗高并發和連接復用。所以自己覺得有必須把最近幾天,實現一個比較完整的php數據庫連接池的點滴記錄下來,望能幫助各位,感激者望多點贊和打賞。
一、數據庫連接池基本概念
所謂的數據庫連接池,一般指的就是程序和數據庫保持一定數量的數據庫連接不斷開,并且各請求的連接可以相互復用,減少重復新建數據庫連接的消耗和避免在高并發的情況下出現數據庫max connections等錯誤。自己總結了一下,如果要實現一個數據庫連接池,一般有幾個特點:
連接復用,不同的請求連接,可以放回池中,等待下個請求發分配和調用
連接數量一般維持min-max的最大最少值之間
對于空閑連接的回收
可以抗一定程度的高并發,也就是說當一次并發請求完池中所有的連接時,獲取不到連接的請求可等待其他連接的釋放
總結幾個特性后,一個基本連接池,大致要實現下圖功能:
創建連接:連接池啟動后,初始化一定的空閑連接,指定為最少的連接min。當連接池為空,不夠用時,創建新的連接放到池里,但不能超過指定的最大連接max數量。
連接釋放:每次使用完連接,一定要調用釋放方法,把連接放回池中,給其他程序或請求使用。
連接分配:連接池中用pop和push的方式對等入隊和出隊分配與回收。能實現阻塞分配,也就是在池空并且已創建數量大于max,阻塞一定時間等待其他請求的連接釋放,超時則返回null。
連接管理:對連接池中的連接,定時檢活和釋放空閑連接等
二、Fpm+數據庫長連接的實現
利用fpm實現:例如你要實例一個100連接數的池,開啟100個空閑fpm,然后每個fpm的連接都是數據庫長連接。一般pm.max_spare_servers = 8這個配置項就是維持連接池的空閑數量,然后pm.max_children = 50就是最大的連接數量。和fpm的進程數量一致。
三、基于swoole的實現
swoole簡單介紹(更多參閱swoole官網)
swoole是一個PHP實現異步網絡通信的引擎或者擴展,其中實現了很多傳統PHP-fpm沒有的東西,例如異步的客戶端,異步Io,常駐內存,協程等等,一個個優秀的擴展,其中異步和協程等概念能應用于高并發場景。缺點是文檔和入門的門檻都比較高,需要排坑。附上swoole的運行流程和進程結構圖:
運行流程圖
進程/線程架構圖
基于swoole現實時的注意事項
首先,為了減少大家對之后運行示例代碼產生不必要的天坑,先把注意事項和場景問題放前面:
1、程序中使用了協程的通信管道channel(與go的chan差不多的),其中swoole2是不支持chan->pop($timeout)中timeout超時等待的,所以必須用swoole4版本
3、筆者使用的環境為:PHP 7.1.18和swoole4作為此次開發的環境
基于swoole現實連接池的方法
首先,此次利用swoole實現連接池,運用到swoole以下技術或者概念
1、連接變量池,這里可以看做一個數組或者隊列,利用swoole全局變量的常駐內存特性,只要變量沒主動unset掉,數組或隊列中的連接對象可以一直保持,不釋放。主要參考:https://wiki.swoole.com/wiki/page/p-zend_mm.html
2、協程。協程是純用戶狀態的線程,通過協作的方式而不是搶占的方式來切換。首先此次的連接池兩處用到協程:
一個是mysql的協程客戶端,為什么要用協程客戶端,因為如果是用同步客戶端PDO,在一個進程處理內,就算有幾百個連接池,swoole worker進程中用普通的PDO方式,隨便并發多少個請求,每一個請求都只能等上一個請求執行完畢,woker才處理下一個請求,這里就算阻塞了。為了讓一個worker支持阻塞切換出cpu去處理其他請求,所以要用到協程的協助切換,或者異步客戶端也可以,但是異步客戶端使用起來嵌套太多,很不方便。swoole協程可以無感知的用同步的代碼編寫方式達到異步IO的效果和性能。
第二個是底層實現了協程切換和調度的channel,以下詳述什么是channel
3、Coroutine/channel通道,類似于go語言的chan,支持多生產者協程和多消費者協程。底層自動實現了協程的切換和調度。高并發時,容易出連接池為空時,如果用一般的array或者splqueue()作為介質存儲連接對象變量,不能產生阻塞等待其他請求釋放的效果,也就是說只能直接返回null.。所以這里用了一個swoole4協程中很牛逼的channel通過管道作為存儲介質,它的出隊方法pop($timeout)可以指定阻塞等待指定時間后返回。注意,是swoole2是沒有超時timeout的參數,不適用此場景。在go語言中,如果chan等待或者push了沒有消費或者生產一對一的情況,是會發生死鎖。所以swoole4的timeout應該是為了避免無限等待為空channel情況而產生。主要參考:
channel切換的例子:
use \Swoole\Coroutine\Channel;
$chan = new Channel();
go(function () use ($chan) {
echo "我是第一個協程,等待3秒內有push就執行返回" . PHP_EOL;
$p = $chan->pop(2);#1
echo "pop返回結果" . PHP_EOL;
var_dump($p);
});
go(function () use ($chan) {
co::sleep(1);#2
$chan->push(1);
});
echo "main" . PHP_EOL;
#1處代碼會首先執行,然后遇到pop(),因為channel還是空,會等待2s。此時協程會讓出cpu,跳到第二個協程執行,然后#2出睡眠1秒,push變量1進去channel后返回#1處繼續執行,成功取車通過中剛push的值1.運行結果為:
如果把#2處的睡眠時間換成大于pop()的等待時間,結果是:
根據這些特性最終實現連接池的抽象封裝類為:
/**
* 連接池封裝.
* User: user
* Date: 2018/9/1
* Time: 13:36
*/
use Swoole\Coroutine\Channel;
abstract class AbstractPool
{
private $min;//最少連接數
private $max;//最大連接數
private $count;//當前連接數
private $connections;//連接池組
protected $spareTime;//用于空閑連接回收判斷
//數據庫配置
protected $dbConfig = array(
'host' => '10.0.2.2',
'port' => 3306,
'user' => 'root',
'password' => 'root',
'database' => 'test',
'charset' => 'utf8',
'timeout' => 2,
);
private $inited = false;
protected abstract function createDb();
public function __construct()
{
$this->min = 10;
$this->max = 100;
$this->spareTime = 10 * 3600;
$this->connections = new Channel($this->max + 1);
}
protected function createObject()
{
$obj = null;
$db = $this->createDb();
if ($db) {
$obj = [
'last_used_time' => time(),
'db' => $db,
];
}
return $obj;
}
/**
* 初始換最小數量連接池
* @return $this|null
*/
public function init()
{
if ($this->inited) {
return null;
}
for ($i = 0; $i < $this->min; $i++) {
$obj = $this->createObject();
$this->count++;
$this->connections->push($obj);
}
return $this;
}
public function getConnection($timeOut = 3)
{
$obj = null;
if ($this->connections->isEmpty()) {
if ($this->count < $this->max) {//連接數沒達到最大,新建連接入池
$this->count++;
$obj = $this->createObject();
} else {
$obj = $this->connections->pop($timeOut);//timeout為出隊的最大的等待時間
}
} else {
$obj = $this->connections->pop($timeOut);
}
return $obj;
}
public function free($obj)
{
if ($obj) {
$this->connections->push($obj);
}
}
/**
* 處理空閑連接
*/
public function gcSpareObject()
{
//大約2分鐘檢測一次連接
swoole_timer_tick(120000, function () {
$list = [];
/*echo "開始檢測回收空閑鏈接" . $this->connections->length() . PHP_EOL;*/
if ($this->connections->length() < intval($this->max * 0.5)) {
echo "請求連接數還比較多,暫不回收空閑連接\n";
}#1
while (true) {
if (!$this->connections->isEmpty()) {
$obj = $this->connections->pop(0.001);
$last_used_time = $obj['last_used_time'];
if ($this->count > $this->min && (time() - $last_used_time > $this->spareTime)) {//回收
$this->count--;
} else {
array_push($list, $obj);
}
} else {
break;
}
}
foreach ($list as $item) {
$this->connections->push($item);
}
unset($list);
});
}
}
同步PDO客戶端下實現
/**
* 數據庫連接池PDO方式
* User: user
* Date: 2018/9/8
* Time: 11:30
*/
require "AbstractPool.php";
class MysqlPoolPdo extends AbstractPool
{
protected $dbConfig = array(
'host' => 'mysql:host=10.0.2.2:3306;dbname=test',
'port' => 3306,
'user' => 'root',
'password' => 'root',
'database' => 'test',
'charset' => 'utf8',
'timeout' => 2,
);
public static $instance;
public static function getInstance()
{
if (is_null(self::$instance)) {
self::$instance = new MysqlPoolPdo();
}
return self::$instance;
}
protected function createDb()
{
return new PDO($this->dbConfig['host'], $this->dbConfig['user'], $this->dbConfig['password']);
}
}
$httpServer = new swoole_http_server('0.0.0.0', 9501);
$httpServer->set(
['worker_num' => 1]
);
$httpServer->on("WorkerStart", function () {
MysqlPoolPdo::getInstance()->init();
});
$httpServer->on("request", function ($request, $response) {
$db = null;
$obj = MysqlPoolPdo::getInstance()->getConnection();
if (!empty($obj)) {
$db = $obj ? $obj['db'] : null;
}
if ($db) {
$db->query("select sleep(2)");
$ret = $db->query("select * from guestbook limit 1");
MysqlPoolPdo::getInstance()->free($obj);
$response->end(json_encode($ret));
}
});
$httpServer->start();
代碼調用過程詳解:
1、server啟動時,調用init()方法初始化最少數量(min指定)的連接對象,放進類型為channelle的connections對象中。在init中循環調用中,依賴了createObject()返回連接對象,而createObject()
中是調用了本來實現的抽象方法,初始化返回一個PDO db連接。所以此時,連接池connections中有min個對象。
2、server監聽用戶請求,當接收發請求時,調用連接數的getConnection()方法從connections通道中pop()一個對象。此時如果并發了10個請求,server因為配置了1個worker,所以再pop到一個對象返回時,遇到sleep()的查詢,因為用的連接對象是pdo的查詢,此時的woker進程只能等待,完成后才能進入下一個請求。因此,池中的其余連接其實是多余的,同步客戶端的請求速度只能和woker的數量有關。
3、查詢結束后,調用free()方法把連接對象放回connections池中。
ab -c 10 -n 10運行的結果,單個worker處理,select sleep(2) 查詢睡眠2s,同步客戶端方式總共運行時間為20s以上,而且mysql的連接始終維持在一條。結果如下:
協程客戶端Coroutine\MySQL方式的調用
/**
* 數據庫連接池協程方式
* User: user
* Date: 2018/9/8
* Time: 11:30
*/
require "AbstractPool.php";
class MysqlPoolCoroutine extends AbstractPool
{
protected $dbConfig = array(
'host' => '10.0.2.2',
'port' => 3306,
'user' => 'root',
'password' => 'root',
'database' => 'test',
'charset' => 'utf8',
'timeout' => 10,
);
public static $instance;
public static function getInstance()
{
if (is_null(self::$instance)) {
self::$instance = new MysqlPoolCoroutine();
}
return self::$instance;
}
protected function createDb()
{
$db = new Swoole\Coroutine\Mysql();
$db->connect(
$this->dbConfig
);
return $db;
}
}
$httpServer = new swoole_http_server('0.0.0.0', 9501);
$httpServer->set(
['worker_num' => 1]
);
$httpServer->on("WorkerStart", function () {
//MysqlPoolCoroutine::getInstance()->init()->gcSpareObject();
MysqlPoolCoroutine::getInstance()->init();
});
$httpServer->on("request", function ($request, $response) {
$db = null;
$obj = MysqlPoolCoroutine::getInstance()->getConnection();
if (!empty($obj)) {
$db = $obj ? $obj['db'] : null;
}
if ($db) {
$db->query("select sleep(2)");
$ret = $db->query("select * from guestbook limit 1");
MysqlPoolCoroutine::getInstance()->free($obj);
$response->end(json_encode($ret));
}
});
$httpServer->start();
代碼調用過程詳解
1、同樣的,協程客戶端方式下的調用,也是實現了之前封裝好的連接池類AbstractPool.php。只是createDb()的抽象方法用了swoole內置的協程客戶端去實現。
2、server啟動后,初始化都和同步一樣。不一樣的在獲取連接對象的時候,此時如果并發了10個請求,同樣是配置了1個worker進程在處理,但是在第一請求到達,pop出池中的一個連接對象,執行到query()方法,遇上sleep阻塞時,此時,woker進程不是在等待select的完成,而是切換到另外的協程去處理下一個請求。完成后同樣釋放對象到池中。當中有重點解釋的代碼段中getConnection()中。
public function getConnection($timeOut = 3)
{
$obj = null;
if ($this->connections->isEmpty()) {
if ($this->count < $this->max) {//連接數沒達到最大,新建連接入池
$this->count++;
$obj = $this->createObject();#1
} else {
$obj = $this->connections->pop($timeOut);#2
}
} else {
$obj = $this->connections->pop($timeOut);#3
}
return $obj;
}
當調用到getConnection()時,如果此時由于大量并發請求過多,連接池connections為空,而沒達到最大連接max數量時時,代碼運行到#1處,調用了createObject(),新建連接返回;但如果連接池connections為空,而到達了最大連接數max時,代碼運行到了#2處,也就是$this->connections->pop($timeOut),此時會阻塞$timeOut的時間,如果期間有鏈接釋放了,會成功獲取到,然后協程返回。超時沒獲取到,則返回false。
3、最后說一下協程Mysql客戶端一項重要配置,那就是代碼里$dbConfig中timeout值的配置。這個配置是意思是最長的查詢等待時間。可以看一個例子說明下:
go(function () {
$start = microtime(true);
$db = new Swoole\Coroutine\MySQL();
$db->connect([
'host' => '10.0.2.2',
'port' => 3306,
'user' => 'root',
'password' => 'root',
'database' => 'test',
'timeout' => 4#1
]);
$db->query("select sleep(5)");
echo "我是第一個sleep五秒之后\n";
$ret = $db->query("select user from guestbook limit 1");#2
var_dump($ret);
$use = microtime(true) - $start;
echo "協程mysql輸出用時:" . $use . PHP_EOL;
});
#1處代碼,如果timeout配了4s查詢超時,而第一條查詢select sleep(5)阻塞后,協程切換到下一條sql的執行,其實$db并不能執行成功,因為用一個連接,同一個協程中,其實執行是同步的,所以此時第二條查詢在等待4s超時后,沒獲取到db的連接執行,就會執行失敗。而如果第一條查詢執行的時間少于這個timeout,那么會執行查詢成功。猜猜上面執行用時多少?結果如下:
如果把timeout換成6s呢,結果如下:
所以要注意的是,協程的客戶端內執行其實是同步的,不要理解為異步,它只是遇到IO阻塞時能讓出執行權,切換到其他協程而已,不能和異步混淆。
ab -c 10 -n 10運行的結果,單個worker處理,select sleep(2) 查詢睡眠2s,協程客戶端方式總共運行時間為2s多。結果如下:
數據庫此時的連接數為10條(show full PROCESSLIST):
再嘗試 ab -c 200 -n 1000 http://127.0.0.1:9501/,200多個并發的處理,時間是20多秒,mysql連接數達到指定的最大值100個。結果如下:
四、后言
現在連接池基本實現了高并發時的連接分配和控制,但是還有一些細節要處理,例如:
并發時,建立了max個池對象,不能一直在池中維護這么多,要在請求空閑時,把連接池的數量維持在一個空閑值內。這里是簡單做了gcSpareObject()的方法實現空閑處理。直接在初始化woker的時候調用:MysqlPoolCoroutine::getInstance()->init()->gcSpareObject();就會定時檢測回收。問題是如何判斷程序比較空閑,值得再去優化。
定時檢測連接時候是活的,剔除死鏈
假如程序忘記調用free()釋放對象到池,是否有更好方法避免這種情況?
對于以上,希望各大神看到后,能提供不錯的意見!
總結
以上是生活随笔為你收集整理的swoole 连接池php fpm,【转】swoole4实现数据库连接池的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: python求反余弦_python 反余
- 下一篇: php-ftm,FTM/MTF的激素种类