dubbo源码分析二:服务发布
?本文將深入分析dubbo的服務發布涉及到的流程及主要類的代碼。首先,我們先從類的關系圖來看一下dubbo發布服務涉及到的相關類。
1.類圖
? ?上圖展示了部分服務發布過程中需要使用到的類和接口,其中:
? ? spring適配涉及到的類:DubboNamespaceHandler、DubboBeanDefinitionParser、ServiceBean;
? ? 配置信息存儲:ServicdConfig、RegistryConfig、MonitorConfig、ProtocolConfig、ProviderConfig等;
? ? 應用協議:Protocol、DubboProtocol、HessianProtocol、ThriftProtocol、RmiProtocol、AbstractProxyProtocol、AbstractProtocol等;
? ? Server相關:Exchanger、HeaderExchanger、ExchangeServer、HeaderExchangeServer、Transporters、Transporter、NettyTransporter、NettyServer等;
?
2.時序圖
? ? 我們通過時序圖來分析一下在發布服務的過程中,上面的類是如何串聯在一起的:
? ? a.spring容器通過DubboBeanDefinitionParser類的對象來解析xml文件中的標簽,生成ServiceConfig等配置對象;
? ? b.ServiceConfig的export()等發布服務的方法被調用;
? ? c.通過spi機制確定Protocol接口的實現對象為DubboProtocol的對象,調用它的openServer()等方法;
? ? d.通過spi機制確定Transporter接口的實現對象為NettyTransporter,調用它的bind()方法;
? ? e.調用NettyServer類,啟動netty服務,綁定端口。
?
?3.核心代碼解析
? ? ? a.DubboProtocol? ? ??
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {URL url = invoker.getUrl();//獲取url信息// export service.String key = serviceKey(url);//產生與發布的接口映射的key ,格式例如:com.alibaba.dubbo.demo.DemoService:20882DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);//創建DubboExporter對象exporterMap.put(key, exporter);//緩存此對象//export an stub service for dispaching eventBoolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY,Constants.DEFAULT_STUB_EVENT);Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);if (isStubSupportEvent && !isCallbackservice){String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);if (stubServiceMethods == null || stubServiceMethods.length() == 0 ){if (logger.isWarnEnabled()){logger.warn(new IllegalStateException("consumer [" +url.getParameter(Constants.INTERFACE_KEY) +"], has set stubproxy support event ,but no stub methods founded."));}} else {stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);}}openServer(url);//打開服務return exporter; }private void openServer(URL url) {// find server.String key = url.getAddress();//client 也可以暴露一個只有server可以調用的服務。boolean isServer = url.getParameter(Constants.IS_SERVER_KEY,true);if (isServer) {ExchangeServer server = serverMap.get(key);if (server == null) {serverMap.put(key, createServer(url));//創建新的服務} else {//server支持reset,配合override功能使用 server.reset(url);}} }private ExchangeServer createServer(URL url) {//默認開啟server關閉時發送readonly事件url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString());//默認開啟heartbeaturl = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);if (str != null && str.length() > 0 && ! ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str))throw new RpcException("Unsupported server type: " + str + ", url: " + url);url = url.addParameter(Constants.CODEC_KEY, Version.isCompatibleVersion() ? COMPATIBLE_CODEC_NAME : DubboCodec.NAME);ExchangeServer server;try {server = Exchangers.bind(url, requestHandler);} catch (RemotingException e) {throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);}str = url.getParameter(Constants.CLIENT_KEY);if (str != null && str.length() > 0) {Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();if (!supportedTypes.contains(str)) {throw new RpcException("Unsupported client type: " + str);}}return server;b.NettyServer
protected void doOpen() throws Throwable {NettyHelper.setNettyLoggerFactory();ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true));//boss線程池ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true));//工作線程池ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS));bootstrap = new ServerBootstrap(channelFactory);//服務啟動類final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);channels = nettyHandler.getChannels();// https://issues.jboss.org/browse/NETTY-365// https://issues.jboss.org/browse/NETTY-379// final Timer timer = new HashedWheelTimer(new NamedThreadFactory("NettyIdleTimer", true));bootstrap.setPipelineFactory(new ChannelPipelineFactory() {public ChannelPipeline getPipeline() {NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec() ,getUrl(), NettyServer.this);ChannelPipeline pipeline = Channels.pipeline();/*int idleTimeout = getIdleTimeout();if (idleTimeout > 10000) {pipeline.addLast("timer", new IdleStateHandler(timer, idleTimeout / 1000, 0, 0));}*/pipeline.addLast("decoder", adapter.getDecoder());//增加解碼器pipeline.addLast("encoder", adapter.getEncoder());//增加編碼器pipeline.addLast("handler", nettyHandler);//業務處理類return pipeline;}});// bindchannel = bootstrap.bind(getBindAddress()); }?
?
總結
以上是生活随笔為你收集整理的dubbo源码分析二:服务发布的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 设计模式系列·王小二需求历险记(一)
- 下一篇: Node.js模块之Buffer