日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

客户连接多个服务端_Dubbo源码解析之客户端Consumer

發布時間:2024/9/27 编程问答 23 豆豆
生活随笔 收集整理的這篇文章主要介紹了 客户连接多个服务端_Dubbo源码解析之客户端Consumer 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

前面我們學習了Dubbo源碼解析之服務端Provider。對服務提供方進行思路上的講解,我們知道以下知識點。本篇文章主要對消費方進行講解。廢話不多說請看下文。

  • 如何將對象方法生成Invoker
  • 如何將Invoker注冊到注冊地中心
  • 如何處理客戶端的請求
  • 二進制數據轉java數據協議
  • 協議中包含的序列化知識

一、啟動一個客戶端Consumer

1. 定義一個接口

注意這里其實是引用的前文中的接口。生產中是服務提供方打一個jar包給客戶端用。

@Testpublic void consumerTest() { // 當前應用配置 ApplicationConfig application = new ApplicationConfig(); application.setName("consumerTest"); // 連接注冊中心配置 RegistryConfig registry = new RegistryConfig(); registry.setAddress("zookeeper://127.0.0.1:2181"); // 注意:ReferenceConfig為重對象,內部封裝了與注冊中心的連接,以及與服務提供方的連接 // 引用遠程服務 ReferenceConfig reference = new ReferenceConfig(); // 此實例很重,封裝了與注冊中心的連接以及與提供者的連接,請自行緩存,否則可能造成內存和連接泄漏 reference.setApplication(application); reference.setRegistry(registry); // 多個注冊中心可以用setRegistries() reference.setInterface(UserService.class); reference.setVersion("1.0.0"); UserService userService = reference.get(); userService.say("hello");}

2. 生成本地服務

@Testpublic void consumerTest() { // 當前應用配置 ApplicationConfig application = new ApplicationConfig(); application.setName("consumerTest"); // 連接注冊中心配置 RegistryConfig registry = new RegistryConfig(); registry.setAddress("zookeeper://127.0.0.1:2181"); // 注意:ReferenceConfig為重對象,內部封裝了與注冊中心的連接,以及與服務提供方的連接 // 引用遠程服務 ReferenceConfig reference = new ReferenceConfig(); // 此實例很重,封裝了與注冊中心的連接以及與提供者的連接,請自行緩存,否則可能造成內存和連接泄漏 reference.setApplication(application); reference.setRegistry(registry); // 多個注冊中心可以用setRegistries() reference.setInterface(UserService.class); reference.setVersion("1.0.0"); UserService userService = reference.get(); userService.say("hello");}

3. 原理分析

首先客戶端只有接口的,那么可以根據這個接口生成一個代理。而代理對象的邏輯就是,從zk中找到服務端地址。
然后通過netty客戶端去請求服務端的數據。然后返回

二、源碼分析

帶著我們猜測的邏輯一起來看下ReferenceConfig的實現原理。

public synchronized T get() { if (destroyed){ throw new IllegalStateException("Already destroyed!"); } if (ref == null) { //邏輯就在init里面 init(); } return ref; }

init先做寫檢查信息,如這個方法是否存在接口中
createProxy#loadRegistries

1. 集群容錯策略

注意只有多個服務提供方才會有這里,只有一個服務提供方,沒辦法容錯處理哈。

可以看到一共有9種策略。

當時服務端是多個的時候,才會生成集群策略。另外既然是集群就要選擇到底使用哪個來執行。這就是
負載均衡或者說叫路由策略。

LoadBalance負載均衡

  • directory中獲取所有的invoker
  • 如果有多個invoker就去看配置的負載均衡策略
  • 根據負載均衡策略找到一個Inoker
public abstract class AbstractClusterInvoker implements Invoker { public Result invoke(final Invocation invocation) throws RpcException { checkWheatherDestoried(); LoadBalance loadbalance; //獲取所有的invoker List> invokers = list(invocation); //如果有多個invoker就去看配置的負載均衡策略 if (invokers != null && invokers.size() > 0) { loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl() .getMethodParameter(invocation.getMethodName(),Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE)); } else { loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(Constants.DEFAULT_LOADBALANCE); } RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation); //根據策略選一個 return doInvoke(invocation, invokers, loadbalance); } protected List> list(Invocation invocation) throws RpcException { List> invokers = directory.list(invocation); return invokers; }}

2. invoker生成代理對象

代理的知識點不用說了。

3. 客戶端的invoker邏輯

Protocol#refer

主要看DubboProtocol的邏輯

public Invoker refer(Class serviceType, URL url) throws RpcException { // create rpc invoker. DubboInvoker invoker = new DubboInvoker(serviceType, url, getClients(url), invokers); invokers.add(invoker); return invoker;}

DubboInvoker

底層調用netty通信api發送數據到客戶端。然后讀取數據。

客戶端doInvoke時候會生成ExchangeClient就是NettyClient。public class DubboInvoker extends AbstractInvoker { @Override protected Result doInvoke(final Invocation invocation) throws Throwable { RpcInvocation inv = (RpcInvocation) invocation; final String methodName = RpcUtils.getMethodName(invocation); inv.setAttachment(Constants.PATH_KEY, getUrl().getPath()); inv.setAttachment(Constants.VERSION_KEY, version); ExchangeClient currentClient; if (clients.length == 1) { currentClient = clients[0]; } else { currentClient = clients[index.getAndIncrement() % clients.length]; } try { boolean isAsync = RpcUtils.isAsync(getUrl(), invocation); boolean isOneway = RpcUtils.isOneway(getUrl(), invocation); int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY,Constants.DEFAULT_TIMEOUT); if (isOneway) { boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false); currentClient.send(inv, isSent); RpcContext.getContext().setFuture(null); return new RpcResult(); } else if (isAsync) { ResponseFuture future = currentClient.request(inv, timeout) ; RpcContext.getContext().setFuture(new FutureAdapter(future)); return new RpcResult(); } else { RpcContext.getContext().setFuture(null); return (Result) currentClient.request(inv, timeout).get(); } } catch (TimeoutException e) { throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e); } catch (RemotingException e) { throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e); } } @Override public boolean isAvailable() { if (!super.isAvailable()) return false; for (ExchangeClient client : clients){ if (client.isConnected() && !client.hasAttribute(Constants.CHANNEL_ATTRIBUTE_READONLY_KEY)){ //cannot write == not Available ? return true ; } } return false; } }

三、總結

在前文的基礎上,客戶端的代碼算是比較簡單的。

回顧以下本文學習的知識點:

  • 主要是集群容錯
  • 負載均衡、路由。
  • 客戶端如何發送數據DubboInvoker

主要是利用代理來實現的。

最后求關注,求訂閱,謝謝你的閱讀!

總結

以上是生活随笔為你收集整理的客户连接多个服务端_Dubbo源码解析之客户端Consumer的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。