日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

eureka 集群失败的原因_eureka集群中的疑问?

發布時間:2023/12/2 编程问答 27 豆豆
生活随笔 收集整理的這篇文章主要介紹了 eureka 集群失败的原因_eureka集群中的疑问? 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

題主的問題描述太繞了,我們先把集群中的角色定義下:

Eureka架構

比較細節的架構圖如下所示:

在配置多個EurekaServer的Service Provider,每次Service Provider啟動的時候會選擇一個Eureka Server,之后如果這個Eureka Server掛了,才會切換Eureka Server,在當前使用的Eureka Server掛掉之前,不會切換。

被Service Provider選擇用來發送請求Eureka Server其實比其他Server多了一項工作,就是發客戶端發來的請求,轉發到集群中其他的Eureka Server。其實這個壓力并沒有太大,但是如果集群中實例個數比較多,或者心跳間隔比較短的情況下,的確有不小的壓力。可以考慮每個服務配置的Eureka Server順序不一樣。

但是其實仔細想想,只是個請求轉發,能有多大壓力啊。。。。

最后,我們詳細分析下服務注冊與取消的源代碼(可以直接參考下我的博客關于Eureka的系列分析張哈希的博客 - CSDN博客?blog.csdn.net

):

關于服務注冊開啟/關閉服務注冊配置:eureka.client.register-with-eureka = true (默認)

什么時候注冊?應用第一次啟動時,初始化EurekaClient時,應用狀態改變:從STARTING變為UP會觸發這個Listener,調用instanceInfoReplicator.onDemandUpdate(); 可以推測出,實例狀態改變時,也會通過注冊接口更新實例狀態信息

statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {

@Override

public String getId() {

return "statusChangeListener";

}

@Override

public void notify(StatusChangeEvent statusChangeEvent) {

if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||

InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {

// log at warn level if DOWN was involved

logger.warn("Saw local status change event {}", statusChangeEvent);

} else {

logger.info("Saw local status change event {}", statusChangeEvent);

}

instanceInfoReplicator.onDemandUpdate();

}

};定時任務,如果InstanceInfo發生改變,也會通過注冊接口更新信息

public void run() {

try {

discoveryClient.refreshInstanceInfo();

//如果實例信息發生改變,則需要調用register更新InstanceInfo

Long dirtyTimestamp = instanceInfo.isDirtyWithTime();

if (dirtyTimestamp != null) {

discoveryClient.register();

instanceInfo.unsetIsDirty(dirtyTimestamp);

}

} catch (Throwable t) {

logger.warn("There was a problem with the instance info replicator", t);

} finally {

Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);

scheduledPeriodicRef.set(next);

}

}在定時renew時,如果renew接口返回404(代表這個實例在EurekaServer上面找不到),可能是之前注冊失敗或者注冊過期導致的。這時需要調用register重新注冊

boolean renew() {

EurekaHttpResponse httpResponse;

try {

httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);

logger.debug("{} - Heartbeat status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode());

//如果renew接口返回404(代表這個實例在EurekaServer上面找不到),可能是之前注冊失敗或者注冊過期導致的

if (httpResponse.getStatusCode() == 404) {

REREGISTER_COUNTER.increment();

logger.info("{} - Re-registering apps/{}", PREFIX + appPathIdentifier, instanceInfo.getAppName());

long timestamp = instanceInfo.setIsDirtyWithTime();

boolean success = register();

if (success) {

instanceInfo.unsetIsDirty(timestamp);

}

return success;

}

return httpResponse.getStatusCode() == 200;

} catch (Throwable e) {

logger.error("{} - was unable to send heartbeat!", PREFIX + appPathIdentifier, e);

return false;

}

}

向Eureka發送注冊請求EurekaServer發生了什么?

主要有兩個存儲,一個是之前提到過的registry,還有一個最近變化隊列,后面我們會知道,這個最近變化隊列里面就是客戶端獲取增量實例信息的內容:

# 整體注冊信息緩存

private final ConcurrentHashMap>> registry = new ConcurrentHashMap>>();

# 最近變化隊列

private ConcurrentLinkedQueue recentlyChangedQueue = new ConcurrentLinkedQueue();

EurekaServer收到實例注冊主要分兩步:調用父類方法注冊

同步到其他EurekaServer實例

public void register(InstanceInfo info, boolean isReplication) {

int leaseDuration = 90;

if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {

leaseDuration = info.getLeaseInfo().getDurationInSecs();

}

//調用父類方法注冊

super.register(info, leaseDuration, isReplication);

//同步到其他EurekaServer實例

this.replicateToPeers(PeerAwareInstanceRegistryImpl.Action.Register, info.getAppName(), info.getId(), info, (InstanceStatus)null, isReplication);

}

我們先看同步到其他EurekaServer實例

其實就是,注冊到的EurekaServer再依次調用其他集群內的EurekaServer的Register方法將實例信息同步過去

private void replicateToPeers(Action action, String appName, String id,

InstanceInfo info /* optional */,

InstanceStatus newStatus /* optional */, boolean isReplication) {

Stopwatch tracer = action.getTimer().start();

try {

if (isReplication) {

numberOfReplicationsLastMin.increment();

}

// If it is a replication already, do not replicate again as this will create a poison replication

if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {

return;

}

for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {

// If the url represents this host, do not replicate to yourself.

if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {

continue;

}

replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);

}

} finally {

tracer.stop();

}

}

private void replicateInstanceActionsToPeers(Action action, String appName,

String id, InstanceInfo info, InstanceStatus newStatus,

PeerEurekaNode node) {

try {

InstanceInfo infoFromRegistry = null;

CurrentRequestVersion.set(Version.V2);

switch (action) {

case Cancel:

node.cancel(appName, id);

break;

case Heartbeat:

InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id);

infoFromRegistry = getInstanceByAppAndId(appName, id, false);

node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);

break;

case Register:

node.register(info);

break;

case StatusUpdate:

infoFromRegistry = getInstanceByAppAndId(appName, id, false);

node.statusUpdate(appName, id, newStatus, infoFromRegistry);

break;

case DeleteStatusOverride:

infoFromRegistry = getInstanceByAppAndId(appName, id, false);

node.deleteStatusOverride(appName, id, infoFromRegistry);

break;

}

} catch (Throwable t) {

logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t);

}

}

然后看看調用父類方法注冊:

public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {

try {

//register雖然看上去好像是修改,但是這里用的是讀鎖,后面會解釋

read.lock();

//從registry中查看這個app是否存在

Map> gMap = registry.get(registrant.getAppName());

//不存在就創建

if (gMap == null) {

final ConcurrentHashMap> gNewMap = new ConcurrentHashMap>();

gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);

if (gMap == null) {

gMap = gNewMap;

}

}

//查看這個app的這個實例是否已存在

Lease existingLease = gMap.get(registrant.getId());

if (existingLease != null && (existingLease.getHolder() != null)) {

//如果已存在,對比時間戳,保留比較新的實例信息......

} else {

// 如果不存在,證明是一個新的實例

//更新自我保護監控變量的值的代碼.....

}

Lease lease = new Lease(registrant, leaseDuration);

if (existingLease != null) {

lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());

}

//放入registry

gMap.put(registrant.getId(), lease);

//加入最近修改的記錄隊列

recentlyChangedQueue.add(new RecentlyChangedItem(lease));

//初始化狀態,記錄時間等相關代碼......

//主動讓Response緩存失效

invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());

} finally {

read.unlock();

}

}

總結起來,就是主要三件事:

1.將實例注冊信息放入或者更新registry

2.將實例注冊信息加入最近修改的記錄隊列

3.主動讓Response緩存失效

我們來類比下服務取消

服務取消CANCEL

protected boolean internalCancel(String appName, String id, boolean isReplication) {

try {

//cancel雖然看上去好像是修改,但是這里用的是讀鎖,后面會解釋

read.lock();

//從registry中剔除這個實例

Map> gMap = registry.get(appName);

Lease leaseToCancel = null;

if (gMap != null) {

leaseToCancel = gMap.remove(id);

}

if (leaseToCancel == null) {

logger.warn("DS: Registry: cancel failed because Lease is not registered for: {}/{}", appName, id);

return false;

} else {

//改變狀態,記錄狀態修改時間等相關代碼......

if (instanceInfo != null) {

instanceInfo.setActionType(ActionType.DELETED);

//加入最近修改的記錄隊列

recentlyChangedQueue.add(new RecentlyChangedItem(leaseToCancel));

}

//主動讓Response緩存失效

invalidateCache(appName, vip, svip);

logger.info("Cancelled instance {}/{} (replication={})", appName, id, isReplication);

return true;

}

} finally {

read.unlock();

}

}

總結起來,也是主要三件事:

1.從registry中剔除這個實例

2.將實例注冊信息加入最近修改的記錄隊列

3.主動讓Response緩存失效

這里我們注意到了這個最近修改隊列,我們來詳細看看

最近修改隊列

這個最近修改隊列和消費者定時獲取服務實例列表有著密切的關系

private TimerTask getDeltaRetentionTask() {

return new TimerTask() {

@Override

public void run() {

Iterator it = recentlyChangedQueue.iterator();

while (it.hasNext()) {

if (it.next().getLastUpdateTime() <

System.currentTimeMillis() - serverConfig.getRetentionTimeInMSInDeltaQueue()) {

it.remove();

} else {

break;

}

}

}

};

}

這個RetentionTimeInMSInDeltaQueue默認是180s(配置是eureka.server.retention-time-in-m-s-in-delta-queue,默認是180s,官網寫錯了),可以看出這個隊列是一個長度為180s的滑動窗口,保存最近180s以內的應用實例信息修改,后面我們會看到,客戶端調用獲取增量信息,實際上就是從這個queue中讀取,所以可能一段時間內讀取到的信息都是一樣的。

關于服務與實例列表獲取

EurekaClient端

我們從Ribbon說起:EurekaClient也存在緩存,應用服務實例列表信息在每個EurekaClient服務消費端都有緩存。一般的,Ribbon的LoadBalancer會讀取這個緩存,來知道當前有哪些實例可以調用,從而進行負載均衡。這個loadbalancer同樣也有緩存。

首先看這個LoadBalancer的緩存更新機制,相關類是PollingServerListUpdater:

final Runnable wrapperRunnable = new Runnable() {

@Override

public void run() {

if (!isActive.get()) {

if (scheduledFuture != null) {

scheduledFuture.cancel(true);

}

return;

}

try {

//從EurekaClient緩存中獲取服務實例列表,保存在本地緩存

updateAction.doUpdate();

lastUpdated = System.currentTimeMillis();

} catch (Exception e) {

logger.warn("Failed one update cycle", e);

}

}

};

//定時調度

scheduledFuture = getRefreshExecutor().scheduleWithFixedDelay(

wrapperRunnable,

initialDelayMs,

refreshIntervalMs,

TimeUnit.MILLISECONDS

);

這個updateAction.doUpdate();就是從EurekaClient緩存中獲取服務實例列表,保存在BaseLoadBalancer的本地緩存:

protected volatile List allServerList = Collections.synchronizedList(new ArrayList());

public void setServersList(List lsrv) {

//寫入allServerList的代碼,這里略

}

@Override

public List getAllServers() {

return Collections.unmodifiableList(allServerList);

}

這里的getAllServers會在每個負載均衡規則中被調用,例如RoundRobinRule:

public Server choose(ILoadBalancer lb, Object key) {

if (lb == null) {

log.warn("no load balancer");

return null;

}

Server server = null;

int count = 0;

while (server == null && count++ < 10) {

List reachableServers = lb.getReachableServers();

//獲取服務實例列表,調用的就是剛剛提到的getAllServers

List allServers = lb.getAllServers();

int upCount = reachableServers.size();

int serverCount = allServers.size();

if ((upCount == 0) || (serverCount == 0)) {

log.warn("No up servers available from load balancer: " + lb);

return null;

}

int nextServerIndex = incrementAndGetModulo(serverCount);

server = allServers.get(nextServerIndex);

if (server == null) {

/* Transient. */

Thread.yield();

continue;

}

if (server.isAlive() && (server.isReadyToServe())) {

return (server);

}

// Next.

server = null;

}

if (count >= 10) {

log.warn("No available alive servers after 10 tries from load balancer: "

+ lb);

}

return server;

}

這個緩存需要注意下,有時候我們只修改了EurekaClient緩存的更新時間,但是沒有修改這個LoadBalancer的刷新本地緩存時間,就是ribbon.ServerListRefreshInterval,這個參數可以設置的很小,因為沒有從網絡讀取,就是從一個本地緩存刷到另一個本地緩存(如何配置緩存配置來實現服務實例快速下線快速感知快速刷新,可以參考我的另一篇文章)。

然后我們來看一下EurekaClient本身的緩存,直接看關鍵類DiscoveryClient的相關源碼,我們這里只關心本地Region的,多Region配置我們先忽略:

//本地緩存,可以理解為是一個軟鏈接

private final AtomicReference localRegionApps = new AtomicReference();

private void initScheduledTasks() {

//如果配置為需要拉取服務列表,則設置定時拉取任務,這個配置默認是需要拉取服務列表

if (clientConfig.shouldFetchRegistry()) {

// registry cache refresh timer

int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();

int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();

scheduler.schedule(

new TimedSupervisorTask(

"cacheRefresh",

scheduler,

cacheRefreshExecutor,

registryFetchIntervalSeconds,

TimeUnit.SECONDS,

expBackOffBound,

new CacheRefreshThread()

),

registryFetchIntervalSeconds, TimeUnit.SECONDS);

}

//其他定時任務初始化的代碼,忽略

}

//定時從EurekaServer拉取服務列表的任務

class CacheRefreshThread implements Runnable {

public void run() {

refreshRegistry();

}

}

void refreshRegistry() {

try {

//多Region配置處理代碼,忽略

boolean success = fetchRegistry(remoteRegionsModified);

if (success) {

registrySize = localRegionApps.get().size();

lastSuccessfulRegistryFetchTimestamp = System.currentTimeMillis();

}

//日志代碼,忽略

} catch (Throwable e) {

logger.error("Cannot fetch registry from server", e);

}

}

//定時從EurekaServer拉取服務列表的核心方法

private boolean fetchRegistry(boolean forceFullRegistryFetch) {

Stopwatch tracer = FETCH_REGISTRY_TIMER.start();

try {

Applications applications = getApplications();

//判斷,如果是第一次拉取,或者app列表為空,就進行全量拉取,否則就會進行增量拉取

if (clientConfig.shouldDisableDelta()

|| (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))

|| forceFullRegistryFetch

|| (applications == null)

|| (applications.getRegisteredApplications().size() == 0)

|| (applications.getVersion() == -1)) //Client application does not have latest library supporting delta

{

getAndStoreFullRegistry();

} else {

getAndUpdateDelta(applications);

}

applications.setAppsHashCode(applications.getReconcileHashCode());

logTotalInstances();

} catch (Throwable e) {

logger.error(PREFIX + appPathIdentifier + " - was unable to refresh its cache! status = " + e.getMessage(), e);

return false;

} finally {

if (tracer != null) {

tracer.stop();

}

}

//緩存更新完成,發送個event給觀察者,目前沒啥用

onCacheRefreshed();

// 檢查下遠端的服務實例列表里面包括自己,并且狀態是否對,這里我們不關心

updateInstanceRemoteStatus();

// registry was fetched successfully, so return true

return true;

}

//全量拉取代碼

private void getAndStoreFullRegistry() throws Throwable {

long currentUpdateGeneration = fetchRegistryGeneration.get();

Applications apps = null;

//訪問/eureka/apps接口,拉取所有服務實例信息

EurekaHttpResponse httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null

? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())

: eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get());

if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {

apps = httpResponse.getEntity();

}

logger.info("The response status is {}", httpResponse.getStatusCode());

if (apps == null) {

logger.error("The application is null for some reason. Not storing this information");

} else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {

localRegionApps.set(this.filterAndShuffle(apps));

logger.debug("Got full registry with apps hashcode {}", apps.getAppsHashCode());

} else {

logger.warn("Not updating applications as another thread is updating it already");

}

}

//增量拉取代碼

private void getAndUpdateDelta(Applications applications) throws Throwable {

long currentUpdateGeneration = fetchRegistryGeneration.get();

Applications delta = null;

//訪問/eureka/delta接口,拉取所有服務實例增量信息

EurekaHttpResponse httpResponse = eurekaTransport.queryClient.getDelta(remoteRegionsRef.get());

if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {

delta = httpResponse.getEntity();

}

if (delta == null) {

//如果delta為空,拉取增量失敗,就全量拉取

logger.warn("The server does not allow the delta revision to be applied because it is not safe. "

+ "Hence got the full registry.");

getAndStoreFullRegistry();

} else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {

//這里設置原子鎖的原因是怕某次調度網絡請求時間過長,導致同一時間有多線程拉取到增量信息并發修改

//拉取增量成功,檢查hashcode是否一樣,不一樣的話也會全量拉取

logger.debug("Got delta update with apps hashcode {}", delta.getAppsHashCode());

String reconcileHashCode = "";

if (fetchRegistryUpdateLock.tryLock()) {

try {

updateDelta(delta);

reconcileHashCode = getReconcileHashCode(applications);

} finally {

fetchRegistryUpdateLock.unlock();

}

} else {

logger.warn("Cannot acquire update lock, aborting getAndUpdateDelta");

}

// There is a diff in number of instances for some reason

if (!reconcileHashCode.equals(delta.getAppsHashCode()) || clientConfig.shouldLogDeltaDiff()) {

reconcileAndLogDifference(delta, reconcileHashCode); // this makes a remoteCall

}

} else {

logger.warn("Not updating application delta as another thread is updating it already");

logger.debug("Ignoring delta update with apps hashcode {}, as another thread is updating it already", delta.getAppsHashCode());

}

}

以上就是對于EurekaClient拉取服務實例信息的源代碼分析,總結EurekaClient 重要緩存如下:EurekaClient第一次全量拉取,定時增量拉取應用服務實例信息,保存在緩存中。

EurekaClient增量拉取失敗,或者增量拉取之后對比hashcode發現不一致,就會執行全量拉取,這樣避免了網絡某時段分片帶來的問題。

同時對于服務調用,如果涉及到ribbon負載均衡,那么ribbon對于這個實例列表也有自己的緩存,這個緩存定時從EurekaClient的緩存更新

EurekaServer端

在EurekaServer端,所有的讀取請求都是讀的ReadOnlyMap(這個可以配置) 有定時任務會定時從ReadWriteMap同步到ReadOnlyMap這個時間配置是:

#eureka server刷新readCacheMap的時間,注意,client讀取的是readCacheMap,這個時間決定了多久會把readWriteCacheMap的緩存更新到readCacheMap上

#默認30s

eureka.server.responseCacheUpdateInvervalMs=3000

相關代碼:

if (shouldUseReadOnlyResponseCache) {

timer.schedule(getCacheUpdateTask(),

new Date(((System.currentTimeMillis() / responseCacheUpdateIntervalMs) * responseCacheUpdateIntervalMs)

+ responseCacheUpdateIntervalMs),

responseCacheUpdateIntervalMs);

}

private TimerTask getCacheUpdateTask() {

return new TimerTask() {

@Override

public void run() {

logger.debug("Updating the client cache from response cache");

for (Key key : readOnlyCacheMap.keySet()) {

if (logger.isDebugEnabled()) {

Object[] args = {key.getEntityType(), key.getName(), key.getVersion(), key.getType()};

logger.debug("Updating the client cache from response cache for key : {} {} {} {}", args);

}

try {

CurrentRequestVersion.set(key.getVersion());

Value cacheValue = readWriteCacheMap.get(key);

Value currentCacheValue = readOnlyCacheMap.get(key);

if (cacheValue != currentCacheValue) {

readOnlyCacheMap.put(key, cacheValue);

}

} catch (Throwable th) {

logger.error("Error while updating the client cache from response cache", th);

}

}

}

};

}

ReadWriteMap是一個LoadingCache,將Registry中的服務實例信息封裝成要返回的http響應(分別是經過gzip壓縮和非壓縮的),同時還有兩個特殊key,ALL_APPS和ALL_APPS_DELTA ALL_APPS就是所有服務實例信息 ALL_APPS_DELTA就是之前講注冊說的RecentlyChangedQueue里面的實例列表封裝的http響應信息

總結

以上是生活随笔為你收集整理的eureka 集群失败的原因_eureka集群中的疑问?的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。