生活随笔
收集整理的這篇文章主要介紹了
Dubbo是如何进行远程服务调用的?(源码流程跟踪)
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
首先會分析Dubbo是如何進行遠程服務調用的,如果不了解dubbo的服務暴露和服務調用,請去看前兩篇dubbo的文章,然后后面我還會說一下dubbo的SPI機制
當我們在使用@reference 注解的時候,來調用我們的提供者的Service對象的時候,Dubbo中的服務調用是怎么實現的
Dubbo的遠程服務調用
(1)首選Dubbo是通過Poxy對象來生成一個代理對象的
具體實現是在ReferenceConfig對象中調用的private T createProxy(Map<String, String> map)方法的,這個方法中有三種生成Invoker對象的方式,第一種是通過本地JVM,第二種是通過URL對象是不是為空判斷進行判斷,然后如果為空就從注冊中心獲取這個Invoker對象,否則就是從ReferenceConfig中的URL中拿到 上面那個方法中還會通過獲取到的Invoker這里的【生成Invoker的過程后面補充】的對象去通過ProxyFactory生成Poxy對象,代碼為:return proxyFactory.getProxy(this.invoker);,這里proxyFactory其實就是
public < T > T getProxy ( Invoker < T > invoker
, Class < ? > [ ] interfaces
) { return Proxy . getProxy ( interfaces
) . newInstance ( new InvokerInvocationHandler ( invoker
) ) ; }
然后通過第2步的getPoxy()方法去動態代理生成代理Poxy對象
public static Proxy getProxy ( Class < ? > . . . ics
) { return getProxy ( ClassHelper . getClassLoader ( Proxy . class ) , ics
) ; } public static Proxy getProxy ( ClassLoader cl
, Class < ? > . . . ics
) { if ( ics
. length
> 65535 ) throw new IllegalArgumentException ( "interface limit exceeded" ) ; StringBuilder sb
= new StringBuilder ( ) ; for ( int i
= 0 ; i
< ics
. length
; i
++ ) { String itf
= ics
[ i
] . getName ( ) ; if ( ! ics
[ i
] . isInterface ( ) ) throw new RuntimeException ( itf
+ " is not a interface." ) ; Class < ? > tmp
= null ; try { tmp
= Class . forName ( itf
, false , cl
) ; } catch ( ClassNotFoundException e
) { } if ( tmp
!= ics
[ i
] ) throw new IllegalArgumentException ( ics
[ i
] + " is not visible from class loader" ) ; sb
. append ( itf
) . append ( ';' ) ; } String key
= sb
. toString ( ) ; Map < String , Object > cache
; synchronized ( ProxyCacheMap ) { cache
= ProxyCacheMap . get ( cl
) ; if ( cache
== null ) { cache
= new HashMap < String , Object > ( ) ; ProxyCacheMap . put ( cl
, cache
) ; } } Proxy proxy
= null ; synchronized ( cache
) { do { Object value
= cache
. get ( key
) ; if ( value
instanceof Reference < ? > ) { proxy
= ( Proxy ) ( ( Reference < ? > ) value
) . get ( ) ; if ( proxy
!= null ) return proxy
; } if ( value
== PendingGenerationMarker ) { try { cache
. wait ( ) ; } catch ( InterruptedException e
) { } } else { cache
. put ( key
, PendingGenerationMarker ) ; break ; } } while ( true ) ; } long id
= PROXY_CLASS_COUNTER
. getAndIncrement ( ) ; String pkg
= null ; ClassGenerator ccp
= null , ccm
= null ; try { ccp
= ClassGenerator . newInstance ( cl
) ; Set < String > worked
= new HashSet < String > ( ) ; List < Method > methods
= new ArrayList < Method > ( ) ; for ( int i
= 0 ; i
< ics
. length
; i
++ ) { if ( ! Modifier . isPublic ( ics
[ i
] . getModifiers ( ) ) ) { String npkg
= ics
[ i
] . getPackage ( ) . getName ( ) ; if ( pkg
== null ) { pkg
= npkg
; } else { if ( ! pkg
. equals ( npkg
) ) throw new IllegalArgumentException ( "non-public interfaces from different packages" ) ; } } ccp
. addInterface ( ics
[ i
] ) ; for ( Method method
: ics
[ i
] . getMethods ( ) ) { String desc
= ReflectUtils . getDesc ( method
) ; if ( worked
. contains ( desc
) ) continue ; worked
. add ( desc
) ; int ix
= methods
. size ( ) ; Class < ? > rt
= method
. getReturnType ( ) ; Class < ? > [ ] pts
= method
. getParameterTypes ( ) ; StringBuilder code
= new StringBuilder ( "Object[] args = new Object[" ) . append ( pts
. length
) . append ( "];" ) ; for ( int j
= 0 ; j
< pts
. length
; j
++ ) code
. append ( " args[" ) . append ( j
) . append ( "] = ($w)$" ) . append ( j
+ 1 ) . append ( ";" ) ; code
. append ( " Object ret = handler.invoke(this, methods[" + ix
+ "], args);" ) ; if ( ! Void . TYPE
. equals ( rt
) ) code
. append ( " return " ) . append ( asArgument ( rt
, "ret" ) ) . append ( ";" ) ; methods
. add ( method
) ; ccp
. addMethod ( method
. getName ( ) , method
. getModifiers ( ) , rt
, pts
, method
. getExceptionTypes ( ) , code
. toString ( ) ) ; } } if ( pkg
== null ) pkg
= PACKAGE_NAME
; String pcn
= pkg
+ ".proxy" + id
; ccp
. setClassName ( pcn
) ; ccp
. addField ( "public static java.lang.reflect.Method[] methods;" ) ; ccp
. addField ( "private " + InvocationHandler . class . getName ( ) + " handler;" ) ; ccp
. addConstructor ( Modifier . PUBLIC
, new Class < ? > [ ] { InvocationHandler . class } , new Class < ? > [ 0 ] , "handler=$1;" ) ; ccp
. addDefaultConstructor ( ) ; Class < ? > clazz
= ccp
. toClass ( ) ; clazz
. getField ( "methods" ) . set ( null , methods
. toArray ( new Method [ 0 ] ) ) ; String fcn
= Proxy . class . getName ( ) + id
; ccm
= ClassGenerator . newInstance ( cl
) ; ccm
. setClassName ( fcn
) ; ccm
. addDefaultConstructor ( ) ; ccm
. setSuperClass ( Proxy . class ) ; ccm
. addMethod ( "public Object newInstance(" + InvocationHandler . class . getName ( ) + " h){ return new " + pcn
+ "($1); }" ) ; Class < ? > pc
= ccm
. toClass ( ) ; proxy
= ( Proxy ) pc
. newInstance ( ) ; } catch ( RuntimeException e
) { throw e
; } catch ( Exception e
) { throw new RuntimeException ( e
. getMessage ( ) , e
) ; } finally { if ( ccp
!= null ) ccp
. release ( ) ; if ( ccm
!= null ) ccm
. release ( ) ; synchronized ( cache
) { if ( proxy
== null ) cache
. remove ( key
) ; else cache
. put ( key
, new WeakReference < Proxy > ( proxy
) ) ; cache
. notifyAll ( ) ; } } return proxy
; }
(2)這里是進行補充上面的那個【Invoker的怎么生成的步驟】,看一下Invoker中都包含了什么信息這么重要,這里需要強調一下這個Invoker生成的過程和Dubbo服務的暴露和導出生成的Invoker不太一樣
invoker對象是通過 InvokerInvocationHandler構造方法傳入,而InvokerInvocationHandler對象是由JavassistProxyFactory類getProxy(Invoker invoker, Class<?>[] interfaces)方法創建。 這還要回到調用proxyFactory.getProxy(invoker);方法的地方,即ReferenceConfig類的createProxy(Map<String, String> map)方法中 所以這個Invoker其實是通過ReferenceConfig 中的createProxy(Map<String, String> map)方法來生成的Invoker對象,這個就是下面中使用到的對象refprotocol ,private static final Protocol refprotocol = (Protocol)ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();
if ( urls
. size ( ) == 1 ) { invoker
= refprotocol
. refer ( interfaceClass
, urls
. get ( 0 ) ) ; } else { List < Invoker < ? > > invokers
= new ArrayList < Invoker < ? > > ( ) ; URL registryURL
= null ; for ( URL url
: urls
) { invokers
. add ( refprotocol
. refer ( interfaceClass
, url
) ) ; if ( Constants . REGISTRY_PROTOCOL
. equals ( url
. getProtocol ( ) ) ) { registryURL
= url
; } } if ( registryURL
!= null ) { URL u
= registryURL
. addParameter ( Constants . CLUSTER_KEY
, AvailableCluster . NAME
) ; invoker
= cluster
. join ( new StaticDirectory ( u
, invokers
) ) ; } else { invoker
= cluster
. join ( new StaticDirectory ( invokers
) ) ; } }
上面的代碼可以看出生成Invoker的有三種方式, 第一種是refprotocol.refer(this.interfaceClass, (URL),這里的接口用的是SPI機制的dubbo對應下面的那個@SPI注解=com.alibaba.dubbo.rpc.protocol.dubbo.DubboProtocol這一句,所以這一種主要是使用的是Dubbo直連的協議都會走這一層 第二種是 cluster.join(new StaticDirectory(u, invokers));,這個是加了路由的負載均衡相關的,這一部分主要都是路由層的東西 第三種是 this.invoker = cluster.join(new StaticDirectory(invokers));也是路由層的東西, 路由層:封裝多個提供者的路由及負載均衡,并橋接注冊中心,以 Invoker 為中心,擴展接口為 Cluster, Directory, Router, LoadBalance
@SPI ( "dubbo" )
public interface Protocol { int getDefaultPort ( ) ; @Adaptive < T > Exporter < T > export ( Invoker < T > var1
) throws RpcException ; @Adaptive < T > Invoker < T > refer ( Class < T > var1
, URL var2
) throws RpcException ; void destroy ( ) ;
}
因為Dubbo中在實現遠程調用的時候其實是通過Poxy對象生成的Invoker對象,那么就先看一下Invoker的怎么生成的,里面都包含了什么信息?
這里的invoker對象,通過InvokerInvocationHandler構造方法傳入,而InvokerInvocationHandler對象是由JavassistProxyFactory類 getProxy(Invoker invoker, Class<?>[] interfaces)方法創建
Dubbo中的SPI機制的使用和分析
參考文章:https://cloud.tencent.com/developer/article/1109459
總結
以上是生活随笔 為你收集整理的Dubbo是如何进行远程服务调用的?(源码流程跟踪) 的全部內容,希望文章能夠幫你解決所遇到的問題。
如果覺得生活随笔 網站內容還不錯,歡迎將生活随笔 推薦給好友。