日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 运维知识 > linux >内容正文

linux

linux安装RabbitMQ和amqp扩展(这个安装rabbitmq通过了但是代码测试没有通过)

發布時間:2024/9/20 linux 29 豆豆
生活随笔 收集整理的這篇文章主要介紹了 linux安装RabbitMQ和amqp扩展(这个安装rabbitmq通过了但是代码测试没有通过) 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

消息隊列rabbitmq
RabbitMQ是一個在AMQP基礎上完成的,可復用的企業消息系統,底層基于Erlang語言。

一:centos7安裝RabbitMQ

這玩意兒安裝很扯淡,官方推薦rpm安裝,rpm安裝本身是最簡單的,但是安裝RabbitMQ卻不簡單,很可能需要修改倉庫地址。不同linux版本不一樣,centos6和centos7也不一樣。我這里不用rpm,手動編譯Erlang,然后選擇編譯好的RabbitMQ。

1:安裝Erlang

1):先安裝幾個必要的插件

$ yum -y install gcc glibc-devel make ncurses-devel openssl-devel autoconf unixODBC unixODBC-devel socat
2):Erlang下載地址:http://www.erlang.org/downloads,我這里下載21.1版本

$ wget http://erlang.org/download/otp_src_21.1.tar.gz #下載
$ tar -xvf otp_src_21.1.tar.gz #解壓
$ cd otp_src_21.1/ #進入目錄準備編譯
$ ./configure --prefix=/usr/local/erlang --without-javac #忽略java編譯
$ make #編譯
$ make install #安裝
?make & make install 這兩步很慢,巨慢無比,耐心等待。

3):進入我們安裝后的目錄測試一下是否安裝成功

?

$ /usr/local/erlang/bin/erl


安裝成功

二:安裝rabbitmq

1:下載地址:http://www.rabbitmq.com/download.html

2:因為我上面的Erlang是手動編譯的,所以這里不選擇rpm方式安裝,直接下載解壓包,從這里下載:https://github.com/rabbitmq/rabbitmq-server/releases

$ wget https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.7.8/rabbitmq-server-generic-unix-3.7.8.tar.xz
$ tar xvJf rabbitmq-server-generic-unix-3.7.8.tar.xz
$ mv rabbitmq_server-3.7.8/ /usr/local/rabbitmq #解壓后移動到你想放到的目錄
這個是編譯好的,可以直接用。

?

3:設置環境變量,設置兩個,一個是Erlang,一個是rabbitmq,打開文件/etc/profile文件,在文件最后加入以下三行:

export ERLANG_PATH=$PATH:/usr/local/erlang/bin #erlang安裝目錄
export RABBITMQ_PATH=$PATH:/usr/local/rabbitmq/sbin #rabbitmq安裝目錄
export PATH=$PATH:$ERLANG_PATH:$RABBITMQ_PATH
運行命令生效:

source /etc/profile
啟動一下:

$ rabbitmq-server
成功

啟動web管理后臺:

$ rabbitmq-plugins enable rabbitmq_management #disable為關閉
開啟防火墻,打開15672端口

firewall-cmd --zone=public --add-port=15672/tcp --permanent
firewall-cmd --reload #重啟防火墻


web界面出來了

?

guest用戶被限制,只能通過127.0.0.1訪問,當然也可以修改配置文件開啟guest遠程訪問。這里我們新建一個用戶,并授予管理員權限。
用戶名:admin 密碼:123

$ rabbitmqctl add_user admin 123 #添加用戶
$ rabbitmqctl set_user_tags admin administrator #分配角色
登錄成功

相關命令:命令在/usrlocal/rabbitmq/sbin下

rabbitmq-server -detached #后臺啟動
rabbitmqctl stop #關閉服務
rabbitmqctl status #查看狀態
rabbitmqctl list_users #列出角色
三:安裝php擴展(我用的php7.2版本)

php是用amqp調用RabbitMQ,所以先下載ampq

$ wget https://pecl.php.net/get/amqp-1.9.3.tgz #下載
$ tar -xvf amqp-1.9.3.tgz #解壓
$ cd amqp-1.9.3
$ /usr/local/php/bin/phpize #用phpize生成編譯文件,注意查看你的php在哪里
$ ./configure --with-php-config=/usr/local/php/bin/php-config
到這一步,我這里報錯了:checking for amqp using pkg-config... configure: error: librabbitmq not found

?

這個錯誤提示還要安裝一個破玩意:rabbitmq-c

去這里下載:https://github.com/alanxz/rabbitmq-c/releases

$ wget https://github.com/alanxz/rabbitmq-c/archive/v0.9.0.tar.gz
$ tar -xvf v0.9.0.tar.gz
$ cd rabbitmq-c-0.9.0/
準備configure的時候,發現沒有configure,0.9改成cmake了,靠,安裝一下cmake

$ yum -y install cmake
$ cmake . -DCMAKE_INSTALL_PREFIX=/usr/local/rabbitmq-c-0.9.0 #指定安裝目錄
$ make
$ make install
然后回過頭去再編譯amqp-1.9.3

$ ./configure --with-php-config=/usr/local/php/bin/php-config --with-amqp --with-librabbitmq-dir=/usr/local/rabbitmq-c-0.9.0
$ make
$ make install
媽的,make報錯!

發現點蛛絲馬跡,上面進入了/usr/local/rabbitmq-c-0.9.0/lib 目錄,查看一下發現/usr/local/rabbitmq-c-0.9.0/沒有lib,但有個lib64位。

處理一下:

$ cp -R /usr/local/rabbitmq-c-0.9.0/lib64/ /usr/local/rabbitmq-c-0.9.0/lib
接著干:make && make install ,OK,這也太不智能了吧。

加入到php.ini 查看一下!

?

extension=amqp.so?
?

?

擴展安裝成功,這時候就可以用PHP操作RabbitMQ了。

四:一些基本術語參數

1:Message:消息。包括消息頭和消息體

2:Publisher:生產者。發布消息的一方

3:Consumer:消費者。接受消息的一方

4:Connection:網絡鏈接,TCP鏈接

5:Channel:信道。建立在真實的TCP連接內地虛擬連接,所有命令通過信道輸入輸出,多路復用一條 TCP 連接,降低TCP開銷

6:ExChange:交換區。因為RabbitMQ是基于AMQP的,AMQP協議中的核心思想就是生產者和消費者隔離,也就是生產者不直接把消息發到隊列,而是先發給ExChange(交換區),Exchange按照特定的策略轉發到Queue(隊列中)進行存儲,所以ExChange的作用就是負責轉發,生產者只面向 Exchange 發布消息,消費者只面向 Queue 消費消息。

Direct:將消息中的Routing key與該Exchange關聯的所有Binding中的Routing key進行比較,完全相等,則發送到該Binding對應的Queue中。

Topic:模糊匹配,如果匹配上了,則發送到該Binding對應的Queue中。
* 表示可以匹配零個或多個字符(Routing key是user.# user.a user.b user都可以匹配)
# 表示可以匹配一個字符 (Routing key是user.* user.a user.b 可以匹配;user user.a.c 不可以匹配)

Fanout:直接將消息轉發到所有binding的對應queue中,忽略Routing key。

Headers Exchange:將消息中的headers與該Exchange相關聯的所有Binging中的參數進行匹配,如果匹配上了,則發送到該Binding對應的Queue中。

7:Binding:綁定。建立交換區與消息隊列之間的關聯,也就是設置規則,交換區該發送到哪個隊列。

8:Queue:消息隊列容器,存儲消息隊列的地方。durability表示是否持久化,durable表示是,transient表示否。

9:Vhost:翻譯叫什么虛擬主機,其實就類似于mysql的數據庫,一個意思。

五:一些基本命令

$ rabbitmq-server -detached #后臺啟動
$ rabbitmqctl stop [] #停止RabbitMQ服務,同時關閉erlang節點和應用程序
$ rabbitmqctl status #查看狀態
$ rabbitmqctl stop [] ?
$ rabbitmqctl stop_app ?#停止RabbitMQ服務,僅關閉erlang節點上的rabbit應用程序
$ rabbitmqctl start_app ? #啟動erlan node上的rabbitmq的應用
? ?
#用戶管理
$ rabbitmqctl list_users #列出角色
$ rabbitmqctl add_user admin 123 #添加用戶和密碼 這里用戶名:admin 密碼:123
$ rabbitmqctl set_user_tags admin administrator #分配角色
$ rabbitmqctl change_password username newpassword #修改用戶密碼
$ rabbitmqctl delete_user username #刪除用戶
# vhost(Virtual host)管理,這玩意兒相當于mysql的數據庫
$ rabbitmqctl add_vhost {name} #添加
$ rabbitmqctl delete_vhost {name} #刪除
$ rabbitmqctl list_vhosts {name} #查看所有
六:php操作RabbitMQ

1:實踐:用PHP創建交換區:goods_msm,隊列名稱:goods_worker,以及路由key:code1

<?php
$conn = new AMQPConnection([
????'host' => '127.0.0.1',
????'vhost' => '/',
????'port' => 5672,
????'login' => 'guest',
????'password' => 'guest'
]);
if(!$conn->connect()){//建立連接
??die('connetc error');
}
$channel = new AMQPChannel($conn); //創建channel(信道或者叫通道)
?
?
$ExChangeName = 'goods_msm'; //交換區名稱
$queueName = 'goods_worker'; //隊列名稱
$routeName1 = 'code1'; //路由key?
?
?
//創建交換機對象?????
$exChange = new AMQPExchange($channel);????
$exChange->setName($ExChangeName);?
$exChange->setType(AMQP_EX_TYPE_DIRECT); //direct類型
$exChange->setFlags(AMQP_DURABLE); //持久化 ,支持rabbitMq重啟時交換機自動恢復
echo "Exchange Status:".$exChange->declare()."\n";???//查看如果交換機不存在則進行創建
運行,如果沒有報錯的話會輸出:

?

Exchange Status:1

通過web窗口看一下

交換區建立成功

繼續:

//創建隊列 ? ?
$queue = new AMQPQueue($channel);?
$queue->setName($queueName); ??
$queue->setFlags(AMQP_DURABLE); //隊列持久化?
echo "Message Total:".$queue->declare()."\n"; ? //查看,如果不存在則創建
可以看到隊列創建成功

?

繼續往下走:

//綁定交換區與隊列,指定路由鍵?
//rabbitmq不是直接發送到隊列,發送到交換區,由交換區決定發給某個隊列
echo 'Queue Bind: '.$queue->bind($ExChangeName, $routeName1)."\n"; //綁定路由
$conn->disconnect(); //關閉連接
可以看到綁定成功

?

也可以用命令查看:

這樣就創建成功了,下面繼續用生產者,消費者干起來。最終代碼:

<?php
/**
?* 用PHP創建交換區:goods_msm,隊列名稱:goods_worker,以及路由key:code1
?* 此代碼不是生產者,也不是消費者
**/
$conn = new AMQPConnection([
? ? 'host' => '127.0.0.1',
? ? 'vhost' => '/',
? ? 'port' => 5672,
? ? 'login' => 'guest',
? ? 'password' => 'guest'
]);
if(!$conn->connect()){//建立連接
? die('connetc error');
}
$channel = new AMQPChannel($conn); //創建channel(信道或者叫通道)
?
$ExChangeName = 'goods_msm'; //交換區名稱
$queueName = 'goods_worker'; //隊列名稱
$routeName1 = 'code1'; //路由key?
?
//創建交換機對象 ? ??
$exChange = new AMQPExchange($channel); ? ?
$exChange->setName($ExChangeName);?
$exChange->setType(AMQP_EX_TYPE_DIRECT); //direct類型
$exChange->setFlags(AMQP_DURABLE); //持久化 ,支持rabbitMq重啟時交換機自動恢復
echo "Exchange Status:".$exChange->declare()."\n"; ? //查看如果交換機不存在則進行創建
?
//創建隊列 ? ?
$queue = new AMQPQueue($channel);?
$queue->setName($queueName); ??
$queue->setFlags(AMQP_DURABLE); //隊列持久化?
echo "Message Total:".$queue->declare()."\n"; ? //查看,如果不存在則創建
?
?
//綁定交換區與隊列,指定路由鍵?
//rabbitmq不是直接發送到隊列,發送到交換區,由交換區決定發給某個隊列
echo 'Queue Bind: '.$queue->bind($ExChangeName, $routeName1)."\n"; //綁定路由
$conn->disconnect(); //關閉連接
2:生產者

<?php
/**
?* 生產者
?* 生產者也就是發送方
**/
$conn = new AMQPConnection([
? ? 'host' => '127.0.0.1',
? ? 'vhost' => '/',
? ? 'port' => 5672,
? ? 'login' => 'guest',
? ? 'password' => 'guest'
]);
if(!$conn->connect()){//建立連接
? die('connetc error');
}
$channel = new AMQPChannel($conn); //創建channel(信道或者叫通道)
?
$ExChangeName = 'goods_msm'; //交換區名稱
$routeName1 = 'code1'; //路由key?
?
//創建交換機對象 ??
$exChange = new AMQPExchange($channel); ? ?
$exChange->setName($ExChangeName);?
?
$exChange->publish('第一條測試消息', $routeName1); //發送消息
$conn->disconnect(); //關閉連接
?運行一下,到web管理界面看看

?

消息發送成功

3:消費者

<?php
/**
?* 用PHP創建交換區:goods_msm,隊列名稱:goods_worker,以及路由key:code1
?* 此代碼不是生產者,也不是消費者
**/
$conn = new AMQPConnection([
????'host' => '127.0.0.1',
????'vhost' => '/',
????'port' => 5672,
????'login' => 'guest',
????'password' => 'guest'
]);
if(!$conn->connect()){//建立連接
??die('connetc error');
}
$channel = new AMQPChannel($conn); //創建channel(信道或者叫通道)
?
$queueName = 'goods_worker'; //隊列名稱
?
//創建隊列????
$queue = new AMQPQueue($channel);?
$queue->setName($queueName);???
$queue->setFlags(AMQP_DURABLE); //隊列持久化?
?
?
//接受消息
$queue->consume(function ($envelope, $queue) {
????$msg = $envelope->getBody();
????echo $msg."\n"; //處理消息
}, AMQP_AUTOACK); //自動應答
?
$conn->disconnect(); //關閉連接
注意:這里需要注意的是這個方法:$queue->consume,queue對象有兩個方法可用于取消息:consume和get。前者是阻塞的,無消息時會被掛起,適合循環中使用;后者則是非阻塞的,取消息時有則取,無則返回false。

就是說用了consume之后,會同步阻塞,該程序常駐內存,不能用nginx,apache調用。?

運行一下生產者,消費端輸出:

?

7:實戰

實戰1:延時隊列

(1)需求:電子商務網站下單后,30分鐘如果未付款,改變訂單狀態

(2)原理:rabbitmq沒有直接延時隊列的功能,不過可以通過模擬實現此功能。用過redis的都知道,redis有一個ttl功能,就是生存周期,某個key設置生存周期,過期就會消失,rabbitmq也有此功能。rabbitmq消息過期進入死信隊列,然后配置一個轉發,把死信隊列的消息轉發到某個隊列,這樣就可以操作這個隊列。

(3)步驟:

a:創建訂單的時候,同時發送訂單到消息隊列,并且設置過期時間

b:mq回調,也就是改變訂單狀態服務,判斷30分鐘后是否付款,如果未付款,改變訂單狀態為:無效訂單。

MQ也要分兩步,首先是進入死信隊列,但是死信隊列不能直接消費,需要轉發出來,利用MQ的兩個特性:

a:ttl,生存周期

b:Dead Letter Exchanges(DLX)

這里設置兩個地方用于轉發:

x-dead-letter-exchange:出現dead letter之后將dead letter重新發送到指定exchange
x-dead-letter-routing-key:出現dead letter之后將dead letter重新按照指定的routing-key發送

具體參考MQ官方文檔:http://www.rabbitmq.com/dlx.html

demo:

<?php
/**
1:創建第1個隊列,此隊列為下單的時候放入隊列,然后設置過期時間
2:創建第2個隊列,第1個隊列過期后自動轉入該隊列
3:所以處理N久時間過期,只要處理第2個隊列就行了。
**/
$conn = new AMQPConnection([
? ? 'host' => '127.0.0.1',
? ? 'vhost' => '/',
? ? 'port' => 5672,
? ? 'login' => 'guest',
? ? 'password' => 'guest'
]);
?
$params = [
?? ?//訂單最開始在這里 第1個隊列
? ? 'exchangeName' => 'old_order_exchange', //交換區名稱
? ? 'queueName' => 'old_order_msg', //訂單隊列
? ? 'routeKey' => 'old_order_route', //路由key?
?
? ? //過期后轉發到這里 第2個隊列
? ? 'exchangeName2' => 'new_order_exchange', //交換區名稱
? ? 'queueName2' => 'new_order_msg', //訂單隊列
? ? 'routeKey2' => 'new_order_route', //路由key?
];
?
if(!$conn->connect()){//建立連接
? die('connetc error');
}
$channel = new AMQPChannel($conn); //創建channel(信道或者叫通道)
?
//創建交換機對象 第1個隊列
$exChange = new AMQPExchange($channel); ? ?
$exChange->setName($params['exchangeName']);?
$exChange->setType(AMQP_EX_TYPE_DIRECT); //direct類型
$exChange->setFlags(AMQP_DURABLE); //持久化 ,支持rabbitMq重啟時交換機自動恢復
$exChange->declare();
?
//創建交換機對象 第2個隊列
$exChange2 = new AMQPExchange($channel); ? ?
$exChange2->setName($params['exchangeName2']);
$exChange2->setType(AMQP_EX_TYPE_DIRECT); //direct類型
$exChange2->setFlags(AMQP_DURABLE); //持久化 ,支持rabbitMq重啟時交換機自動恢復
$exChange2->declare();
?
//第1個隊列
$queue = new AMQPQueue($channel);?
$queue->setName($params['queueName']); ??
$queue->setFlags(AMQP_DURABLE); //隊列持久化?
$queue->setArguments([
?? ?//如果過期,把訂單轉發到以下
?? ?'x-dead-letter-exchange' => $params['exchangeName2'],
?? ?'x-dead-letter-routing-key' => $params['routeKey2'],
?? ?'x-message-ttl' => 10000, //60秒過期,注意:message是單個消息過期,mq也可以設置整個隊列過期時間
]);
$queue->declare();
?
//第2個隊列 ? ?
$queue2 = new AMQPQueue($channel);?
$queue2->setName($params['queueName2']); ??
$queue2->setFlags(AMQP_DURABLE); //隊列持久化?
$queue2->declare();
?
//綁定交換區與隊列,指定路由鍵 第1個隊列
$queue->bind($params['exchangeName'], $params['routeKey']); //綁定訂單定時隊列
?
//綁定交換區與隊列,指定路由鍵 第2個隊列
$queue2->bind($params['exchangeName2'], $params['routeKey2']); //綁定過期轉發訂單隊列
?
echo $exChange->publish('訂單'.date('Y-m-d H:i:s'), $params['routeKey']); //發送消息,往第1個隊列發
?
$conn->disconnect(); //關閉連接
?運行下這個程序:

這里有1條消息

過期后轉到這里來了!
---------------------?
作者:一曲微茫度此生?
來源:CSDN?
原文:https://blog.csdn.net/weixin_41782053/article/details/84992609?
版權聲明:本文為博主原創文章,轉載請附上博文鏈接!

總結

以上是生活随笔為你收集整理的linux安装RabbitMQ和amqp扩展(这个安装rabbitmq通过了但是代码测试没有通过)的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。