日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 综合教程 >内容正文

综合教程

gateway-workman

發(fā)布時間:2023/12/13 综合教程 32 生活家
生活随笔 收集整理的這篇文章主要介紹了 gateway-workman 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

最外層start.php,設置全局啟動模式,加載Application里的個子服務目錄下應用的啟動文件(start開頭,這些文件都是workmanwork類的子類,在載入文件的同時,這些子服務會生成對象,work類的構(gòu)造方法會把生成的work對象都存入work類的靜態(tài)變量$_workers,方便主文件后續(xù)設置以及啟動子服務,全局啟動模式子服務是不會啟動的),設置時區(qū),注冊自動加載,調(diào)用workmanWorker:runall啟動所有服務。

主文件:

// 標記是全局啟動
define('GLOBAL_START', 1);

require_once __DIR__ . '/Workerman/Autoloader.php';

// 加載所有Applications/*/start.php,以便啟動所有服務
foreach(glob(__DIR__.'/Applications/*/start*.php') as $start_file)
{
    require_once $start_file;
}
// 運行所有服務
Worker::runAll();

子服務文件start_gateway,提供接入請求的服務(class Gateway extends Worker):

// gateway 進程,這里使用Text協(xié)議,可以用telnet測試
$gateway = new Gateway("Text://0.0.0.0:8282");
// gateway名稱,status方便查看
$gateway->name = 'YourAppGateway';
// gateway進程數(shù)
$gateway->count = 4;
// 本機ip,分布式部署時使用內(nèi)網(wǎng)ip
$gateway->lanIp = '127.0.0.1';
// 內(nèi)部通訊起始端口,假如$gateway->count=4,起始端口為4000
// 則一般會使用4001 4002 4003 4004 4個端口作為內(nèi)部通訊端口 
$gateway->startPort = 2300;
// 心跳間隔
//$gateway->pingInterval = 10;
// 心跳數(shù)據(jù)
//$gateway->pingData = '{"type":"ping"}';

/* 
// 當客戶端連接上來時,設置連接的onWebSocketConnect,即在websocket握手時的回調(diào)
$gateway->onConnect = function($connection)
{
    $connection->onWebSocketConnect = function($connection , $http_header)
    {
        // 可以在這里判斷連接來源是否合法,不合法就關(guān)掉連接
        // $_SERVER['HTTP_ORIGIN']標識來自哪個站點的頁面發(fā)起的websocket鏈接
        if($_SERVER['HTTP_ORIGIN'] != 'http://kedou.workerman.net')
        {
            $connection->close();
        }
        // onWebSocketConnect 里面$_GET $_SERVER是可用的
        // var_dump($_GET, $_SERVER);
    };
}; 
*/

// 如果不是在根目錄啟動,則運行runAll方法
if(!defined('GLOBAL_START'))
{
    Worker::runAll();
}

接入服務除了注冊到全局靜態(tài)$_workers變量,還設置了路由:

    public function __construct($socket_name, $context_option = array())
    {
        parent::__construct($socket_name, $context_option);
        //隨機返回一個bussness的連接
        $this->router = array("\GatewayWorker\Gateway", 'routerRand');
        
        $backrace = debug_backtrace();
        $this->_appInitPath = dirname($backrace[0]['file']);
    }

子服務文件start_bussinessworker,提供實際的業(yè)務處理,和gateway服務內(nèi)部通訊類似nginx與php(class BusinessWorker extends Worker):

// bussinessWorker 進程
$worker = new BusinessWorker();
// worker名稱
$worker->name = 'YourAppBusinessWorker';
// bussinessWorker進程數(shù)量
$worker->count = 4;


// 如果不是在根目錄啟動,則運行runAll方法
if(!defined('GLOBAL_START'))
{
    Worker::runAll();
}

附帶基類work的構(gòu)造函數(shù):

/**
     * worker構(gòu)造函數(shù)
     *
     * @param string $socket_name
     * @param array  $context_option
     */
    public function __construct($socket_name = '', $context_option = array())
    {
        // 保存worker實例
        $this->workerId = spl_object_hash($this);
        self::$_workers[$this->workerId] = $this;
        self::$_pidMap[$this->workerId] = array();
        
        // 獲得實例化文件路徑,用于自動加載設置根目錄
        $backrace = debug_backtrace();
        $this->_appInitPath = dirname($backrace[0]['file']);
        
        // 設置socket上下文
        if($socket_name)
        {
            $this->_socketName = $socket_name;
            if(!isset($context_option['socket']['backlog']))
            {
                $context_option['socket']['backlog'] = self::DEFAUL_BACKLOG;
            }
            $this->_context = stream_context_create($context_option);
        }
    }

這里可以看到self::$_workers[$this->workerId] = $this;記錄全局worker實例,
self::$_pidMap這個用來記錄各個子服務開始fork后的所有子進程id
$context_option['socket']['backlog'] = self::DEFAUL_BACKLOG;設置嵌套字上下文里的未accept隊列長度
在后面運行實例的listen方法監(jiān)聽的時候會傳遞到stream_socket_server方法里。

runAll的流程如下:

/**
     * 運行所有worker實例
     * @return void
     */
    public static function runAll()
    {
        // 初始化環(huán)境變量(pid文件,log文件,status文件,定時器信號回調(diào))
        self::init();
        // 解析命令(運行,重啟,停止,重載,狀態(tài),從命令判斷主進程是否以守護進程啟動)
	// 啟動之后通過php start.php XXX命令會到這里!因為第一步設置了這個文件的pid(這里可以看到pid對應到文件位置的重要性),所以后面的命令會對應為發(fā)送信號
        self::parseCommand();
        // 嘗試以守護進程模式運行(fork兩次進程,重置進程sid)
        self::daemonize();
        // 初始化所有worker實例,主要是監(jiān)聽端口(記錄所有子服務worker實例的最長名稱name長度,最長嵌套字名socket_name長度,最長運行用戶名user長度;
	// 所有定義了協(xié)議的子服務(Gate)開始監(jiān)聽也就是啟動服務,子服務實例監(jiān)聽嵌套字對象mainSoctke,還沒有注冊accept回調(diào))
	// 到這里所有Gate子服務啟動監(jiān)聽,但是沒有accept
	self::initWorkers();
        //  初始化所有信號處理函數(shù)(為主進程注冊stop,stats,reload的信號回調(diào)signalHandler)
        self::installSignal();
        // 保存主進程pid(將獲取daemonize方法后的新的主進程sid,存入init方法后的pid文件)
        self::saveMasterPid();
        // 創(chuàng)建子進程(worker進程)并運行(主進程通過self::$_pidMap用來記錄子服務進程創(chuàng)建的各自進程號,方便后面發(fā)送信號,
// 生成的子進程置空主進程的全局變量,self::$_pidMap,self::$_workers,如果在子服務文件里定義了self::$stdoutFile文件地址,
// 會重定向子服務子進程的stdout和stderr,直接運行work實例的run方法)
// 到這里子服務已經(jīng)在子進程中運行,后面的代碼就只有主服務執(zhí)行

// 子進程的run方法會通過libevent綁定子服務mainSocket的accept回調(diào),在accept回調(diào)方法里才有定義后面怎么處理請求socket
// 子進程的run方法會通過libevent綁定重新綁定信號量,以及用libevent來注入定時器
// 子進程的run方法會回調(diào)用戶在子服務文件里的onWorkStar方法
// 子進程進入事件監(jiān)聽輪詢
// 上面是基類中的run方法,基于gateway的子服務,會實現(xiàn)自己的onworkStar方法,然后在調(diào)用基類的run,這樣可以在onWorkStar里實現(xiàn)gate與worker的連接
// 這里不知道怎么處理子進程對accpet時候的驚群 self::forkWorkers(); // 展示啟動界面(打印所有啟動的子服務的信息,由于initWorkers獲取了各個子服務實例的名稱等信息長度可以很好的格式化展示) self::displayUI(); // 嘗試重定向標準輸入輸出(重定向主服務進程) self::resetStd(); // 監(jiān)控所有子進程(worker進程)(處理主進程的信號量;通過pcntl_wait循環(huán)監(jiān)聽子進程狀態(tài),保持子進程的運行) /* 什么是平滑重啟? 平滑重啟不同于普通的重啟,平滑重啟可以做到在不影響用戶的情況下重啟服務,以便重新載入PHP程序,完成業(yè)務代碼更新。 平滑重啟一般應用于業(yè)務更新或者版本發(fā)布過程中,能夠避免因為代碼發(fā)布重啟服務導致的暫時性服務不可用的影響。 注意:只有在on{...}回調(diào)中載入的文件平滑重啟后才會自動更新,啟動腳本中直接載入的文件或者寫死的代碼運行reload不會自動更新。 平滑重啟原理 WorkerMan分為主進程和子進程,主進程負責監(jiān)控子進程,子進程負責接收客戶端的連接和連接上發(fā)來的請求數(shù)據(jù), 做相應的處理并返回數(shù)據(jù)給客戶端。當業(yè)務代碼更新時,其實我們只要更新子進程,便可以達到更新代碼的目的。 當WorkerMan主進程收到平滑重啟信號時,主進程會向其中一個子進程發(fā)送安全退出(讓對應進程處理完畢當前請求后才退出)信號, 當這個進程退出后,主進程會重新創(chuàng)建一個新的子進程(這個子進程載入了新的PHP代碼),然后主進程再次向另外一個舊的進程發(fā)送停止 命令,這樣一個進程一個進程的重啟,直到所有舊的進程全部被置換為止。 我們看到平滑重啟實際上是讓舊的業(yè)務進程逐個退出然后并逐個創(chuàng)建新的進程做到的。為了在平滑重啟時不影響客用戶,這就要求進程中不 要保存用戶相關(guān)的狀態(tài)信息,即業(yè)務進程最好是無狀態(tài)的,避免由于進程退出導致信息丟失。 */ //上面是官網(wǎng)對平滑啟動的說明,設計的代碼就是reload方法的 $one_worker_pid = current(self::$_pidsToRestart );這里是處理主進程的平滑啟動信號的 // 在主進程里獲取所有設置為可以平滑啟動的子進程的pid,然后取一個發(fā)送平滑啟動信號信號,這個信號到子進程,其實子進程會通過stopAll方法停止運行 // exit(0); // 主進程監(jiān)聽到子進程退出,然后重新生成一個新的子進程,然后把這個子進程的id從self::$_pidsToRestart里刪除,然后再次調(diào)用reload方法去殺掉下一個子進程 // self::monitorWorkers(); }

主要的過程已經(jīng)描述清楚了,主服務在主進程里,子服務開啟監(jiān)聽之后,主服務開始fork,然后記錄子服務進程的對應的pid,然后通過信號量來處理用戶命令以及管理子服務進程。子服務在子進程里實現(xiàn)accpet監(jiān)聽回調(diào)。

work基類的主要代碼片段:

子服務listen:
 // 獲得應用層通訊協(xié)議以及監(jiān)聽的地址,udp會轉(zhuǎn)換為傳輸協(xié)議
        list($scheme, $address) = explode(':', $this->_socketName, 2);
        // 如果有指定應用層協(xié)議,則檢查對應的協(xié)議類是否存在
        if($scheme != 'tcp' && $scheme != 'udp')
        {
            $scheme = ucfirst($scheme);
            $this->_protocol = '\Protocols\'.$scheme;
            if(!class_exists($this->_protocol))
            {
                $this->_protocol = "\Workerman\Protocols\$scheme";
                if(!class_exists($this->_protocol))
                {
                    throw new Exception("class \Protocols\$scheme not exist");
                }
            }
        }
        elseif($scheme === 'udp')
        {
            $this->transport = 'udp';
        }
        
        // flag
        $flags =  $this->transport === 'udp' ? STREAM_SERVER_BIND : STREAM_SERVER_BIND | STREAM_SERVER_LISTEN;
        $errno = 0;
        $errmsg = '';
        $this->_mainSocket = stream_socket_server($this->transport.":".$address, $errno, $errmsg, $flags, $this->_context);
        if(!$this->_mainSocket)
        {
            throw new Exception($errmsg);
        }
創(chuàng)建子進程:
        // 創(chuàng)建子進程
            while(count(self::$_pidMap[$worker->workerId]) < $worker->count)
            {
                static::forkOneWorker($worker);
            }
        }
    }

    /**
     * 創(chuàng)建一個子進程
     * @param Worker $worker
     * @throws Exception
     */
    protected static function forkOneWorker($worker)
    {
        $pid = pcntl_fork();
        // 主進程記錄子進程pid
        if($pid > 0)
        {
            self::$_pidMap[$worker->workerId][$pid] = $pid;
        }
        // 子進程運行
        elseif(0 === $pid)
        {
            // 啟動過程中嘗試重定向標準輸出
            if(self::$_status === self::STATUS_STARTING)
            {
                self::resetStd();
            }
            self::$_pidMap = array();
            self::$_workers = array($worker->workerId => $worker);
            Timer::delAll();
            self::setProcessTitle('WorkerMan: worker process  ' . $worker->name . ' ' . $worker->getSocketName());
            self::setProcessUser($worker->user);
            $worker->run();
            exit(250);
        }
        else
        {
            throw new Exception("forkOneWorker fail");
        }
    }
子進程執(zhí)行的基類run方法:
/**
     * 運行worker實例
     */
    public function run()
    {
        // 注冊進程退出回調(diào),用來檢查是否有錯誤
        register_shutdown_function(array("\Workerman\Worker", 'checkErrors'));
        
        // 設置自動加載根目錄
        Autoloader::setRootPath($this->_appInitPath);
        
        // 如果沒有全局事件輪詢,則創(chuàng)建一個
        if(!self::$globalEvent)
        {
            if(extension_loaded('libevent'))
            {
                self::$globalEvent = new Libevent();
            }
            else
            {
                self::$globalEvent = new Select();
            }
            // 監(jiān)聽_mainSocket上的可讀事件(客戶端連接事件)也只有Gate才有這個事件
            if($this->_socketName)
            {
                if($this->transport !== 'udp')
                {
                    self::$globalEvent->add($this->_mainSocket, EventInterface::EV_READ, array($this, 'acceptConnection'));
                }
                else
                {
                    self::$globalEvent->add($this->_mainSocket,  EventInterface::EV_READ, array($this, 'acceptUdpConnection'));
                }
            }
        }
        
        // 重新安裝事件處理函數(shù),使用全局事件輪詢監(jiān)聽信號事件
        self::reinstallSignal();
        
        // 用全局事件輪詢初始化定時器
        Timer::init(self::$globalEvent);
        
        // 如果有設置進程啟動回調(diào),則執(zhí)行
        if($this->onWorkerStart)
        {
            call_user_func($this->onWorkerStart, $this);
        }
        
        // 子進程主循環(huán)
        self::$globalEvent->loop();
    }
主服務監(jiān)聽子服務進程:
pcntl_signal_dispatch();
            // 掛起進程,直到有子進程退出或者被信號打斷
            $status = 0;
            $pid = pcntl_wait($status, WUNTRACED);
            // 如果有信號到來,嘗試觸發(fā)信號處理函數(shù)
            pcntl_signal_dispatch();
            // 有子進程退出
            if($pid > 0)
            {
                // 查找是哪個進程組的,然后再啟動新的進程補上
                foreach(self::$_pidMap as $worker_id => $worker_pid_array)
                {
                    if(isset($worker_pid_array[$pid]))
                    {
                        $worker = self::$_workers[$worker_id];
                        // 檢查退出狀態(tài)
                        if($status !== 0)
                        {
                            self::log("worker[".$worker->name.":$pid] exit with status $status");
                        }
                       
                        // 統(tǒng)計,運行status命令時使用
                        if(!isset(self::$_globalStatistics['worker_exit_info'][$worker_id][$status]))
                        {
                            self::$_globalStatistics['worker_exit_info'][$worker_id][$status] = 0;
                        }
                        self::$_globalStatistics['worker_exit_info'][$worker_id][$status]++;
                        
                        // 清除子進程信息
                        unset(self::$_pidMap[$worker_id][$pid]);
                        
                        break;
                    }
                }
                // 如果不是關(guān)閉狀態(tài),則補充新的進程
                if(self::$_status !== self::STATUS_SHUTDOWN)
                {
                    self::forkWorkers();
                    // 如果該進程是因為運行reload命令退出,則繼續(xù)執(zhí)行reload流程
                    if(isset(self::$_pidsToRestart[$pid]))
                    {
                        unset(self::$_pidsToRestart[$pid]);
                        self::reload();
                    }
                }
平滑啟動過程:
 /**
     * 執(zhí)行平滑重啟流程
     * @return void
     */
    protected static function reload()
    {
        // 主進程部分
        if(self::$_masterPid === posix_getpid())
        {
            // 設置為平滑重啟狀態(tài)
            if(self::$_status !== self::STATUS_RELOADING && self::$_status !== self::STATUS_SHUTDOWN)
            {
                self::log("Workerman[".basename(self::$_startFile)."] reloading");
                self::$_status = self::STATUS_RELOADING;
            }
            
            // 如果有worker設置了reloadable=false,則過濾掉
            $reloadable_pid_array = array();
            foreach(self::$_pidMap as $worker_id =>$worker_pid_array)
            {
                $worker = self::$_workers[$worker_id];
                if($worker->reloadable)
                {
                    foreach($worker_pid_array as $pid)
                    {
                        $reloadable_pid_array[$pid] = $pid;
                    }
                }
            }
            
            // 得到所有可以重啟的進程
            self::$_pidsToRestart = array_intersect(self::$_pidsToRestart , $reloadable_pid_array);
            
            // 平滑重啟完畢
            if(empty(self::$_pidsToRestart))
            {
                if(self::$_status !== self::STATUS_SHUTDOWN)
                {
                    self::$_status = self::STATUS_RUNNING;
                }
                return;
            }
            // 繼續(xù)執(zhí)行平滑重啟流程
            $one_worker_pid = current(self::$_pidsToRestart );
            // 給子進程發(fā)送平滑重啟信號
            posix_kill($one_worker_pid, SIGUSR1);
            // 定時器,如果子進程在KILL_WORKER_TIMER_TIME秒后沒有退出,則強行殺死
            Timer::add(self::KILL_WORKER_TIMER_TIME, 'posix_kill', array($one_worker_pid, SIGKILL), false);
        }
        // 子進程部分
        else
        {
            // 如果當前worker的reloadable屬性為真,則執(zhí)行退出
            $worker = current(self::$_workers);
            if($worker->reloadable)
            {
                self::stopAll();
            }
        }
    } 

到這里主進程已經(jīng)準備好了,子進程(Gate)已經(jīng)開始監(jiān)聽了(還未講gate與worker的連接通信,以及gate怎么接受請求,然后worker怎么處理請求)。上面說了gate與worker的連接是在OnWorkStart里實現(xiàn)的。后面就來看看

gate的run方法里保存了用戶自定義的方法,然后自己的onWorkStart,onConnect,onMessage,onClose,onWorkstop都已定義好

/**
     * 運行
     * @see Workerman.Worker::run()
     */
    public function run()
    {
        // 保存用戶的回調(diào),當對應的事件發(fā)生時觸發(fā)
        $this->_onWorkerStart = $this->onWorkerStart;
        $this->onWorkerStart = array($this, 'onWorkerStart');
        // 保存用戶的回調(diào),當對應的事件發(fā)生時觸發(fā)
        $this->_onConnect = $this->onConnect;
        $this->onConnect = array($this, 'onClientConnect');
        
        // onMessage禁止用戶設置回調(diào)
        $this->onMessage = array($this, 'onClientMessage');
        
        // 保存用戶的回調(diào),當對應的事件發(fā)生時觸發(fā)
        $this->_onClose = $this->onClose;
        $this->onClose = array($this, 'onClientClose');
        // 保存用戶的回調(diào),當對應的事件發(fā)生時觸發(fā)
        $this->_onWorkerStop = $this->onWorkerStop;
        $this->onWorkerStop = array($this, 'onWorkerStop');
        
        // 記錄進程啟動的時間
        $this->_startTime = time();
        // 運行父方法
        parent::run();
    }

在看看他的OnworkStart方法,也就是子進程運行后執(zhí)行的方法

/**
     * 當Gateway啟動的時候觸發(fā)的回調(diào)函數(shù)
     * @return void
     */
    public function onWorkerStart()
    {
        // 分配一個內(nèi)部通訊端口
		// 主進程pid-子進程pid+startPort保證每個子進程的內(nèi)部端口不同
        $this->lanPort = function_exists('posix_getppid') ? $this->startPort - posix_getppid() + posix_getpid() : $this->startPort;
        if($this->lanPort<0 || $this->lanPort >=65535)
        {
            $this->lanPort = rand($this->startPort, 65535);
        }
        
        // 如果有設置心跳,則定時執(zhí)行
        if($this->pingInterval > 0)
        {
            $timer_interval = $this->pingNotResponseLimit > 0 ? $this->pingInterval/2 : $this->pingInterval;
            Timer::add($timer_interval, array($this, 'ping'));
        }
		//別名內(nèi)部通訊協(xié)議
        if(!class_exists('ProtocolsGatewayProtocol'))
        {
            class_alias('GatewayWorkerProtocolsGatewayProtocol', 'ProtocolsGatewayProtocol');
        }
        // 初始化gateway內(nèi)部的監(jiān)聽,用于監(jiān)聽worker的連接已經(jīng)連接上發(fā)來的數(shù)據(jù)
// 這里內(nèi)部鏈接在同一個ip+端口的情況下有兩個服務
// 這個時候listen由于全局的事件self::$globalEvent在子進程的run方法里在回調(diào)OnWorkStart之前已經(jīng)定義,所以不像主進程一樣在listen的不監(jiān)聽accept事件 $this->_innerTcpWorker = new Worker("GatewayProtocol://{$this->lanIp}:{$this->lanPort}"); $this->_innerTcpWorker->listen(); $this->_innerUdpWorker = new Worker("GatewayProtocol://{$this->lanIp}:{$this->lanPort}"); $this->_innerUdpWorker->transport = 'udp'; $this->_innerUdpWorker->listen(); // 重新設置自動加載根目錄 Autoloader::setRootPath($this->_appInitPath); // 設置內(nèi)部監(jiān)聽的相關(guān)回調(diào) $this->_innerTcpWorker->onMessage = array($this, 'onWorkerMessage'); $this->_innerUdpWorker->onMessage = array($this, 'onWorkerMessage'); $this->_innerTcpWorker->onConnect = array($this, 'onWorkerConnect'); $this->_innerTcpWorker->onClose = array($this, 'onWorkerClose'); // 注冊gateway的內(nèi)部通訊地址,worker去連這個地址,以便gateway與worker之間建立起TCP長連接 if(!$this->registerAddress()) { $this->log('registerAddress fail and exit'); Worker::stopAll(); } if($this->_onWorkerStart) { call_user_func($this->_onWorkerStart, $this); } }

可以看到他有新建了兩個個work實例,innerTcpWorker,innerUdpWorker,并且在同一個地址{$this->lanIp}:{$this->lanPort}下開啟了兩個服務,并且注冊了與worker通信的回調(diào)事件onWorkerConnect,onWorkerMessage,onWorkerClose。使用的協(xié)議是內(nèi)部通訊協(xié)議GatewayProtocol。registerAddress方法通過文件鎖把這個子進程的內(nèi)部通訊服務地址{$this->lanIp}:{$this->lanPort}記錄到一個公共地方,可能是文件,可能是memcache,可能是redis,后兩種支持分布式部署gate與work,不然就要走同一臺機器上。用后面兩種可以部署多個gate,然后其他機器部署work。這時候定義的內(nèi)部服務通過listen方法已經(jīng)有accept監(jiān)聽事件了,如果work跟與gate連接就會進入到設置的Worker的回調(diào)方法里,客戶端與Gate連接就會進入到Client方法里,因為他們是兩種不同的work實例,監(jiān)聽的不同的端口。

到這里Gate子服務已經(jīng)準備好了,除了自己是work實例提供給客戶端的連接服務,被主進程管理之外;每個子Gate進程都會在新建一個work實例來提供對worker子進程的訪問服務;對客戶端的服務有new Gate的時候指定協(xié)議,對子worker進程的服務是默認協(xié)議,并且tcp與udp都監(jiān)聽了。后面的步奏應該是子worker進程在workstart方法里從子Gate服務建立內(nèi)部服務是注冊全局的內(nèi)部通訊服務地址,連接到Gate,這樣gate的內(nèi)部服務監(jiān)聽就把子worker服務的地址記錄下來。

子worker進程服務通過tcp與gate進程服務通信,通過在連接上的監(jiān)聽,實現(xiàn)消息的傳遞,worker進程在通過與Event文件的回調(diào)來通知客戶端的請求,Event處理完畢之后,通過lib/gateway文件,直接udp到Gate來實現(xiàn)信息的傳遞。

分布式中的每臺機器有主進程管理子進程,子Gate進程處理監(jiān)聽客戶度與內(nèi)部Work進程,Gate記錄全局的客戶端id與Gate的對應關(guān)系到全局儲存器,也記錄自己的內(nèi)部服務地址到全局存儲器。每個子Gate進程記錄連接自己的內(nèi)部worker地址。然后子worker啟動時候從全局內(nèi)部服務地址取地址進行tcp連接,記錄與自己連接的Gate地址,client,gate,work直接的通信就打通了,如果一個客戶端要與另一個客戶端通信,在Event處理時從全局的client與gate的對以關(guān)系里得到要發(fā)送的client連接的gate,然后給這個gate發(fā)送udp信息,再由gate轉(zhuǎn)發(fā)到對的client。

下面看看基類的accept方法:

/**
     * 接收一個客戶端連接
     * @param resource $socket
     * @return void
     */
    public function acceptConnection($socket)
    {
        // 獲得客戶端連接
        $new_socket = @stream_socket_accept($socket, 0);
        // 驚群現(xiàn)象,忽略
        if(false === $new_socket)
        {
            return;
        }
        
        // 初始化連接對象
        $connection = new TcpConnection($new_socket);
        $this->connections[$connection->id] = $connection;
        $connection->worker = $this;
        $connection->protocol = $this->_protocol;
        $connection->onMessage = $this->onMessage;
        $connection->onClose = $this->onClose;
        $connection->onError = $this->onError;
        $connection->onBufferDrain = $this->onBufferDrain;
        $connection->onBufferFull = $this->onBufferFull;
        
        // 如果有設置連接回調(diào),則執(zhí)行
        if($this->onConnect)
        {
            try
            {
                call_user_func($this->onConnect, $connection);
            }
            catch(Exception $e)
            {
                ConnectionInterface::$statistics['throw_exception']++;
                self::log($e);
            }
        }
    }

    /**
     * 處理udp連接(udp其實是無連接的,這里為保證和tcp連接接口一致)
     *
     * @param resource $socket
     * @return bool
     */
    public function acceptUdpConnection($socket)
    {
        $recv_buffer = stream_socket_recvfrom($socket , self::MAX_UDP_PACKEG_SIZE, 0, $remote_address);
        if(false === $recv_buffer || empty($remote_address))
        {
            return false;
        }
        // 模擬一個連接對象
        $connection = new UdpConnection($socket, $remote_address);
        if($this->onMessage)
        {
            if($this->_protocol)
            {
                /** @var WorkermanProtocolsProtocolInterface $parser */
                $parser = $this->_protocol;
                $recv_buffer = $parser::decode($recv_buffer, $connection);
            }
            ConnectionInterface::$statistics['total_request']++;
            try
            {
               call_user_func($this->onMessage, $connection, $recv_buffer);
            }
            catch(Exception $e)
            {
                ConnectionInterface::$statistics['throw_exception']++;
            }
        }
    }

整體上來說就是tcp就是新建一個客戶端連接,然后使用tcpConnecttion類封裝,包括通信協(xié)議,然后回調(diào)onConnect事件;udp直接從連接獲取數(shù)據(jù),然后通過協(xié)議解析數(shù)據(jù),回調(diào)onMessage方法。

gate的內(nèi)部服務建立好了,再看看business子服務的run方法:

    /**
     * 運行
     * @see Workerman.Worker::run()
     */
    public function run()
    {
        $this->_onWorkerStart = $this->onWorkerStart;
        $this->onWorkerStart = array($this, 'onWorkerStart');
        parent::run();
    }
    
    /**
     * 當進程啟動時一些初始化工作
     * @return void
     */
    protected function onWorkerStart()
    {
        if(!class_exists('ProtocolsGatewayProtocol'))
        {
            class_alias('GatewayWorkerProtocolsGatewayProtocol', 'ProtocolsGatewayProtocol');
        }
        Timer::add(1, array($this, 'checkGatewayConnections'));
        $this->checkGatewayConnections();
        GatewayWorkerLibGateway::setBusinessWorker($this);
        if($this->_onWorkerStart)
        {
            call_user_func($this->_onWorkerStart, $this);
        }
    }

這里business子服務直接就是循環(huán)連接所有的Gate了。

連接打通之后,怎么通信呢,這就要看事件驅(qū)動管理以及協(xié)議了。

總結(jié)

以上是生活随笔為你收集整理的gateway-workman的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。