日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Nacos源码主动健康检测

發(fā)布時(shí)間:2024/4/14 编程问答 41 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Nacos源码主动健康检测 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

對于非臨時(shí)實(shí)例(ephemeral=false),Nacos會(huì)采用主動(dòng)的健康檢測,定時(shí)向?qū)嵗l(fā)送請求,根據(jù)響應(yīng)來判斷實(shí)例健康狀態(tài)。

入口在ServiceManager類中的registerInstance方法:

?創(chuàng)建空服務(wù)時(shí):

public void createEmptyService(String namespaceId, String serviceName, boolean local) throws NacosException {// 如果服務(wù)不存在,創(chuàng)建新的服務(wù)createServiceIfAbsent(namespaceId, serviceName, local, null); }

創(chuàng)建服務(wù)流程:

public void createServiceIfAbsent(String namespaceId, String serviceName, boolean local, Cluster cluster)throws NacosException {// 嘗試獲取服務(wù)Service service = getService(namespaceId, serviceName);if (service == null) {// 發(fā)現(xiàn)服務(wù)不存在,開始創(chuàng)建新服務(wù)Loggers.SRV_LOG.info("creating empty service {}:{}", namespaceId, serviceName);service = new Service();service.setName(serviceName);service.setNamespaceId(namespaceId);service.setGroupName(NamingUtils.getGroupName(serviceName));// now validate the service. if failed, exception will be thrownservice.setLastModifiedMillis(System.currentTimeMillis());service.recalculateChecksum();if (cluster != null) {cluster.setService(service);service.getClusterMap().put(cluster.getName(), cluster);}service.validate();// ** 寫入注冊表并初始化 **putServiceAndInit(service);if (!local) {addOrReplaceService(service);}} }

關(guān)鍵在putServiceAndInit(service)方法中:

private void putServiceAndInit(Service service) throws NacosException {// 將服務(wù)寫入注冊表putService(service);service = getService(service.getNamespaceId(), service.getName());// 完成服務(wù)的初始化service.init();consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service);consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service);Loggers.SRV_LOG.info("[NEW-SERVICE] {}", service.toJson()); }

進(jìn)入初始化邏輯:service.init(),這個(gè)會(huì)進(jìn)入Service類中:

/*** Init service.*/ public void init() {// 開啟臨時(shí)實(shí)例的心跳監(jiān)測任務(wù)HealthCheckReactor.scheduleCheck(clientBeatCheckTask);// 遍歷注冊表中的集群for (Map.Entry<String, Cluster> entry : clusterMap.entrySet()) {entry.getValue().setService(this);// 完成集群初識化entry.getValue().init();} }

這里集群的初始化entry.getValue().init();會(huì)進(jìn)入Cluster類型的init()方法:

/*** Init cluster.*/ public void init() {if (inited) {return;}// 創(chuàng)建健康檢測的任務(wù)checkTask = new HealthCheckTask(this);// 這里會(huì)開啟對 非臨時(shí)實(shí)例的 定時(shí)健康檢測HealthCheckReactor.scheduleCheck(checkTask);inited = true; }

這里的HealthCheckReactor.scheduleCheck(checkTask);會(huì)開啟定時(shí)任務(wù),對非臨時(shí)實(shí)例做健康檢測。檢測邏輯定義在HealthCheckTask這個(gè)類中,是一個(gè)Runnable,其中的run方法:

public void run() {try {if (distroMapper.responsible(cluster.getService().getName()) && switchDomain.isHealthCheckEnabled(cluster.getService().getName())) {// 開始健康檢測healthCheckProcessor.process(this);// 記錄日志 。。。}} catch (Throwable e) {// 記錄日志 。。。} finally {if (!cancelled) {// 結(jié)束后,再次進(jìn)行任務(wù)調(diào)度,一定延遲后執(zhí)行HealthCheckReactor.scheduleCheck(this);// 。。。}} }

健康檢測邏輯定義在healthCheckProcessor.process(this);方法中,在HealthCheckProcessor接口中,這個(gè)接口也有很多實(shí)現(xiàn),默認(rèn)是TcpSuperSenseProcessor:

進(jìn)入TcpSuperSenseProcessor的process方法:

@Override public void process(HealthCheckTask task) {// 獲取所有 非臨時(shí)實(shí)例的 集合List<Instance> ips = task.getCluster().allIPs(false);if (CollectionUtils.isEmpty(ips)) {return;}for (Instance ip : ips) {// 封裝健康檢測信息到 BeatBeat beat = new Beat(ip, task);// 放入一個(gè)阻塞隊(duì)列中taskQueue.add(beat);MetricsMonitor.getTcpHealthCheckMonitor().incrementAndGet();} }

可以看到,所有的健康檢測任務(wù)都被放入一個(gè)阻塞隊(duì)列,而不是立即執(zhí)行了。這里又采用了異步執(zhí)行的策略,可以看到Nacos中大量這樣的設(shè)計(jì)。

而TcpSuperSenseProcessor本身就是一個(gè)Runnable,在它的構(gòu)造函數(shù)中會(huì)把自己放入線程池中去執(zhí)行,其run方法如下:

public void run() {while (true) {try {// 處理任務(wù)processTask();// ...} catch (Throwable e) {SRV_LOG.error("[HEALTH-CHECK] error while processing NIO task", e);}} }

通過processTask來處理健康檢測的任務(wù):

private void processTask() throws Exception {// 將任務(wù)封裝為一個(gè) TaskProcessor,并放入集合Collection<Callable<Void>> tasks = new LinkedList<>();do {Beat beat = taskQueue.poll(CONNECT_TIMEOUT_MS / 2, TimeUnit.MILLISECONDS);if (beat == null) {return;}tasks.add(new TaskProcessor(beat));} while (taskQueue.size() > 0 && tasks.size() < NIO_THREAD_COUNT * 64);// 批量處理集合中的任務(wù)for (Future<?> f : GlobalExecutor.invokeAllTcpSuperSenseTask(tasks)) {f.get();} }

任務(wù)被封裝到了TaskProcessor中去執(zhí)行了,TaskProcessor是一個(gè)Callable,其中的call方法:

@Override public Void call() {// 獲取檢測任務(wù)已經(jīng)等待的時(shí)長long waited = System.currentTimeMillis() - beat.getStartTime();if (waited > MAX_WAIT_TIME_MILLISECONDS) {Loggers.SRV_LOG.warn("beat task waited too long: " + waited + "ms");}SocketChannel channel = null;try {// 獲取實(shí)例信息Instance instance = beat.getIp();// 通過NIO建立TCP連接channel = SocketChannel.open();channel.configureBlocking(false);// only by setting this can we make the socket close event asynchronouschannel.socket().setSoLinger(false, -1);channel.socket().setReuseAddress(true);channel.socket().setKeepAlive(true);channel.socket().setTcpNoDelay(true);Cluster cluster = beat.getTask().getCluster();int port = cluster.isUseIPPort4Check() ? instance.getPort() : cluster.getDefCkport();channel.connect(new InetSocketAddress(instance.getIp(), port));// 注冊連接、讀取事件SelectionKey key = channel.register(selector, SelectionKey.OP_CONNECT | SelectionKey.OP_READ);key.attach(beat);keyMap.put(beat.toString(), new BeatKey(key));beat.setStartTime(System.currentTimeMillis());GlobalExecutor.scheduleTcpSuperSenseTask(new TimeOutTask(key), CONNECT_TIMEOUT_MS, TimeUnit.MILLISECONDS);} catch (Exception e) {beat.finishCheck(false, false, switchDomain.getTcpHealthParams().getMax(),"tcp:error:" + e.getMessage());if (channel != null) {try {channel.close();} catch (Exception ignore) {}}}return null; }

Nacos的健康檢測有兩種模式:

  • 臨時(shí)實(shí)例:

    • 采用客戶端心跳檢測模式,心跳周期5秒

    • 心跳間隔超過15秒則標(biāo)記為不健康

    • 心跳間隔超過30秒則從服務(wù)列表刪除

  • 永久實(shí)例:

    • 采用服務(wù)端主動(dòng)健康檢測方式

    • 周期為2000 + 5000毫秒內(nèi)的隨機(jī)數(shù)

    • 檢測異常只會(huì)標(biāo)記為不健康,不會(huì)刪除

那么為什么Nacos有臨時(shí)和永久兩種實(shí)例呢?

以淘寶為例,雙十一大促期間,流量會(huì)比平常高出很多,此時(shí)服務(wù)肯定需要增加更多實(shí)例來應(yīng)對高并發(fā),而這些實(shí)例在雙十一之后就無需繼續(xù)使用了,采用臨時(shí)實(shí)例比較合適。而對于服務(wù)的一些常備實(shí)例,則使用永久實(shí)例更合適。

與eureka相比,Nacos與Eureka在臨時(shí)實(shí)例上都是基于心跳模式實(shí)現(xiàn),差別不大,主要是心跳周期不同,eureka是30秒,Nacos是5秒。

另外,Nacos支持永久實(shí)例,而Eureka不支持,Eureka只提供了心跳模式的健康監(jiān)測,而沒有主動(dòng)檢測功能。

總結(jié)

以上是生活随笔為你收集整理的Nacos源码主动健康检测的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。