日韩av黄I国产麻豆传媒I国产91av视频在线观看I日韩一区二区三区在线看I美女国产在线I麻豆视频国产在线观看I成人黄色短片

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程资源 > 综合教程 >内容正文

综合教程

分布式中间件nacos入门解析

發(fā)布時(shí)間:2023/12/13 综合教程 49 生活家
生活随笔 收集整理的這篇文章主要介紹了 分布式中间件nacos入门解析 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

一、Nacos簡(jiǎn)介

1、Nacos是什么?

Nacos是阿里巴巴開源的一個(gè)為微服務(wù)提供服務(wù)發(fā)現(xiàn)、服務(wù)配置和服務(wù)管理的微服務(wù)基礎(chǔ)設(shè)施,簡(jiǎn)單說(shuō)就是Nacos為微服務(wù)架構(gòu)提供了分布式配置和服務(wù)注冊(cè)中心的工作。

2、Nacos有什么功能?

Nacos主要有兩大功能:注冊(cè)中心和配置中心

2.1、注冊(cè)中心

a.服務(wù)發(fā)布:服務(wù)提供者發(fā)布服務(wù)到nacos,nacos存儲(chǔ)服務(wù)和提供者關(guān)系;

b.服務(wù)訂閱:服務(wù)消費(fèi)者從nacos訂閱服務(wù),拉去服務(wù)提供者信息列表;

c.變更推送:當(dāng)服務(wù)提供者信息變更時(shí),實(shí)時(shí)通知服務(wù)消費(fèi)者;

d.路由策略:根據(jù)不同路由規(guī)則,推送不同服務(wù)提供者信息給消費(fèi)者;

e.健康檢測(cè):和服務(wù)提供者和服務(wù)消費(fèi)者保持心跳,檢測(cè)服務(wù)的健康狀態(tài);

2.2、配置中心

a.管理配置:配置的增刪改查管理;

b.監(jiān)聽配置:客戶端實(shí)時(shí)監(jiān)聽配置的更新情況;

c.灰度更新:允許針對(duì)部分客戶端進(jìn)行配置更新;

d.配置快照:客戶端需要緩存配置快照,當(dāng)nacos服務(wù)器不可用時(shí)可以使用本地配置,提高整體容災(zāi)能力。

3、Nacos有哪些概念?

3.1、命名空間(namespace)

命名空間是用于配置和服務(wù)的空間隔離,不同命名空間下的數(shù)據(jù)相互獨(dú)立,不同命名空間下可以存在相同配置和相同服務(wù),通常命名空間可用于不同環(huán)境。如開發(fā)環(huán)境、測(cè)試環(huán)境和生產(chǎn)環(huán)境可以通過命名空間來(lái)進(jìn)行區(qū)分隔離。

nacos默認(rèn)有一個(gè)保留的命名空間為public,每一個(gè)命名空間都有一個(gè)唯一的ID,如果沒有手動(dòng)配置則會(huì)自動(dòng)生產(chǎn)一個(gè)。服務(wù)管理和配置管理都是在命名空間區(qū)域內(nèi)進(jìn)行管理,每一個(gè)服務(wù)和配置都會(huì)綁定一個(gè)命名空間。

3.2、配置分組(Group)

同一個(gè)命名空間下可以有多個(gè)應(yīng)用的配置,每個(gè)應(yīng)用都可能有相同的配置,所以需要有一個(gè)分組來(lái)將屬于同一個(gè)應(yīng)用的配置進(jìn)行區(qū)分。配置分組不需要單獨(dú)管理,在管理配置集時(shí)添加配置分組即可。

3.3、配置集(Data)

配置集是一組配置的集合,通常一個(gè)配置文件就是一個(gè)配置集,每一個(gè)配置集都有一個(gè)配置集ID叫做Data ID,如和緩存相關(guān)配置都可以放在配置集cache.properties中,數(shù)據(jù)庫(kù)配置放在db.properties中。

配置集ID可以重復(fù),但是同一個(gè)命名空間下同一個(gè)配置分組下的配置集ID不可重復(fù),也就是說(shuō)命名空間+配置分組+配置集ID可以唯一定位一個(gè)配置文件。

3.4、服務(wù)

通過預(yù)定義接口網(wǎng)絡(luò)訪問的提供給客戶端的軟件功能。每個(gè)服務(wù)都有一個(gè)服務(wù)名是服務(wù)提供的標(biāo)識(shí),通過該標(biāo)識(shí)可以唯一確定其指代的服務(wù)。

3.5、服務(wù)注冊(cè)

服務(wù)提供者將自己提供的服務(wù)注冊(cè)到nacos,nacos存儲(chǔ)服務(wù)和服務(wù)提供者關(guān)系。

3.6、服務(wù)訂閱

服務(wù)消費(fèi)者從nacos上獲取對(duì)應(yīng)服務(wù)的服務(wù)提供者信息列表

3.7、元數(shù)據(jù)

Nacos數(shù)據(jù)(如配置和服務(wù))描述信息,如服務(wù)版本、權(quán)重、容災(zāi)策略、負(fù)載均衡策略、鑒權(quán)配置、各種自定義標(biāo)簽 (label),從作用范圍來(lái)看,分為服務(wù)級(jí)別的元信息、集群的元信息及實(shí)例的元信息。

3.8、權(quán)重

實(shí)例級(jí)別的配置。權(quán)重為浮點(diǎn)數(shù)。權(quán)重越大,分配給該實(shí)例的流量越大。

3.9、健康檢查

以指定方式檢查服務(wù)下掛載的實(shí)例 (Instance) 的健康度,從而確認(rèn)該實(shí)例 (Instance) 是否能提供服務(wù)。根據(jù)檢查結(jié)果,實(shí)例 (Instance) 會(huì)被判斷為健康或不健康。對(duì)服務(wù)發(fā)起解析請(qǐng)求時(shí),不健康的實(shí)例 (Instance) 不會(huì)返回給客戶端。

3.10、健康保護(hù)閾值

為了防止因過多實(shí)例 (Instance) 不健康導(dǎo)致流量全部流向健康實(shí)例 (Instance) ,繼而造成流量壓力把健康實(shí)例 (Instance) 壓垮并形成雪崩效應(yīng),應(yīng)將健康保護(hù)閾值定義為一個(gè) 0 到 1 之間的浮點(diǎn)數(shù)。當(dāng)域名健康實(shí)例數(shù) (Instance) 占總服務(wù)實(shí)例數(shù) (Instance) 的比例小于

該值時(shí),無(wú)論實(shí)例 (Instance) 是否健康,都會(huì)將這個(gè)實(shí)例 (Instance) 返回給客戶端。這樣做雖然損失了一部分流量,但是保證了集群中剩余健康實(shí)例 (Instance) 能正常工作。

二、Nacos使用

2.1、Nacos的Open API

Nacos提供了大量的HTTP API,其中包括配置管理、服務(wù)管理和命名空間管理等,核心API如下

配置管理 獲取配置 GET /nacos/v1/cs/configs
監(jiān)聽配置 POST /nacos/v1/cs/configs/listener
發(fā)布配置 POST /nacos/v1/cs/configs
刪除配置 DELETE /nacos/v1/cs/configs
查詢歷史版本配置 GET /nacos/v1/cs/history?search=accurate
查詢上一個(gè)版本配置 GET /nacos/v1/cs/history/previous
服務(wù)發(fā)現(xiàn) 注冊(cè)實(shí)例 POST /nacos/v1/ns/instance
注銷實(shí)例 DELETE /nacos/v1/ns/instance
修改實(shí)例 PUT /nacos/v1/ns/instance
查詢實(shí)例列表   GET /nacos/v1/ns/instance/list
查詢實(shí)例詳情 GET /nacos/v1/ns/instance
發(fā)送實(shí)例心跳 PUT /nacos/v1/ns/instance/beat
創(chuàng)建服務(wù) POST /nacos/v1/ns/service
刪除服務(wù) DELETE /nacos/v1/ns/service
修改服務(wù) PUT /nacos/v1/ns/service
查詢服務(wù)詳情 GET /nacos/v1/ns/service
查詢服務(wù)列表 GET /nacos/v1/ns/service/list
查詢系統(tǒng)數(shù)據(jù)指標(biāo) GET /nacos/v1/ns/operator/metrics
查詢集群服務(wù)器列表 GET /nacos/v1/ns/operator/servers
查詢集群當(dāng)前Leader GET /nacos/v1/ns/raft/leader
更新實(shí)例健康狀態(tài) PUT /nacos/v1/ns/health/instance
批量更新實(shí)例元數(shù)據(jù) PUT /nacos/v1/ns/instance/metadata/batch
命名空間 查詢命名空間列表 GET /nacos/v1/console/namespaces
創(chuàng)建命名空間 POST /nacos/v1/console/namespaces
修改命名空間 PUT /nacos/v1/console/namespaces
刪除命名空間 DELETE /nacos/v1/console/namespaces

2.2、JAVA集成Nacos的SDK

Maven依賴

<dependency>
    <groupId>com.alibaba.nacos</groupId>
    <artifactId>nacos-client</artifactId>
    <version>${version}</version>
</dependency>

2.2.1、配置管理

和配置相關(guān)功能都定義在ConfigService接口中,根據(jù)NacosFactory可以創(chuàng)建ConfigService對(duì)象,調(diào)用ConfigService相關(guān)方法就可對(duì)配置文件進(jìn)行增刪改查或監(jiān)聽配置更新,ConfigService相關(guān)方法定義如下:

public interface ConfigService {

        /**
         * 獲取配置
         */
        String getConfig(String dataId, String group, long timeoutMs) throws NacosException;

        /**
         * 獲取配置并添加監(jiān)聽器監(jiān)聽配置變更
         */
        String getConfigAndSignListener(String dataId, String group, long timeoutMs, Listener listener)
                throws NacosException;

        /**
         * 添加監(jiān)聽器監(jiān)聽配置變更
         */
        void addListener(String dataId, String group, Listener listener) throws NacosException;

        /**
         * 發(fā)布配置
         */
        boolean publishConfig(String dataId, String group, String content) throws NacosException;

        /**
         * 發(fā)布指定類型的配置,如yml、xml、properties、json等
         */
        boolean publishConfig(String dataId, String group, String content, String type) throws NacosException;

        /**
         * 刪除配置
         */
        boolean removeConfig(String dataId, String group) throws NacosException;

        /**
         * 刪除監(jiān)聽器
         */
        void removeListener(String dataId, String group, Listener listener);

        /**
         * 獲取服務(wù)器狀態(tài)
         */
        String getServerStatus();

        /**
         * 關(guān)閉服務(wù)
         */
        void shutDown() throws NacosException;
    }

ConfigService測(cè)試案例代碼如下:

public static void main(String[] args) throws NacosException {
        /** 配置管理服務(wù)*/
        String nacosServer = "localhost:8848";
        ConfigService configService = NacosFactory.createConfigService(nacosServer);

        String dataId = "db.config";
        String group = "lucky";
        /** 1.發(fā)布配置*/
        String configContent = "";
        configService.publishConfig(dataId, group, configContent);
        /** 2.獲取配置*/
        String config = configService.getConfig(dataId, group, 5000);
        /** 3.添加配置更新監(jiān)聽器*/
        configService.addListener(dataId, group, new Listener() {
            @Override
            public Executor getExecutor() {
                return null;
            }

            @Override
            public void receiveConfigInfo(String configInfo) {
                System.out.println("監(jiān)聽配置更新:" + configInfo);
                //TODO 處理配置更新
            }
        });
        while (true){

        }
    }

2.2.2、服務(wù)管理

服務(wù)管理相關(guān)功能都由NamingService接口定義,根據(jù)NacosFactory可以獲取NamingService實(shí)例,NamingService包含服務(wù)注冊(cè)、訂閱等相關(guān)方法,定義如下:

public interface NamingService {

        /**
         *  注冊(cè)服務(wù)實(shí)例
         */
        void registerInstance(String serviceName, String ip, int port) throws NacosException;

        void registerInstance(String serviceName, String groupName, String ip, int port) throws NacosException;

        void registerInstance(String serviceName, String ip, int port, String clusterName) throws NacosException;

        void registerInstance(String serviceName, String groupName, String ip, int port, String clusterName) throws NacosException;

        void registerInstance(String serviceName, Instance instance) throws NacosException;

        void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException;

        /**
         * 注銷服務(wù)實(shí)例
         */
        void deregisterInstance(String serviceName, String ip, int port) throws NacosException;

        void deregisterInstance(String serviceName, String groupName, String ip, int port) throws NacosException;

        void deregisterInstance(String serviceName, String ip, int port, String clusterName) throws NacosException;

        void deregisterInstance(String serviceName, String groupName, String ip, int port, String clusterName) throws NacosException;

        void deregisterInstance(String serviceName, Instance instance) throws NacosException;

        void deregisterInstance(String serviceName, String groupName, Instance instance) throws NacosException;

        /**
         * 根據(jù)條件獲取服務(wù)實(shí)例列表
         */
        List<Instance> getAllInstances(String serviceName) throws NacosException;

        List<Instance> getAllInstances(String serviceName, String groupName) throws NacosException;

        List<Instance> getAllInstances(String serviceName, boolean subscribe) throws NacosException;

        List<Instance> getAllInstances(String serviceName, String groupName, boolean subscribe) throws NacosException;

        List<Instance> getAllInstances(String serviceName, List<String> clusters) throws NacosException;

        List<Instance> getAllInstances(String serviceName, String groupName, List<String> clusters) throws NacosException;

        List<Instance> getAllInstances(String serviceName, List<String> clusters, boolean subscribe) throws NacosException;

        List<Instance> getAllInstances(String serviceName, String groupName, List<String> clusters, boolean subscribe) throws NacosException;

        /**
         * 根據(jù)條件選擇服務(wù)實(shí)例列表
         */
        List<Instance> selectInstances(String serviceName, boolean healthy) throws NacosException;

        List<Instance> selectInstances(String serviceName, String groupName, boolean healthy) throws NacosException;

        List<Instance> selectInstances(String serviceName, boolean healthy, boolean subscribe) throws NacosException;

        List<Instance> selectInstances(String serviceName, String groupName, boolean healthy, boolean subscribe) throws NacosException;

        List<Instance> selectInstances(String serviceName, List<String> clusters, boolean healthy) throws NacosException;

        List<Instance> selectInstances(String serviceName, String groupName, List<String> clusters, boolean healthy) throws NacosException;

        List<Instance> selectInstances(String serviceName, List<String> clusters, boolean healthy, boolean subscribe) throws NacosException;

        List<Instance> selectInstances(String serviceName, String groupName, List<String> clusters, boolean healthy, boolean subscribe) throws NacosException;

        /**
         * 根據(jù)條件以及負(fù)載均衡策略選擇一個(gè)健康的服務(wù)實(shí)例
         */
        Instance selectOneHealthyInstance(String serviceName) throws NacosException;

        Instance selectOneHealthyInstance(String serviceName, String groupName) throws NacosException;

        Instance selectOneHealthyInstance(String serviceName, boolean subscribe) throws NacosException;

        Instance selectOneHealthyInstance(String serviceName, String groupName, boolean subscribe) throws NacosException;

        Instance selectOneHealthyInstance(String serviceName, List<String> clusters) throws NacosException;

        Instance selectOneHealthyInstance(String serviceName, String groupName, List<String> clusters) throws NacosException;

        Instance selectOneHealthyInstance(String serviceName, List<String> clusters, boolean subscribe) throws NacosException;

        Instance selectOneHealthyInstance(String serviceName, String groupName, List<String> clusters, boolean subscribe) throws NacosException;

        /**
         * 訂閱服務(wù),并開啟Listener監(jiān)聽服務(wù)變更事件
         */
        void subscribe(String serviceName, EventListener listener) throws NacosException;

        void subscribe(String serviceName, String groupName, EventListener listener) throws NacosException;

        void subscribe(String serviceName, List<String> clusters, EventListener listener) throws NacosException;

        void subscribe(String serviceName, String groupName, List<String> clusters, EventListener listener)
                throws NacosException;

        /**
         * 取消訂閱服務(wù),并關(guān)閉Listener監(jiān)聽服務(wù)變更事件
         */
        void unsubscribe(String serviceName, EventListener listener) throws NacosException;

        void unsubscribe(String serviceName, String groupName, EventListener listener) throws NacosException;

        void unsubscribe(String serviceName, List<String> clusters, EventListener listener) throws NacosException;

        void unsubscribe(String serviceName, String groupName, List<String> clusters, EventListener listener)
                throws NacosException;

        /**
         * 根據(jù)條件獲取所有服務(wù)名稱列表
         */
        ListView<String> getServicesOfServer(int pageNo, int pageSize) throws NacosException;

        ListView<String> getServicesOfServer(int pageNo, int pageSize, String groupName) throws NacosException;

        ListView<String> getServicesOfServer(int pageNo, int pageSize, AbstractSelector selector) throws NacosException;

        ListView<String> getServicesOfServer(int pageNo, int pageSize, String groupName, AbstractSelector selector) throws NacosException;

        /**
         * 獲取當(dāng)前客戶端訂閱的服務(wù)列表
         */
        List<ServiceInfo> getSubscribeServices() throws NacosException;

        /**
         * 獲取服務(wù)器狀態(tài)
         */
        String getServerStatus();

        /**
         * 關(guān)閉服務(wù)器
         */
        void shutDown() throws NacosException;
    }

NamingService測(cè)試案例代碼如下:

public static void main(String[] args) throws NacosException {
        String serverAddr = "42.192.94.208:8858";
        /** 1.創(chuàng)建NamingService實(shí)例 */
        NamingService namingService = NacosFactory.createNamingService(serverAddr);
        /** 2.注冊(cè)實(shí)例*/
        namingService.registerInstance("testService", "localhost", 8080);
        /** 3.注銷實(shí)例*/
        namingService.deregisterInstance("testService", "localhost", 8080);
        /** 4.獲取所有健康實(shí)例*/
        List<Instance> instances = namingService.selectInstances("testService", true);
        /** 5.監(jiān)聽服務(wù)變化*/
        namingService.subscribe("testService", new EventListener() {
            @Override
            public void onEvent(Event event) {
                System.out.println("處理服務(wù)變更事件");
                if(event instanceof NamingEvent){
                    //TODO
                }
            }
        });
        while (true){

        }
    }

2.3、dubbo集成Nacos注冊(cè)中心

dubbo采用Nacos作為注冊(cè)中心,只需要在配置注冊(cè)中心時(shí)將地址改成nacos地址即可,如下:

XML配置

<!-- nacos地址 -->
<dubbo:registry address="nacos://127.0.0.1:8848" />

外部配置

## dubbo注冊(cè)中心地址
dubbo.registry.address = zookeeper://10.20.153.10:2181

2.4、SpringBoot集成Nacos配置中心

添加nacos依賴

<dependency>
         <groupId>com.alibaba.boot</groupId>
         <artifactId>nacos-config-spring-boot-starter</artifactId>
         <version>0.2.1</version>
</dependency>

版本號(hào)0.2.x.RELEASE對(duì)應(yīng)的是 Spring Boot 2.x 版本,版本0.1.x.RELEASE對(duì)應(yīng)的是 Spring Boot 1.x 版本

在application.properties配置文件中添加nacos地址配置

nacos.config.server-addr=127.0.0.1:8848

在SpringBoot啟動(dòng)類添加@NacosProperySource注解添加Nacos配置來(lái)源,autoRefreshed表示是否自動(dòng)更新

@NacosPropertySource(dataId = "db.config", autoRefreshed = true)

通過nacos的@NacosValue注解給變量賦值配置的值,autoRefreshed表示是否自動(dòng)更新,如:

    @NacosValue(value = "${db.username:tempUser}", autoRefreshed = true)
    private String dbUser;

    @NacosValue(value = "${db.password:tempPassword}")
    private String dbPassword;

三、Nacos實(shí)現(xiàn)原理

3.1、配置中心實(shí)現(xiàn)原理

Nacos提供了大量的配置管理相關(guān)API供客戶端調(diào)用,客戶端可以很方便的調(diào)用API來(lái)進(jìn)行配置管理。所以Nacos Client啟動(dòng)的時(shí)候只需要調(diào)用Nacos server的接口就可以獲取到所有的配置。

所以客戶端獲取配置的重點(diǎn)是如何進(jìn)行熱更新,也就是當(dāng)服務(wù)端配置更新后,客戶端是如何根據(jù)監(jiān)聽器進(jìn)行實(shí)時(shí)更新的,監(jiān)聽器又是如何實(shí)現(xiàn)的呢?首先就需要從ConfigService的addListener方法入手。

ConfigService接口的實(shí)現(xiàn)類是NacosConfigService,addListener方法源碼如下:

 1 private final ClientWorker worker;
 2 
 3     /** NacosConfigService類 添加配置更新監(jiān)聽器方法
 4      * @param dataId : 配置集
 5      * @param group : 配置分組
 6      * @param listener : 配置更新監(jiān)聽器
 7      *  */
 8     public void addListener(String dataId, String group, Listener listener) throws NacosException {
 9         //調(diào)用ClientWorker對(duì)象方法
10         worker.addTenantListeners(dataId, group, Arrays.asList(listener));
11     }
12 
13     //Http客戶端
14     private final HttpAgent agent;
15 
16     /** ClientWorker類 添加監(jiān)聽器方法 */
17     public void addTenantListeners(String dataId, String group, List<? extends Listener> listeners) throws NacosException {
18         group = null2defaultGroup(group);
19         String tenant = agent.getTenant();
20         CacheData cache = addCacheDataIfAbsent(dataId, group, tenant);
21         for (Listener listener : listeners) {
22             /** 調(diào)用CacheData對(duì)象的addListener方法*/
23             cache.addListener(listener);
24         }
25     }
/** CacheData類 監(jiān)聽器列表*/
    private final CopyOnWriteArrayList<ManagerListenerWrap> listeners;

    /**
     * CacheData類 添加監(jiān)聽器
     * */
    public void addListener(Listener listener) {
        if (null == listener) {
            throw new IllegalArgumentException("listener is null");
        }
        /** 包裝Listener*/
        ManagerListenerWrap wrap = (listener instanceof AbstractConfigChangeListener) ? new ManagerListenerWrap(listener, md5, content)
                        : new ManagerListenerWrap(listener, md5);

        /** 將監(jiān)聽器添加到列表中*/
        if (listeners.addIfAbsent(wrap)) {
            LOGGER.info("[{}] [add-listener] ok, tenant={}, dataId={}, group={}, cnt={}", name, tenant, dataId, group,
                    listeners.size());
        }
    }

邏輯并不復(fù)雜,最終是將Listener對(duì)象進(jìn)行封裝并添加到了CacheData對(duì)象的listeners列表中存儲(chǔ)起來(lái)。既然有地方存了,那么就需要有地方去讀,而開啟監(jiān)聽是通過ClientWorker實(shí)例來(lái)實(shí)現(xiàn)。

NacosConfigService初始化時(shí),會(huì)初始化ClientWorker對(duì)象,ClientWorker構(gòu)造函數(shù)如下:

/** ClientWorker構(gòu)造函數(shù) */
    public ClientWorker(final HttpAgent agent, final ConfigFilterChainManager configFilterChainManager,
                        final Properties properties) {
        this.agent = agent;
        this.configFilterChainManager = configFilterChainManager;

        /** 1.初始化配置*/
        init(properties);

        /** 2.創(chuàng)建定時(shí)任務(wù)線程池*/
        this.executor = Executors.newScheduledThreadPool(1, new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread t = new Thread(r);
                t.setName("com.alibaba.nacos.client.Worker." + agent.getName());
                t.setDaemon(true);
                return t;
            }
        });

        /** 3.創(chuàng)建定時(shí)任務(wù)線程池*/
        this.executorService = Executors
                .newScheduledThreadPool(Runtime.getRuntime().availableProcessors(), new ThreadFactory() {
                    @Override
                    public Thread newThread(Runnable r) {
                        Thread t = new Thread(r);
                        t.setName("com.alibaba.nacos.client.Worker.longPolling." + agent.getName());
                        t.setDaemon(true);
                        return t;
                    }
                });

        /** 4.開啟定時(shí)任務(wù),10毫秒執(zhí)行一次*/
        this.executor.scheduleWithFixedDelay(new Runnable() {
            @Override
            public void run() {
                try {
                    /** 5.檢測(cè)配置信息*/
                    checkConfigInfo();
                } catch (Throwable e) {
                    LOGGER.error("[" + agent.getName() + "] [sub-check] rotate check error", e);
                }
            }
        }, 1L, 10L, TimeUnit.MILLISECONDS);
    }

ClientWorker初始化時(shí)會(huì)創(chuàng)建兩個(gè)定時(shí)任務(wù)線程池,一個(gè)只有一個(gè)線程每10毫秒執(zhí)行一次checkConfigInfo方法,而另一個(gè)線程池就是專門用來(lái)處理checkConfigInfo方法內(nèi)部的檢查配置的邏輯,源碼如下:

/** ClientWorker檢查配置信息方法*/
    public void checkConfigInfo() {
        /** 1.獲取CacheData對(duì)象,key是dataId*/
        int listenerSize = cacheMap.size();
        int longingTaskCount = (int) Math.ceil(listenerSize / ParamUtil.getPerTaskConfigSize());
        if (longingTaskCount > currentLongingTaskCount) {
            for (int i = (int) currentLongingTaskCount; i < longingTaskCount; i++) {
                /** 2.線程池執(zhí)行LongPollingRunnable任務(wù)*/
                executorService.execute(new LongPollingRunnable(i));
            }
            currentLongingTaskCount = longingTaskCount;
        }
    }

checkConfigInfo方法實(shí)際就是向定時(shí)任務(wù)線程池中提交一個(gè)長(zhǎng)輪訓(xùn)任務(wù)LongPollingRunnable,該任務(wù)執(zhí)行邏輯如下:

/** LongPollingRunnable線程執(zhí)行邏輯 */
    public void run() {
        List<CacheData> cacheDatas = new ArrayList<CacheData>();
        List<String> inInitializingCacheList = new ArrayList<String>();
        try {
            //遍歷所有CacheData
            for (CacheData cacheData : cacheMap.values()) {
                if (cacheData.getTaskId() == taskId) {
                    cacheDatas.add(cacheData);
                    try {
                        /** 檢查CacheData的本地配置*/
                        checkLocalConfig(cacheData);
                        if (cacheData.isUseLocalConfigInfo()) {
                            cacheData.checkListenerMd5();
                        }
                    } catch (Exception e) {
                        LOGGER.error("get local config info error", e);
                    }
                }
            }

            // 校驗(yàn)服務(wù)器配置,檢查需要更新的DataId
            List<String> changedGroupKeys = checkUpdateDataIds(cacheDatas, inInitializingCacheList);
            if (!CollectionUtils.isEmpty(changedGroupKeys)) {
                LOGGER.info("get changedGroupKeys:" + changedGroupKeys);
            }

            /** 遍歷所有更新的配置分組key*/
            for (String groupKey : changedGroupKeys) {
                String[] key = GroupKey.parseKey(groupKey);
                String dataId = key[0];
                String group = key[1];
                String tenant = null;
                if (key.length == 3) {
                    tenant = key[2];
                }
                try {
                    /** 獲取服務(wù)器配置 */
                    String[] ct = getServerConfig(dataId, group, tenant, 3000L);
                    CacheData cache = cacheMap.get(GroupKey.getKeyTenant(dataId, group, tenant));
                    /** 更新服務(wù)器配置*/
                    cache.setContent(ct[0]);
                    if (null != ct[1]) {
                        cache.setType(ct[1]);
                    }
                    LOGGER.info("[{}] [data-received] dataId={}, group={}, tenant={}, md5={}, content={}, type={}", agent.getName(), dataId, group, tenant, cache.getMd5(), ContentUtils.truncateContent(ct[0]), ct[1]);
                } catch (NacosException ioe) {
                    String message = String.format("[%s] [get-update] get changed config exception. dataId=%s, group=%s, tenant=%s",
                                    agent.getName(), dataId, group, tenant);
                    LOGGER.error(message, ioe);
                }
            }
            for (CacheData cacheData : cacheDatas) {
                if (!cacheData.isInitializing() || inInitializingCacheList
                        .contains(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant))) {
                    /** 校驗(yàn)配置的MD5*/
                    cacheData.checkListenerMd5();
                    cacheData.setInitializing(false);
                }
            }
            inInitializingCacheList.clear();
            executorService.execute(this);

        } catch (Throwable e) {
            LOGGER.error("longPolling error : ", e);
            executorService.schedule(this, taskPenaltyTime, TimeUnit.MILLISECONDS);
        }
    }
}

首先是檢查本地配置,所以及時(shí)服務(wù)器崩潰了,nacos客戶端也可以保證可以使用本地配置,本地配置存儲(chǔ)在~nacos/config/目錄下,檢查完本地配置之后,再查詢服務(wù)器配置,然后和本地配置進(jìn)行比較的到需要更新的配置,將最新的配置寫入本地。

最后執(zhí)行CacheData的checkListenerMd5()方法,該方法作用是比較配置文件的MD5加密數(shù)據(jù)是否一致,如果不一致則表示更新過,那么就需要觸發(fā)監(jiān)聽器的回調(diào),源碼如下:

 1 /** CacheData類*/
 2     void checkListenerMd5() {
 3         for (ManagerListenerWrap wrap : listeners) {
 4             //比較MD5加密數(shù)據(jù)是否一致
 5             if (!md5.equals(wrap.lastCallMd5)) {
 6                 /** 回調(diào)Listener*/
 7                 safeNotifyListener(dataId, group, content, type, md5, wrap);
 8             }
 9         }
10     }
11 
12     private void safeNotifyListener(final String dataId, final String group, final String content, final String type,
13                                     final String md5, final ManagerListenerWrap listenerWrap) {
14         final Listener listener = listenerWrap.listener;
15 
16         Runnable job = new Runnable() {
17             @Override
18             public void run() {
19                 ClassLoader myClassLoader = Thread.currentThread().getContextClassLoader();
20                 ClassLoader appClassLoader = listener.getClass().getClassLoader();
21                 try {
22                     if (listener instanceof AbstractSharedListener) {
23                         AbstractSharedListener adapter = (AbstractSharedListener) listener;
24                         adapter.fillContext(dataId, group);
25                         LOGGER.info("[{}] [notify-context] dataId={}, group={}, md5={}", name, dataId, group, md5);
26                     }
27                     // 執(zhí)行回調(diào)之前先將線程classloader設(shè)置為具體webapp的classloader,以免回調(diào)方法中調(diào)用spi接口是出現(xiàn)異?;蝈e(cuò)用(多應(yīng)用部署才會(huì)有該問題)。
28                     Thread.currentThread().setContextClassLoader(appClassLoader);
29 
30                     ConfigResponse cr = new ConfigResponse();
31                     cr.setDataId(dataId);
32                     cr.setGroup(group);
33                     cr.setContent(content);
34                     configFilterChainManager.doFilter(null, cr);
35                     String contentTmp = cr.getContent();
36                     /** 回調(diào)執(zhí)行Listener的receiveConfigInfo方法 */
37                     listener.receiveConfigInfo(contentTmp);
38 
39                     // compare lastContent and content
40                     if (listener instanceof AbstractConfigChangeListener) {
41                         Map data = ConfigChangeHandler.getInstance()
42                                 .parseChangeData(listenerWrap.lastContent, content, type);
43                         ConfigChangeEvent event = new ConfigChangeEvent(data);
44                         ((AbstractConfigChangeListener) listener).receiveConfigChange(event);
45                         listenerWrap.lastContent = content;
46                     }
47 
48                     listenerWrap.lastCallMd5 = md5;
49                     LOGGER.info("[{}] [notify-ok] dataId={}, group={}, md5={}, listener={} ", name, dataId, group, md5,
50                             listener);
51                 } catch (NacosException ex) {
52                     LOGGER.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} errCode={} errMsg={}",
53                             name, dataId, group, md5, listener, ex.getErrCode(), ex.getErrMsg());
54                 } catch (Throwable t) {
55                     LOGGER.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} tx={}", name, dataId,
56                             group, md5, listener, t.getCause());
57                 } finally {
58                     Thread.currentThread().setContextClassLoader(myClassLoader);
59                 }
60             }
61         };
62 
63         final long startNotify = System.currentTimeMillis();
64         try {
65             if (null != listener.getExecutor()) {
66                 listener.getExecutor().execute(job);
67             } else {
68                 job.run();
69             }
70         } catch (Throwable t) {
71             LOGGER.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} throwable={}", name, dataId,
72                     group, md5, listener, t.getCause());
73         }
74         final long finishNotify = System.currentTimeMillis();
75         LOGGER.info("[{}] [notify-listener] time cost={}ms in ClientWorker, dataId={}, group={}, md5={}, listener={} ",
76                 name, (finishNotify - startNotify), dataId, group, md5, listener);
77     }

當(dāng)比較更新完的配置和之前的配置不一樣時(shí),就會(huì)觸發(fā)監(jiān)聽器Listener的回調(diào),執(zhí)行Listener的receiveConfigInfo方法

總結(jié):

Nacos配置中心采用的是客戶端pull的方式從nacos服務(wù)器獲取配置數(shù)據(jù),并且沒有和nacos服務(wù)器保持長(zhǎng)連接,而是以定時(shí)任務(wù)執(zhí)行HTTP請(qǐng)求的方式從Nacos服務(wù)器獲取最新配置,然后再刷新到本地存儲(chǔ),最后再觸發(fā)監(jiān)聽器Listener的回調(diào)方法。

所以Nacos客戶端的監(jiān)聽器的通知并不是nacos服務(wù)器主動(dòng)推送過來(lái)的,而是nacos客戶端本地輪訓(xùn)查詢發(fā)現(xiàn)了配置變更之后才觸發(fā)的回調(diào)。另外nacos客戶端本地采用了線程池方式拉取配置,所以不會(huì)影響核心業(yè)務(wù)線程。

3.2、服務(wù)管理實(shí)現(xiàn)原理

nacos提供了大量關(guān)于服務(wù)發(fā)布和訂閱的API,作為Nacos客戶端,無(wú)論是服務(wù)提供者還是服務(wù)消費(fèi)者,只需要在啟動(dòng)時(shí)調(diào)用nacos的API即可完成服務(wù)發(fā)布和服務(wù)訂閱功能。但是作為注冊(cè)中心,還需要有服務(wù)實(shí)例健康檢查功能,服務(wù)消費(fèi)者實(shí)時(shí)監(jiān)聽服務(wù)提供者變化的

通知功能。而服務(wù)訂閱的監(jiān)聽邏輯和nacos配置的變更監(jiān)聽流程基本上相同,訂閱功能主要由subscribe方法實(shí)現(xiàn),NamingService實(shí)現(xiàn)類是NacosNamingService,初始化時(shí)會(huì)執(zhí)行init方法,初始化服務(wù)器代理serverProxy,心跳處理器beatReactor,host處理器

hostReactor等對(duì)象,服務(wù)訂閱方法subscribe方法邏輯如下:

    private HostReactor hostReactor;

    private BeatReactor beatReactor;

    private NamingProxy serverProxy;

    /** NacosNamingService初始化方法 */
    private void init(Properties properties) throws NacosException {
        ValidatorUtils.checkInitParam(properties);
        this.namespace = InitUtils.initNamespaceForNaming(properties);
        InitUtils.initSerialization();
        initServerAddr(properties);
        InitUtils.initWebRootContext(properties);
        initCacheDir();
        initLogName(properties);

        this.serverProxy = new NamingProxy(this.namespace, this.endpoint, this.serverList, properties);
        this.beatReactor = new BeatReactor(this.serverProxy, initClientBeatThreadCount(properties));
        this.hostReactor = new HostReactor(this.serverProxy, beatReactor, this.cacheDir, isLoadCacheAtStart(properties),
                isPushEmptyProtect(properties), initPollingThreadCount(properties));
    }

    /** NacosNamingService服務(wù)訂閱方法 */
    public void subscribe(String serviceName, String groupName, List<String> clusters, EventListener listener)
            throws NacosException {
        hostReactor.subscribe(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ","),
                listener);
    }

    /** HostReactor的服務(wù)訂閱方法,并開啟監(jiān)聽器*/
    public void subscribe(String serviceName, String clusters, EventListener eventListener) {
        /** 1.注冊(cè)監(jiān)聽器,存入InstanceChangeNotifier對(duì)象的Map中,key是服務(wù)名稱和集群,value是監(jiān)聽器集合 */
        notifier.registerListener(serviceName, clusters, eventListener);
        /** 2.根據(jù)服務(wù)名稱獲取服務(wù)器信息 */
        getServiceInfo(serviceName, clusters);
    }

方法執(zhí)行到HostReactor對(duì)象的subscribe方法,首先是將監(jiān)聽器存入InstanceChangeNotifier對(duì)象的Map中,根據(jù)服務(wù)名稱和集群名稱作為key存儲(chǔ),value是監(jiān)聽器的集合,存儲(chǔ)起來(lái)之后調(diào)用getServiceInfo方法從nacos服務(wù)器獲取服務(wù)實(shí)例信息,邏輯如下:

 1 /** HostReactor類 獲取服務(wù)實(shí)例信息方法 */
 2     public ServiceInfo getServiceInfo(final String serviceName, final String clusters) {
 3         String key = ServiceInfo.getKey(serviceName, clusters);
 4         if (failoverReactor.isFailoverSwitch()) {
 5             return failoverReactor.getService(key);
 6         }
 7         /** 從本地緩存中獲取ServiceInfo對(duì)象 */
 8         ServiceInfo serviceObj = getServiceInfo0(serviceName, clusters);
 9 
10         if (null == serviceObj) {// 如果本地緩存中沒有服務(wù)實(shí)例
11             serviceObj = new ServiceInfo(serviceName, clusters);
12             serviceInfoMap.put(serviceObj.getKey(), serviceObj);
13             updatingMap.put(serviceName, new Object());
14             /** 立即更新服務(wù)實(shí)例*/
15             updateServiceNow(serviceName, clusters);
16             updatingMap.remove(serviceName);
17 
18         } else if (updatingMap.containsKey(serviceName)) {//判斷當(dāng)前服務(wù)實(shí)例是否正在更新
19             if (UPDATE_HOLD_INTERVAL > 0) {
20                 synchronized (serviceObj) {
21                     try {
22                         serviceObj.wait(UPDATE_HOLD_INTERVAL);
23                     } catch (InterruptedException e) {
24                         NAMING_LOGGER
25                                 .error("[getServiceInfo] serviceName:" + serviceName + ", clusters:" + clusters, e);
26                     }
27                 }
28             }
29         }
30 
31         /** 定時(shí)更新服務(wù)實(shí)例信息 */
32         scheduleUpdateIfAbsent(serviceName, clusters);
33         return serviceInfoMap.get(serviceObj.getKey());
34     }

核心邏輯是先從本地獲取服務(wù)實(shí)例信息,如果不存在那么立即執(zhí)行updateServiceNow方法進(jìn)行更新;如果已經(jīng)存在那么先執(zhí)行scheuleUpdateIfAbsent方法定時(shí)更新。updateServiceNow方法也就是當(dāng)前線程立即更新服務(wù)實(shí)例,執(zhí)行了updateService方法,

而定時(shí)更新邏輯是先構(gòu)建一個(gè)UpdateTask,然后提交給線程池來(lái)執(zhí)行,定時(shí)每1秒執(zhí)行一次,邏輯如下:

/** HostReactor類 */
    public void scheduleUpdateIfAbsent(String serviceName, String clusters) {
        if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) {
            return;
        }

        synchronized (futureMap) {
            if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) {
                return;
            }

            /** 創(chuàng)建UpdateTask,并添加定時(shí)任務(wù) */
            ScheduledFuture<?> future = addTask(new UpdateTask(serviceName, clusters));
            futureMap.put(ServiceInfo.getKey(serviceName, clusters), future);
        }
    }

    /** HostReactor類添加任務(wù)*/
    public synchronized ScheduledFuture<?> addTask(UpdateTask task) {
        /** 線程池執(zhí)行,每1秒執(zhí)行一次*/
        return executor.schedule(task, DEFAULT_DELAY, TimeUnit.MILLISECONDS);
    }

所以更新的邏輯主要在UpdateTask執(zhí)行體類,且邏輯肯定包含了updateService方法的邏輯,源碼核心邏輯如下:

/** HostReactor 更新服務(wù)實(shí)例方法 */
    public void updateService(String serviceName, String clusters) throws NacosException {
        /** 1.從本地獲取舊的服務(wù)實(shí)例 */
        ServiceInfo oldService = getServiceInfo0(serviceName, clusters);
        try {
            /** 2.從服務(wù)器查詢最新服務(wù)實(shí)例列表 */
            String result = serverProxy.queryList(serviceName, clusters, pushReceiver.getUdpPort(), false);

            if (StringUtils.isNotEmpty(result)) {
                /** 3.刷新本地緩存 */
                processServiceJson(result);
            }
        } finally {
            if (oldService != null) {
                synchronized (oldService) {
                    oldService.notifyAll();
                }
            }
        }
    }

    /** UpdateTask 執(zhí)行體*/
    public void run() {
        long delayTime = DEFAULT_DELAY;
        try {
            /** 1.從緩存中獲取服務(wù)實(shí)例*/
            ServiceInfo serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));
            if (serviceObj == null) {
                /** 2.如果緩存中沒有,則執(zhí)行updateService方法查詢*/
                updateService(serviceName, clusters);
                return;
            }
            /** 2.如果本地服務(wù)實(shí)例更新時(shí)間延遲,那么就執(zhí)行updateService方法刷新*/
            if (serviceObj.getLastRefTime() <= lastRefTime) {
                updateService(serviceName, clusters);
                serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));
            } else {
                refreshOnly(serviceName, clusters);
            }

            lastRefTime = serviceObj.getLastRefTime();

            if (!notifier.isSubscribed(serviceName, clusters) && !futureMap
                    .containsKey(ServiceInfo.getKey(serviceName, clusters))) {
                // abort the update task
                NAMING_LOGGER.info("update task is stopped, service:" + serviceName + ", clusters:" + clusters);
                return;
            }
            if (CollectionUtils.isEmpty(serviceObj.getHosts())) {
                /** 3.如果查詢失敗,那么失敗次數(shù)自增*/
                incFailCount();
                return;
            }
            delayTime = serviceObj.getCacheMillis();
            /** 4.如果查詢成功,那么重置失敗次數(shù)*/
            resetFailCount();
        } catch (Throwable e) {
            incFailCount();
            NAMING_LOGGER.warn("[NA] failed to update serviceName: " + serviceName, e);
        } finally {
            /** 5.提交下一次延遲任務(wù)*/
            executor.schedule(this, Math.min(delayTime << failCount, DEFAULT_DELAY * 60), TimeUnit.MILLISECONDS);
        }
    }

可以發(fā)現(xiàn)更新邏輯就是執(zhí)行updateService方法,首先從服務(wù)器查詢最新的服務(wù)實(shí)例列表,然后將查詢結(jié)果刷新到本地緩存中,然后開啟下一次定時(shí)任務(wù)繼續(xù)執(zhí)行。默認(rèn)是1秒鐘執(zhí)行一次,如果查詢不到任何記錄(服務(wù)器異?;驘o(wú)可用實(shí)例),那么就增加失敗次數(shù),每

增加一次失敗次數(shù)延遲執(zhí)行時(shí)間就翻倍,最長(zhǎng)會(huì)1分鐘執(zhí)行一次。

另外當(dāng)執(zhí)行updateService方法刷新服務(wù)實(shí)例時(shí),如果觸發(fā)了服務(wù)更新,就需要更新本地緩存并且寫入磁盤的持久化文件中保持,并且還會(huì)調(diào)用NotifyCenter的publishEvent方法發(fā)布服務(wù)實(shí)例變更事件,邏輯如下:

/** HostReactor 處理查詢服務(wù)實(shí)例結(jié)果方法*/
    public ServiceInfo processServiceJson(String json) {
        //......
        boolean changed = false;
        if (oldService != null) {
            //......
        } else {
            changed = true;
            /** 刷新內(nèi)存中緩存*/
            serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);
            /** 發(fā)布服務(wù)實(shí)例變更事件*/
            NotifyCenter.publishEvent(new InstancesChangeEvent(serviceInfo.getName(), serviceInfo.getGroupName(), serviceInfo.getClusters(), serviceInfo.getHosts()));
            serviceInfo.setJsonFromServer(json);
            /** 寫入磁盤本地?cái)?shù)據(jù)*/
            DiskCache.write(serviceInfo, cacheDir);
        }
        //......
        return serviceInfo;
    }

    /** NotifyCenter 發(fā)布事件方法*/
    public static boolean publishEvent(Event event) {
        try {
            return publishEvent(event.getClass(), event);
        } catch (Throwable var2) {
            LOGGER.error("There was an exception to the message publishing : {}", var2);
            return false;
        }
    }
    private static boolean publishEvent(final Class<? extends Event> eventType, final Event event) {
        if (ClassUtils.isAssignableFrom(SlowEvent.class, eventType)) {
            return INSTANCE.sharePublisher.publish(event);
        }

        final String topic = ClassUtils.getCanonicalName(eventType);
        EventPublisher publisher = INSTANCE.publisherMap.get(topic);
        if (publisher != null) {
            /** 執(zhí)行EventPublisher對(duì)象publish方法*/
            return publisher.publish(event);
        }
        LOGGER.warn("There are no [{}] publishers for this event, please register", topic);
        return false;
    }

實(shí)際是調(diào)用了EventPublisher對(duì)象的publish方法,默認(rèn)實(shí)現(xiàn)是DefaultPublisher類,DefaultPublisher會(huì)先將通知事件存入本地隊(duì)列,然后采用線程異步通知,邏輯如下:

 1  /** DefaultPublisher類 發(fā)布事件方法*/
 2     public boolean publish(Event event) {
 3         /** 1.檢查并開啟線程 */
 4         checkIsStart();
 5         /** 2.將事件存入隊(duì)列*/
 6         boolean success = this.queue.offer(event);
 7         if (!success) {
 8             LOGGER.warn("Unable to plug in due to interruption, synchronize sending time, event : {}", event);
 9             /** 3.如果存入隊(duì)列失敗,那么立即通知*/
10             receiveEvent(event);
11             return true;
12         }
13         return true;
14     }
15 
16     public void run() {
17         openEventHandler();
18     }
19 
20     void openEventHandler() {
21         try {
22 
23             // This variable is defined to resolve the problem which message overstock in the queue.
24             int waitTimes = 60;
25             // To ensure that messages are not lost, enable EventHandler when
26             // waiting for the first Subscriber to register
27             for (; ; ) {
28                 if (shutdown || hasSubscriber() || waitTimes <= 0) {
29                     break;
30                 }
31                 ThreadUtils.sleep(1000L);
32                 waitTimes--;
33             }
34 
35             for (; ; ) {
36                 if (shutdown) {
37                     break;
38                 }
39                 final Event event = queue.take();
40                 receiveEvent(event);
41                 UPDATER.compareAndSet(this, lastEventSequence, Math.max(lastEventSequence, event.sequence()));
42             }
43         } catch (Throwable ex) {
44             LOGGER.error("Event listener exception : {}", ex);
45         }
46     }
47 
48     void receiveEvent(Event event) {
49         final long currentEventSequence = event.sequence();
50         /** 遍歷所有訂閱者,*/
51         for (Subscriber subscriber : subscribers) {
52             // Whether to ignore expiration events
53             if (subscriber.ignoreExpireEvent() && lastEventSequence > currentEventSequence) {
54                 LOGGER.debug("[NotifyCenter] the {} is unacceptable to this subscriber, because had expire",
55                         event.getClass());
56                 continue;
57             }
58             /** 通知訂閱者,執(zhí)行訂閱者的onEvent方法 */
59             notifySubscriber(subscriber, event);
60         }
61     }

DefaultPublisher先將事件存入隊(duì)列,然后通過異步線程從隊(duì)列中取任務(wù),遍歷事件所有訂閱者,依次遍歷執(zhí)行訂閱者的onEvent方法實(shí)現(xiàn)事件回調(diào)通知。

總結(jié):

服務(wù)管理的實(shí)現(xiàn)和配置管理實(shí)現(xiàn)原理基本一致,啟動(dòng)時(shí)首先會(huì)調(diào)用Nacos服務(wù)器的HTTP接口初始化一次,并且在本地內(nèi)存中緩存一份,磁盤中持久化一份。然后開啟定時(shí)任務(wù)輪訓(xùn)查詢服務(wù)器最新數(shù)據(jù),如果數(shù)據(jù)發(fā)生變化,那么就更新內(nèi)存中緩存,重新寫入磁盤,

然后再由線程池異步遍歷所有訂閱者,回調(diào)執(zhí)行訂閱者的回調(diào)函數(shù)實(shí)現(xiàn)變更通知的邏輯。

3.3、心跳檢測(cè)

作為服務(wù)提供者,需要和nacos服務(wù)器保持心跳,服務(wù)提供者在注冊(cè)實(shí)例時(shí)會(huì)創(chuàng)建心跳任務(wù),邏輯如下:

 1 /** 服務(wù)提供者 注冊(cè)實(shí)例*/
 2     public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
 3         NamingUtils.checkInstanceIsLegal(instance);
 4         String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
 5         /** 如果實(shí)例是臨時(shí)節(jié)點(diǎn)*/
 6         if (instance.isEphemeral()) {
 7             /** 構(gòu)建心跳任務(wù)交給BeatReactor處理 */
 8             BeatInfo beatInfo = beatReactor.buildBeatInfo(groupedServiceName, instance);
 9             beatReactor.addBeatInfo(groupedServiceName, beatInfo);
10         }
11         serverProxy.registerService(groupedServiceName, groupName, instance);
12     }

調(diào)用BeatReactor的addBeatInfo方法提交心跳任務(wù)

public void addBeatInfo(String serviceName, BeatInfo beatInfo) {
        NAMING_LOGGER.info("[BEAT] adding beat: {} to beat map.", beatInfo);
        String key = buildKey(serviceName, beatInfo.getIp(), beatInfo.getPort());
        BeatInfo existBeat = null;
        //fix #1733
        if ((existBeat = dom2Beat.remove(key)) != null) {
            existBeat.setStopped(true);
        }
        dom2Beat.put(key, beatInfo);
        /** 創(chuàng)建并提交心跳定時(shí)任務(wù),默認(rèn)是5秒執(zhí)行一次*/
        executorService.schedule(new BeatTask(beatInfo), beatInfo.getPeriod(), TimeUnit.MILLISECONDS);
        MetricsMonitor.getDom2BeatSizeMonitor().set(dom2Beat.size());
    }

    /** 心跳定時(shí)任務(wù)執(zhí)行體 */
    class BeatTask implements Runnable {

        BeatInfo beatInfo;

        public BeatTask(BeatInfo beatInfo) {
            this.beatInfo = beatInfo;
        }

        @Override
        public void run() {
            if (beatInfo.isStopped()) {
                return;
            }
            long nextTime = beatInfo.getPeriod();
            try {
                /** 發(fā)送心跳給Nacos服務(wù)器
                 *  調(diào)用Nacos服務(wù)器的 /instance/beat 接口 */
                JsonNode result = serverProxy.sendBeat(beatInfo, BeatReactor.this.lightBeatEnabled);
                long interval = result.get("clientBeatInterval").asLong();
                boolean lightBeatEnabled = false;
                if (result.has(CommonParams.LIGHT_BEAT_ENABLED)) {
                    lightBeatEnabled = result.get(CommonParams.LIGHT_BEAT_ENABLED).asBoolean();
                }
                BeatReactor.this.lightBeatEnabled = lightBeatEnabled;
                if (interval > 0) {
                    nextTime = interval;
                }
                int code = NamingResponseCode.OK;
                if (result.has(CommonParams.CODE)) {
                    code = result.get(CommonParams.CODE).asInt();
                }
                if (code == NamingResponseCode.RESOURCE_NOT_FOUND) {
                    Instance instance = new Instance();
                    instance.setPort(beatInfo.getPort());
                    instance.setIp(beatInfo.getIp());
                    instance.setWeight(beatInfo.getWeight());
                    instance.setMetadata(beatInfo.getMetadata());
                    instance.setClusterName(beatInfo.getCluster());
                    instance.setServiceName(beatInfo.getServiceName());
                    instance.setInstanceId(instance.getInstanceId());
                    instance.setEphemeral(true);
                    try {
                        /** 如果返回404,那么就重新注冊(cè)實(shí)例*/
                        serverProxy.registerService(beatInfo.getServiceName(),
                                NamingUtils.getGroupName(beatInfo.getServiceName()), instance);
                    } catch (Exception ignore) {
                    }
                }
            } catch (NacosException ex) {
                NAMING_LOGGER.error("[CLIENT-BEAT] failed to send beat: {}, code: {}, msg: {}",
                        JacksonUtils.toJson(beatInfo), ex.getErrCode(), ex.getErrMsg());
            }
            /** 開啟下一次心跳定時(shí)任務(wù)*/
            executorService.schedule(new BeatTask(beatInfo), nextTime, TimeUnit.MILLISECONDS);
        }
    }

核心邏輯就是構(gòu)建心跳定時(shí)任務(wù)交給NacosNamingService的線程池,默認(rèn)每5秒發(fā)送一次心跳,實(shí)際就是調(diào)用nacos服務(wù)器的 /instance/beat接口發(fā)送心跳,心跳發(fā)送完成再開啟下一次的定時(shí)任務(wù),整體邏輯比較簡(jiǎn)單。

總結(jié):

雖然nacos實(shí)現(xiàn)了配置中心和服務(wù)發(fā)現(xiàn)、服務(wù)訂閱、健康檢測(cè)等功能,但是nacos客戶端實(shí)際上并沒有和nacos服務(wù)器保持長(zhǎng)連接,而是采用HTTP請(qǐng)求的方式來(lái)實(shí)現(xiàn)。

配置中心就是調(diào)用查詢配置HTTP接口查詢并緩存在本地,然后開啟定時(shí)任務(wù)輪訓(xùn)查詢,如果發(fā)送變更就刷新本地緩存,并觸發(fā)回調(diào)通知監(jiān)聽器;

服務(wù)發(fā)布就是調(diào)用注冊(cè)服務(wù)HTTP接口實(shí)現(xiàn)注冊(cè),然后開啟定時(shí)任務(wù)每5秒向nacos調(diào)用一次HTTP接口發(fā)送心跳數(shù)據(jù),nacos根據(jù)心跳來(lái)管理服務(wù)提供者的健康狀態(tài);

服務(wù)訂閱就是調(diào)用查詢服務(wù)HTTP接口實(shí)現(xiàn)服務(wù)訂閱并將服務(wù)實(shí)例信息緩存在本地,然后開啟定時(shí)任務(wù)輪訓(xùn)查詢并和本地?cái)?shù)據(jù)進(jìn)行比較,如果有更新那么就異步觸發(fā)回調(diào)通知所有服務(wù)訂閱者;

總結(jié)

以上是生活随笔為你收集整理的分布式中间件nacos入门解析的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。

精品日韩中文字幕 | 麻豆国产在线播放 | 久久久久国产精品免费网站 | 久久五月网 | 国产精品一区二区三区免费看 | 国产中文字幕视频在线观看 | 在线激情网 | 国偷自产中文字幕亚洲手机在线 | 久草精品视频在线观看 | 亚洲情感电影大片 | 69av视频在线观看 | 中文字幕精品一区二区三区电影 | 欧美一区二区三区特黄 | 免费观看成人网 | 色综合人人 | 日韩视频免费观看高清完整版在线 | 日韩精品中文字幕在线不卡尤物 | 国产精品久久久久久久久久新婚 | 国产精品欧美激情在线观看 | 色婷婷国产在线 | 蜜臀av在线一区二区三区 | 超碰在线免费97 | 亚洲一区视频免费观看 | 91污在线观看 | 西西4444www大胆艺术 | h视频在线看 | av在线免费观看网站 | 九七视频在线观看 | 日韩精品一区二区三区中文字幕 | 激情视频免费在线观看 | 91在线资源| www91在线观看 | 久久人人97超碰国产公开结果 | 97超碰资源总站 | 久久久久免费视频 | 三级午夜片| 精品高清美女精品国产区 | 亚洲成人午夜在线 | 欧美国产三区 | 久久少妇av | 欧美人人 | 成人在线播放免费观看 | 99热这里精品| 日韩在线播放视频 | 亚洲激情在线观看 | 三级黄在线 | 人人爽人人爽人人片av | 在线观看深夜视频 | 91av电影网 | 婷婷激情五月 | 亚洲日日日 | 91爱爱免费观看 | 久久这里只有精品首页 | 免费福利片2019潦草影视午夜 | 成人免费视频播放 | 狠狠狠色| 亚洲久草在线 | 亚洲一区二区三区毛片 | 日韩电影一区二区在线 | www色片 | 亚洲最新av网址 | 手机av永久免费 | 久久免费视频一区 | 在线欧美a | 久久婷婷久久 | 久久天天躁狠狠躁亚洲综合公司 | 日韩va亚洲va欧美va久久 | 免费av大全 | 在线观看你懂的网站 | 久久黄色精品视频 | 日韩三级久久 | 久久精品欧美一区 | 999精品在线 | 午夜久久久久 | www视频免费在线观看 | 97在线观看视频 | 成人免费在线视频 | 高清国产在线一区 | 亚洲视频免费在线 | 婷婷精品国产欧美精品亚洲人人爽 | 少妇bbw揉bbb欧美 | 日韩a级免费视频 | 天天弄天天干 | 国产午夜av | 亚洲国产午夜精品 | 国产一区二区在线观看视频 | 久久久久久久久免费 | 欧美黑吊大战白妞欧美 | 五月亚洲婷婷 | 国产精品日韩欧美一区二区 | 91精品老司机久久一区啪 | 国内精品久久久久久久久久清纯 | 日韩中文字幕免费在线观看 | 狠狠干免费 | 国产精品久久久久久久久久久免费 | 亚洲国产精品久久久久久 | 丁香六月在线 | 又黄又爽又湿又无遮挡的在线视频 | 欧亚日韩精品一区二区在线 | 最近中文字幕国语免费高清6 | 天天操天天色天天射 | 波多野结衣日韩 | av午夜电影 | 一区二区三区四区不卡 | 性色视频在线 | 999成人 | 日日夜夜草 | 最近中文字幕 | 99九九99九九九视频精品 | 久久久久国产精品厨房 | 色婷婷激情电影 | 国产精品国产三级国产不产一地 | 国产精品久久久久av免费 | 亚洲精品va | 色综合久久久久久久 | 国产精品亚州 | 国产日韩精品欧美 | 射射射av| 日韩v欧美v日本v亚洲v国产v | 97小视频| 日韩久久精品 | 日韩网站在线观看 | 黄色福利网 | 日韩在线观看三区 | 免费看一级一片 | 久久久五月天 | 国产高清在线一区 | 999成人 | 免费视频区 | 婷婷色在线资源 | 日韩久久久久久久久久久久 | 精品999在线观看 | 亚洲视频1区2区 | 亚洲精品mv在线观看 | 久免费 | 亚洲精品动漫久久久久 | 99精品视频免费看 | h视频在线看| 免费精品国产va自在自线 | 日本不卡一区二区三区在线观看 | 久久国产精品99久久久久 | 免费h在线观看 | 二区三区中文字幕 | 久久久一本精品99久久精品66 | 91精品欧美 | 欧美天天射 | 久草亚洲视频 | 丝袜美女在线 | 亚洲成av人片一区二区梦乃 | 97天堂网 | 欧美一二三区播放 | 国产精品久久久av久久久 | 天天操天天添 | 手机版av在线 | 丁香久久综合 | 亚洲国产中文字幕在线观看 | 久久国产成人午夜av影院潦草 | 一区二区国产精品 | 中文字幕色播 | 国产成人精品一区二区三区福利 | 国产一级特黄电影 | 色香蕉在线视频 | 国产成人精品综合久久久久99 | 日本精品一区二区三区在线观看 | 手机成人在线电影 | 久久久wwww | 99精品视频一区二区 | 一区二区三区免费在线观看视频 | www.久久久com | 久久久这里有精品 | 国产精品免费观看视频 | 最近最新最好看中文视频 | 中文字幕激情 | 日韩精品视频在线免费观看 | 国产精品成人一区二区 | 国产一区免费在线 | av综合站| 999色视频 | 免费在线观看成年人视频 | 超碰人人91 | 97夜夜澡人人双人人人喊 | 亚洲精品视频一 | 国产精品一区二区久久久 | 亚洲精品视频免费在线观看 | 91免费版在线观看 | 看片网站黄 | av天天在线观看 | 四虎5151久久欧美毛片 | 中文字幕成人一区 | 日韩精品一区二区三区免费观看视频 | 国产精品 中文字幕 亚洲 欧美 | 成人网页在线免费观看 | 亚洲在线色 | 国产精品美女久久久久久久久久久 | 91三级视频 | 国产精品1区2区3区在线观看 | 国产中文在线观看 | 国产人成精品一区二区三 | 91精品久久久久久久久久久久久 | av电影不卡在线 | 日韩午夜电影网 | 99色在线播放 | 超级碰碰碰碰 | 在线观看深夜视频 | 国产午夜精品一区二区三区四区 | www狠狠| 亚洲一区av | 91在线中文 | 国产小视频福利在线 | 色综合天天狠天天透天天伊人 | 国产精品成人自产拍在线观看 | 91高清在线 | 久热色超碰 | 亚洲一区视频免费观看 | 久久手机精品视频 | 亚洲午夜精品一区二区三区电影院 | 欧美在线观看视频一区二区 | 亚洲日本va午夜在线电影 | av在线小说 | 我要色综合天天 | 国产最新网站 | 国产综合婷婷 | 天天做夜夜做 | 亚洲一区二区三区在线看 | 精品在线播放 | 在线免费国产视频 | 国产第一页在线播放 | 一区二区三区四区五区在线视频 | 六月婷婷色 | www·22com天天操 | 992tv又爽又黄的免费视频 | 91九色国产视频 | 欧美午夜精品久久久久久孕妇 | 中文字幕av全部资源www中文字幕在线观看 | 天天操天天操天天 | 久久大片 | 美女性爽视频国产免费app | 日韩电影在线一区 | 免费精品人在线二线三线 | 91视频高清免费 | 懂色av懂色av粉嫩av分享吧 | 国产 字幕 制服 中文 在线 | 99久久精品免费一区 | 日韩成人在线免费观看 | 999成人免费视频 | 免费av在线播放 | 欧美日本国产在线观看 | 国产精品久久久久久久久久久久午 | 久保带人| 能在线看的av | 999在线精品 | 天海翼一区二区三区免费 | 黄色看片 | 国产精品1区2区在线观看 | 夜夜狠狠| 国产不卡精品 | 丁香5月婷婷久久 | 日韩视频一区二区 | 免费国产黄线在线观看视频 | 97成人精品区在线播放 | av九九九 | 久久男女视频 | 中文字幕在线观看的网站 | 久草免费资源 | 国产午夜三级一二三区 | 国产精品入口66mio女同 | 最近能播放的中文字幕 | 成人国产精品一区二区 | 日韩一区二区免费视频 | 天天射天天干天天操 | 在线观看亚洲国产 | 一区二区三区在线观看 | 中文字幕免费不卡视频 | 成人app在线免费观看 | 91久久精品一区二区二区 | 精品国产精品久久 | 欧美一区免费观看 | 久久99热这里只有精品 | 五月婷婷婷婷婷 | www日韩在线| 青青草久草在线 | 久久精品艹 | 久久久久夜色 | 久草干 | 欧美视频日韩视频 | 久久综合福利 | 91香蕉视频 mp4 | 51久久夜色精品国产麻豆 | 97在线影视| 一区二区三区四区五区在线 | 成 人 黄 色 免费播放 | 精品亚洲网| 亚洲国产成人高清精品 | 日韩精品免费在线观看视频 | 欧美一区二区三区激情视频 | 国产一区二区久久久 | 五月激情综合婷婷 | 在线观看黄色国产 | 91视视频在线直接观看在线看网页在线看 | 99c视频高清免费观看 | 日本久久久久久 | 黄色小说视频在线 | 福利久久久 | 午夜精品久久久久久久久久久 | 日日夜夜网| 国产在线更新 | 日韩二区精品 | 国产日产亚洲精华av | 午夜精品电影一区二区在线 | 国产精品9999久久久久仙踪林 | 国产一区高清在线 | 日本一区二区三区免费看 | 成人毛片在线观看 | 麻豆免费视频观看 | 国产涩涩在线观看 | 国产一区二区手机在线观看 | 黄色小视频在线观看免费 | 视频成人永久免费视频 | 精品国产人成亚洲区 | 超碰人人在 | 一本一本久久a久久精品综合 | 日本性生活一级片 | 日本韩国精品一区二区在线观看 | 狠狠干美女 | 国产999在线 | 91精品国产乱码 | www日韩精品 | 国产精品亚洲片在线播放 | 亚洲激情| 欧美日韩精品在线一区二区 | 在线97 | 午夜精品福利一区二区 | 日韩av视屏 | 久操视频在线播放 | av中文字幕电影 | 国产小视频免费观看 | 美女在线观看网站 | 久久电影国产免费久久电影 | 91在线91| 欧美精品一区二区三区一线天视频 | 91麻豆精品91久久久久同性 | 不卡国产视频 | 国产精品久久久久影视 | 国产精品都在这里 | 日韩天堂在线观看 | 久久精品中文字幕少妇 | 999国产 | 玖玖视频在线 | 免费看的黄色录像 | 美女视频网站久久 | 国产一级视屏 | 色综合久久天天 | 久久综合狠狠综合久久综合88 | 久久久精品电影 | 三级a视频| 国产一区二区高清 | 91在线视频免费播放 | 成人蜜桃网 | 久久在线免费 | 成人久久亚洲 | 亚洲人成人天堂h久久 | 久草视频观看 | 国产超碰在线 | 能在线观看的日韩av | 91丨porny丨九色 | 国产丝袜美腿在线 | 国产黄色片久久 | 91最新网址 | 黄色免费网 | 黄网站色欧美视频 | 精品久久久久久国产 | 日韩av片无码一区二区不卡电影 | 91网页版免费观看 | 国产高清视频在线播放一区 | 高清av网站 | 狠色在线 | 欧美精品在线视频 | 日日夜夜免费精品 | 在线播放 一区 | 福利av影院 | 日韩在线视频观看 | 亚洲欧美成人综合 | 欧美日韩视频在线 | 成人在线视频免费观看 | 国产精品一区二区你懂的 | 在线看小早川怜子av | 色噜噜日韩精品一区二区三区视频 | 成人免费在线视频观看 | 国产一性一爱一乱一交 | 欧美日韩在线免费观看 | 国产欧美在线一区二区三区 | 四虎影视精品 | 欧美一区二区三区在线观看 | 欧美精品小视频 | 国产成人一区三区 | 黄色av观看| 久久人人爽人人片 | 亚洲一二三久久 | 在线网址你懂得 | 久久夜av | 中文字幕资源网在线观看 | 国产精品亚洲片夜色在线 | 四虎永久视频 | 香蕉视频最新网址 | 在线观看视频一区二区 | 午夜狠狠干 | 精品一区 在线 | 欧美日本一二三 | 91大神dom调教在线观看 | 日韩久久精品一区二区三区下载 | 黄色三级网站 | 欧美日韩天堂 | 日本久久不卡视频 | 色综合久久久 | 久久久久久久久久久国产精品 | 国产精品美女久久久久久久久 | 国产福利在线免费 | 青草草在线视频 | 99久久99视频 | 国产色视频一区二区三区qq号 | av高清一区二区三区 | 成 人 黄 色 片 在线播放 | 精品女同一区二区三区在线观看 | 日韩精品视频免费在线观看 | 亚洲综合在线一区二区三区 | 99视频久久 | 精品国产一区二区在线 | 亚洲精品美女在线观看 | 日韩免费在线观看视频 | 国产精品99久久久久 | 日韩在线高清免费视频 | 亚洲激情在线视频 | 91人人射 | 成人综合婷婷国产精品久久免费 | 婷婷综合久久 | 久久 亚洲视频 | 国产一区网 | 国产涩图| 激情av网| 日韩中文幕| 欧美天天综合网 | 高清精品视频 | 国产成人区 | 日韩videos高潮hd | 黄色中文字幕在线 | 久久久久久久久影视 | 午夜电影 电影 | 国产老妇av | av看片在线观看 | 草久在线观看视频 | 免费91麻豆精品国产自产在线观看 | 美女免费网站 | www.久久久 | 在线免费观看成人 | 久久99久| 天天要夜夜操 | 日本久久免费电影 | 亚洲aⅴ一区二区三区 | 色射色 | 国产欧美高清 | 狠狠干成人综合网 | 91在线精品播放 | 黄色精品国产 | 色婷婷激情网 | 精品在线亚洲视频 | 五月婷在线观看 | 亚洲日本色 | 一区二区中文字幕在线播放 | 六月激情丁香 | 日本丶国产丶欧美色综合 | 欧美一区三区四区 | 久久久久久久久久久久久影院 | 91成人久久 | 色视频网址 | 国产精品自产拍在线观看中文 | 97电影网站 | 狠狠躁夜夜躁人人爽超碰91 | 青青河边草免费直播 | 久久久久视 | 免费视频久久久久 | 免费在线观看午夜视频 | 欧美精品久久久久久久久久白贞 | 在线影院中文字幕 | 久久激情久久 | 亚洲日本三级 | 99视频在线播放 | 少妇bbw揉bbb欧美 | 国产亚洲免费的视频看 | 国产高清在线观看 | 国产一级在线看 | 国产成人精品日本亚洲999 | 久久久久久久久久久久久国产精品 | 国产手机在线观看视频 | 91精品蜜桃 | 欧美精品午夜 | 久久久久久久久久久国产精品 | 免费看av片网站 | 18女毛片 | 日本大片免费观看在线 | 91干干干| 欧美色图视频一区 | 国产精品av免费在线观看 | 日韩精品视频第一页 | 久久婷婷五月综合色丁香 | 国产精品久久久久久超碰 | 精品国产乱码 | 成人黄色国产 | 免费在线视频一区二区 | 毛片视频电影 | 国产视频精选 | 成人av网址大全 | 日韩在线观看三区 | 91成人免费观看视频 | 婷婷视频 | 国语精品免费视频 | 麻豆手机在线 | 日韩久久网站 | 91av视频观看| 韩国av电影网 | 高潮毛片无遮挡高清免费 | 在线之家免费在线观看电影 | 国产福利中文字幕 | 久久久福利视频 | 蜜桃视频在线观看一区 | 国产精品6999成人免费视频 | 精品久久一二三区 | 日韩亚洲在线视频 | 91免费版在线 | 99精品免费在线观看 | 夜夜骑日日操 | 福利视频一二区 | 一区二区三区四区久久 | 在线观看视频国产 | 久久久精品久久日韩一区综合 | 久久中文精品视频 | 婷婷色 亚洲 | 五月天激情婷婷 | 99操视频 | 婷婷在线播放 | 国产精品大片免费观看 | 日韩精品免费在线视频 | 亚洲丁香久久久 | 国产中文字幕久久 | 天天综合入口 | 99精品免费网 | 久久久久久久久毛片 | 欧美日韩免费视频 | 欧美精品一区二区性色 | 97人人模人人爽人人喊网 | 国产一区二区久久久久 | 免费看日韩片 | 99九九免费视频 | 日韩 精品 一区 国产 麻豆 | 国产精品久久久久av | 日韩丝袜视频 | 综合色综合 | 天天操天天干天天 | av怡红院 | 在线观看理论 | 一区二区三区影院 | 午夜精品999 | 免费下载高清毛片 | 美女网站在线 | 国产一区二区在线观看免费 | 亚洲精品视频偷拍 | 日韩a在线观看 | 久久免费播放视频 | 九九热免费视频在线观看 | 天天操夜夜叫 | 日韩视频1区| 亚洲精品tv久久久久久久久久 | 在线va网站 | 国产不卡av在线播放 | 亚洲精品国产综合久久 | 91av电影网| 亚洲h在线播放在线观看h | 极品久久久久久久 | 久久夜色精品亚洲噜噜国4 午夜视频在线观看欧美 | 天天干天天怕 | 中文字幕在线观看国产 | 久久a热6 | 日日成人网 | 狠狠干天天 | 久久精品理论 | 在线观看不卡视频 | 国产日韩视频在线播放 | 久久毛片网站 | www在线免费观看 | 丁香激情综合 | 日韩有码在线播放 | 日韩电影在线观看一区二区三区 | 久久亚洲专区 | 日韩亚洲国产中文字幕 | 精品一区二区6 | 黄色www免费| 在线观看免费观看在线91 | 在线观看日韩精品 | 欧美日本啪啪无遮挡网站 | 国产粉嫩在线 | 久色网| 国产精品porn | 超碰人人在线观看 | 91一区一区三区 | 亚洲日日夜夜 | 丁香色婷| 国产成人精品一区在线 | 国产成视频在线观看 | 欧美激情综合五月色丁香小说 | 亚洲视频网站在线观看 | 天海翼一区二区三区免费 | 久久午夜精品 | 亚洲日本va午夜在线影院 | 午夜精品一区二区国产 | 91人人澡人人爽 | 日本久久久久 | 国产一级电影在线 | 日日干天夜夜 | 色成人亚洲网 | 激情五月开心 | 国产精品久久久久一区二区三区共 | 天堂av在线免费 | 在线网站黄 | 96看片 | 亚洲午夜精品久久久久久久久 | 中文字幕一区二区三区精华液 | 国产一区二区在线播放视频 | 欧美韩日在线 | 久久中文字幕导航 | 国产精品久久久久毛片大屁完整版 | 热久久在线视频 | 麻豆视频免费入口 | 日韩一区二区免费播放 | 免费成人短视频 | 午夜精品成人一区二区三区 | 国产成视频在线观看 | 在线视频第一页 | 国产成人一区二区三区电影 | 色婷婷亚洲精品 | 精品一区二区av | 一级免费av | 日韩成人av在线 | 亚洲va欧美va人人爽春色影视 | 99视频在线精品 | 国产在线观看91 | 一级黄色a视频 | 欧美久久久久久久久久久久 | 18av在线视频 | 国产成人av电影在线观看 | 亚洲无人区小视频 | 久久久久久国产精品免费 | 色香蕉视频| www狠狠操 | 亚洲成人免费在线 | 在线观看免费国产小视频 | 欧洲视频一区 | 黄色日视频 | 日日干视频 | 97在线精品 | 激情五月在线观看 | 国产色婷婷精品综合在线手机播放 | 国产一区免费在线观看 | av中文电影| 婷婷精品进入 | 国产精品区二区三区日本 | 中文字幕免费一区 | 婷婷久久网站 | 六月丁香激情综合色啪小说 | 黄色免费电影网站 | av在线免费在线观看 | 婷婷免费视频 | 亚洲一级片在线看 | 综合激情网 | 国产1区2| 久久人91精品久久久久久不卡 | 午夜视频导航 | 亚洲国产mv | 超碰97人人在线 | 97视频中文字幕 | 黄色一级大片免费看 | 国产午夜三级一二三区 | 香蕉在线播放 | 福利久久久 | 奇米先锋| 久草网视频在线观看 | 日韩在线高清免费视频 | 日日夜夜免费精品 | 粉嫩av一区二区三区入口 | 日日日视频 | 青青啪| 粉嫩aⅴ一区二区三区 | 午夜的福利 | av在线观 | 国产精品久久久久久久久久不蜜月 | 深爱激情久久 | 中文字幕黄网 | 一级片黄色片网站 | 免费黄色av | 99久久这里有精品 | 国产精品国产精品 | 亚洲天堂网在线播放 | 2023av在线 | 在线午夜| 国产91精品久久久久久 | 精品久久久久久亚洲综合网站 | 五月婷av| 日韩av免费一区 | 婷婷av色综合 | 午夜在线观看一区 | 久色小说| 在线观看日韩免费视频 | 色av色av色av| 97国产精品一区二区 | 九九在线精品视频 | 亚洲电影影音先锋 | 成人一级电影在线观看 | 亚洲黄色av网址 | aⅴ精品av导航 | 亚洲欧洲日韩在线观看 | 久久久久久久久久久久国产精品 | 亚洲精品456在线播放 | 亚洲成av人片一区二区梦乃 | 日韩中文字幕免费视频 | 亚洲精品视频在线观看视频 | 久久精品草| 日韩色在线 | 欧美九九九 | 国产女做a爱免费视频 | 在线观看视频色 | 天天做天天爱天天爽综合网 | 不卡电影一区二区三区 | 亚洲欧美日韩精品久久久 | 久久久网站| 久久综合免费视频影院 | 久久精品久久综合 | 91色视频 | 日日干美女 | 一区二区免费不卡在线 | 在线视频18在线视频4k | 美女黄频视频大全 | 黄色毛片视频免费观看中文 | 97夜夜澡人人爽人人免费 | 久久久久久久久久电影 | 午夜少妇av| 日本韩国欧美在线观看 | 国产午夜精品一区二区三区 | aⅴ精品av导航| 免费在线观看视频一区 | 91麻豆产精品久久久久久 | 久久人人干 | 国产一二三区在线观看 | 激情网五月天 | 福利电影一区二区 | 国产精品免费久久久 | 久久精品欧美视频 | 日韩精品欧美视频 | 中文字幕在线观看一区二区 | 日韩在线视频二区 | 婷婷激情综合 | av免费在线网站 | 欧美成人一区二区 | 超碰97人 | 久草在线手机观看 | 久草在线视频免费资源观看 | 成人h视频 | 日韩在线视频一区 | 丁香激情综合久久伊人久久 | 91av视频观看| 国产亚洲一区 | av一本久道久久波多野结衣 | 日日夜夜精品网站 | 日本三级人妇 | 精品999久久久 | 欧美影片 | av天天草| 国产免费人成xvideos视频 | 亚洲精品免费在线 | 天天做天天爱天天综合网 | 国产不卡视频在线播放 | 中文字幕在线一二 | 婷婷视频在线观看 | 深爱激情综合 | 高清在线一区二区 | 国产.精品.日韩.另类.中文.在线.播放 | av成人免费在线看 | 一级黄色在线免费观看 | 精品一区二区6 | 婷婷色站| 99在线精品视频观看 | 久久蜜臀一区二区三区av | 日韩中文免费视频 | 香蕉在线播放 | 在线观看免费一级片 | 日韩欧美在线观看 | 91精品免费 | 在线免费观看黄色小说 | 丁香激情综合久久伊人久久 | 99久久久久 | 9992tv成人免费看片 | 国产高清在线精品 | 香蕉影院在线观看 | 丁香九月激情 | 91av片 | 国产免码va在线观看免费 | 色吧久久 | 久久久久久久av麻豆果冻 | 麻豆手机在线 | 激情av在线资源 | 一区二区理论片 | 日韩中文字幕免费视频 | 中文字幕 影院 | 黄色免费网 | 日韩理论在线 | 日韩激情综合 | 色片网站在线观看 | 在线观看韩日电影免费 | 17videosex性欧美 | 四虎在线免费观看 | 久久久国产一区二区三区四区小说 | 久久国内精品 | av在线播放免费 | 欧美日韩精品在线 | 欧美一区二区三区在线视频观看 | 日韩精品一区二区不卡 | 97国产超碰在线 | 久久综合99 | 天天色综合三 | 97超级碰| 在线观看视频你懂得 | 日韩成人邪恶影片 | 丝袜+亚洲+另类+欧美+变态 | 亚洲美女在线一区 | 美国av片在线观看 | 欧美日韩另类在线观看 | 亚洲免费在线观看视频 | 中文字幕婷婷 | 国语久久 | 国产成人久久精品亚洲 | 午夜美女福利直播 | 国产一区二区三区视频在线 | 久草在线久草在线2 | 天天色婷婷 | 亚洲黄色片 | 国产黄色av影视 | 国产成人免费 | 精品三级av | 国产99亚洲| 在线免费观看视频一区二区三区 | 国产成人精品一区二区三区福利 | 亚洲日韩精品欧美一区二区 | 日韩视频免费在线 | 天天综合网在线 | 日日操天天操夜夜操 | 精品久久国产 | 久久婷婷一区二区三区 | 偷拍精品一区二区三区 | 99热国产在线中文 | 久久久久在线观看 | 丁香婷婷激情五月 | 在线观看日韩中文字幕 | 高清国产一区 | 公与妇乱理三级xxx 在线观看视频在线观看 | 亚洲精品乱码久久 | 欧洲成人av| 国产精品久久久久aaaa九色 | 日韩一三区 | 色老板在线 | 日韩精品免费专区 | 干 操 插| 久久人人爽人人爽人人片av免费 | 天天操狠狠操 | 欧美一性一交一乱 | 一区三区视频在线观看 | 国产精品一码二码三码在线 | 日韩av影片在线观看 | 中文字幕在线观看你懂的 | 国产精品美 | 在线电影日韩 | 精品一区二区三区久久久 | 精品一区二区在线看 | 日韩一二三区不卡 | 久久久在线视频 | 人人爽人人干 | 久久大片网站 | 国产专区一 | 中文字幕资源站 | 婷婷中文字幕综合 | 在线观看国产中文字幕 | 久久国产热视频 | 成人黄在线观看 | 激情综合电影网 | 在线精品视频免费播放 | 91av久久 | 久久综合欧美精品亚洲一区 | 色网站在线免费观看 | 久久久精品视频成人 | 激情五月六月婷婷 | 国产精品乱码久久久久久1区2区 | 九九九九免费视频 | 91福利在线导航 | 成人av免费网站 | 国产免费a | 久久九九久久九九 | 久久久精品国产免费观看一区二区 | 久久精品视频3 | 麻豆视频在线免费 | 久久99精品波多结衣一区 | 成人av电影在线播放 | 精品久久久免费视频 | 免费日韩 精品中文字幕视频在线 | 亚洲蜜桃在线 | 青草视频在线 | 日本久久久影视 | 免费在线观看不卡av | 国产特黄色片 | 91精品麻豆| 亚洲动漫在线观看 | 日韩精品无码一区二区三区 | 久久久999 | 在线免费av观看 | 中文字幕亚洲字幕 | 精品国产乱码久久久久久浪潮 | 天天综合色天天综合 | 久久毛片网站 | 91av在线免费 | 五月婷婷在线播放 | 国产又粗又猛又色又黄视频 | 国产1级毛片 | 欧美色图30p | av综合av| 国产精品视频地址 | 人人澡人人爽欧一区 | 国产成人精品a | 欧美精品在线观看免费 | 亚洲激情一区二区三区 | 国产精品一区二区久久国产 | 五月天婷婷狠狠 | 香蕉日日 | 免费av网站在线看 | 国产亚洲视频中文字幕视频 | 激情综合色播五月 | 久久综合婷婷 | 久久久久久久久久久影院 | 久9在线| 91麻豆精品91久久久久同性 | 中文字幕成人在线 | 中文字幕在线一区观看 | 99久久久久国产精品免费 | 久久久精品综合 | 最近更新的中文字幕 | 日韩亚洲欧美中文字幕 | 偷拍精偷拍精品欧洲亚洲网站 | 久久精品屋| 综合精品在线 | 日本一区二区免费在线观看 | 欧美在线不卡一区 | 日韩三级精品 | 国产精品一区二区三区99 | 亚洲极色 | 日韩欧美视频免费看 | 在线观看亚洲电影 | 丁香九月婷婷 | 亚洲成a人片77777kkkk1在线观看 | 欧美男男激情videos | 国产一区二区三区免费在线观看 | 国产一区二区三区在线 | 视频精品一区二区三区 | www欧美日韩 | 亚洲欧美视频在线观看 | 激情五月开心 | 岛国av在线不卡 | 久草91视频 | 香蕉影院在线观看 | 在线观看免费日韩 | 91日韩精品视频 | av中文字幕在线观看网站 | 色婷婷在线播放 | 成人app在线播放 | 五月天色站 | 美女福利视频一区二区 | 久久艹中文字幕 | 国产亚洲精品久久久久久网站 | 国精产品999国精产 久久久久 | 日韩r级电影在线观看 | a天堂一码二码专区 | 91视频久久久久久 | 成人影片免费 | 久久激情视频 | 亚洲三级视频 | 久久久三级视频 | 在线一级片 | 蜜桃av久久久亚洲精品 | 久久草网站 | 狠狠久久伊人 | 欧美性视频网站 | 久久综合射 | 青草视频网| 日韩理论在线视频 | 精品久久久久久亚洲 |