Nacos源码集群数据同步
生活随笔
收集整理的這篇文章主要介紹了
Nacos源码集群数据同步
小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.
在DistroConsistencyServiceImpl的put方法中分為兩步:
?
其中的onPut方法已經(jīng)分析過(guò)了。
下面的distroProtocol.sync()就是集群同步的邏輯了。
DistroProtocol類(lèi)的sync方法如下:
public void sync(DistroKey distroKey, DataOperation action, long delay) {// 遍歷 Nacos 集群中除自己以外的其它節(jié)點(diǎn)for (Member each : memberManager.allMembersWithoutSelf()) {DistroKey distroKeyWithTarget = new DistroKey(distroKey.getResourceKey(), distroKey.getResourceType(),each.getAddress());// 定義一個(gè)Distro的同步任務(wù)DistroDelayTask distroDelayTask = new DistroDelayTask(distroKeyWithTarget, action, delay);// 交給線程池去執(zhí)行distroTaskEngineHolder.getDelayTaskExecuteEngine().addTask(distroKeyWithTarget, distroDelayTask);if (Loggers.DISTRO.isDebugEnabled()) {Loggers.DISTRO.debug("[DISTRO-SCHEDULE] {} to {}", distroKey, each.getAddress());}} }其中同步的任務(wù)封裝為一個(gè)DistroDelayTask對(duì)象。
交給了distroTaskEngineHolder.getDelayTaskExecuteEngine()執(zhí)行,這行代碼的返回值是:
NacosDelayTaskExecuteEngine,這個(gè)類(lèi)維護(hù)了一個(gè)線程池,并且接收任務(wù),執(zhí)行任務(wù)。
執(zhí)行任務(wù)的方法為processTasks()方法:
protected void processTasks() {Collection<Object> keys = getAllTaskKeys();for (Object taskKey : keys) {AbstractDelayTask task = removeTask(taskKey);if (null == task) {continue;}NacosTaskProcessor processor = getProcessor(taskKey);if (null == processor) {getEngineLog().error("processor not found for task, so discarded. " + task);continue;}try {// 嘗試執(zhí)行同步任務(wù),如果失敗會(huì)重試if (!processor.process(task)) {retryFailedTask(taskKey, task);}} catch (Throwable e) {getEngineLog().error("Nacos task execute error : " + e.toString(), e);retryFailedTask(taskKey, task);}} }可以看出來(lái)基于Distro模式的同步是異步進(jìn)行的,并且失敗時(shí)會(huì)將任務(wù)重新入隊(duì)并充實(shí),因此不保證同步結(jié)果的強(qiáng)一致性,屬于AP模式的一致性策略。
總結(jié)
以上是生活随笔為你收集整理的Nacos源码集群数据同步的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: Nacos源码覆盖实例列表
- 下一篇: Nacos服务端流程图