日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程语言 > php >内容正文

php

php basic publish,RabbitMQ入门(PHP语言描述)

發布時間:2024/6/1 php 44 豆豆
生活随笔 收集整理的這篇文章主要介紹了 php basic publish,RabbitMQ入门(PHP语言描述) 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

一 "Hello World!"

生產者:

/*

* php G:\wamp\www\mygedu\yii tools/send-mq msg*/

public?function?actionSendMq($argv=''){

$connection?=?new?AMQPStreamConnection('localhost',?5672,?'guest',?'guest');

$channel?=?$connection->channel();

$channel->queue_declare('hello',?false,?false,?false,?false);

$msg?=?new?AMQPMessage($argv);

$channel->basic_publish($msg,?'',?'hello');

echo?"?[x]?Sent?'$argv'".PHP_EOL;

$channel->close();

$connection->close();

}

消費者:

/*

*?php?G:\wamp\www\mygedu\yii?tools/receive-mq

*/

public?function?actionReceiveMq(){

$connection?=?new?AMQPStreamConnection('localhost',?5672,?'guest',?'guest');

$channel?=?$connection->channel();

$channel->queue_declare('hello',?false,?false,?false,?false);

echo?'?[*]?Waiting?for?messages.?To?exit?press?CTRL+C',?"\n";

$callback?=?function($msg)?{

echo?"?[x]?Received?",?$msg->body,?"\n";

};

$channel->basic_consume('hello',?'',?false,?true,?false,?false,?$callback);

while(count($channel->callbacks))?{

$channel->wait();

}

}

二?Work?queues

生產者:

/*

*?php?G:\wamp\www\mygedu\yii?tools/new-task?msg

*/

public?function?actionNewTask($data='Hello?World!'){

$connection?=?new?AMQPStreamConnection('localhost',?5672,?'guest',?'guest');

$channel?=?$connection->channel();

$channel->queue_declare('my_task_queue',?false,?true,?false,?false);

$msg?=?new?AMQPMessage($data,

array('delivery_mode'?=>?2)?#?make?message?persistent

);

$channel->basic_publish($msg,?'',?'my_task_queue');

echo?"?[x]?Sent?",?$data,?"\n";

$channel->close();

$connection->close();

}

消費者:

/*

*?php?G:\wamp\www\mygedu\yii?tools/worker

*/

public?function?actionWorker(){

$connection?=?new?AMQPStreamConnection('localhost',?5672,?'guest',?'guest');

$channel?=?$connection->channel();

$channel->queue_declare('my_task_queue',?false,?true,?false,?false);

echo?'?[*]?Waiting?for?messages.?To?exit?press?CTRL+C',?"\n";

$callback?=?function($msg){

echo?"?[x]?Received?",?$msg->body,?"\n";

sleep(substr_count($msg->body,?'.'));

echo?"?[x]?Done",?"\n";

$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);

};

$channel->basic_qos(null,?1,?null);

$channel->basic_consume('my_task_queue',?'',?false,?false,?false,?false,?$callback);

while(count($channel->callbacks))?{

$channel->wait();

}

$channel->close();

$connection->close();

}

三?Publish/Subscribe

生產者:

/*

*?php?G:\wamp\www\mygedu\yii?tools/emit-log?msg

*/

public?function?actionEmitLog($data='Hello?World!'){

$connection?=?new?AMQPStreamConnection('localhost',?5672,?'guest',?'guest');

$channel?=?$connection->channel();

$channel->exchange_declare('logs',?'fanout',?false,?false,?false);

if(empty($data))?$data?=?"info:?Hello?World!";

$msg?=?new?AMQPMessage($data);

$channel->basic_publish($msg,?'logs');

echo?"?[x]?Sent?",?$data,?"\n";

$channel->close();

$connection->close();

}

消費者:

/*

*?php?G:\wamp\www\mygedu\yii?tools/receive-logs

*/

public?function?actionReceiveLogs(){

$connection?=?new?AMQPStreamConnection('localhost',?5672,?'guest',?'guest');

$channel?=?$connection->channel();

$channel->exchange_declare('logs',?'fanout',?false,?false,?false);

list($queue_name,?,)?=?$channel->queue_declare("",?false,?false,?true,?false);

$channel->queue_bind($queue_name,?'logs');

echo?'?[*]?Waiting?for?logs.?To?exit?press?CTRL+C',?"\n";

$callback?=?function($msg){

echo?'?[x]?',?$msg->body,?"\n";

};

$channel->basic_consume($queue_name,?'',?false,?true,?false,?false,?$callback);

while(count($channel->callbacks))?{

$channel->wait();

}

$channel->close();

$connection->close();

}

四?Routing

生產者:

/*

*?php?G:\wamp\www\mygedu\yii?tools/emit-log-direct?info?msg

*/

public?function?actionEmitLogDirect($argv,?$data='Hello?World!'){

$connection?=?new?AMQPStreamConnection('localhost',?5672,?'guest',?'guest');

$channel?=?$connection->channel();

$channel->exchange_declare('direct_logs',?'direct',?false,?false,?false);

$severity?=?isset($argv)?&&?!empty($argv)???$argv?:?'info';

$msg?=?new?AMQPMessage($data);

$channel->basic_publish($msg,?'direct_logs',?$severity);

echo?"?[x]?Sent?",$severity,':',$data,"?\n";

$channel->close();

$connection->close();

}

消費者:

/*

*?php?G:\wamp\www\mygedu\yii?tools/receive-logs-direct?info,warning,error

*/

public?function?actionReceiveLogsDirect($argv){

$connection?=?new?AMQPStreamConnection('localhost',?5672,?'guest',?'guest');

$channel?=?$connection->channel();

$channel->exchange_declare('direct_logs',?'direct',?false,?false,?false);

list($queue_name,?,)?=?$channel->queue_declare("",?false,?false,?true,?false);

$severities?=?explode(',',?$argv);

if(empty($severities))?{

file_put_contents('php://stderr',?"Usage:?$argv[0]?[info]?[warning]?[error]\n");

exit(1);

}

foreach($severities?as?$severity)?{

$channel->queue_bind($queue_name,?'direct_logs',?$severity);

}

echo?'?[*]?Waiting?for?logs.?To?exit?press?CTRL+C',?"\n";

$callback?=?function($msg){

echo?'?[x]?',$msg->delivery_info['routing_key'],?':',?$msg->body,?"\n";

};

$channel->basic_consume($queue_name,?'',?false,?true,?false,?false,?$callback);

while(count($channel->callbacks))?{

$channel->wait();

}

$channel->close();

$connection->close();

}

五?Topics

生產者:

/*

*?php?G:\wamp\www\mygedu\yii?tools/topics-emit-log-direct?info?msg

*/

public?function?actionTopicsEmitLogDirect($routing_key='kern.critical',?$data='Hello?World!'){

$connection?=?new?AMQPStreamConnection('localhost',?5672,?'guest',?'guest');

$channel?=?$connection->channel();

$channel->exchange_declare('topic_logs',?'topic',?false,?false,?false);

$msg?=?new?AMQPMessage($data);

$channel->basic_publish($msg,?'topic_logs',?$routing_key);

echo?"?[x]?Sent?",$routing_key,':',$data,"?\n";

$channel->close();

$connection->close();

}

消費者:

/*

*?php?G:\wamp\www\mygedu\yii?tools/topics-receive-logs-direct?info,warning,error

*/

public?function?actionTopicsReceiveLogsDirect($binding_keys=''){

$connection?=?new?AMQPStreamConnection('localhost',?5672,?'guest',?'guest');

$channel?=?$connection->channel();

$channel->exchange_declare('topic_logs',?'topic',?false,?false,?false);

list($queue_name,?,)?=?$channel->queue_declare("",?false,?false,?true,?false);

$binding_keys?=?explode(',',?$binding_keys);

if(?empty($binding_keys?))?{

file_put_contents('php://stderr',?"Usage:?$binding_keys\n");

exit(1);

}

foreach($binding_keys?as?$binding_key)?{

$channel->queue_bind($queue_name,?'topic_logs',?$binding_key);

}

echo?'?[*]?Waiting?for?logs.?To?exit?press?CTRL+C',?"\n";

$callback?=?function($msg){

echo?'?[x]?',$msg->delivery_info['routing_key'],?':',?$msg->body,?"\n";

};

$channel->basic_consume($queue_name,?'',?false,?true,?false,?false,?$callback);

while(count($channel->callbacks))?{

$channel->wait();

}

$channel->close();

$connection->close();

}

六?RPC

生產者:

/*

*?php?G:\wamp\www\mygedu\yii?tools/rpc-client?10

*/

public?function?actionRpcClient($fib=10){

$fibonacci_rpc?=?new?FibonacciRpcClient();

$response?=?$fibonacci_rpc->call($fib);

echo?"?[.]?Got?",?$response,?"\n";

}

消費者:

/*

*?php?G:\wamp\www\mygedu\yii?tools/rpc-server

*/

public?function?actionRpcServer($routing_key='kern.critical',?$data='Hello?World!'){

$connection?=?new?AMQPStreamConnection('localhost',?5672,?'guest',?'guest');

$channel?=?$connection->channel();

$channel->queue_declare('rpc_queue',?false,?false,?false,?false);

function?fib($n)?{

if?($n?==?0)

return?0;

if?($n?==?1)

return?1;

return?fib($n-1)?+?fib($n-2);

}

echo?"?[x]?Awaiting?RPC?requests\n";

$callback?=?function($req)?{

$n?=?intval($req->body);

echo?"?[.]?fib(",?$n,?")\n";

$msg?=?new?AMQPMessage(

(string)?fib($n),

array('correlation_id'?=>?$req->get('correlation_id'))

);

$req->delivery_info['channel']->basic_publish(

$msg,?'',?$req->get('reply_to'));

$req->delivery_info['channel']->basic_ack(

$req->delivery_info['delivery_tag']);

};

$channel->basic_qos(null,?1,?null);

$channel->basic_consume('rpc_queue',?'',?false,?false,?false,?false,?$callback);

while(count($channel->callbacks))?{

$channel->wait();

}

$channel->close();

$connection->close();

}

相關類:

class?FibonacciRpcClient?{

private?$connection;

private?$channel;

private?$callback_queue;

private?$response;

private?$corr_id;

public?function?__construct()?{

$this->connection?=?new?AMQPStreamConnection(

'localhost',?5672,?'guest',?'guest');

$this->channel?=?$this->connection->channel();

list($this->callback_queue,?,)?=?$this->channel->queue_declare(

"",?false,?false,?true,?false);

$this->channel->basic_consume(

$this->callback_queue,?'',?false,?false,?false,?false,

array($this,?'on_response'));

}

public?function?on_response($rep)?{

if($rep->get('correlation_id')?==?$this->corr_id)?{

$this->response?=?$rep->body;

}

}

public?function?call($n)?{

$this->response?=?null;

$this->corr_id?=?uniqid();

$msg?=?new?AMQPMessage(

(string)?$n,

array('correlation_id'?=>?$this->corr_id,

'reply_to'?=>?$this->callback_queue)

);

$this->channel->basic_publish($msg,?'',?'rpc_queue');

while(!$this->response)?{

$this->channel->wait();

}

return?intval($this->response);

}

}

總結

以上是生活随笔為你收集整理的php basic publish,RabbitMQ入门(PHP语言描述)的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。