日韩av黄I国产麻豆传媒I国产91av视频在线观看I日韩一区二区三区在线看I美女国产在线I麻豆视频国产在线观看I成人黄色短片

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 >

[Curator] Path Cache 的使用与分析

發布時間:2025/5/22 34 豆豆
生活随笔 收集整理的這篇文章主要介紹了 [Curator] Path Cache 的使用与分析 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

為什么80%的碼農都做不了架構師?>>> ??

Path Cache

Path Cache其實就是用于對zk節點的監聽。不論是子節點的新增、更新或者移除的時候,Path Cache都能對子節點集合的狀態和數據變化做出響應。

1. 關鍵 API

org.apache.curator.framework.recipes.cache.PathChildrenCache

org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent

org.apache.curator.framework.recipes.cache.PathChildrenCacheListener

org.apache.curator.framework.recipes.cache.ChildData

2. 機制說明

PathChildrenCache內部使用一個命令模式來封裝各種操作:

  • 操作接口:org.apache.curator.framework.recipes.cache.Operation
    • 刷新操作:org.apache.curator.framework.recipes.cache.RefreshOperation
    • 觸發事件操作:org.apache.curator.framework.recipes.cache.EventOperation
    • 獲取數據操作:org.apache.curator.framework.recipes.cache.GetDataOperation

而這些操作對象,都在構造器中接受PathChildrenCache引用,這樣可以在操作中,處理cache(回調):

EventOperation(PathChildrenCache cache, PathChildrenCacheEvent event) {this.cache = cache;this.event = event; } GetDataOperation(PathChildrenCache cache, String fullPath) {this.cache = cache;this.fullPath = PathUtils.validatePath(fullPath); } RefreshOperation(PathChildrenCache cache, PathChildrenCache.RefreshMode mode) {this.cache = cache;this.mode = mode; }

而這些操作,還使用了一個單線程的線程池來調用,從而形成了異步調用。

  • 使用了一個private final Set<Operation> operationsQuantizer = Sets.newSetFromMap(Maps.<Operation, Boolean>newConcurrentMap());來作為線程池的任務接收隊列
    • 使用set,避免了并發情況下重復操作
    • 由于單線程,使得各種操作都是按序執行的
  • 所以為了避免curator的監聽機制阻塞
    • 在childrenWatcher以及dataWatcher中,都使用異步執行命令的方式

觸發操作:

void offerOperation(final Operation operation) {if ( operationsQuantizer.add(operation) ){submitToExecutor(new Runnable(){@Overridepublic void run(){try{operationsQuantizer.remove(operation);operation.invoke();}catch ( InterruptedException e ){//We expect to get interrupted during shutdown,//so just ignore these eventsif ( state.get() != State.CLOSED ){handleException(e);}Thread.currentThread().interrupt();}catch ( Exception e ){ThreadUtils.checkInterrupted(e);handleException(e);}}});} } private synchronized void submitToExecutor(final Runnable command) {if ( state.get() == State.STARTED ){executorService.submit(command);} }
  • 考慮到了各種操作的中斷
  • 考慮到了狀態
  • 統一操作的異常處理
  • 投遞方法submitToExecutor使用了synchronized
    • 因為可能監聽器觸發,所以需要對狀態進行檢查
      • 如先關閉,然后再被某個監聽器回掉,導致不必要的操作
    • 而檢查動作不是原子的,所以需要同步鎖

3. 用法

3.1 創建

public PathChildrenCache(CuratorFramework client,String path,boolean cacheData)
  • cacheData
    • 如果設置true,是否需要緩存數據

3.2 使用

  • Cache必須在使用前調用start()方法
    • 有兩個start()方法
    • void start()
      • 無參
    • void start(PathChildrenCache.StartMode mode)
      • 可以通過參數,選擇如何初始化
      • StartMode
        • NORMAL
        • BUILD_INITIAL_CACHE
        • POST_INITIALIZED_EVENT
  • 使用完成后需要調用close()方法
  • 任何時候,調用getCurrentData()都可以得到狀態信息
  • 可以添加監聽器,當數據發生變動時回調執行
    • public void addListener(PathChildrenCacheListener listener)

4. 錯誤處理

PathChildrenCache實例會通過ConnectionStateListener監聽鏈接狀態。 如果鏈接狀態發生變化,緩存會被重置(PathChildrenCacheListener會受到一個RESET事件)

5. 源碼分析

5.1 類定義

public class PathChildrenCache implements Closeable{}
  • 實現了java.io.Closeable接口

5.2 成員變量

public class PathChildrenCache implements Closeable {private final Logger log = LoggerFactory.getLogger(getClass());private final CuratorFramework client;private final String path;private final CloseableExecutorService executorService;private final boolean cacheData;private final boolean dataIsCompressed;private final ListenerContainer<PathChildrenCacheListener> listeners = new ListenerContainer<PathChildrenCacheListener>();private final ConcurrentMap<String, ChildData> currentData = Maps.newConcurrentMap();private final AtomicReference<Map<String, ChildData>> initialSet = new AtomicReference<Map<String, ChildData>>();private final Set<Operation> operationsQuantizer = Sets.newSetFromMap(Maps.<Operation, Boolean>newConcurrentMap());private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT);private final EnsureContainers ensureContainers;private enum State{LATENT,STARTED,CLOSED}private static final ChildData NULL_CHILD_DATA = new ChildData("/", null, null);private static final boolean USE_EXISTS = Boolean.getBoolean("curator-path-children-cache-use-exists");private volatile Watcher childrenWatcher = new Watcher(){@Overridepublic void process(WatchedEvent event){offerOperation(new RefreshOperation(PathChildrenCache.this, RefreshMode.STANDARD));}};private volatile Watcher dataWatcher = new Watcher(){@Overridepublic void process(WatchedEvent event){try{if ( event.getType() == Event.EventType.NodeDeleted ){remove(event.getPath());}else if ( event.getType() == Event.EventType.NodeDataChanged ){offerOperation(new GetDataOperation(PathChildrenCache.this, event.getPath()));}}catch ( Exception e ){ThreadUtils.checkInterrupted(e);handleException(e);}}};@VisibleForTestingvolatile Exchanger<Object> rebuildTestExchanger;private volatile ConnectionStateListener connectionStateListener = new ConnectionStateListener(){@Overridepublic void stateChanged(CuratorFramework client, ConnectionState newState){handleStateChange(newState);}};private static final ThreadFactory defaultThreadFactory = ThreadUtils.newThreadFactory("PathChildrenCache"); }
  • log
  • client
  • path
    • 緩存對應的zk節點路徑
  • executorService
    • org.apache.curator.utils.CloseableExecutorService
    • 線程池
    • 用以執行各種操作
    • 參見第2章節
  • cacheData
    • 是否需要緩存數據
  • dataIsCompressed
    • 數據是否已壓縮
  • listeners
    • org.apache.curator.framework.listen.ListenerContainer
    • 監聽器容器(管理多個監聽器)
    • 業務監聽器
    • 可以添加自己的監聽器
  • currentData
    • java.util.concurrent.ConcurrentMap
    • 當前數據
    • <String, ChildData>
    • 存放著多個org.apache.curator.framework.recipes.cache.ChildData
  • initialSet
    • AtomicReference
    • 初始化集合
    • 放置節點,以此來跟蹤各個節點是否初始化
      • 如果全部節點都初始化完成,則會觸發PathChildrenCacheEvent.Type.INITIALIZED事件
  • operationsQuantizer
    • 相當于線程池的任務接收隊列
  • state
    • 狀態
    • AtomicReference
  • ensureContainers
    • org.apache.curator.framework.EnsureContainers
    • 可以線程安全的創建path節點
  • State
    • 內部枚舉
      • LATENT
      • STARTED
      • CLOSED
  • NULL_CHILD_DATA
    • 私有常量
    • 空數據節點
  • USE_EXISTS
    • 私有常量
    • 使用系統配置中curator-path-children-cache-use-exists的值
  • childrenWatcher
    • volatile
    • 子節點變動的監聽器
  • dataWatcher
    • volatile
    • 數據變動監聽器
  • rebuildTestExchanger
    • java.util.concurrent.Exchanger
    • 用于并發線程間傳值
    • 在重建緩存時通過此對象傳遞一個信號對象
    • 用于測試
  • connectionStateListener
    • 鏈接狀態監聽器
  • defaultThreadFactory
    • 線程工廠

5.3 構造器

public PathChildrenCache(CuratorFramework client, String path, PathChildrenCacheMode mode) {this(client, path, mode != PathChildrenCacheMode.CACHE_PATHS_ONLY, false, new CloseableExecutorService(Executors.newSingleThreadExecutor(defaultThreadFactory), true)); }public PathChildrenCache(CuratorFramework client, String path, PathChildrenCacheMode mode, ThreadFactory threadFactory) {this(client, path, mode != PathChildrenCacheMode.CACHE_PATHS_ONLY, false, new CloseableExecutorService(Executors.newSingleThreadExecutor(threadFactory), true)); }public PathChildrenCache(CuratorFramework client, String path, boolean cacheData) {this(client, path, cacheData, false, new CloseableExecutorService(Executors.newSingleThreadExecutor(defaultThreadFactory), true)); }public PathChildrenCache(CuratorFramework client, String path, boolean cacheData, ThreadFactory threadFactory) {this(client, path, cacheData, false, new CloseableExecutorService(Executors.newSingleThreadExecutor(threadFactory), true)); }public PathChildrenCache(CuratorFramework client, String path, boolean cacheData, boolean dataIsCompressed, ThreadFactory threadFactory) {this(client, path, cacheData, dataIsCompressed, new CloseableExecutorService(Executors.newSingleThreadExecutor(threadFactory), true)); }public PathChildrenCache(CuratorFramework client, String path, boolean cacheData, boolean dataIsCompressed, final ExecutorService executorService) {this(client, path, cacheData, dataIsCompressed, new CloseableExecutorService(executorService)); }public PathChildrenCache(CuratorFramework client, String path, boolean cacheData, boolean dataIsCompressed, final CloseableExecutorService executorService) {this.client = client;this.path = PathUtils.validatePath(path);this.cacheData = cacheData;this.dataIsCompressed = dataIsCompressed;this.executorService = executorService;ensureContainers = new EnsureContainers(client, path); }

有7個構造器,最終都是調用最后一個。不過從中也可以看出:

  • 默認使用newSingleThreadExecutor單線程線程池
  • 默認不對數據進行壓縮處理

5.4 啟動

緩存在使用前需要調用start()

public enum StartMode{NORMAL,BUILD_INITIAL_CACHE,POST_INITIALIZED_EVENT}public void start() throws Exception {start(StartMode.NORMAL); }@Deprecated public void start(boolean buildInitial) throws Exception {start(buildInitial ? StartMode.BUILD_INITIAL_CACHE : StartMode.NORMAL); }public void start(StartMode mode) throws Exception {Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "already started");mode = Preconditions.checkNotNull(mode, "mode cannot be null");client.getConnectionStateListenable().addListener(connectionStateListener);switch ( mode ){case NORMAL:{offerOperation(new RefreshOperation(this, RefreshMode.STANDARD));break;}case BUILD_INITIAL_CACHE:{rebuild();break;}case POST_INITIALIZED_EVENT:{initialSet.set(Maps.<String, ChildData>newConcurrentMap());offerOperation(new RefreshOperation(this, RefreshMode.POST_INITIALIZED));break;}} }private void processChildren(List<String> children, RefreshMode mode) throws Exception {Set<String> removedNodes = Sets.newHashSet(currentData.keySet());for ( String child : children ) {removedNodes.remove(ZKPaths.makePath(path, child));}for ( String fullPath : removedNodes ){remove(fullPath);}for ( String name : children ){String fullPath = ZKPaths.makePath(path, name);if ( (mode == RefreshMode.FORCE_GET_DATA_AND_STAT) || !currentData.containsKey(fullPath) ){getDataAndStat(fullPath);}updateInitialSet(name, NULL_CHILD_DATA);}maybeOfferInitializedEvent(initialSet.get()); }
  • 無參的start()
    • 默認使用StartMode.NORMAL策略
  • 不建議使用的start(boolean buildInitial)
    • true
      • 使用StartMode.BUILD_INITIAL_CACHE策略
    • false
      • 使用StartMode.NORMAL策略
  • 啟動時添加了鏈接狀態的監聽器

可以看到啟動過程有三種策略:

  • NORMAL模式
  • 執行刷新命令org.apache.curator.framework.recipes.cache.RefreshOperation(命令模式
    • 使用RefreshMode.STANDARD刷新模式
    • 調用org.apache.curator.framework.recipes.cache.PathChildrenCache#refresh方法
    • 調用org.apache.curator.framework.EnsureContainers#ensure創建節點
    • 在節點上添加childrenWatcher監聽器
    • 回調觸發org.apache.curator.framework.recipes.cache.PathChildrenCache#processChildren進行刷新
    • 清理掉已緩存在本地的數據中的其他節點
    • 篩選出不是本cache的數據節點
    • 從本地初始集合中清理掉
    • 如果緩存節點還沒用同步到本地,或者指定為RefreshMode.FORCE_GET_DATA_AND_STAT模式
    • 則立即同步節點數據與狀態
    • 如果不需要緩存數據,則只檢查節點是否存在(只緩存節點以及狀態,不含數據)
    • 否則讀取數據(如果需要解壓則解壓數據)并構建ChildData緩存
    • 新數據放入currentData
    • 根據情況觸發事件(喚起監聽器)
      • PathChildrenCacheEvent.Type.CHILD_ADDED事件
      • PathChildrenCacheEvent.Type.CHILD_UPDATED事件
    • 更新initialSet數據(將未同步的NULL_CHILD_DATA數據替換成讀取的數據)
    • 更新initialSet
    • 如果initialSet的Map不為空
      • NORMAL模式下,這里為空
      • 可以參見POST_INITIALIZED_EVENT模式
  • BUILD_INITIAL_CACHE模式
  • 調用rebuild方法(此方法會阻塞執行)
    • 重新查詢所有需要的數據
    • 不會觸發任何事件
  • 安全創建path
  • 清空currentData緩存
  • 重新加載path下子節點,逐個結點重構緩存
    • 逐個讀取節點數據和狀態
    • 構建ChildData放入currentData
  • 通過rebuildTestExchanger發送要給信號對象
  • POST_INITIALIZED_EVENT模式
  • 初始化initialSet
  • 以RefreshMode.POST_INITIALIZED模式刷新緩存
    • 參見NORMAL模式,但不同的是
      • 更新initialSet時
      • 如果initialSet的Map不為空
        • POST_INITIALIZED_EVENT模式下,這里已經初始化了Map
      • 如果initialSet中的數據都已經同步完成(都不等于NULL_CHILD_DATA)
      • 將initialSet制空
      • 觸發PathChildrenCacheEvent.Type.INITIALIZED事件
  • 5.5 節點發生變化

    在啟動start()已經給path上增加了一個監聽器childrenWatcher

    private volatile Watcher childrenWatcher = new Watcher() {@Overridepublic void process(WatchedEvent event){offerOperation(new RefreshOperation(PathChildrenCache.this, RefreshMode.STANDARD));} };
    • 以RefreshMode.STANDARD模式刷新緩存
      • 會對本地的緩存數據和zk節點做比較
      • 只是處理新的緩存數據
    • 注意操作的參數PathChildrenCache.this
      • this不同了

    5.6 數據發生變化

    在每次獲取緩存數據時(getDataAndStat方法),在每個緩存上添加了監聽器dataWatcher:

    private volatile Watcher dataWatcher = new Watcher() {@Overridepublic void process(WatchedEvent event){try{if ( event.getType() == Event.EventType.NodeDeleted ){remove(event.getPath());}else if ( event.getType() == Event.EventType.NodeDataChanged ){offerOperation(new GetDataOperation(PathChildrenCache.this, event.getPath()));}}catch ( Exception e ){ThreadUtils.checkInterrupted(e);handleException(e);}} };
    • 節點刪除時
      • 清理緩存
      • 觸發PathChildrenCacheEvent.Type.CHILD_REMOVED事件
    • 數據發生變化時
      • 執行GetDataOperation操作
        • 也就是再次執行getDataAndStat方法
    • 注意操作的參數PathChildrenCache.this
      • this不同了

    5.7 獲取當前數據

    public List<ChildData> getCurrentData() {return ImmutableList.copyOf(Sets.<ChildData>newTreeSet(currentData.values())); }public ChildData getCurrentData(String fullPath) {return currentData.get(fullPath); }

    都是從本地數據中獲取

    5.8 清理

    5.8.1 清理緩存

    public void clear() {currentData.clear(); } public void clearAndRefresh() throws Exception {currentData.clear();offerOperation(new RefreshOperation(this, RefreshMode.STANDARD)); }

    清空本地數據

    如果需要則使用RefreshMode.STANDARD模式,刷新

    5.8.2 清理緩存數據

    public void clearDataBytes(String fullPath) {clearDataBytes(fullPath, -1); } public boolean clearDataBytes(String fullPath, int ifVersion) {ChildData data = currentData.get(fullPath);if ( data != null ){if ( (ifVersion < 0) || (ifVersion == data.getStat().getVersion()) ){if ( data.getData() != null ){currentData.replace(fullPath, data, new ChildData(data.getPath(), data.getStat(), null));}return true;}}return false; }

    保留緩存信息,但是數據部分制空

    5.9 鏈接狀態變化

    在啟動時(start())中為鏈接添加了connectionStateListener監聽器:

    private volatile ConnectionStateListener connectionStateListener = new ConnectionStateListener() {@Overridepublic void stateChanged(CuratorFramework client, ConnectionState newState){handleStateChange(newState);} };private void handleStateChange(ConnectionState newState) {switch ( newState ){case SUSPENDED:{offerOperation(new EventOperation(this, new PathChildrenCacheEvent(PathChildrenCacheEvent.Type.CONNECTION_SUSPENDED, null)));break;}case LOST:{offerOperation(new EventOperation(this, new PathChildrenCacheEvent(PathChildrenCacheEvent.Type.CONNECTION_LOST, null)));break;}case CONNECTED:case RECONNECTED:{try{offerOperation(new RefreshOperation(this, RefreshMode.FORCE_GET_DATA_AND_STAT));offerOperation(new EventOperation(this, new PathChildrenCacheEvent(PathChildrenCacheEvent.Type.CONNECTION_RECONNECTED, null)));}catch ( Exception e ){ThreadUtils.checkInterrupted(e);handleException(e);}break;}} }

    主要都是根據鏈接狀態,觸發不同的操作,以及觸發業務監聽器來執行。

    • 由于數據都是緩存,所以在鏈接丟失,中斷時,僅僅時觸發事件,并沒有將數據置為不可用
    • 當鏈接建立CONNECTED,以及恢復時RECONNECTED都觸發了一次RefreshMode.FORCE_GET_DATA_AND_STAT模式的刷新操作。

    5.10 關閉

    在使用完之后,需要調用close()方法:

    public void close() throws IOException {if ( state.compareAndSet(State.STARTED, State.CLOSED) ){client.getConnectionStateListenable().removeListener(connectionStateListener);listeners.clear();executorService.close();client.clearWatcherReferences(childrenWatcher);client.clearWatcherReferences(dataWatcher);// TODO// This seems to enable even more GC - I'm not sure why yet - it// has something to do with Guava's cache and circular referencesconnectionStateListener = null;childrenWatcher = null;dataWatcher = null;} }
    • 原子操作,將狀態更新為CLOSED
    • 移除鏈接狀態監聽器
    • 清空業務監聽器
    • 關閉線程池
    • 清空節點監聽器
    • 清空數據監聽器

    6. 小結

    PathChildrenCache雖然名字帶有Cache。 但其實并不是一個完整的緩存。

    應該說,它僅僅是對path下諸多節點進行統一的管理。 當這些節點發生變動,或者數據發生變化時,都可以被PathChildrenCache發現,并同步到本地Map中。以此來達到一個緩存的概念。

    從API中也能發現,它只能獲取數據。至于放置緩存,則需要另外實現。

    • 其實也簡單,直接向path下新建節點并寫入數據就行

    可以通過getListenable().addListener(listener);添加自定義監聽器,從而實現對緩存進行更細致的控制。

    7. 示例

    這里可以參考官方的示例

    轉載于:https://my.oschina.net/roccn/blog/918209

    總結

    以上是生活随笔為你收集整理的[Curator] Path Cache 的使用与分析的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。