日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

rabbitmq rpc

發(fā)布時間:2025/3/15 编程问答 25 豆豆
生活随笔 收集整理的這篇文章主要介紹了 rabbitmq rpc 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

目錄

概述

過程描述

代碼

結(jié)果


遠程過程調(diào)用(RPC): 當(dāng)客戶端發(fā)送請求到遠程服務(wù)器,遠程服務(wù)器接收請求并處理結(jié)果,這時候?qū)⒔Y(jié)果響應(yīng)給客戶端,這個過程被稱為遠程過程調(diào)用

隊列

在整個過程中用會涉及到兩個隊列一個是專門保存請求的隊列,稱為rpc_queue,另一個隊列被稱為響應(yīng)隊列,專門用于保存服務(wù)器處理的響應(yīng)結(jié)果,這個隊列的名字是隨機生成的字符串。

消息的基本屬性BasicProperties

響應(yīng)隊列名字 回復(fù)(replyTo):是響應(yīng)隊列的名字,當(dāng)服務(wù)器接收并處理好結(jié)果,這時候服務(wù)器需要知道將響應(yīng)的信息發(fā)送到哪個隊列中;

關(guān)聯(lián)id(correlationId):是一個UUID值,發(fā)消息的時候會帶上這個值,該值在客戶端接收響應(yīng)時用于判斷接收到的響應(yīng)消息是否是自己發(fā)出請求對應(yīng)的響應(yīng); 客戶端在發(fā)送請求時需要帶上replyTo和correlationId兩個屬性。

過程描述

客戶端發(fā)送消息到請求隊列,在發(fā)送請求時需要指定兩個值(replyTo和correlationId)----------->服務(wù)端為隨時接受到請求消息,需要預(yù)先訂閱請求隊列(rpc_queue),,當(dāng)服務(wù)端接收到請求消息時對請求進行處理,將處理結(jié)果發(fā)送到響應(yīng)隊列(隨機隊列)中--------------->客戶端也需要預(yù)先訂閱響應(yīng)隊列(隨機隊列),以便服務(wù)器發(fā)送響應(yīng)消息到響應(yīng)隊列中,客戶端能及時收到響應(yīng)結(jié)果,服務(wù)器在將響應(yīng)發(fā)送到響應(yīng)隊列中還要指定correlationId值(唯一標(biāo)識),這樣客戶端接收到消息時就可以通過correlationId的值是否和發(fā)送請求的關(guān)聯(lián)id值是否相同,如果相同就證明這個響應(yīng)結(jié)果就是這個請求對應(yīng)的響應(yīng)結(jié)果。

注意:這個預(yù)先訂閱響應(yīng)隊列的步驟需要在客戶端中完成,最好在客戶端發(fā)送請求消息前就完成。.

代碼

服務(wù)端:

package com.ll.mq.hellomq.rpc;import java.io.IOException;import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; //ll public class Service {public static void main(String[] args) {try {// 創(chuàng)建連接工廠ConnectionFactory factory = new ConnectionFactory(); // 設(shè)置RabbitMQ地址factory.setHost("127.0.0.1");factory.setPort(5672);factory.setUsername("kysc");factory.setPassword("123456"); // 創(chuàng)建一個連接Connection connection = factory.newConnection(); // 創(chuàng)建一個頻道final Channel channel = connection.createChannel();String rpc_queuqu = "rpc_queue";channel.queueDeclare(rpc_queuqu, false, false, false, null);// DefaultConsumer類實現(xiàn)了Consumer接口,通過傳入一個頻道,告訴服務(wù)器我們需要那個頻道的消息,如果頻道中有消息,就會執(zhí)行回調(diào)函數(shù)handleDeliveryDefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String message = new String(body, "UTF-8");// 服務(wù)器端接收到消息并處理消息String response = "{'code': 200, 'data': '" + message+ "'}";// // 將消息發(fā)布到reply_to響應(yīng)隊列中AMQP.BasicProperties replyProperties = new AMQP.BasicProperties.Builder().correlationId(properties.getCorrelationId()).build();String replyTo = properties.getReplyTo();channel.basicPublish("", replyTo, replyProperties, response.getBytes("UTF-8"));System.out.println("服務(wù)端:請求已處理完畢,響應(yīng)結(jié)果" + response + "已發(fā)送到響應(yīng)隊列中"); // // 手動應(yīng)答channel.basicAck(envelope.getDeliveryTag(), true);}}; // 自動回復(fù)隊列應(yīng)答channel.basicConsume(rpc_queuqu, false, consumer);} catch (Exception e) {e.printStackTrace();}} }

客戶端

package com.ll.mq.hellomq.rpc;import java.io.IOException; import java.util.UUID;import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope;public class Client {public static void main(String[] args) {ConnectionFactory factory = new ConnectionFactory();factory.setHost("127.0.0.1");factory.setPort(5672);factory.setUsername("kysc");factory.setPassword("123456");try {Connection connection = factory.newConnection();Channel channel = connection.createChannel();// 預(yù)先訂閱響應(yīng)結(jié)果的隊列,先訂閱響應(yīng)隊列,再發(fā)送消息到請求隊列String reyply_to_queue = channel.queueDeclare().getQueue();final String correlationId = UUID.randomUUID().toString();DefaultConsumer consumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {if (properties.getCorrelationId().equals(correlationId)) {String message = new String(body, "UTF-8");System.out.println("已接收到服務(wù)器的響應(yīng)結(jié)果:" + message);}}};channel.basicConsume(reyply_to_queue, true, consumer);// 將消息發(fā)送到請求隊列中String rpc_queuqu = "rpc_queue";String message = "Hello RabbitMQ";AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder().correlationId(correlationId).replyTo(reyply_to_queue).build();channel.basicPublish("", rpc_queuqu, properties, message.getBytes("UTF-8"));System.out.println("已發(fā)出請求請求消息:" + message);} catch (Exception e) {e.printStackTrace();}} }

結(jié)果

客戶端

已發(fā)出請求請求消息:Hello RabbitMQ
已接收到服務(wù)器的響應(yīng)結(jié)果:{'code': 200, 'data': 'Hello RabbitMQ'}
服務(wù)端

服務(wù)端:請求已處理完畢,響應(yīng)結(jié)果{'code': 200, 'data': 'Hello RabbitMQ'}已發(fā)送到響應(yīng)隊列中
?

?

總結(jié)

以上是生活随笔為你收集整理的rabbitmq rpc的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。