日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

xxl-job 2.1.1执行器源码解读

發(fā)布時間:2024/4/13 编程问答 40 豆豆
生活随笔 收集整理的這篇文章主要介紹了 xxl-job 2.1.1执行器源码解读 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

?

XxlJobConfig

執(zhí)行器端通過XxlJobConfig類作為xxl-job的啟動入口。通過注解@beaninitMethod的方法來啟動xxl-job。

/*** xxl-job config** @author xuxueli 2017-04-28*/ @Configuration public class XxlJobConfig {@Bean(initMethod = "start", destroyMethod = "destroy")public XxlJobSpringExecutor xxlJobExecutor() {logger.info(">>>>>>>>>>> xxl-job config init.");XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();xxlJobSpringExecutor.setAdminAddresses(adminAddresses);xxlJobSpringExecutor.setAppName(appName);xxlJobSpringExecutor.setIp(ip);xxlJobSpringExecutor.setPort(port);xxlJobSpringExecutor.setAccessToken(accessToken);xxlJobSpringExecutor.setLogPath(logPath);xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);return xxlJobSpringExecutor;}}

XxlJobSpringExecutor

XxlJobSpringExecutor的start()方法,首先初始化執(zhí)行器端的所有執(zhí)行器,再調(diào)用XxlJobExecutor的start()方法。

public class XxlJobSpringExecutor extends XxlJobExecutor implements ApplicationContextAware {@Overridepublic void start() throws Exception {// init JobHandler RepositoryinitJobHandlerRepository(applicationContext);// refresh GlueFactoryGlueFactory.refreshInstance(1);// super startsuper.start();}

由于XxlJobSpringExecutor實現(xiàn)了ApplicationContextAware接口,在Spring啟動時,會調(diào)用setApplicationContext()方法,設(shè)置ApplicationContext,用于訪問spring bean。

initJobHandlerRepository方法獲取所有應(yīng)用了@JobHandler注解的JobHandler,并注冊。

private void initJobHandlerRepository(ApplicationContext applicationContext){if (applicationContext == null) {return;}// init job handler actionMap<String, Object> serviceBeanMap = applicationContext.getBeansWithAnnotation(JobHandler.class);if (serviceBeanMap!=null && serviceBeanMap.size()>0) {for (Object serviceBean : serviceBeanMap.values()) {if (serviceBean instanceof IJobHandler){String name = serviceBean.getClass().getAnnotation(JobHandler.class).value();IJobHandler handler = (IJobHandler) serviceBean;if (loadJobHandler(name) != null) {throw new RuntimeException("xxl-job jobhandler["+ name +"] naming conflicts.");}registJobHandler(name, handler);}}}}

XxlJobExecutor

XxlJobExecutor的start()方法

public void start() throws Exception {// init logpathXxlJobFileAppender.initLogPath(logPath);// init invoker, admin-clientinitAdminBizList(adminAddresses, accessToken);//開啟日志清理線程JobLogFileCleanThread.getInstance().start(logRetentionDays);// 初始化觸發(fā)器回調(diào)線程(用RPC回調(diào)調(diào)度中心接口)TriggerCallbackThread.getInstance().start();//初始化Rpc服務(wù)port = port>0?port: NetUtil.findAvailablePort(9999);ip = (ip!=null&&ip.trim().length()>0)?ip: IpUtil.getIp();initRpcProvider(ip, port, appName, accessToken);}

初始化RPC服務(wù)

private void initRpcProvider(String ip, int port, String appName, String accessToken) throws Exception {// init, provider factoryString address = IpUtil.getIpPort(ip, port);Map<String, String> serviceRegistryParam = new HashMap<String, String>();serviceRegistryParam.put("appName", appName);serviceRegistryParam.put("address", address);xxlRpcProviderFactory = new XxlRpcProviderFactory(); //初始化xxlRpcProviderFactory.initConfig(NetEnum.NETTY_HTTP, Serializer.SerializeEnum.HESSIAN.getSerializer(), ip, port, accessToken, ExecutorServiceRegistry.class, serviceRegistryParam);// add servicesxxlRpcProviderFactory.addService(ExecutorBiz.class.getName(), null, new ExecutorBizImpl());// startxxlRpcProviderFactory.start();}

Rpc服務(wù)啟動

  • 構(gòu)造server。
  • 設(shè)置started回調(diào),stoped回調(diào)。
  • start server。
public void start() throws Exception {// 啟動服務(wù)serviceAddress = IpUtil.getIpPort(this.ip, port);server = netType.serverClass.newInstance();server.setStartedCallback(new BaseCallback() { // serviceRegistry started@Overridepublic void run() throws Exception {// start registryif (serviceRegistryClass != null) {serviceRegistry = serviceRegistryClass.newInstance();serviceRegistry.start(serviceRegistryParam);if (serviceData.size() > 0) {serviceRegistry.registry(serviceData.keySet(), serviceAddress);}}}});server.setStopedCallback(new BaseCallback() { // serviceRegistry stoped@Overridepublic void run() {// stop registryif (serviceRegistry != null) {if (serviceData.size() > 0) {serviceRegistry.remove(serviceData.keySet(), serviceAddress);}serviceRegistry.stop();serviceRegistry = null;}}});server.start(this);}

提供了幾種server:

public enum NetEnum {/*** netty tcp server*/NETTY(NettyServer.class, NettyClient.class),/*** netty http server (servlet no server, ServletServerHandler)*/NETTY_HTTP(NettyHttpServer.class, NettyHttpClient.class),/*** mina tcp server*/MINA(MinaServer.class, MinaClient.class),/*** jetty http server*/JETTY(JettyServer .class, JettyClient .class);public final Class<? extends Server> serverClass;public final Class<? extends Client> clientClass;NetEnum(Class<? extends Server> serverClass, Class<? extends Client> clientClass) {this.serverClass = serverClass;this.clientClass = clientClass;}}

NettyHttpServer

NettyHttpServer使用Netty作為底層通信框架。注冊了4個handler:

  • IdleStateHandler
  • HttpServerCodec
  • HttpObjectAggregator
  • NettyHttpServerHandler
public void start(final XxlRpcProviderFactory xxlRpcProviderFactory) throws Exception {thread = new Thread(new Runnable() {@Overridepublic void run() {// paramfinal ThreadPoolExecutor serverHandlerPool = ThreadPoolUtil.makeServerThreadPool(NettyHttpServer.class.getSimpleName());EventLoopGroup bossGroup = new NioEventLoopGroup();EventLoopGroup workerGroup = new NioEventLoopGroup();try {// start serverServerBootstrap bootstrap = new ServerBootstrap();bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel channel) throws Exception {channel.pipeline().addLast(new IdleStateHandler(0, 0, 10, TimeUnit.MINUTES)).addLast(new HttpServerCodec()).addLast(new HttpObjectAggregator(5 * 1024 * 1024)) // merge request & reponse to FULL.addLast(new NettyHttpServerHandler(xxlRpcProviderFactory, serverHandlerPool));}}).childOption(ChannelOption.SO_KEEPALIVE, true);// bindChannelFuture future = bootstrap.bind(xxlRpcProviderFactory.getPort()).sync();logger.info(">>>>>>>>>>> xxl-rpc remoting server start success, nettype = {}, port = {}", NettyHttpServer.class.getName(), xxlRpcProviderFactory.getPort());onStarted();// wait util stopfuture.channel().closeFuture().sync();} catch (InterruptedException e) {if (e instanceof InterruptedException) {logger.info(">>>>>>>>>>> xxl-rpc remoting server stop.");} else {logger.error(">>>>>>>>>>> xxl-rpc remoting server error.", e);}} finally {// stoptry {serverHandlerPool.shutdown(); // shutdownNow} catch (Exception e) {logger.error(e.getMessage(), e);}try {workerGroup.shutdownGracefully();bossGroup.shutdownGracefully();} catch (Exception e) {logger.error(e.getMessage(), e);}}}});thread.setDaemon(true); // daemon, service jvm, user thread leave >>> daemon leave >>> jvm leavethread.start();}

其中NettyHttpServerHandler為xxl-rpc實現(xiàn)的消息處理方法。反序列化請求,處理消息,再序列化響應(yīng)。以上步驟都由XxlRpcProviderFactory處理。

// request deserializeXxlRpcRequest xxlRpcRequest = (XxlRpcRequest) xxlRpcProviderFactory.getSerializer().deserialize(requestBytes, XxlRpcRequest.class);requestId = xxlRpcRequest.getRequestId();// invoke + responseXxlRpcResponse xxlRpcResponse = xxlRpcProviderFactory.invokeService(xxlRpcRequest);// response serializebyte[] responseBytes = xxlRpcProviderFactory.getSerializer().serialize(xxlRpcResponse);// response-writewriteResponse(ctx, keepAlive, responseBytes);

啟動server之后,會觸發(fā)onStart()事件,通過serviceRegistryClass注冊服務(wù)。

server.setStartedCallback(new BaseCallback() { // serviceRegistry started@Overridepublic void run() throws Exception {// start registryif (serviceRegistryClass != null) {serviceRegistry = serviceRegistryClass.newInstance();serviceRegistry.start(serviceRegistryParam);if (serviceData.size() > 0) {//注冊服務(wù)。serviceRegistry.registry(serviceData.keySet(), serviceAddress);}}}});

serviceRegistryClass啟動一個線程,向Admin(可以是多個管理端)注冊服務(wù)。

registryThread = new Thread(new Runnable() {@Overridepublic void run() {// 注冊,此處為守護線程,一直不退出。while (!toStop) {try {RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appName, address);for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {try {ReturnT<String> registryResult = adminBiz.registry(registryParam);if (registryResult!=null && ReturnT.SUCCESS_CODE == registryResult.getCode()) {registryResult = ReturnT.SUCCESS;logger.debug(">>>>>>>>>>> xxl-job registry success, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult}); //注冊一個AdminBiz就跳出??看不明白。break;} else {logger.info(">>>>>>>>>>> xxl-job registry fail, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});}} catch (Exception e) {logger.info(">>>>>>>>>>> xxl-job registry error, registryParam:{}", registryParam, e);}}} catch (Exception e) {if (!toStop) {logger.error(e.getMessage(), e);}}try {if (!toStop) {//每30秒注冊一次,心跳。TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);}} catch (InterruptedException e) {if (!toStop) {logger.warn(">>>>>>>>>>> xxl-job, executor registry thread interrupted, error msg:{}", e.getMessage());}}}//線程停止時,取消注冊。try {RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appName, address);for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {try {ReturnT<String> registryResult = adminBiz.registryRemove(registryParam);if (registryResult!=null && ReturnT.SUCCESS_CODE == registryResult.getCode()) {registryResult = ReturnT.SUCCESS;logger.info(">>>>>>>>>>> xxl-job registry-remove success, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});break;} else {logger.info(">>>>>>>>>>> xxl-job registry-remove fail, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});}} catch (Exception e) {if (!toStop) {logger.info(">>>>>>>>>>> xxl-job registry-remove error, registryParam:{}", registryParam, e);}}}} catch (Exception e) {if (!toStop) {logger.error(e.getMessage(), e);}}logger.info(">>>>>>>>>>> xxl-job, executor registry thread destory.");}});

向Admin注冊Job,使用AdminBiz的代理實現(xiàn)類,向Admin服務(wù)器發(fā)送方法。

AdminBiz adminBiz = (AdminBiz) new XxlRpcReferenceBean(NetEnum.NETTY_HTTP,serializer,CallType.SYNC,LoadBalance.ROUND,AdminBiz.class,null,3000,addressUrl,accessToken,null,null).getObject(); public Object getObject() {return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), new Class[] { iface },new InvocationHandler() {@Overridepublic Object invoke(Object proxy, Method method, Object[] args) throws Throwable {........// 封裝方法調(diào)用為request,發(fā)送到AdminXxlRpcRequest xxlRpcRequest = new XxlRpcRequest();xxlRpcRequest.setRequestId(UUID.randomUUID().toString());xxlRpcRequest.setCreateMillisTime(System.currentTimeMillis());xxlRpcRequest.setAccessToken(accessToken);xxlRpcRequest.setClassName(className);xxlRpcRequest.setMethodName(methodName);xxlRpcRequest.setParameterTypes(parameterTypes);xxlRpcRequest.setParameters(parameters);// 同步,異步,回調(diào)方式調(diào)用方法。if (CallType.SYNC == callType) {} else if (CallType.FUTURE == callType) {} else if (CallType.CALLBACK == callType) {} else if (CallType.ONEWAY == callType) {} else {throw new XxlRpcException("xxl-rpc callType["+ callType +"] invalid");}}});}

?

總結(jié)

以上是生活随笔為你收集整理的xxl-job 2.1.1执行器源码解读的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。