日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 运维知识 > 数据库 >内容正文

数据库

swoole mysql 并发_Swoole4 如何打造高并发的PHP7协程Mysql连接池?

發(fā)布時(shí)間:2025/3/19 数据库 22 豆豆
生活随笔 收集整理的這篇文章主要介紹了 swoole mysql 并发_Swoole4 如何打造高并发的PHP7协程Mysql连接池? 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

一、數(shù)據(jù)庫連接池基本概念

所謂的數(shù)據(jù)庫連接池,一般指的就是程序和數(shù)據(jù)庫保持一定數(shù)量的數(shù)據(jù)庫連接不斷開,并且各請(qǐng)求的連接可以相互復(fù)用,減少重復(fù)新建數(shù)據(jù)庫連接的消耗和避免在高并發(fā)的情況下出現(xiàn)數(shù)據(jù)庫max connections等錯(cuò)誤。自己總結(jié)一下,如果要實(shí)現(xiàn)一個(gè)數(shù)據(jù)庫連接池,一般有幾個(gè)特點(diǎn):連接復(fù)用,不同的請(qǐng)求連接,可以放回池中,等待下個(gè)請(qǐng)求發(fā)分配和調(diào)用

連接數(shù)量一般維持min-max的最大最少值之間

對(duì)于空閑連接的回收

可以抗一定程度的高并發(fā),也就是說當(dāng)一次并發(fā)請(qǐng)求完池中所有的連接時(shí),獲取不到連接的請(qǐng)求可等待其他連接的釋放

總結(jié)幾個(gè)特性后,一個(gè)基本連接池,大致要實(shí)現(xiàn)下圖功能:

創(chuàng)建連接:連接池啟動(dòng)后,初始化一定的空閑連接,指定為最少的連接min。當(dāng)連接池為空,不夠用時(shí),創(chuàng)建新的連接放到池里,但不能超過指定的最大連接max數(shù)量。

連接釋放:每次使用完連接,一定要調(diào)用釋放方法,把連接放回池中,給其他程序或請(qǐng)求使用。

連接分配:連接池中用pop和push的方式對(duì)等入隊(duì)和出隊(duì)分配與回收。能實(shí)現(xiàn)阻塞分配,也就是在池空并且已創(chuàng)建數(shù)量大于max,阻塞一定時(shí)間等待其他請(qǐng)求的連接釋放,超時(shí)則返回null。

連接管理:對(duì)連接池中的連接,定時(shí)檢活和釋放空閑連接等

二、Fpm+數(shù)據(jù)庫長連接的實(shí)現(xiàn)利用fpm實(shí)現(xiàn):例如你要實(shí)例一個(gè)100連接數(shù)的池,開啟100個(gè)空閑fpm,然后每個(gè)fpm的連接都是數(shù)據(jù)庫長連接。一般pm.max_spare_servers = 8這個(gè)配置項(xiàng)就是維持連接池的空閑數(shù)量,然后pm.max_children = 50就是最大的連接數(shù)量。和fpm的進(jìn)程數(shù)量一致。

三、基于swoole的實(shí)現(xiàn)swoole簡單介紹(更多參閱swoole官網(wǎng))

swoole是一個(gè)PHP實(shí)現(xiàn)異步網(wǎng)絡(luò)通信的引擎或者擴(kuò)展,其中實(shí)現(xiàn)了很多傳統(tǒng)PHP-fpm沒有的東西,例如異步的客戶端,異步Io,常駐內(nèi)存,協(xié)程等等,一個(gè)個(gè)優(yōu)秀的擴(kuò)展,其中異步和協(xié)程等概念能應(yīng)用于高并發(fā)場景。缺點(diǎn)是文檔和入門的門檻都比較高,需要排坑。附上swoole的運(yùn)行流程和進(jìn)程結(jié)構(gòu)圖:

運(yùn)行流程圖

進(jìn)程/線程架構(gòu)圖

基于swoole現(xiàn)實(shí)時(shí)的注意事項(xiàng)

首先,為了減少大家對(duì)之后運(yùn)行示例代碼產(chǎn)生不必要的天坑,先把注意事項(xiàng)和場景問題放前面:

1、程序中使用了協(xié)程的通信管道channel(與go的chan差不多的),其中swoole2是不支持chan->pop($timeout)中timeout超時(shí)等待的,所以必須用swoole4版本

3、筆者使用的環(huán)境為:PHP 7.1.18和swoole4作為此次開發(fā)的環(huán)境基于swoole現(xiàn)實(shí)連接池的方法

首先,此次利用swoole實(shí)現(xiàn)連接池,運(yùn)用到swoole以下技術(shù)或者概念

1、連接變量池,這里可以看做一個(gè)數(shù)組或者隊(duì)列,利用swoole全局變量的常駐內(nèi)存特性,只要變量沒主動(dòng)unset掉,數(shù)組或隊(duì)列中的連接對(duì)象可以一直保持,不釋放。

2、協(xié)程。協(xié)程是純用戶狀態(tài)的線程,通過協(xié)作的方式而不是搶占的方式來切換。首先此次的連接池兩處用到協(xié)程:一個(gè)是mysql的協(xié)程客戶端,為什么要用協(xié)程客戶端,因?yàn)槿绻怯猛娇蛻舳薖DO,在一個(gè)進(jìn)程處理內(nèi),就算有幾百個(gè)連接池,swoole worker進(jìn)程中用普通的PDO方式,隨便并發(fā)多少個(gè)請(qǐng)求,每一個(gè)請(qǐng)求都只能等上一個(gè)請(qǐng)求執(zhí)行完畢,woker才處理下一個(gè)請(qǐng)求,這里就算阻塞了。為了讓一個(gè)worker支持阻塞切換出cpu去處理其他請(qǐng)求,所以要用到協(xié)程的協(xié)助切換,或者異步客戶端也可以,但是異步客戶端使用起來嵌套太多,很不方便。swoole協(xié)程可以無感知的用同步的代碼編寫方式達(dá)到異步IO的效果和性能。

第二個(gè)是底層實(shí)現(xiàn)了協(xié)程切換和調(diào)度的channel,以下詳述什么是channel

3、Coroutine/channel通道,類似于go語言的chan,支持多生產(chǎn)者協(xié)程和多消費(fèi)者協(xié)程。底層自動(dòng)實(shí)現(xiàn)了協(xié)程的切換和調(diào)度。高并發(fā)時(shí),容易出連接池為空時(shí),如果用一般的array或者splqueue()作為介質(zhì)存儲(chǔ)連接對(duì)象變量,不能產(chǎn)生阻塞等待其他請(qǐng)求釋放的效果,也就是說只能直接返回null.。所以這里用了一個(gè)swoole4協(xié)程中很牛逼的channel通過管道作為存儲(chǔ)介質(zhì),它的出隊(duì)方法pop($timeout)可以指定阻塞等待指定時(shí)間后返回。注意,是swoole2是沒有超時(shí)timeout的參數(shù),不適用此場景。在go語言中,如果chan等待或者push了沒有消費(fèi)或者生產(chǎn)一對(duì)一的情況,是會(huì)發(fā)生死鎖。所以swoole4的timeout應(yīng)該是為了避免無限等待為空channel情況而產(chǎn)生。

channel切換的例子:

$chan = new Channel();

go(function()use($chan)

{ echo"我是第一個(gè)協(xié)程,等待3秒內(nèi)有push就執(zhí)行返回" . PHP_EOL;

$p = $chan->pop(2);#1

echo"pop返回結(jié)果" . PHP_EOL;

var_dump($p);

});

go(function()use($chan){

co::sleep(1);#2

$chan->push(1);

});

echo"main" . PHP_EOL;

#1處代碼會(huì)首先執(zhí)行,然后遇到pop(),因?yàn)閏hannel還是空,會(huì)等待2s。此時(shí)協(xié)程會(huì)讓出cpu,跳到第二個(gè)協(xié)程執(zhí)行,然后#2出睡眠1秒,push變量1進(jìn)去channel后返回#1處繼續(xù)執(zhí)行,成功取車通過中剛push的值1.運(yùn)行結(jié)果為:

如果把#2處的睡眠時(shí)間換成大于pop()的等待時(shí)間,結(jié)果是:

根據(jù)這些特性最終實(shí)現(xiàn)連接池的抽象封裝類為:

/**

* 連接池封裝.

* User: user

* Date: 2018/9/1

* Time: 13:36

*/

use Swoole\Coroutine\Channel;

abstract class AbstractPool

{

private $min;//最少連接數(shù)

private $max;//最大連接數(shù)

private $count;//當(dāng)前連接數(shù)

private $connections;//連接池組

protected $spareTime;//用于空閑連接回收判斷

//數(shù)據(jù)庫配置

protected $dbConfig = array(

'host' => '10.0.2.2',

'port' => 3306,

'user' => 'root',

'password' => 'root',

'database' => 'test',

'charset' => 'utf8',

'timeout' => 2,

);

private $inited = false;

protected abstract function createDb();

public function __construct()

{

$this->min = 10;

$this->max = 100;

$this->spareTime = 10 * 3600;

$this->connections = new Channel($this->max + 1);

}

protected function createObject()

{

$obj = null;

$db = $this->createDb();

if ($db) {

$obj = [

'last_used_time' => time(),

'db' => $db,

];

}

return $obj;

}

/**

* 初始換最小數(shù)量連接池

* @return $this|null

*/

public function init()

{

if ($this->inited) {

return null;

}

for ($i = 0; $i < $this->min; $i++) {

$obj = $this->createObject();

$this->count++;

$this->connections->push($obj);

}

return $this;

}

public function getConnection($timeOut = 3)

{

$obj = null;

if ($this->connections->isEmpty()) {

if ($this->count < $this->max) {//連接數(shù)沒達(dá)到最大,新建連接入池

$this->count++;

$obj = $this->createObject();

} else {

$obj = $this->connections->pop($timeOut);//timeout為出隊(duì)的最大的等待時(shí)間

}

} else {

$obj = $this->connections->pop($timeOut);

}

return $obj;

}

public function free($obj)

{

if ($obj) {

$this->connections->push($obj);

}

}

/**

* 處理空閑連接

*/

public function gcSpareObject()

{

//大約2分鐘檢測一次連接

swoole_timer_tick(120000, function () {

$list = [];

/*echo "開始檢測回收空閑鏈接" . $this->connections->length() . PHP_EOL;*/

if ($this->connections->length() < intval($this->max * 0.5)) {

echo "請(qǐng)求連接數(shù)還比較多,暫不回收空閑連接\n";

}#1

while (true) {

if (!$this->connections->isEmpty()) {

$obj = $this->connections->pop(0.001);

$last_used_time = $obj['last_used_time'];

if ($this->count > $this->min && (time() - $last_used_time > $this->spareTime)) {//回收

$this->count--;

} else {

array_push($list, $obj);

}

} else {

break;

}

}

foreach ($list as $item) {

$this->connections->push($item);

}

unset($list);

});

}

}同步PDO客戶端下實(shí)現(xiàn)

<<?php

/**

* 數(shù)據(jù)庫連接池PDO方式

* User: user

* Date: 2018/9/8

* Time: 11:30

*/

require "AbstractPool.php";

class MysqlPoolPdo extends AbstractPool

{

protected $dbConfig = array(

'host' => 'mysql:host=10.0.2.2:3306;dbname=test',

'port' => 3306,

'user' => 'root',

'password' => 'root',

'database' => 'test',

'charset' => 'utf8',

'timeout' => 2,

);

public static $instance;

public static function getInstance()

{

if (is_null(self::$instance)) {

self::$instance = new MysqlPoolPdo();

}

return self::$instance;

}

protected function createDb()

{

return new PDO($this->dbConfig['host'], $this->dbConfig['user'], $this->dbConfig['password']);

}

}

$httpServer = new swoole_http_server('0.0.0.0', 9501);

$httpServer->set(

['worker_num' => 1]

);

$httpServer->on("WorkerStart", function () {

MysqlPoolPdo::getInstance()->init();

});

$httpServer->on("request", function ($request, $response) {

$db = null;

$obj = MysqlPoolPdo::getInstance()->getConnection();

if (!empty($obj)) {

$db = $obj ? $obj['db'] : null;

}

if ($db) {

$db->query("select sleep(2)");

$ret = $db->query("select * from guestbook limit 1");

MysqlPoolPdo::getInstance()->free($obj);

$response->end(json_encode($ret));

}

});

$httpServer->start();

代碼調(diào)用過程詳解:

1、server啟動(dòng)時(shí),調(diào)用init()方法初始化最少數(shù)量(min指定)的連接對(duì)象,放進(jìn)類型為channelle的connections對(duì)象中。在init中循環(huán)調(diào)用中,依賴了createObject()返回連接對(duì)象,而createObject()

中是調(diào)用了本來實(shí)現(xiàn)的抽象方法,初始化返回一個(gè)PDO db連接。所以此時(shí),連接池connections中有min個(gè)對(duì)象。

2、server監(jiān)聽用戶請(qǐng)求,當(dāng)接收發(fā)請(qǐng)求時(shí),調(diào)用連接數(shù)的getConnection()方法從connections通道中pop()一個(gè)對(duì)象。此時(shí)如果并發(fā)了10個(gè)請(qǐng)求,server因?yàn)榕渲昧?個(gè)worker,所以再pop到一個(gè)對(duì)象返回時(shí),遇到sleep()的查詢,因?yàn)橛玫倪B接對(duì)象是pdo的查詢,此時(shí)的woker進(jìn)程只能等待,完成后才能進(jìn)入下一個(gè)請(qǐng)求。因此,池中的其余連接其實(shí)是多余的,同步客戶端的請(qǐng)求速度只能和woker的數(shù)量有關(guān)。

3、查詢結(jié)束后,調(diào)用free()方法把連接對(duì)象放回connections池中。

ab -c 10 -n 10運(yùn)行的結(jié)果,單個(gè)worker處理,select sleep(2) 查詢睡眠2s,同步客戶端方式總共運(yùn)行時(shí)間為20s以上,而且mysql的連接始終維持在一條。結(jié)果如下:

協(xié)程客戶端Coroutine\MySQL方式的調(diào)用

/**

* 數(shù)據(jù)庫連接池協(xié)程方式

* User: user

* Date: 2018/9/8

* Time: 11:30

*/

require "AbstractPool.php";

class MysqlPoolCoroutine extends AbstractPool

{

protected $dbConfig = array(

'host' => '10.0.2.2',

'port' => 3306,

'user' => 'root',

'password' => 'root',

'database' => 'test',

'charset' => 'utf8',

'timeout' => 10,

);

public static $instance;

public static function getInstance()

{

if (is_null(self::$instance)) {

self::$instance = new MysqlPoolCoroutine();

}

return self::$instance;

}

protected function createDb()

{

$db = new Swoole\Coroutine\Mysql();

$db->connect(

$this->dbConfig

);

return $db;

}

}

$httpServer = new swoole_http_server('0.0.0.0', 9501);

$httpServer->set(

['worker_num' => 1]

);

$httpServer->on("WorkerStart", function () {

//MysqlPoolCoroutine::getInstance()->init()->gcSpareObject();

MysqlPoolCoroutine::getInstance()->init();

});

$httpServer->on("request", function ($request, $response) {

$db = null;

$obj = MysqlPoolCoroutine::getInstance()->getConnection();

if (!empty($obj)) {

$db = $obj ? $obj['db'] : null;

}

if ($db) {

$db->query("select sleep(2)");

$ret = $db->query("select * from guestbook limit 1");

MysqlPoolCoroutine::getInstance()->free($obj);

$response->end(json_encode($ret));

}

});

$httpServer->start();

代碼調(diào)用過程詳解

1、同樣的,協(xié)程客戶端方式下的調(diào)用,也是實(shí)現(xiàn)了之前封裝好的連接池類AbstractPool.php。只是createDb()的抽象方法用了swoole內(nèi)置的協(xié)程客戶端去實(shí)現(xiàn)。

2、server啟動(dòng)后,初始化都和同步一樣。不一樣的在獲取連接對(duì)象的時(shí)候,此時(shí)如果并發(fā)了10個(gè)請(qǐng)求,同樣是配置了1個(gè)worker進(jìn)程在處理,但是在第一請(qǐng)求到達(dá),pop出池中的一個(gè)連接對(duì)象,執(zhí)行到query()方法,遇上sleep阻塞時(shí),此時(shí),woker進(jìn)程不是在等待select的完成,而是切換到另外的協(xié)程去處理下一個(gè)請(qǐng)求。完成后同樣釋放對(duì)象到池中。當(dāng)中有重點(diǎn)解釋的代碼段中g(shù)etConnection()中。

public function getConnection($timeOut = 3)

{

$obj = null;

if ($this->connections->isEmpty()) {

if ($this->count < $this->max) {//連接數(shù)沒達(dá)到最大,新建連接入池

$this->count++;

$obj = $this->createObject();#1

} else {

$obj = $this->connections->pop($timeOut);#2

}

} else {

$obj = $this->connections->pop($timeOut);#3

}

return $obj;

}

當(dāng)調(diào)用到getConnection()時(shí),如果此時(shí)由于大量并發(fā)請(qǐng)求過多,連接池connections為空,而沒達(dá)到最大連接max數(shù)量時(shí)時(shí),代碼運(yùn)行到#1處,調(diào)用了createObject(),新建連接返回;但如果連接池connections為空,而到達(dá)了最大連接數(shù)max時(shí),代碼運(yùn)行到了#2處,也就是$this->connections->pop($timeOut),此時(shí)會(huì)阻塞$timeOut的時(shí)間,如果期間有鏈接釋放了,會(huì)成功獲取到,然后協(xié)程返回。超時(shí)沒獲取到,則返回false。

3、最后說一下協(xié)程Mysql客戶端一項(xiàng)重要配置,那就是代碼里$dbConfig中timeout值的配置。這個(gè)配置是意思是最長的查詢等待時(shí)間。可以看一個(gè)例子說明下:

go(function () {

$start = microtime(true);

$db = new Swoole\Coroutine\MySQL();

$db->connect([

'host' => '10.0.2.2',

'port' => 3306,

'user' => 'root',

'password' => 'root',

'database' => 'test',

'timeout' => 4#1

]);

$db->query("select sleep(5)");

echo "我是第一個(gè)sleep五秒之后\n";

$ret = $db->query("select user from guestbook limit 1");#2

var_dump($ret);

$use = microtime(true) - $start;

echo "協(xié)程mysql輸出用時(shí):" . $use . PHP_EOL;

});

#1處代碼,如果timeout配了4s查詢超時(shí),而第一條查詢select sleep(5)阻塞后,協(xié)程切換到下一條sql的執(zhí)行,其實(shí)$db并不能執(zhí)行成功,因?yàn)橛靡粋€(gè)連接,同一個(gè)協(xié)程中,其實(shí)執(zhí)行是同步的,所以此時(shí)第二條查詢?cè)诘却?s超時(shí)后,沒獲取到db的連接執(zhí)行,就會(huì)執(zhí)行失敗。而如果第一條查詢執(zhí)行的時(shí)間少于這個(gè)timeout,那么會(huì)執(zhí)行查詢成功。猜猜上面執(zhí)行用時(shí)多少?結(jié)果如下:

如果把timeout換成6s呢,結(jié)果如下:

所以要注意的是,協(xié)程的客戶端內(nèi)執(zhí)行其實(shí)是同步的,不要理解為異步,它只是遇到IO阻塞時(shí)能讓出執(zhí)行權(quán),切換到其他協(xié)程而已,不能和異步混淆。

ab -c 10 -n 10運(yùn)行的結(jié)果,單個(gè)worker處理,select sleep(2) 查詢睡眠2s,協(xié)程客戶端方式總共運(yùn)行時(shí)間為2s多。結(jié)果如下:

數(shù)據(jù)庫此時(shí)的連接數(shù)為10條(show full PROCESSLIST):

再嘗試 ab -c 200 -n 1000 http://127.0.0.1:9501/,200多個(gè)并發(fā)的處理,時(shí)間是20多秒,mysql連接數(shù)達(dá)到指定的最大值100個(gè)。結(jié)果如下:

四、后言

現(xiàn)在連接池基本實(shí)現(xiàn)了高并發(fā)時(shí)的連接分配和控制,但是還有一些細(xì)節(jié)要處理,例如:并發(fā)時(shí),建立了max個(gè)池對(duì)象,不能一直在池中維護(hù)這么多,要在請(qǐng)求空閑時(shí),把連接池的數(shù)量維持在一個(gè)空閑值內(nèi)。這里是簡單做了gcSpareObject()的方法實(shí)現(xiàn)空閑處理。直接在初始化woker的時(shí)候調(diào)用:MysqlPoolCoroutine::getInstance()->init()->gcSpareObject();就會(huì)定時(shí)檢測回收。問題是如何判斷程序比較空閑,值得再去優(yōu)化。

定時(shí)檢測連接時(shí)候是活的,剔除死鏈

假如程序忘記調(diào)用free()釋放對(duì)象到池,是否有更好方法避免這種情況?

本書做了一個(gè)很好的歸納。同時(shí)也歡迎點(diǎn)擊:正在跳轉(zhuǎn)加入,任何問題可以一起討論,也歡迎大家一起踴躍發(fā)言!

總結(jié)

以上是生活随笔為你收集整理的swoole mysql 并发_Swoole4 如何打造高并发的PHP7协程Mysql连接池?的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。