日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

[一]RabbitMQ-客户端源码之ConnectionFactory

發布時間:2024/4/11 编程问答 30 豆豆
生活随笔 收集整理的這篇文章主要介紹了 [一]RabbitMQ-客户端源码之ConnectionFactory 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

歡迎支持筆者新作:《深入理解Kafka:核心設計與實踐原理》和《RabbitMQ實戰指南》,同時歡迎關注筆者的微信公眾號:朱小廝的博客。

歡迎跳轉到本文的原文鏈接:https://honeypps.com/mq/rabbitmq-client-source-code-of-connectionfactory/


首先看一段amqp-client發送端的示例代碼(展示出主要部分):

ConnectionFactory factory = new ConnectionFactory(); factory.setHost(ip); factory.setPort(5672); factory.setUsername("root"); factory.setPassword("root"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); String message = "RabbitMQ Demo Test:" + System.currentTimeMillis(); channel.basicPublish(EXCHANGE_NAME, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes()); channel.close(); connection.close();

相信使用rabbitmq java客戶端的同學來說,這段代碼并不陌生,主要的作用是發送一條消息至broker然后關閉。通過wireshark抓包工具可以看到整個AMQP協議的流程,如下圖:
(xx.xx.48.240是client的ip,xx.xx.197.73是broker的ip)

下面通過源碼來分析下Connection有關的整個流程,對于上面AMQP流程中的Protocol-Header到Connection.Open-Ok的部分。

首先是ConnectionFactory類(文章開篇的demo中),這里主要包含一些與broker連接的配置參數等,比如:username, password, virtualHost, host,port, requestedChannelMax, requestedFrameMax, requestedHeartbeat, connectionTimeout, shutdownTimeout(只列出部分)。

這個類中其余都是些Getter和Setter方法,但是有個newConnection方法是關鍵,文中開篇的demo代碼下面列出詳細內容:

/*** Create a new broker connection, picking the first available address from* the list.** If <a href="http://www.rabbitmq.com/api-guide.html#recovery">automatic connection recovery</a>* is enabled, the connection returned by this method will be {@link Recoverable}. Future* reconnection attempts will pick a random accessible address from the provided list.** @param executor thread execution service for consumers on the connection* @param addrs an array of known broker addresses (hostname/port pairs) to try in order* @return an interface to the connection* @throws java.io.IOException if it encounters a problem* @see <a href="http://www.rabbitmq.com/api-guide.html#recovery">Automatic Recovery</a>*/ public Connection newConnection(ExecutorService executor, Address[] addrs)throws IOException, TimeoutException {FrameHandlerFactory fhFactory = createFrameHandlerFactory();ConnectionParams params = params(executor);if (isAutomaticRecoveryEnabled()) {// see com.rabbitmq.client.impl.recovery.RecoveryAwareAMQConnectionFactory#newConnectionAutorecoveringConnection conn = new AutorecoveringConnection(params, fhFactory, addrs);conn.init();return conn;} else {IOException lastException = null;for (Address addr : addrs) {try {FrameHandler handler = fhFactory.create(addr);AMQConnection conn = new AMQConnection(params, handler);conn.start();return conn;} catch (IOException e) {lastException = e;}}throw (lastException != null) ? lastException : new IOException("failed to connect");} }

方法中首先是FrameHandlerFactory fhFactory = createFrameHandlerFactory();這個是用來處理client與broker之間的通信幀(Frame)的,包括建立通信鏈路(java的原生socket,注意這里沒有NIO也沒有netty)。

protected FrameHandlerFactory createFrameHandlerFactory() throws IOException {return new FrameHandlerFactory(connectionTimeout, factory, socketConf, isSSL()); }

調用createFrameHandlerFactory()方法得到FrameHandlerFactory對象之后再:“ FrameHandler handler = fhFactory.create(addr);”返回的是SocketFrameHandler對象,這個對象是對Socket的一個封裝,完全可以看成是一個Socket對象。
注意這里的Socket的TCP_NODELAY參數默認設置為true,而不是默認的false。當然你也可以調用ConnectionFactory的setSocketConfigurator自行設置。

//這個方法是DefaultSocketConfigurator的唯一的方法 public void configure(Socket socket) throws IOException {// disable Nagle's algorithm, for more consistently low latencysocket.setTcpNoDelay(true); }

有關Socket的TCP_NODELAY參數:默認情況下發送數據是采用Negale算法。Negale算法是指發送方數據不會立刻發送出去,而是先放在緩沖區內,等待緩沖區滿了,在發出去。Negale算法適用于需要發送大量數據的應用場景。這種算法減少傳輸的次數增加性能。但是如果對于需要即使響應的,小批量數據的應用場景,例如網絡游戲就不能采用Negale算法了。默認是false,表示采用Negale算法。

ConnectionParams 主要用來配置與broker連接相關的參數,比如username,password,vhost等。這個與前面Socket的參數不同,需要注意區分。

之后if(isAutomaticRecoveryEnabled()){}之內的方法是建立可自動恢復連接的,這個可以忽略,直接看else里面的代碼,因為if和else大體功能上一致,else里的更通用一些,也是默認的。上面提到 FrameHandler handler = fhFactory.create(addr);這段代碼返回的是SocketFrameHandler對象,之后: AMQConnection conn = new AMQConnection(params, handler);這句通過參數和Socket與broker建立連接。之后初始化:conn.start();完成之后客戶端就已經和broker建立了正常的連接了.

有關AMQConnection的詳細內容將在下一篇文章[二]RabbitMQ-客戶端源碼之AMQConnection中講述。


附:本系列全集

  • [Conclusion]RabbitMQ-客戶端源碼之總結
  • [一]RabbitMQ-客戶端源碼之ConnectionFactory
  • [二]RabbitMQ-客戶端源碼之AMQConnection
  • [三]RabbitMQ-客戶端源碼之ChannelManager
  • [四]RabbitMQ-客戶端源碼之Frame
  • [五]RabbitMQ-客戶端源碼之AMQChannel
  • [六]RabbitMQ-客戶端源碼之AMQCommand
  • [七]RabbitMQ-客戶端源碼之AMQPImpl+Method
  • [八]RabbitMQ-客戶端源碼之ChannelN
  • [九]RabbitMQ-客戶端源碼之Consumer
  • 歡迎跳轉到本文的原文鏈接:https://honeypps.com/mq/rabbitmq-client-source-code-of-connectionfactory/


    歡迎支持筆者新作:《深入理解Kafka:核心設計與實踐原理》和《RabbitMQ實戰指南》,同時歡迎關注筆者的微信公眾號:朱小廝的博客。


    總結

    以上是生活随笔為你收集整理的[一]RabbitMQ-客户端源码之ConnectionFactory的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。