gateway-workman
最外層start.php,設置全局啟動模式,加載Application里的個子服務目錄下應用的啟動文件(start開頭,這些文件都是workmanwork類的子類,在載入文件的同時,這些子服務會生成對象,work類的構(gòu)造方法會把生成的work對象都存入work類的靜態(tài)變量$_workers,方便主文件后續(xù)設置以及啟動子服務,全局啟動模式子服務是不會啟動的),設置時區(qū),注冊自動加載,調(diào)用workmanWorker:runall啟動所有服務。
主文件:
// 標記是全局啟動
define('GLOBAL_START', 1);
require_once __DIR__ . '/Workerman/Autoloader.php';
// 加載所有Applications/*/start.php,以便啟動所有服務
foreach(glob(__DIR__.'/Applications/*/start*.php') as $start_file)
{
require_once $start_file;
}
// 運行所有服務
Worker::runAll();
子服務文件start_gateway,提供接入請求的服務(class Gateway extends Worker):
// gateway 進程,這里使用Text協(xié)議,可以用telnet測試
$gateway = new Gateway("Text://0.0.0.0:8282");
// gateway名稱,status方便查看
$gateway->name = 'YourAppGateway';
// gateway進程數(shù)
$gateway->count = 4;
// 本機ip,分布式部署時使用內(nèi)網(wǎng)ip
$gateway->lanIp = '127.0.0.1';
// 內(nèi)部通訊起始端口,假如$gateway->count=4,起始端口為4000
// 則一般會使用4001 4002 4003 4004 4個端口作為內(nèi)部通訊端口
$gateway->startPort = 2300;
// 心跳間隔
//$gateway->pingInterval = 10;
// 心跳數(shù)據(jù)
//$gateway->pingData = '{"type":"ping"}';
/*
// 當客戶端連接上來時,設置連接的onWebSocketConnect,即在websocket握手時的回調(diào)
$gateway->onConnect = function($connection)
{
$connection->onWebSocketConnect = function($connection , $http_header)
{
// 可以在這里判斷連接來源是否合法,不合法就關(guān)掉連接
// $_SERVER['HTTP_ORIGIN']標識來自哪個站點的頁面發(fā)起的websocket鏈接
if($_SERVER['HTTP_ORIGIN'] != 'http://kedou.workerman.net')
{
$connection->close();
}
// onWebSocketConnect 里面$_GET $_SERVER是可用的
// var_dump($_GET, $_SERVER);
};
};
*/
// 如果不是在根目錄啟動,則運行runAll方法
if(!defined('GLOBAL_START'))
{
Worker::runAll();
}
接入服務除了注冊到全局靜態(tài)$_workers變量,還設置了路由:
public function __construct($socket_name, $context_option = array())
{
parent::__construct($socket_name, $context_option);
//隨機返回一個bussness的連接
$this->router = array("\GatewayWorker\Gateway", 'routerRand');
$backrace = debug_backtrace();
$this->_appInitPath = dirname($backrace[0]['file']);
}
子服務文件start_bussinessworker,提供實際的業(yè)務處理,和gateway服務內(nèi)部通訊類似nginx與php(class BusinessWorker extends Worker):
// bussinessWorker 進程
$worker = new BusinessWorker();
// worker名稱
$worker->name = 'YourAppBusinessWorker';
// bussinessWorker進程數(shù)量
$worker->count = 4;
// 如果不是在根目錄啟動,則運行runAll方法
if(!defined('GLOBAL_START'))
{
Worker::runAll();
}
附帶基類work的構(gòu)造函數(shù):
/**
* worker構(gòu)造函數(shù)
*
* @param string $socket_name
* @param array $context_option
*/
public function __construct($socket_name = '', $context_option = array())
{
// 保存worker實例
$this->workerId = spl_object_hash($this);
self::$_workers[$this->workerId] = $this;
self::$_pidMap[$this->workerId] = array();
// 獲得實例化文件路徑,用于自動加載設置根目錄
$backrace = debug_backtrace();
$this->_appInitPath = dirname($backrace[0]['file']);
// 設置socket上下文
if($socket_name)
{
$this->_socketName = $socket_name;
if(!isset($context_option['socket']['backlog']))
{
$context_option['socket']['backlog'] = self::DEFAUL_BACKLOG;
}
$this->_context = stream_context_create($context_option);
}
}
這里可以看到self::$_workers[$this->workerId] = $this;記錄全局worker實例,
self::$_pidMap這個用來記錄各個子服務開始fork后的所有子進程id
$context_option['socket']['backlog'] = self::DEFAUL_BACKLOG;設置嵌套字上下文里的未accept隊列長度
在后面運行實例的listen方法監(jiān)聽的時候會傳遞到stream_socket_server方法里。
runAll的流程如下:
/**
* 運行所有worker實例
* @return void
*/
public static function runAll()
{
// 初始化環(huán)境變量(pid文件,log文件,status文件,定時器信號回調(diào))
self::init();
// 解析命令(運行,重啟,停止,重載,狀態(tài),從命令判斷主進程是否以守護進程啟動)
// 啟動之后通過php start.php XXX命令會到這里!因為第一步設置了這個文件的pid(這里可以看到pid對應到文件位置的重要性),所以后面的命令會對應為發(fā)送信號
self::parseCommand();
// 嘗試以守護進程模式運行(fork兩次進程,重置進程sid)
self::daemonize();
// 初始化所有worker實例,主要是監(jiān)聽端口(記錄所有子服務worker實例的最長名稱name長度,最長嵌套字名socket_name長度,最長運行用戶名user長度;
// 所有定義了協(xié)議的子服務(Gate)開始監(jiān)聽也就是啟動服務,子服務實例監(jiān)聽嵌套字對象mainSoctke,還沒有注冊accept回調(diào))
// 到這里所有Gate子服務啟動監(jiān)聽,但是沒有accept
self::initWorkers();
// 初始化所有信號處理函數(shù)(為主進程注冊stop,stats,reload的信號回調(diào)signalHandler)
self::installSignal();
// 保存主進程pid(將獲取daemonize方法后的新的主進程sid,存入init方法后的pid文件)
self::saveMasterPid();
// 創(chuàng)建子進程(worker進程)并運行(主進程通過self::$_pidMap用來記錄子服務進程創(chuàng)建的各自進程號,方便后面發(fā)送信號,
// 生成的子進程置空主進程的全局變量,self::$_pidMap,self::$_workers,如果在子服務文件里定義了self::$stdoutFile文件地址,
// 會重定向子服務子進程的stdout和stderr,直接運行work實例的run方法)
// 到這里子服務已經(jīng)在子進程中運行,后面的代碼就只有主服務執(zhí)行
// 子進程的run方法會通過libevent綁定子服務mainSocket的accept回調(diào),在accept回調(diào)方法里才有定義后面怎么處理請求socket
// 子進程的run方法會通過libevent綁定重新綁定信號量,以及用libevent來注入定時器
// 子進程的run方法會回調(diào)用戶在子服務文件里的onWorkStar方法
// 子進程進入事件監(jiān)聽輪詢
// 上面是基類中的run方法,基于gateway的子服務,會實現(xiàn)自己的onworkStar方法,然后在調(diào)用基類的run,這樣可以在onWorkStar里實現(xiàn)gate與worker的連接
// 這里不知道怎么處理子進程對accpet時候的驚群
self::forkWorkers();
// 展示啟動界面(打印所有啟動的子服務的信息,由于initWorkers獲取了各個子服務實例的名稱等信息長度可以很好的格式化展示)
self::displayUI();
// 嘗試重定向標準輸入輸出(重定向主服務進程)
self::resetStd();
// 監(jiān)控所有子進程(worker進程)(處理主進程的信號量;通過pcntl_wait循環(huán)監(jiān)聽子進程狀態(tài),保持子進程的運行)
/*
什么是平滑重啟?
平滑重啟不同于普通的重啟,平滑重啟可以做到在不影響用戶的情況下重啟服務,以便重新載入PHP程序,完成業(yè)務代碼更新。
平滑重啟一般應用于業(yè)務更新或者版本發(fā)布過程中,能夠避免因為代碼發(fā)布重啟服務導致的暫時性服務不可用的影響。
注意:只有在on{...}回調(diào)中載入的文件平滑重啟后才會自動更新,啟動腳本中直接載入的文件或者寫死的代碼運行reload不會自動更新。
平滑重啟原理
WorkerMan分為主進程和子進程,主進程負責監(jiān)控子進程,子進程負責接收客戶端的連接和連接上發(fā)來的請求數(shù)據(jù),
做相應的處理并返回數(shù)據(jù)給客戶端。當業(yè)務代碼更新時,其實我們只要更新子進程,便可以達到更新代碼的目的。
當WorkerMan主進程收到平滑重啟信號時,主進程會向其中一個子進程發(fā)送安全退出(讓對應進程處理完畢當前請求后才退出)信號,
當這個進程退出后,主進程會重新創(chuàng)建一個新的子進程(這個子進程載入了新的PHP代碼),然后主進程再次向另外一個舊的進程發(fā)送停止
命令,這樣一個進程一個進程的重啟,直到所有舊的進程全部被置換為止。
我們看到平滑重啟實際上是讓舊的業(yè)務進程逐個退出然后并逐個創(chuàng)建新的進程做到的。為了在平滑重啟時不影響客用戶,這就要求進程中不
要保存用戶相關(guān)的狀態(tài)信息,即業(yè)務進程最好是無狀態(tài)的,避免由于進程退出導致信息丟失。
*/
//上面是官網(wǎng)對平滑啟動的說明,設計的代碼就是reload方法的 $one_worker_pid = current(self::$_pidsToRestart );這里是處理主進程的平滑啟動信號的
// 在主進程里獲取所有設置為可以平滑啟動的子進程的pid,然后取一個發(fā)送平滑啟動信號信號,這個信號到子進程,其實子進程會通過stopAll方法停止運行
// exit(0);
// 主進程監(jiān)聽到子進程退出,然后重新生成一個新的子進程,然后把這個子進程的id從self::$_pidsToRestart里刪除,然后再次調(diào)用reload方法去殺掉下一個子進程
//
self::monitorWorkers();
}
主要的過程已經(jīng)描述清楚了,主服務在主進程里,子服務開啟監(jiān)聽之后,主服務開始fork,然后記錄子服務進程的對應的pid,然后通過信號量來處理用戶命令以及管理子服務進程。子服務在子進程里實現(xiàn)accpet監(jiān)聽回調(diào)。
work基類的主要代碼片段:
子服務listen:
// 獲得應用層通訊協(xié)議以及監(jiān)聽的地址,udp會轉(zhuǎn)換為傳輸協(xié)議
list($scheme, $address) = explode(':', $this->_socketName, 2);
// 如果有指定應用層協(xié)議,則檢查對應的協(xié)議類是否存在
if($scheme != 'tcp' && $scheme != 'udp')
{
$scheme = ucfirst($scheme);
$this->_protocol = '\Protocols\'.$scheme;
if(!class_exists($this->_protocol))
{
$this->_protocol = "\Workerman\Protocols\$scheme";
if(!class_exists($this->_protocol))
{
throw new Exception("class \Protocols\$scheme not exist");
}
}
}
elseif($scheme === 'udp')
{
$this->transport = 'udp';
}
// flag
$flags = $this->transport === 'udp' ? STREAM_SERVER_BIND : STREAM_SERVER_BIND | STREAM_SERVER_LISTEN;
$errno = 0;
$errmsg = '';
$this->_mainSocket = stream_socket_server($this->transport.":".$address, $errno, $errmsg, $flags, $this->_context);
if(!$this->_mainSocket)
{
throw new Exception($errmsg);
}
創(chuàng)建子進程:
// 創(chuàng)建子進程
while(count(self::$_pidMap[$worker->workerId]) < $worker->count)
{
static::forkOneWorker($worker);
}
}
}
/**
* 創(chuàng)建一個子進程
* @param Worker $worker
* @throws Exception
*/
protected static function forkOneWorker($worker)
{
$pid = pcntl_fork();
// 主進程記錄子進程pid
if($pid > 0)
{
self::$_pidMap[$worker->workerId][$pid] = $pid;
}
// 子進程運行
elseif(0 === $pid)
{
// 啟動過程中嘗試重定向標準輸出
if(self::$_status === self::STATUS_STARTING)
{
self::resetStd();
}
self::$_pidMap = array();
self::$_workers = array($worker->workerId => $worker);
Timer::delAll();
self::setProcessTitle('WorkerMan: worker process ' . $worker->name . ' ' . $worker->getSocketName());
self::setProcessUser($worker->user);
$worker->run();
exit(250);
}
else
{
throw new Exception("forkOneWorker fail");
}
}
子進程執(zhí)行的基類run方法:
/**
* 運行worker實例
*/
public function run()
{
// 注冊進程退出回調(diào),用來檢查是否有錯誤
register_shutdown_function(array("\Workerman\Worker", 'checkErrors'));
// 設置自動加載根目錄
Autoloader::setRootPath($this->_appInitPath);
// 如果沒有全局事件輪詢,則創(chuàng)建一個
if(!self::$globalEvent)
{
if(extension_loaded('libevent'))
{
self::$globalEvent = new Libevent();
}
else
{
self::$globalEvent = new Select();
}
// 監(jiān)聽_mainSocket上的可讀事件(客戶端連接事件)也只有Gate才有這個事件
if($this->_socketName)
{
if($this->transport !== 'udp')
{
self::$globalEvent->add($this->_mainSocket, EventInterface::EV_READ, array($this, 'acceptConnection'));
}
else
{
self::$globalEvent->add($this->_mainSocket, EventInterface::EV_READ, array($this, 'acceptUdpConnection'));
}
}
}
// 重新安裝事件處理函數(shù),使用全局事件輪詢監(jiān)聽信號事件
self::reinstallSignal();
// 用全局事件輪詢初始化定時器
Timer::init(self::$globalEvent);
// 如果有設置進程啟動回調(diào),則執(zhí)行
if($this->onWorkerStart)
{
call_user_func($this->onWorkerStart, $this);
}
// 子進程主循環(huán)
self::$globalEvent->loop();
}
主服務監(jiān)聽子服務進程:
pcntl_signal_dispatch();
// 掛起進程,直到有子進程退出或者被信號打斷
$status = 0;
$pid = pcntl_wait($status, WUNTRACED);
// 如果有信號到來,嘗試觸發(fā)信號處理函數(shù)
pcntl_signal_dispatch();
// 有子進程退出
if($pid > 0)
{
// 查找是哪個進程組的,然后再啟動新的進程補上
foreach(self::$_pidMap as $worker_id => $worker_pid_array)
{
if(isset($worker_pid_array[$pid]))
{
$worker = self::$_workers[$worker_id];
// 檢查退出狀態(tài)
if($status !== 0)
{
self::log("worker[".$worker->name.":$pid] exit with status $status");
}
// 統(tǒng)計,運行status命令時使用
if(!isset(self::$_globalStatistics['worker_exit_info'][$worker_id][$status]))
{
self::$_globalStatistics['worker_exit_info'][$worker_id][$status] = 0;
}
self::$_globalStatistics['worker_exit_info'][$worker_id][$status]++;
// 清除子進程信息
unset(self::$_pidMap[$worker_id][$pid]);
break;
}
}
// 如果不是關(guān)閉狀態(tài),則補充新的進程
if(self::$_status !== self::STATUS_SHUTDOWN)
{
self::forkWorkers();
// 如果該進程是因為運行reload命令退出,則繼續(xù)執(zhí)行reload流程
if(isset(self::$_pidsToRestart[$pid]))
{
unset(self::$_pidsToRestart[$pid]);
self::reload();
}
}
平滑啟動過程:
/**
* 執(zhí)行平滑重啟流程
* @return void
*/
protected static function reload()
{
// 主進程部分
if(self::$_masterPid === posix_getpid())
{
// 設置為平滑重啟狀態(tài)
if(self::$_status !== self::STATUS_RELOADING && self::$_status !== self::STATUS_SHUTDOWN)
{
self::log("Workerman[".basename(self::$_startFile)."] reloading");
self::$_status = self::STATUS_RELOADING;
}
// 如果有worker設置了reloadable=false,則過濾掉
$reloadable_pid_array = array();
foreach(self::$_pidMap as $worker_id =>$worker_pid_array)
{
$worker = self::$_workers[$worker_id];
if($worker->reloadable)
{
foreach($worker_pid_array as $pid)
{
$reloadable_pid_array[$pid] = $pid;
}
}
}
// 得到所有可以重啟的進程
self::$_pidsToRestart = array_intersect(self::$_pidsToRestart , $reloadable_pid_array);
// 平滑重啟完畢
if(empty(self::$_pidsToRestart))
{
if(self::$_status !== self::STATUS_SHUTDOWN)
{
self::$_status = self::STATUS_RUNNING;
}
return;
}
// 繼續(xù)執(zhí)行平滑重啟流程
$one_worker_pid = current(self::$_pidsToRestart );
// 給子進程發(fā)送平滑重啟信號
posix_kill($one_worker_pid, SIGUSR1);
// 定時器,如果子進程在KILL_WORKER_TIMER_TIME秒后沒有退出,則強行殺死
Timer::add(self::KILL_WORKER_TIMER_TIME, 'posix_kill', array($one_worker_pid, SIGKILL), false);
}
// 子進程部分
else
{
// 如果當前worker的reloadable屬性為真,則執(zhí)行退出
$worker = current(self::$_workers);
if($worker->reloadable)
{
self::stopAll();
}
}
}
到這里主進程已經(jīng)準備好了,子進程(Gate)已經(jīng)開始監(jiān)聽了(還未講gate與worker的連接通信,以及gate怎么接受請求,然后worker怎么處理請求)。上面說了gate與worker的連接是在OnWorkStart里實現(xiàn)的。后面就來看看
gate的run方法里保存了用戶自定義的方法,然后自己的onWorkStart,onConnect,onMessage,onClose,onWorkstop都已定義好
/**
* 運行
* @see Workerman.Worker::run()
*/
public function run()
{
// 保存用戶的回調(diào),當對應的事件發(fā)生時觸發(fā)
$this->_onWorkerStart = $this->onWorkerStart;
$this->onWorkerStart = array($this, 'onWorkerStart');
// 保存用戶的回調(diào),當對應的事件發(fā)生時觸發(fā)
$this->_onConnect = $this->onConnect;
$this->onConnect = array($this, 'onClientConnect');
// onMessage禁止用戶設置回調(diào)
$this->onMessage = array($this, 'onClientMessage');
// 保存用戶的回調(diào),當對應的事件發(fā)生時觸發(fā)
$this->_onClose = $this->onClose;
$this->onClose = array($this, 'onClientClose');
// 保存用戶的回調(diào),當對應的事件發(fā)生時觸發(fā)
$this->_onWorkerStop = $this->onWorkerStop;
$this->onWorkerStop = array($this, 'onWorkerStop');
// 記錄進程啟動的時間
$this->_startTime = time();
// 運行父方法
parent::run();
}
在看看他的OnworkStart方法,也就是子進程運行后執(zhí)行的方法
/**
* 當Gateway啟動的時候觸發(fā)的回調(diào)函數(shù)
* @return void
*/
public function onWorkerStart()
{
// 分配一個內(nèi)部通訊端口
// 主進程pid-子進程pid+startPort保證每個子進程的內(nèi)部端口不同
$this->lanPort = function_exists('posix_getppid') ? $this->startPort - posix_getppid() + posix_getpid() : $this->startPort;
if($this->lanPort<0 || $this->lanPort >=65535)
{
$this->lanPort = rand($this->startPort, 65535);
}
// 如果有設置心跳,則定時執(zhí)行
if($this->pingInterval > 0)
{
$timer_interval = $this->pingNotResponseLimit > 0 ? $this->pingInterval/2 : $this->pingInterval;
Timer::add($timer_interval, array($this, 'ping'));
}
//別名內(nèi)部通訊協(xié)議
if(!class_exists('ProtocolsGatewayProtocol'))
{
class_alias('GatewayWorkerProtocolsGatewayProtocol', 'ProtocolsGatewayProtocol');
}
// 初始化gateway內(nèi)部的監(jiān)聽,用于監(jiān)聽worker的連接已經(jīng)連接上發(fā)來的數(shù)據(jù)
// 這里內(nèi)部鏈接在同一個ip+端口的情況下有兩個服務
// 這個時候listen由于全局的事件self::$globalEvent在子進程的run方法里在回調(diào)OnWorkStart之前已經(jīng)定義,所以不像主進程一樣在listen的不監(jiān)聽accept事件
$this->_innerTcpWorker = new Worker("GatewayProtocol://{$this->lanIp}:{$this->lanPort}");
$this->_innerTcpWorker->listen();
$this->_innerUdpWorker = new Worker("GatewayProtocol://{$this->lanIp}:{$this->lanPort}");
$this->_innerUdpWorker->transport = 'udp';
$this->_innerUdpWorker->listen();
// 重新設置自動加載根目錄
Autoloader::setRootPath($this->_appInitPath);
// 設置內(nèi)部監(jiān)聽的相關(guān)回調(diào)
$this->_innerTcpWorker->onMessage = array($this, 'onWorkerMessage');
$this->_innerUdpWorker->onMessage = array($this, 'onWorkerMessage');
$this->_innerTcpWorker->onConnect = array($this, 'onWorkerConnect');
$this->_innerTcpWorker->onClose = array($this, 'onWorkerClose');
// 注冊gateway的內(nèi)部通訊地址,worker去連這個地址,以便gateway與worker之間建立起TCP長連接
if(!$this->registerAddress())
{
$this->log('registerAddress fail and exit');
Worker::stopAll();
}
if($this->_onWorkerStart)
{
call_user_func($this->_onWorkerStart, $this);
}
}
可以看到他有新建了兩個個work實例,innerTcpWorker,innerUdpWorker,并且在同一個地址{$this->lanIp}:{$this->lanPort}下開啟了兩個服務,并且注冊了與worker通信的回調(diào)事件onWorkerConnect,onWorkerMessage,onWorkerClose。使用的協(xié)議是內(nèi)部通訊協(xié)議GatewayProtocol。registerAddress方法通過文件鎖把這個子進程的內(nèi)部通訊服務地址{$this->lanIp}:{$this->lanPort}記錄到一個公共地方,可能是文件,可能是memcache,可能是redis,后兩種支持分布式部署gate與work,不然就要走同一臺機器上。用后面兩種可以部署多個gate,然后其他機器部署work。這時候定義的內(nèi)部服務通過listen方法已經(jīng)有accept監(jiān)聽事件了,如果work跟與gate連接就會進入到設置的Worker的回調(diào)方法里,客戶端與Gate連接就會進入到Client方法里,因為他們是兩種不同的work實例,監(jiān)聽的不同的端口。
到這里Gate子服務已經(jīng)準備好了,除了自己是work實例提供給客戶端的連接服務,被主進程管理之外;每個子Gate進程都會在新建一個work實例來提供對worker子進程的訪問服務;對客戶端的服務有new Gate的時候指定協(xié)議,對子worker進程的服務是默認協(xié)議,并且tcp與udp都監(jiān)聽了。后面的步奏應該是子worker進程在workstart方法里從子Gate服務建立內(nèi)部服務是注冊全局的內(nèi)部通訊服務地址,連接到Gate,這樣gate的內(nèi)部服務監(jiān)聽就把子worker服務的地址記錄下來。
子worker進程服務通過tcp與gate進程服務通信,通過在連接上的監(jiān)聽,實現(xiàn)消息的傳遞,worker進程在通過與Event文件的回調(diào)來通知客戶端的請求,Event處理完畢之后,通過lib/gateway文件,直接udp到Gate來實現(xiàn)信息的傳遞。
分布式中的每臺機器有主進程管理子進程,子Gate進程處理監(jiān)聽客戶度與內(nèi)部Work進程,Gate記錄全局的客戶端id與Gate的對應關(guān)系到全局儲存器,也記錄自己的內(nèi)部服務地址到全局存儲器。每個子Gate進程記錄連接自己的內(nèi)部worker地址。然后子worker啟動時候從全局內(nèi)部服務地址取地址進行tcp連接,記錄與自己連接的Gate地址,client,gate,work直接的通信就打通了,如果一個客戶端要與另一個客戶端通信,在Event處理時從全局的client與gate的對以關(guān)系里得到要發(fā)送的client連接的gate,然后給這個gate發(fā)送udp信息,再由gate轉(zhuǎn)發(fā)到對的client。
下面看看基類的accept方法:
/**
* 接收一個客戶端連接
* @param resource $socket
* @return void
*/
public function acceptConnection($socket)
{
// 獲得客戶端連接
$new_socket = @stream_socket_accept($socket, 0);
// 驚群現(xiàn)象,忽略
if(false === $new_socket)
{
return;
}
// 初始化連接對象
$connection = new TcpConnection($new_socket);
$this->connections[$connection->id] = $connection;
$connection->worker = $this;
$connection->protocol = $this->_protocol;
$connection->onMessage = $this->onMessage;
$connection->onClose = $this->onClose;
$connection->onError = $this->onError;
$connection->onBufferDrain = $this->onBufferDrain;
$connection->onBufferFull = $this->onBufferFull;
// 如果有設置連接回調(diào),則執(zhí)行
if($this->onConnect)
{
try
{
call_user_func($this->onConnect, $connection);
}
catch(Exception $e)
{
ConnectionInterface::$statistics['throw_exception']++;
self::log($e);
}
}
}
/**
* 處理udp連接(udp其實是無連接的,這里為保證和tcp連接接口一致)
*
* @param resource $socket
* @return bool
*/
public function acceptUdpConnection($socket)
{
$recv_buffer = stream_socket_recvfrom($socket , self::MAX_UDP_PACKEG_SIZE, 0, $remote_address);
if(false === $recv_buffer || empty($remote_address))
{
return false;
}
// 模擬一個連接對象
$connection = new UdpConnection($socket, $remote_address);
if($this->onMessage)
{
if($this->_protocol)
{
/** @var WorkermanProtocolsProtocolInterface $parser */
$parser = $this->_protocol;
$recv_buffer = $parser::decode($recv_buffer, $connection);
}
ConnectionInterface::$statistics['total_request']++;
try
{
call_user_func($this->onMessage, $connection, $recv_buffer);
}
catch(Exception $e)
{
ConnectionInterface::$statistics['throw_exception']++;
}
}
}
整體上來說就是tcp就是新建一個客戶端連接,然后使用tcpConnecttion類封裝,包括通信協(xié)議,然后回調(diào)onConnect事件;udp直接從連接獲取數(shù)據(jù),然后通過協(xié)議解析數(shù)據(jù),回調(diào)onMessage方法。
gate的內(nèi)部服務建立好了,再看看business子服務的run方法:
/**
* 運行
* @see Workerman.Worker::run()
*/
public function run()
{
$this->_onWorkerStart = $this->onWorkerStart;
$this->onWorkerStart = array($this, 'onWorkerStart');
parent::run();
}
/**
* 當進程啟動時一些初始化工作
* @return void
*/
protected function onWorkerStart()
{
if(!class_exists('ProtocolsGatewayProtocol'))
{
class_alias('GatewayWorkerProtocolsGatewayProtocol', 'ProtocolsGatewayProtocol');
}
Timer::add(1, array($this, 'checkGatewayConnections'));
$this->checkGatewayConnections();
GatewayWorkerLibGateway::setBusinessWorker($this);
if($this->_onWorkerStart)
{
call_user_func($this->_onWorkerStart, $this);
}
}
這里business子服務直接就是循環(huán)連接所有的Gate了。
連接打通之后,怎么通信呢,這就要看事件驅(qū)動管理以及協(xié)議了。
總結(jié)
以上是生活随笔為你收集整理的gateway-workman的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 一个在线解压缩的网站:wobzip
- 下一篇: iOS UITableView划动删除的