[一]RabbitMQ-客户端源码之ConnectionFactory
歡迎支持筆者新作:《深入理解Kafka:核心設計與實踐原理》和《RabbitMQ實戰指南》,同時歡迎關注筆者的微信公眾號:朱小廝的博客。
歡迎跳轉到本文的原文鏈接:https://honeypps.com/mq/rabbitmq-client-source-code-of-connectionfactory/
首先看一段amqp-client發送端的示例代碼(展示出主要部分):
ConnectionFactory factory = new ConnectionFactory(); factory.setHost(ip); factory.setPort(5672); factory.setUsername("root"); factory.setPassword("root"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); String message = "RabbitMQ Demo Test:" + System.currentTimeMillis(); channel.basicPublish(EXCHANGE_NAME, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes()); channel.close(); connection.close();相信使用rabbitmq java客戶端的同學來說,這段代碼并不陌生,主要的作用是發送一條消息至broker然后關閉。通過wireshark抓包工具可以看到整個AMQP協議的流程,如下圖:
(xx.xx.48.240是client的ip,xx.xx.197.73是broker的ip)
下面通過源碼來分析下Connection有關的整個流程,對于上面AMQP流程中的Protocol-Header到Connection.Open-Ok的部分。
首先是ConnectionFactory類(文章開篇的demo中),這里主要包含一些與broker連接的配置參數等,比如:username, password, virtualHost, host,port, requestedChannelMax, requestedFrameMax, requestedHeartbeat, connectionTimeout, shutdownTimeout(只列出部分)。
這個類中其余都是些Getter和Setter方法,但是有個newConnection方法是關鍵,文中開篇的demo代碼下面列出詳細內容:
/*** Create a new broker connection, picking the first available address from* the list.** If <a href="http://www.rabbitmq.com/api-guide.html#recovery">automatic connection recovery</a>* is enabled, the connection returned by this method will be {@link Recoverable}. Future* reconnection attempts will pick a random accessible address from the provided list.** @param executor thread execution service for consumers on the connection* @param addrs an array of known broker addresses (hostname/port pairs) to try in order* @return an interface to the connection* @throws java.io.IOException if it encounters a problem* @see <a href="http://www.rabbitmq.com/api-guide.html#recovery">Automatic Recovery</a>*/ public Connection newConnection(ExecutorService executor, Address[] addrs)throws IOException, TimeoutException {FrameHandlerFactory fhFactory = createFrameHandlerFactory();ConnectionParams params = params(executor);if (isAutomaticRecoveryEnabled()) {// see com.rabbitmq.client.impl.recovery.RecoveryAwareAMQConnectionFactory#newConnectionAutorecoveringConnection conn = new AutorecoveringConnection(params, fhFactory, addrs);conn.init();return conn;} else {IOException lastException = null;for (Address addr : addrs) {try {FrameHandler handler = fhFactory.create(addr);AMQConnection conn = new AMQConnection(params, handler);conn.start();return conn;} catch (IOException e) {lastException = e;}}throw (lastException != null) ? lastException : new IOException("failed to connect");} }方法中首先是FrameHandlerFactory fhFactory = createFrameHandlerFactory();這個是用來處理client與broker之間的通信幀(Frame)的,包括建立通信鏈路(java的原生socket,注意這里沒有NIO也沒有netty)。
protected FrameHandlerFactory createFrameHandlerFactory() throws IOException {return new FrameHandlerFactory(connectionTimeout, factory, socketConf, isSSL()); }調用createFrameHandlerFactory()方法得到FrameHandlerFactory對象之后再:“ FrameHandler handler = fhFactory.create(addr);”返回的是SocketFrameHandler對象,這個對象是對Socket的一個封裝,完全可以看成是一個Socket對象。
注意這里的Socket的TCP_NODELAY參數默認設置為true,而不是默認的false。當然你也可以調用ConnectionFactory的setSocketConfigurator自行設置。
有關Socket的TCP_NODELAY參數:默認情況下發送數據是采用Negale算法。Negale算法是指發送方數據不會立刻發送出去,而是先放在緩沖區內,等待緩沖區滿了,在發出去。Negale算法適用于需要發送大量數據的應用場景。這種算法減少傳輸的次數增加性能。但是如果對于需要即使響應的,小批量數據的應用場景,例如網絡游戲就不能采用Negale算法了。默認是false,表示采用Negale算法。
ConnectionParams 主要用來配置與broker連接相關的參數,比如username,password,vhost等。這個與前面Socket的參數不同,需要注意區分。
之后if(isAutomaticRecoveryEnabled()){}之內的方法是建立可自動恢復連接的,這個可以忽略,直接看else里面的代碼,因為if和else大體功能上一致,else里的更通用一些,也是默認的。上面提到 FrameHandler handler = fhFactory.create(addr);這段代碼返回的是SocketFrameHandler對象,之后: AMQConnection conn = new AMQConnection(params, handler);這句通過參數和Socket與broker建立連接。之后初始化:conn.start();完成之后客戶端就已經和broker建立了正常的連接了.
有關AMQConnection的詳細內容將在下一篇文章[二]RabbitMQ-客戶端源碼之AMQConnection中講述。
附:本系列全集
歡迎跳轉到本文的原文鏈接:https://honeypps.com/mq/rabbitmq-client-source-code-of-connectionfactory/
歡迎支持筆者新作:《深入理解Kafka:核心設計與實踐原理》和《RabbitMQ實戰指南》,同時歡迎關注筆者的微信公眾號:朱小廝的博客。
總結
以上是生活随笔為你收集整理的[一]RabbitMQ-客户端源码之ConnectionFactory的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: RabbitMQ之Consumer消费模
- 下一篇: [三]RabbitMQ-客户端源码之Ch