日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 运维知识 > 数据库 >内容正文

数据库

php协程实现mysql异步_swoole与php协程实现异步非阻塞IO开发

發布時間:2024/10/8 数据库 38 豆豆
生活随笔 收集整理的這篇文章主要介紹了 php协程实现mysql异步_swoole与php协程实现异步非阻塞IO开发 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

“協程可以在遇到阻塞的時候中斷主動讓渡資源,調度程序選擇其他的協程運行。從而實現非阻塞IO”

然而php是不支持原生協程的,遇到阻塞時如不交由異步進程來執行是沒有任何意義的,代碼還是同步執行的,如下所示:

function foo()

{

$db=new Db();

$result=(yield $db->query());

yield $result;

}

上面的數據庫查詢操作是阻塞的,當調度器調度該協程到這一步時發現執行了阻塞操作,此時調度器該怎么辦?選擇其余協程執行?那該協程的阻塞操作又該何時執行,交由誰執行呢?所以說在php協程中拋開異步調用談非阻塞IO屬于耍流氓。

而swoole的異步task提供了一個實現異步的解決方案,關于swoole_task可以參考官方文檔

核心功能實現

將一次請求形成一個協程

首先創建一個swoole_server并設置回調

class HttpServer implements Server

{

private $swooleHttpServer;

public function __construct(\swoole_http_server $swooleHttpServer)

{

$this->swooleHttpServer = $swooleHttpServer;

}

public function start()

{

$this->swooleHttpServer->on('start', [$this, 'onStart']);

$this->swooleHttpServer->on('shutdown', [$this, 'onShutdown']);

$this->swooleHttpServer->on('workerStart', [$this, 'onWorkerStart']);

$this->swooleHttpServer->on('workerStop', [$this, 'onWorkerStop']);

$this->swooleHttpServer->on('workerError', [$this, 'onWorkerError']);

$this->swooleHttpServer->on('task', [$this, 'onTask']);

$this->swooleHttpServer->on('finish', [$this, 'onFinish']);

$this->swooleHttpServer->on('request', [$this, 'onRequest']);

$this->swooleHttpServer->start();

}

onRequest方法:

public function onRequest(\swoole_http_request $request, \swoole_http_response $response)

{

$requestHandler = new RequestHandler($request, $response);

$requestHandler->handle();

}

在ReqeustHandler中執行handle方法,來解析請求的路由,并創建控制器,調用相應的方法,相

public function handle()

{

$this->context = new Context($this->request, $this->response, $this->getFd());

$this->router = new Router($this->request);

try {

if (false === $this->router->parse()) {

$this->response->output('');

return;

}

$coroutine = $this->doRun();

$task = new Task($coroutine, $this->context);

$task->run();

} catch (\Exception $e) {

PcsExceptionHandler::handle($e, $this->response);

}

}

private function doRun()

{

$ret = (yield $this->dispatch());

yield $this->response->send($ret);

}

上面代碼中的ret是action()的調用結果,yield $this->response->send($ret);是向對客戶端請求的應答。

$coroutine是這一次請求形成的一個協程(Genetator對象),包含了整個請求的流程,接下來就要對這個協程進行調度來獲取真正的執行結果。

協程調度

namespace Pcs\Coroutine;

use Pcs\Network\Context\Context;

class Task

{

private $coroutine;

private $context;

private $status;

private $scheduler;

private $sendValue;

public function __construct(\Generator $coroutine, Context $context)

{

$this->coroutine = $coroutine;

$this->context = $context;

$this->scheduler = new Scheduler($this);

}

public function run()

{

while (true) {

try {

$this->status = $this->scheduler->schedule();

switch ($this->status) {

case TaskStatus::TASK_WAIT:

echo "task status: TASK_WAIT\n";

return null;

case TaskStatus::TASK_DONE:

echo "task status: TASK_DONE\n";

return null;

case TaskStatus::TASK_CONTINUE;

echo "task status: TASK_CONTINUE\n";

break;

}

} catch (\Exception $e) {

$this->scheduler->throwException($e);

}

}

}

public function setCoroutine($coroutine)

{

$this->coroutine = $coroutine;

}

public function getCoroutine()

{

return $this->coroutine;

}

public function valid()

{

if ($this->coroutine->valid()) {

return true;

} else {

return false;

}

}

public function send($value)

{

$this->sendValue = $value;

$ret = $this->coroutine->send($value);

return $ret;

}

public function getSendVal()

{

return $this->sendValue;

}

}

Task依賴于Generator對象$coroutine,在Task類中定義了一些get/set方法,以及一些Generator的方法,Task::run()方法用來執行對協程的調度,調度行為由Schedule來執行,每次調度都會返回當前這次調度的狀態。多個協程共用一個調度器,而這里run方法會為每個協程創建一個調度器,原因是每個協程都是一個客戶端的請求,使用一個單獨的調度器能減少相互間的影響,而且多個協程之間的調度順序是swoole來處理的,這里的調度器不用關心。下面給出調度的代碼:

namespace Pcs\Coroutine;

class Scheduler

{

private $task;

private $stack;

const SCHEDULE_CONTINUE = 10;

public function __construct(Task $task)

{

$this->task = $task;

$this->stack = new \SplStack();

}

public function schedule()

{

$coroutine = $this->task->getCoroutine();

$value = $coroutine->current();

$status = $this->handleSystemCall($value);

if ($status !== self::SCHEDULE_CONTINUE) return $status;

$status = $this->handleStackPush($value);

if ($status !== self::SCHEDULE_CONTINUE) return $status;

$status = $this->handleAsyncJob($value);

if ($status !== self::SCHEDULE_CONTINUE) return $status;

$status = $this->handelYieldValue($value);

if ($status !== self::SCHEDULE_CONTINUE) return $status;

$status = $this->handelStackPop();

if ($status !== self::SCHEDULE_CONTINUE) return $status;

return TaskStatus::TASK_DONE;

}

public function isStackEmpty()

{

return $this->stack->isEmpty();

}

private function handleSystemCall($value)

{

if (!$value instanceof SystemCall) {

return self::SCHEDULE_CONTINUE;

}

}

private function handleStackPush($value)

{

if (!$value instanceof \Generator) {

return self::SCHEDULE_CONTINUE;

}

$coroutine = $this->task->getCoroutine();

$this->stack->push($coroutine);

$this->task->setCoroutine($value);

return TaskStatus::TASK_CONTINUE;

}

private function handleAsyncJob($value)

{

if (!is_subclass_of($value, Async::class)) {

return self::SCHEDULE_CONTINUE;

}

$value->execute([$this, 'asyncCallback']);

return TaskStatus::TASK_WAIT;

}

public function asyncCallback($response, $exception = null)

{

if ($exception !== null

&& $exception instanceof \Exception

) {

$this->throwException($exception, true);

} else {

$this->task->send($response);

$this->task->run();

}

}

private function handelYieldValue($value)

{

if (!$this->task->valid()) {

return self::SCHEDULE_CONTINUE;

}

$ret = $this->task->send($value);

return TaskStatus::TASK_CONTINUE;

}

private function handelStackPop()

{

if ($this->isStackEmpty()) {

return self::SCHEDULE_CONTINUE;

}

$coroutine = $this->stack->pop();

$this->task->setCoroutine($coroutine);

$value = $this->task->getSendVal();

$this->task->send($value);

return TaskStatus::TASK_CONTINUE;

}

public function throwException($e, $isFirstCall = false)

{

if ($this->isStackEmpty()) {

$this->task->getCoroutine()->throw($e);

return;

}

try {

if ($isFirstCall) {

$coroutine = $this->task->getCoroutine();

} else {

$coroutine = $this->stack->pop();

}

$this->task->setCoroutine($coroutine);

$coroutine->throw($e);

$this->task->run();

} catch (\Exception $e) {

$this->throwException($e);

}

}

}

Scheduler中的schedule方法會獲取當前Task的協程,并通過current()方法獲取當前中斷點的返回值,接著依次調用5個方法來對返回值進行處理。

1:handleSystemCall

如果返回的值是SystemCall類型的對象,則執行系統調用,如killTask之類的操作,systemCall是第一優先級。

2:handleStackPush

在A函數中調用B函數,則B函數稱為A函數的子例程(子函數),然而在協程中卻不能像普通函數那樣調用。

function funcA()

{

return funcB();

}

function genA()

{

yield genB();

}

在funcA中funcB();會返回funcB的執行結果,但是在genA中,yield genB();會返回一個Generator對象,而不是genB的最終執行結果。想得到genB的執行結果需要對genB進行調度,而genB中又可能有genC()genD()的協程嵌套,所以為了讓協程像函數一眼正常調用,這里使用協程棧來實現。

如上圖,當調度器獲取到GenA(父協程)的返回值is instance of Generator時,調度器會把父協程push到stack中,然后把子協程分配給Task,繼續調度子協程。如此反復直到最后一個子協程返回,然后開始pop,將stack中的協程依次取出

3:handleAsyncJob

handleAsyncJob是整個協程調度的核心

private function handleAsyncJob($value)

{

if (!is_subclass_of($value, Async::class)) {

return self::SCHEDULE_CONTINUE;

}

$value->execute([$this, 'asyncCallback']);

return TaskStatus::TASK_WAIT;

}

public function asyncCallback($response, $exception = null)

{

if ($exception !== null

&& $exception instanceof \Exception

) {

$this->throwException($exception, true);

} else {

$this->task->send($response);

$this->task->run();

}

}

當協程調度的返回值是繼承了Async的子類或者是實現了Asycn接口的實例的時候,會執行Async的execute方法。這里用mysqli數據庫查詢類舉例。

public function execute(callable $callback)

{

$this->callback = $callback;

$serv = ServerHolder::getServer();

$serv->task($this->sql, -1, [$this, 'queryReady']);

}

public function queryReady(\swoole_http_server $serv, $task_id, $data)

{

$queryResult = unserialize($data);

$exception = null;

if ($queryResult->errno != 0) {

$exception = new \Exception($queryResult->error);

}

call_user_func_array($this->callback, [$queryResult, $exception]);

}

execute方法接收一個函數作為該異步操作完成之后的回調函數,在Mysqli類中的execute方法中,啟動了一個異步swoole_task,將sql操作交給swoole_task異步執行,在執行結束后會執行queryReady方法,該方法在解析異步返回數據之后執行$this->callback()也就是之前在調度器中傳入的 asyncCallback方法,該方法在檢測異常之后會執行send()方法將異步執行的結果發送到中斷處,繼續執行。

handleAsyncJob不會等待異步操作的返回結果,而是直接返回TASK_WAIT信號,回到上面的Task->run()方法可以看到TASK_WAIT信號會導致run()方法返回null,釋放當前worker,調度流程圖如下圖所示,

4:handleYieldValue

private function handelYieldValue($value)

{

if (!$this->task->valid()) {

return self::SCHEDULE_CONTINUE;

}

$ret = $this->task->send($value);

return TaskStatus::TASK_CONTINUE;

}

如果某次yield的返回值既不是異步調用也不是Generator,那么判斷當前的generator是否是valid(是否執行完)如果執行完畢,繼續調度,執行下面的handleStackPush方法,否則的話返回Task_Continue繼續調度,也就是說在一個generator中多次yield,最后只會取最后一次yield的返回值。

5:handleStackPush

當上一步中判斷!$this->task->valid()也就是當前生成器執行完畢的時候,會執行本方法來控制之前的協程stack進行pop操作,首先檢查Stac是否是非空,非空的話pop出一個父協程,并將當前協程的返回值send()到父協程中斷出繼續執行。

協程優勢在哪里

當一次請求遇到IO的時候,同步操作會導致當前請求阻塞在IO處等待IO返回,體現在swoole上就是一個請求一直占用一個worker。

但是當使用了協程調度之后,用戶可以在阻塞的地方通過yield手動中斷,交由swoole_task去異步操作,同時釋放worker占用來處理其他請求。

當異步處理執行結束后再繼續調度。

注意 php的協程只負責中斷,異步操作是Swoole_task做的

總結

以上是生活随笔為你收集整理的php协程实现mysql异步_swoole与php协程实现异步非阻塞IO开发的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。