SOFA 源码分析 — 连接管理器
前言
RPC 框架需要維護客戶端和服務端的連接,通常是一個客戶端對應多個服務端,而客戶端看到的是接口,并不是服務端的地址,服務端地址對于客戶端來講是透明的。
那么,如何實現這樣一個 RPC 框架的網絡連接呢?
我們從 SOFA 中尋找答案。
連接管理器介紹
先從一個小 demo 開始看:
ConsumerConfig<HelloService> consumerConfig = new ConsumerConfig<HelloService>().setInterfaceId(HelloService.class.getName()) // 指定接口.setProtocol("bolt") // 指定協議.setDirectUrl("bolt://127.0.0.1:9696"); // 指定直連地址HelloService helloService = consumerConfig.refer();while (true) {System.out.println(helloService.sayHello("world"));try {Thread.sleep(2000);} catch (Exception e) {} }上面的代碼中,一個 ConsumerConfig 對應一個接口服務,并指定了直連地址。
然后調用 ref 方法。每個 ConsumerConfig 綁定了一個 ConsumerBootstrap,這是一個非單例的類。
而每個 ConsumerBootstrap 又綁定了一個 Cluster,這是真正的客戶端。該類包含了一個客戶端所有的關鍵信息,例如:
這 5 個實例是 Cluster 的核心。一個客戶端的正常使用絕對離不開這 5 個元素。
我們之前分析了 5 個中的 4 個,今天分析最后一個 —— 連接管理器。
他可以說是 RPC 網絡通信的核心。
地址管理器代表的是:一個客戶端可以擁有多個接口。
連接管理器代表的是:一個客戶端可以擁有多個 TCP 連接。
很明顯,地址管理器的數據肯定比連接管理器要多。因為通常一個 TCP 連接(Server 端)可以含有多個接口。
那么 SOFA 是如何實現連接管理器的呢?
從 AbstractCluster 的 init 方法中,我們知道,該方法初始化了 Cluster。同時也初始化了 connectionHolder。
具體代碼如下:
// 連接管理器 connectionHolder = ConnectionHolderFactory.getConnectionHolder(consumerBootstrap);使用了 SPI 的方式進行的初始化。目前 RPC 框架的具體實現類只有一個 AllConnectConnectionHolder。即長連接管理器。
該類需要一個 ConsumerConfig 才能初始化。
該類中包含很多和連接相關的屬性,有 4 個 Map,未初始化的 Map,存活的節點列表,存活但亞健康的列表,失敗待重試的列表。這些 Map 的元素都會隨著服務的網絡變化而變化。
而這些 Map 中的元素則是:ConcurrentHashMap<ProviderInfo, ClientTransport> 。
即每個服務者的信息對應一個客戶端傳輸。那么這個 ClientTransport 是什么呢?看過之前文章的都知道,這個一個 RPC 和 Bolt 的膠水類。該類的默認實現 BoltClientTransport 包含了一個 RpcClient 屬性,注意,該屬性是個靜態的。也就是說,是所有實例公用的。并且,BoltClientTransport 包含一個 ProviderInfo 屬性。還有一個 Url 屬性,Connection 屬性(網絡連接)。
我們理一下:一個 ConsumerConfig 綁定一個 Cluster,一個 Cluster 綁定一個 connectionHolder,一個 connectionHolder 綁定多個 ProviderInfo 和 ClientTransport。
因為一個客戶端可以和多個服務進行通信。
代碼如何實現?
在 Cluster 中,會對 connectionHolder 進行初始化,在 Cluster 從注冊中心得到服務端列表后,會建立長連接。
從這里開始,地址管理器開始運作。
Cluster 的 updateAllProviders 方法是源頭。該方法會將服務列表添加到 connectionHolder 中。即調用 connectionHolder.updateAllProviders(providerGroups) 方法。該方法會全量更新服務端列表。
如果更新的時候,發現有新的服務,便會建立長連接。具體代碼如下:
if (!needAdd.isEmpty()) {addNode(needAdd); }addNode 方法就是添加新的節點。該方法會多線程建立 TCP 連接。
首先會根據 ProviderInfo 信息創建一個 ClientTransport,然后向線程池提交一個任務,任務內容是 initClientTransport(),即初始化客戶端傳輸。
該方法代碼如下(精簡過了):
private void initClientTransport(String interfaceId, ProviderInfo providerInfo, ClientTransport transport) {transport.connect();if (doubleCheck(interfaceId, providerInfo, transport)) {printSuccess(interfaceId, providerInfo, transport);addAlive(providerInfo, transport);} else {printFailure(interfaceId, providerInfo, transport);addRetry(providerInfo, transport);} }其中關鍵是調用 transport 的 connect 方法建立連接。
該方法的默認實現在 BoltClientTransport 中,符合我們的預期。我們知道, BoltClientTransport 有一個 RpcClient 的靜態實例。這個實例在類加載的時候,就會在靜態塊中初始化。初始化內容則是初始化他的一些屬性,例如地址解析器,連接管理器,連接監控等等。
我們再看 BoltClientTransport 的 connect 方法,該方法主要邏輯是初始化連接。方式則是通過 RpcClient 的 getConnection 方法來獲取,具體代碼如下:
connection = RPC_CLIENT.getConnection(url, url.getConnectTimeout());傳入一個 URL 和超時時間。 RpcClient 則是調用連接管理器的 getAndCreateIfAbsent 方法獲取,同樣傳入 Url,這個方法的名字很好,根據 URL 獲取連接,如果沒有,就創建一個。
有必要看看具體代碼:
public Connection getAndCreateIfAbsent(Url url) throws InterruptedException, RemotingException {// get and create a connection pool with initialized connections.ConnectionPool pool = this.getConnectionPoolAndCreateIfAbsent(url.getUniqueKey(),new ConnectionPoolCall(url));if (null != pool) {return pool.get();} else {logger.error("[NOTIFYME] bug detected! pool here must not be null!");return null;} }該方法會繼續調用自身的 getConnectionPoolAndCreateIfAbsent 方法,傳入 URL 的唯一標識,和一個 ConnectionPoolCall 對象(實現了 Callable)。
然后阻塞等待返回連接。
我們看看這個 ConnectionPoolCall 的 call 方法實現。該方法調用了連接管理器的 doCreate 方法。傳入了 URL 和一個連接池。然后 call 方法返回連接池。
doCreate 方法中,重點就是 create 方法,傳入了一個 url,返回一個 Connection,并放入連接池。默認池中只有一個長連接。
而 create 方法則是調用連接工廠的 createConnection 方法。然后調用 doCreateConnection 方法。該方法內部給了我們明確的答案:調用 Netty 的 Bootstrap 的 connect 方法。
代碼如下:
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeout); ChannelFuture future = bootstrap.connect(new InetSocketAddress(targetIP, targetPort));熟悉 Netty 的同學一眼便看出來了。這是一個連接服務端的操作。而這個 BootStrap 的初始化則是在 RpcClient 初始化的時候進行的。注意:BootStrap 是可以共享的。
可以看到, ConnectionPoolCall 的 call 方法就是用來創建 Netty 連接的?;氐?getAndCreateIfAbsent 方法里,繼續看 getConnectionPoolAndCreateIfAbsent 方法的實現。
該方法內部將 Callable 包裝成一個 FutureTask,目的應該是為了以后的異步運行吧,總之,最后還是同步調用了 run 方法。然后調用 get 方法阻塞等待,等待剛剛 call 方法返回的連接池。然后返回。
得到連接池,連接池調用 get 方法,從池中根據策略選取一個連接返回。目前只有一個隨機選取的策略。
這個 Connection 連接實例會保存在 BoltClientTransport 中。
在客戶端進行調用的時候, RpcClient 會根據 URL 找到對應的連接,然后,獲取這個連接對應的 Channel ,向服務端發送數據。具體代碼如下:
conn.getChannel().writeAndFlush(request).addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture f) throws Exception {if (!f.isSuccess()) {conn.removeInvokeFuture(request.getId());future.putResponse(commandFactory.createSendFailedResponse(conn.getRemoteAddress(), f.cause()));logger.error("Invoke send failed, id={}", request.getId(), f.cause());}} });以上,就是 SOFA 的連接的原理和設計。
總結
連接管理器是我們分析 SOFA—RPC Cluster 中的最后一個模塊,他管理著一個客戶端對應的所有服務網絡連接。
connectionHolder 內部包含多個 Map,Map 中的 key 是 Provider,value 是 ClientTransport,ClientTransport 是 RpcClient 和 SOFA 的膠水類,通常一個 Provider 對應一個 ClientTransport。ClientTransport 其實就是一個連接的包裝。
ClientTransport 獲取連接的方式則是通過 RpcClient 的 連接管理器獲取的。該連接管理器內部包含一個連接工廠,會根據 URL 創建連接。創建連接的凡是則是通過 Netty 的 BootStrap 來創建。
當我們使用 Provider 對應的 ClientTransport 中的 RpcClient 發送數據的時候,則會根據 URL 找到對應 Connection,并獲取他的 Channel ,向服務端發送數據。
好了,以上就是 SOFA—RPC 連接管理的分析。
篇幅有限,如有錯誤,還請指正。
轉載于:https://www.cnblogs.com/stateis0/p/9006080.html
總結
以上是生活随笔為你收集整理的SOFA 源码分析 — 连接管理器的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 《程序员代码面试指南》第八章 数组和矩阵
- 下一篇: cocos2dx3.0五种屏幕适配模式,