rocketmq 初探(二)
生活随笔
收集整理的這篇文章主要介紹了
rocketmq 初探(二)
小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.
大家好,我是烤鴨:
????上一篇簡單介紹和rocketmq,這一篇看下源碼之注冊(cè)中心。
namesrv
先看兩個(gè)初始化方法
NamesrvController.initialize() 和 NettyRemotingServer.start();
NettyRemotingServer.start()
public void start() {// 用剛才初始化的線程池創(chuàng)建線程this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(nettyServerConfig.getServerWorkerThreads(),new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);@Overridepublic Thread newThread(Runnable r) {return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());}});// 構(gòu)建netty 相關(guān)的handler,包含連接、讀數(shù)據(jù)、解碼、請(qǐng)求和響應(yīng)處理prepareSharableHandlers();// 創(chuàng)建netty server,使用初始化的參數(shù)和剛才的handler初始化channelServerBootstrap childHandler =this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector).channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 1024).option(ChannelOption.SO_REUSEADDR, true).option(ChannelOption.SO_KEEPALIVE, false).childOption(ChannelOption.TCP_NODELAY, true).childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize()).childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize()).localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort())).childHandler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler).addLast(defaultEventExecutorGroup,encoder,new NettyDecoder(),new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),connectionManageHandler,serverHandler);}});if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);}try {ChannelFuture sync = this.serverBootstrap.bind().sync();InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();this.port = addr.getPort();} catch (InterruptedException e1) {throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);}// 啟動(dòng)channel的監(jiān)聽器,針對(duì)channel的連接、關(guān)閉、異常、空閑(后面其他的實(shí)現(xiàn)都是關(guān)閉邏輯)if (this.channelEventListener != null) {this.nettyEventExecutor.start();}// 每秒處理過時(shí)的響應(yīng),如果超時(shí)時(shí)間+1秒沒響應(yīng),就移除該請(qǐng)求并手動(dòng)回調(diào)(由于注冊(cè)中心沒有對(duì)外發(fā)請(qǐng)求,所以沒用到,client和server用到了)this.timer.scheduleAtFixedRate(new TimerTask() {從@Overridepublic void run() {try {NettyRemotingServer.this.scanResponseTable();} catch (Throwable e) {log.error("scanResponseTable exception", e);}}}, 1000 * 3, 1000); }再看下 NettyClientHandler,對(duì)請(qǐng)求和響應(yīng)指令進(jìn)行處理
/*** Entry of incoming command processing.** <p>* <strong>Note:</strong>* The incoming remoting command may be* <ul>* <li>An inquiry request from a remote peer component;</li>* <li>A response to a previous request issued by this very participant.</li>* </ul>* </p>** @param ctx Channel handler context.* @param msg incoming remoting command.* @throws Exception if there were any error while processing the incoming command.*/ public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {final RemotingCommand cmd = msg;if (cmd != null) {switch (cmd.getType()) {case REQUEST_COMMAND:// 接收請(qǐng)求并處理processRequestCommand(ctx, cmd);break;case RESPONSE_COMMAND:// 接收響應(yīng),維護(hù)responseTable(注冊(cè)中心用不到)processResponseCommand(ctx, cmd);break;default:break;}} }由于注冊(cè)中心沒有發(fā)起 request,看下 processRequestCommand(接收request)
/*** Process incoming request command issued by remote peer.** @param ctx channel handler context.* @param cmd request command.*/ public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) {// request的code在 RequestCode 類維護(hù),包括 發(fā)送、拉取等等final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode());final Pair<NettyRequestProcessor, ExecutorService> pair = null == matched ? this.defaultRequestProcessor : matched;// 自增計(jì)數(shù)final int opaque = cmd.getOpaque();if (pair != null) {Runnable run = new Runnable() {@Overridepublic void run() {try {// ACL鑒權(quán) (client端和broker使用)doBeforeRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd);final RemotingResponseCallback callback = new RemotingResponseCallback() {@Overridepublic void callback(RemotingCommand response) {doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response);if (!cmd.isOnewayRPC()) {if (response != null) {response.setOpaque(opaque);response.markResponseType();try {ctx.writeAndFlush(response);} catch (Throwable e) {log.error("process request over, but response failed", e);log.error(cmd.toString());log.error(response.toString());}} else {}}}};// 異步 or 同步if (pair.getObject1() instanceof AsyncNettyRequestProcessor) {AsyncNettyRequestProcessor processor = (AsyncNettyRequestProcessor)pair.getObject1();processor.asyncProcessRequest(ctx, cmd, callback);} else {NettyRequestProcessor processor = pair.getObject1();// 比較重要的地方,單獨(dú)分析RemotingCommand response = processor.processRequest(ctx, cmd);callback.callback(response);}} catch (Throwable e) {log.error("process request exception", e);log.error(cmd.toString());if (!cmd.isOnewayRPC()) {final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR,RemotingHelper.exceptionSimpleDesc(e));response.setOpaque(opaque);ctx.writeAndFlush(response);}}}};// 系統(tǒng)繁忙,注冊(cè)中心不會(huì)提示這個(gè)(broker 刷盤不及時(shí)會(huì)報(bào)這個(gè))if (pair.getObject1().rejectRequest()) {final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,"[REJECTREQUEST]system busy, start flow control for a while");response.setOpaque(opaque);ctx.writeAndFlush(response);return;}try {final RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd);pair.getObject2().submit(requestTask);} catch (RejectedExecutionException e) {// 避免日志打印的太多if ((System.currentTimeMillis() % 10000) == 0) {log.warn(RemotingHelper.parseChannelRemoteAddr(ctx.channel())+ ", too many requests and system thread pool busy, RejectedExecutionException "+ pair.getObject2().toString()+ " request code: " + cmd.getCode());}// 不是單向請(qǐng)求(onewayRPC,線程池滿的話,直接返回系統(tǒng)繁忙)if (!cmd.isOnewayRPC()) {final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,"[OVERLOAD]system busy, start flow control for a while");response.setOpaque(opaque);ctx.writeAndFlush(response);}}} else {String error = " request type " + cmd.getCode() + " not supported";final RemotingCommand response =RemotingCommand.createResponseCommand(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED, error);response.setOpaque(opaque);ctx.writeAndFlush(response);log.error(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) + error);} }我們先看一下 NettyRequestProcessor.processRequest 實(shí)現(xiàn)
DefaultRequestProcessor.processRequest
其實(shí)看名字就能看出來 注冊(cè)中心的操作了
public RemotingCommand processRequest(ChannelHandlerContext ctx,RemotingCommand request) throws RemotingCommandException {if (ctx != null) {log.debug("receive request, {} {} {}",request.getCode(),RemotingHelper.parseChannelRemoteAddr(ctx.channel()),request);}switch (request.getCode()) {case RequestCode.PUT_KV_CONFIG:// admin調(diào)用,配置添加到 configTable,定期打印return this.putKVConfig(ctx, request);case RequestCode.GET_KV_CONFIG:// admin調(diào)用,獲取配置return this.getKVConfig(ctx, request);case RequestCode.DELETE_KV_CONFIG:// admin調(diào)用,刪除配置return this.deleteKVConfig(ctx, request);case RequestCode.QUERY_DATA_VERSION:// broker 獲取topic配置return queryBrokerTopicConfig(ctx, request);case RequestCode.REGISTER_BROKER:// 注冊(cè)broker,版本不同處理邏輯有些不一樣(topic配置信息封裝不同)Version brokerVersion = MQVersion.value2Version(request.getVersion());if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {return this.registerBrokerWithFilterServer(ctx, request);} else {return this.registerBroker(ctx, request);}case RequestCode.UNREGISTER_BROKER:// 下線 brokerreturn this.unregisterBroker(ctx, request);case RequestCode.GET_ROUTEINFO_BY_TOPIC:// 根據(jù)topic獲取路由信息,獲取的key是 ORDER_TOPIC_CONFIG+topicidreturn this.getRouteInfoByTopic(ctx, request);case RequestCode.GET_BROKER_CLUSTER_INFO:// 獲取broker 集群信息return this.getBrokerClusterInfo(ctx, request);case RequestCode.WIPE_WRITE_PERM_OF_BROKER:// 廢除broker的寫入權(quán)限r(nóng)eturn this.wipeWritePermOfBroker(ctx, request);case RequestCode.GET_ALL_TOPIC_LIST_FROM_NAMESERVER:// 獲取所有的topicreturn getAllTopicListFromNameserver(ctx, request);case RequestCode.DELETE_TOPIC_IN_NAMESRV:// 刪除topicreturn deleteTopicInNamesrv(ctx, request);case RequestCode.GET_KVLIST_BY_NAMESPACE:// 根據(jù)namespace獲取配置return this.getKVListByNamespace(ctx, request);case RequestCode.GET_TOPICS_BY_CLUSTER:// 根據(jù)cluster下的broker獲取topicreturn this.getTopicsByCluster(ctx, request);case RequestCode.GET_SYSTEM_TOPIC_LIST_FROM_NS:// 獲取cluster、broker和關(guān)聯(lián)信息return this.getSystemTopicListFromNs(ctx, request);case RequestCode.GET_UNIT_TOPIC_LIST:// 設(shè)置unit_mode true && 非重試的時(shí)候,這個(gè)配置好像沒用啊(https://github.com/apache/rocketmq/issues/639)return this.getUnitTopicList(ctx, request);case RequestCode.GET_HAS_UNIT_SUB_TOPIC_LIST:// 設(shè)置unit_mode true(校驗(yàn)消息和心跳的時(shí)候),獲取topicreturn this.getHasUnitSubTopicList(ctx, request);case RequestCode.GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST:// !getUnitTopicList && getHasUnitSubTopicListreturn this.getHasUnitSubUnUnitTopicList(ctx, request);case RequestCode.UPDATE_NAMESRV_CONFIG:// 更新注冊(cè)中心配置return this.updateConfig(ctx, request);case RequestCode.GET_NAMESRV_CONFIG:// 獲取注冊(cè)中心配置return this.getConfig(ctx, request);default:break;}return null; }小結(jié)
注冊(cè)中心的作用:
存了 cluster、broker、topic的信息。
提供了一些接口,可以broker注冊(cè)和下線,修改配置等。
檢測(cè)和維護(hù)broker是否活躍。
總結(jié)
以上是生活随笔為你收集整理的rocketmq 初探(二)的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 魔力服务器修改器,魔力宝贝修改器
- 下一篇: Mac版WebStorm破解方案