【架构师视角系列】Apollo配置中心之Client端(二)
原創文章,轉載請標注。https://www.cnblogs.com/boycelee/p/17978027
目錄- 聲明
- 配置中心系列文章
-
一、客戶端架構
-
1、Config Service職責
- (1)配置管理
- (2)配置發布
- (3)配置讀取
-
2、Apollo Client 職責
- (1)配置拉取
- (2)配置注入
- (3)配置變更監聽
-
3、基本交互流程
- (1)應用啟動
- (2)配置變更通知
- (3)配置更新
- (4)配置注入
-
1、Config Service職責
- 二、架構思考
-
三、源碼剖析
-
1、初始化
- (1)邏輯描述
- (2)時序圖
- (3)代碼位置
-
2、查找注解
- (1)邏輯描述
- (2)時序圖
- (3)代碼位置
-
3、建立連接
- (1)邏輯描述
- (2)時序圖
- (3)具體函數
-
4、拉取配置
- (1)邏輯描述
- (2)時序圖
-
(3)代碼實現
- a)配置初始化加載(trySync)
- b)周期配置拉取(schedulePeriodicRefresh)
- c)長輪詢監聽與最新配置拉取(scheduleLongPollingRefresh)
-
5、變更通知
- (1)邏輯描述
- (2)時序圖
- (2)代碼實現
-
6、配置注入
- (1)邏輯描述
- (2)代碼位置
- 四、最后
-
1、初始化
聲明
原創文章,轉載請標注。https://www.cnblogs.com/boycelee/p/17978027
《碼頭工人的一千零一夜》是一位專注于技術干貨分享的博主,追隨博主的文章,你將深入了解業界最新的技術趨勢,以及在Java開發和安全領域的實用經驗分享。無論你是開發人員還是對逆向工程感興趣的愛好者,都能在《碼頭工人的一千零一夜》找到有價值的知識和見解。
配置中心系列文章
《【架構師視角系列】Apollo配置中心之架構設計(一)》https://www.cnblogs.com/boycelee/p/17967590
《【架構師視角系列】Apollo配置中心之Client端(二)》https://www.cnblogs.com/boycelee/p/17978027
一、客戶端架構
架構介紹會從分層、職責、關系以及運行負責四個維度進行描述。
1、Config Service職責
(1)配置管理
Config Service 是Apollo配置中心的服務端組件,負責管理應用程序的配置信息。它存儲和維護應用程序的各種配置項。
(2)配置發布
Config Service 負責將最新的配置發布給注冊在它上面的Apollo Client。當配置發生變更時,Config Service 負責通知所有訂閱了相應配置的客戶端。
(3)配置讀取
Apollo Client 向 Config Service 發送請求,獲取應用程序的配置信息。
2、Apollo Client 職責
(1)配置拉取
Apollo Client 負責向 Config Service 發送配置拉取請求,獲取三方應用程序的配置。
(2)配置注入
Apollo Client 將從 Config Service 獲取到的配置注入到三方應用程序中。
(3)配置變更監聽
Apollo Client 可以注冊對配置變更的監聽器。當 Config Service 發布新的配置時,Apollo Client 能夠感知到配置的變更,并觸發相應的操作。
3、基本交互流程
(1)應用啟動
Apollo Client 在應用啟動時向 Config Service 發送配置拉取請求,獲取初始的配置。
(2)配置變更通知
Config Service 在配置發生變更時,通知所有注冊的 Apollo Client。
(3)配置更新
Apollo Client 接收到配置變更通知后,向 Config Service 發送請求,獲取最新的配置。
(4)配置注入
Apollo Client 將獲取到的最新配置注入到應用程序中,以便使用最新的配置信息。
通過以上交互流程達到應用不需要重啟,動態配置變更的目的。
二、架構思考
架構師視角系列,在分析一款組件的源碼時,需要深入思考其設計背后的動機。以下是讀者在閱讀本篇文章時應思考的問題:
- 配置拉取的設計:
- 思考點: 設計中采用的配置拉取方式是如何選擇的?背后的動機是什么?可能的考慮包括系統性能、可維護性和安全性。
- 配置的注入方式:
- 思考點: 配置是如何被注入到組件中的?這種注入方式有何優勢?設計選擇的原因可能涉及松耦合、動態變化和代碼可維護性等方面。
- 配置變更的通知機制:
- 思考點: 配置變更是如何通知其他組件的?為什么選擇當前的通知機制?可能的考慮包括實時性、效率以及系統整體的架構要求。
- 為什么配置拉取拆分為兩個請求?
- 思考點: 配置拉取為何拆分為兩個獨立的請求?這個設計決策的目的是什么?可能涉及到性能優化、可伸縮性以及減輕服務器負擔的考慮。
- 長輪詢的概念:
- 思考點: 什么是長輪詢?為何在配置方案中選擇使用它?長輪詢的優勢在哪里?可能涉及到減少輪詢頻率、降低網絡開銷以及更及時的配置變更通知。
- 為什么需要做本地文件緩存?
- 思考點: 為什么在組件中引入了本地文件緩存的機制?這樣的設計有哪些優點?可能牽涉到性能優化、離線支持以及用戶體驗的方面。
在深入研究源碼時,理解這些設計決策背后的原因,有助于更全面地理解系統架構,并為自己的設計提供有價值的啟示。
三、源碼剖析
1、初始化
(1)邏輯描述
通過實現Spring框架提供的BeanPostProcessor接口,并完成postProcessBeforeInitialization函數的實現,我們能夠在Bean初始化之前執行自定義的操作。BeanPostProcessor是Spring框架提供的一個擴展點,允許我們在Bean初始化前后插入自定義邏輯。在postProcessBeforeInitialization函數中,我們有機會遍歷Bean的成員變量和函數,實現在初始化之前對它們進行定制化處理的需求。
(2)時序圖
(3)代碼位置
ApolloProcessor#postProcessBeforeInitialization
為了講解更加順暢,會沿著Method上的注解@ApolloConfigChangeListener實現邏輯進行講解。
public abstract class ApolloProcessor implements BeanPostProcessor, PriorityOrdered {
@Override
public Object postProcessBeforeInitialization(Object bean, String beanName)
throws BeansException {
Class clazz = bean.getClass();
// 遍歷Bean中的成員變量
for (Field field : findAllField(clazz)) {
processField(bean, beanName, field);
}
// 遍歷Bean中的所有函數(根據這條邏輯進行講解)
for (Method method : findAllMethod(clazz)) {
processMethod(bean, beanName, method);
}
return bean;
}
...
}
2、查找注解
(1)邏輯描述
創建配置變化的監聽器,并創建namespace對應的config實例,將監聽器注冊到config實例中。當發生配置變更時,會調用監聽器的onChange函數,并利用反射機制通知對應的函數(使用@ApolloConfigChange)。
(2)時序圖
(3)代碼位置
ApolloAnnotationProcessor#processMethod
public class ApolloAnnotationProcessor extends ApolloProcessor implements BeanFactoryAware,
EnvironmentAware {
...
@Override
protected void processMethod(final Object bean, String beanName, final Method method) {
// 處理函數上的注解(@ApolloConfigChange)(關注這里)
this.processApolloConfigChangeListener(bean, method);
this.processApolloJsonValue(bean, beanName, method);
}
private void processApolloConfigChangeListener(final Object bean, final Method method) {
ApolloConfigChangeListener annotation = AnnotationUtils
.findAnnotation(method, ApolloConfigChangeListener.class);
if (annotation == null) {
return;
}
Class<?>[] parameterTypes = method.getParameterTypes();
Preconditions.checkArgument(parameterTypes.length == 1,
"Invalid number of parameters: %s for method: %s, should be 1", parameterTypes.length,
method);
Preconditions.checkArgument(ConfigChangeEvent.class.isAssignableFrom(parameterTypes[0]),
"Invalid parameter type: %s for method: %s, should be ConfigChangeEvent", parameterTypes[0],
method);
ReflectionUtils.makeAccessible(method);
String[] namespaces = annotation.value();
String[] annotatedInterestedKeys = annotation.interestedKeys();
String[] annotatedInterestedKeyPrefixes = annotation.interestedKeyPrefixes();
// 創建配置變化監聽器。當配置發生變化時,會調用onChange函數并使用反射觸發標識@ApolloConfigChange的Method
ConfigChangeListener configChangeListener = new ConfigChangeListener() {
@Override
public void onChange(ConfigChangeEvent changeEvent) {
ReflectionUtils.invokeMethod(method, bean, changeEvent);
}
};
Set<String> interestedKeys =
annotatedInterestedKeys.length > 0 ? Sets.newHashSet(annotatedInterestedKeys) : null;
Set<String> interestedKeyPrefixes =
annotatedInterestedKeyPrefixes.length > 0 ? Sets.newHashSet(annotatedInterestedKeyPrefixes)
: null;
// 遍歷namespace
for (String namespace : namespaces) {
final String resolvedNamespace = this.environment.resolveRequiredPlaceholders(namespace);
// 創建(獲取)Config實例(關注這里)
Config config = ConfigService.getConfig(resolvedNamespace);
// 注冊監聽器
if (interestedKeys == null && interestedKeyPrefixes == null) {
// 將創建的監聽器注冊到namespace對應的config實例中(關注這里)
config.addChangeListener(configChangeListener);
} else {
config.addChangeListener(configChangeListener, interestedKeys, interestedKeyPrefixes);
}
}
}
...
}
3、建立連接
(1)邏輯描述
為注解(@ApolloConfigChange)綁定的namespace創建Config實例,Config實例中會會為namespace創建本地配置倉庫(createLocalConfigRepository處理本地配置存儲)和遠程配置倉庫(createRemoteConfigRepository處理遠程ConfigService配置拉取)。
(2)時序圖
(3)具體函數
ConfigService#getConfig
public class ConfigService {
// 創建一個ConfigService單例
private static final ConfigService s_instance = new ConfigService();
// 獲取 m_configManager 與 m_configRegistry單例
private volatile ConfigManager m_configManager;
private volatile ConfigRegistry m_configRegistry;
// 獲取nanespae對應的config實例(關注這里)
public static Config getConfig(String namespace) {
return s_instance.getManager().getConfig(namespace);
}
(2)具體函數:DefaultConfigManager#getConfig
public class DefaultConfigManager implements ConfigManager {
@Override
public Config getConfig(String namespace) {
Config config = m_configs.get(namespace);
// 每個namespace創建一個Config對象
if (config == null) {
synchronized (this) {
config = m_configs.get(namespace);
if (config == null) {
ConfigFactory factory = m_factoryManager.getFactory(namespace);
//config對象中有,拉取遠程和本地倉庫(Repository)
config = factory.create(namespace);
m_configs.put(namespace, config);
}
}
}
return config;
}
}
(3)具體函數:DefaultConfigFactory#create
創建順序是:1)創建遠端存儲倉庫,從Config Service中拉取配置數據;2)創建本地存儲倉庫,將遠端拉取到的配置文件存儲到本地文件中;3)實例化Config,以供后續獲取配置信息使用。
public class DefaultConfigFactory implements ConfigFactory {
...
@Override
public Config create(String namespace) {
ConfigFileFormat format = determineFileFormat(namespace);
if (ConfigFileFormat.isPropertiesCompatible(format)) {
return this.createRepositoryConfig(namespace, createPropertiesCompatibleFileConfigRepository(namespace, format));
}
// (關注這里)。調用createLocalConfigRepository函數,創建LocalConfigRepository,建立本地存儲倉庫
return this.createRepositoryConfig(namespace, createLocalConfigRepository(namespace));
}
LocalFileConfigRepository createLocalConfigRepository(String namespace) {
if (m_configUtil.isInLocalMode()) {
logger.warn(
"==== Apollo is in local mode! Won't pull configs from remote server for namespace {} ! ====",
namespace);
return new LocalFileConfigRepository(namespace);
}
// (關注這里)。調用createRemoteConfigRepository函數,創建RemoteConfigRepository,建立遠程存儲倉庫
return new LocalFileConfigRepository(namespace, createRemoteConfigRepository(namespace));
}
RemoteConfigRepository createRemoteConfigRepository(String namespace) {
return new RemoteConfigRepository(namespace);
}
...
}
4、拉取配置
(1)邏輯描述
配置拉取主要分為三個關鍵步驟:1)初始化加載配置;2)定期拉取配置;3)通過長輪詢進行刷新。在這其中,長輪詢刷新階段又分為兩個請求:1)配置更新通知(通過長輪詢實現);2)詳細配置拉取。通過這個流程,系統能夠實現配置的及時更新,確保應用程序始終使用最新的配置信息。
(2)時序圖
(3)代碼實現
具體函數:RemoteConfigRepository#RemoteConfigRepository
public class RemoteConfigRepository extends AbstractConfigRepository {
...
public RemoteConfigRepository(String namespace) {
m_namespace = namespace;
m_configCache = new AtomicReference<>();
m_configUtil = ApolloInjector.getInstance(ConfigUtil.class);
m_httpClient = ApolloInjector.getInstance(HttpClient.class);
m_serviceLocator = ApolloInjector.getInstance(ConfigServiceLocator.class);
remoteConfigLongPollService = ApolloInjector.getInstance(RemoteConfigLongPollService.class);
m_longPollServiceDto = new AtomicReference<>();
m_remoteMessages = new AtomicReference<>();
m_loadConfigRateLimiter = RateLimiter.create(m_configUtil.getLoadConfigQPS());
m_configNeedForceRefresh = new AtomicBoolean(true);
m_loadConfigFailSchedulePolicy = new ExponentialSchedulePolicy(m_configUtil.getOnErrorRetryInterval(),
m_configUtil.getOnErrorRetryInterval() * 8);
// 初始化加載配置
this.trySync();
// 周期拉取配置(apollo定時兜底)
this.schedulePeriodicRefresh();
// 長輪詢
this.scheduleLongPollingRefresh();
}
}
a)配置初始化加載(trySync)
具體函數:AbstractConfigRepository#trySync
時序圖:
邏輯描述:namespace初始化加載配置
public abstract class AbstractConfigRepository implements ConfigRepository {
...
// 獲取配置內容
protected boolean trySync() {
try {
// (關注這里)獲取配置
sync();
return true;
} catch (Throwable ex) {
Tracer.logEvent("ApolloConfigException", ExceptionUtil.getDetailMessage(ex));
logger
.warn("Sync config failed, will retry. Repository {}, reason: {}", this.getClass(), ExceptionUtil
.getDetailMessage(ex));
}
return false;
}
...
}
具體函數:RemoteConfigRepository#sync()
public class RemoteConfigRepository extends AbstractConfigRepository {
...
@Override
protected synchronized void sync() {
Transaction transaction = Tracer.newTransaction("Apollo.ConfigService", "syncRemoteConfig");
try {
ApolloConfig previous = m_configCache.get();
// 加載配置(關注這里)
ApolloConfig current = loadApolloConfig();
//reference equals means HTTP 304
if (previous != current) {
logger.debug("Remote Config refreshed!");
m_configCache.set(current);
// 通知Repository監聽器,配置發生變化(關注這里)
this.fireRepositoryChange(m_namespace, this.getConfig());
}
if (current != null) {
Tracer.logEvent(String.format("Apollo.Client.Configs.%s", current.getNamespaceName()),
current.getReleaseKey());
}
transaction.setStatus(Transaction.SUCCESS);
} catch (Throwable ex) {
transaction.setStatus(ex);
throw ex;
} finally {
transaction.complete();
}
}
// 加載配置
private ApolloConfig loadApolloConfig() {
// 限流,避免創建過多連接。同一個namespace會有多種觸發loadApolloConfig函數的方式
if (!m_loadConfigRateLimiter.tryAcquire(5, TimeUnit.SECONDS)) {
//wait at most 5 seconds
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
}
}
String appId = m_configUtil.getAppId();
String cluster = m_configUtil.getCluster();
String dataCenter = m_configUtil.getDataCenter();
String secret = m_configUtil.getAccessKeySecret();
Tracer.logEvent("Apollo.Client.ConfigMeta", STRING_JOINER.join(appId, cluster, m_namespace));
int maxRetries = m_configNeedForceRefresh.get() ? 2 : 1;
long onErrorSleepTime = 0; // 0 means no sleep
Throwable exception = null;
// 從meta server中獲取注冊到eureka的config service
List<ServiceDTO> configServices = getConfigServices();
String url = null;
retryLoopLabel:
for (int i = 0; i < maxRetries; i++) {
List<ServiceDTO> randomConfigServices = Lists.newLinkedList(configServices);
Collections.shuffle(randomConfigServices);
if (m_longPollServiceDto.get() != null) {
randomConfigServices.add(0, m_longPollServiceDto.getAndSet(null));
}
for (ServiceDTO configService : randomConfigServices) {
if (onErrorSleepTime > 0) {
logger.warn(
"Load config failed, will retry in {} {}. appId: {}, cluster: {}, namespaces: {}",
onErrorSleepTime, m_configUtil.getOnErrorRetryIntervalTimeUnit(), appId, cluster, m_namespace);
try {
m_configUtil.getOnErrorRetryIntervalTimeUnit().sleep(onErrorSleepTime);
} catch (InterruptedException e) {
//ignore
}
}
// 拼接請求config service獲取配置的url
url = assembleQueryConfigUrl(configService.getHomepageUrl(), appId, cluster, m_namespace,
dataCenter, m_remoteMessages.get(), m_configCache.get());
logger.debug("Loading config from {}", url);
HttpRequest request = new HttpRequest(url);
if (!StringUtils.isBlank(secret)) {
Map<String, String> headers = Signature.buildHttpHeaders(url, appId, secret);
request.setHeaders(headers);
}
Transaction transaction = Tracer.newTransaction("Apollo.ConfigService", "queryConfig");
transaction.addData("Url", url);
try {
// 發送請求
HttpResponse<ApolloConfig> response = m_httpClient.doGet(request, ApolloConfig.class);
m_configNeedForceRefresh.set(false);
m_loadConfigFailSchedulePolicy.success();
transaction.addData("StatusCode", response.getStatusCode());
transaction.setStatus(Transaction.SUCCESS);
// 如果配置沒有變更,config service會返回304狀態碼
if (response.getStatusCode() == 304) {
logger.debug("Config server responds with 304 HTTP status code.");
// 緩存中拉取歷史配置
return m_configCache.get();
}
ApolloConfig result = response.getBody();
logger.debug("Loaded config for {}: {}", m_namespace, result);
// 如果配置變更,這會直接返回
return result;
} catch (ApolloConfigStatusCodeException ex) {
ApolloConfigStatusCodeException statusCodeException = ex;
//config not found
if (ex.getStatusCode() == 404) {
String message = String.format(
"Could not find config for namespace - appId: %s, cluster: %s, namespace: %s, " +
"please check whether the configs are released in Apollo!",
appId, cluster, m_namespace);
statusCodeException = new ApolloConfigStatusCodeException(ex.getStatusCode(),
message);
}
Tracer.logEvent("ApolloConfigException", ExceptionUtil.getDetailMessage(statusCodeException));
transaction.setStatus(statusCodeException);
exception = statusCodeException;
if(ex.getStatusCode() == 404) {
break retryLoopLabel;
}
} catch (Throwable ex) {
Tracer.logEvent("ApolloConfigException", ExceptionUtil.getDetailMessage(ex));
transaction.setStatus(ex);
exception = ex;
} finally {
transaction.complete();
}
// if force refresh, do normal sleep, if normal config load, do exponential sleep
onErrorSleepTime = m_configNeedForceRefresh.get() ? m_configUtil.getOnErrorRetryInterval() :
m_loadConfigFailSchedulePolicy.fail();
}
}
...
}
b)周期配置拉取(schedulePeriodicRefresh)
具體函數:RemoteConfigRepository#schedulePeriodicRefresh
時序圖:
邏輯描述:周期拉取配置(apollo定時兜底)
public class RemoteConfigRepository extends AbstractConfigRepository {
...
private final static ScheduledExecutorService m_executorService;
static {
m_executorService = Executors.newScheduledThreadPool(1,
ApolloThreadFactory.create("RemoteConfigRepository", true));
}
// 定時拉取配置
private void schedulePeriodicRefresh() {
logger.debug("Schedule periodic refresh with interval: {} {}",
m_configUtil.getRefreshInterval(), m_configUtil.getRefreshIntervalTimeUnit());
// 固定時間間隔執行任務
m_executorService.scheduleAtFixedRate(
new Runnable() {
@Override
public void run() {
Tracer.logEvent("Apollo.ConfigService", String.format("periodicRefresh: %s", m_namespace));
logger.debug("refresh config for namespace: {}", m_namespace);
trySync();
Tracer.logEvent("Apollo.Client.Version", Apollo.VERSION);
}
}, m_configUtil.getRefreshInterval(), m_configUtil.getRefreshInterval(),
m_configUtil.getRefreshIntervalTimeUnit());
}
...
}
c)長輪詢監聽與最新配置拉取(scheduleLongPollingRefresh)
具體函數:RemoteConfigRepository#scheduleLongPollingRefresh()
時序圖:
邏輯描述:建立長輪詢,監聽配置變更通知通知后加載最新配置
public class RemoteConfigRepository extends AbstractConfigRepository {
private void scheduleLongPollingRefresh() {
remoteConfigLongPollService.submit(m_namespace, this);
}
}
(8)具體函數:RemoteConfigLongPollService#submit()
public class RemoteConfigLongPollService {
private final ExecutorService m_longPollingService;
public RemoteConfigLongPollService() {
m_longPollFailSchedulePolicyInSecond = new ExponentialSchedulePolicy(1, 120); //in second
m_longPollingStopped = new AtomicBoolean(false);
m_longPollingService = Executors.newSingleThreadExecutor(
ApolloThreadFactory.create("RemoteConfigLongPollService", true));
m_longPollStarted = new AtomicBoolean(false);
m_longPollNamespaces =
Multimaps.synchronizedSetMultimap(HashMultimap.<String, RemoteConfigRepository>create());
m_notifications = Maps.newConcurrentMap();
m_remoteNotificationMessages = Maps.newConcurrentMap();
m_responseType = new TypeToken<List<ApolloConfigNotification>>() {
}.getType();
m_configUtil = ApolloInjector.getInstance(ConfigUtil.class);
m_httpClient = ApolloInjector.getInstance(HttpClient.class);
m_serviceLocator = ApolloInjector.getInstance(ConfigServiceLocator.class);
m_longPollRateLimiter = RateLimiter.create(m_configUtil.getLongPollQPS());
}
public boolean submit(String namespace, RemoteConfigRepository remoteConfigRepository) {
// 如果長輪詢已經啟動,就不會再往線程池里添加runnable(通過m_longPollStarted判斷是否啟動),但是會往m_longPollNamespaces中添加需要被通知變更的namespace對應的remoteConfigRepository
boolean added = m_longPollNamespaces.put(namespace, remoteConfigRepository);
m_notifications.putIfAbsent(namespace, INIT_NOTIFICATION_ID);
if (!m_longPollStarted.get()) {
startLongPolling();
}
return added;
}
// 多個namespace,也只有一個長輪詢
private void startLongPolling() {
if (!m_longPollStarted.compareAndSet(false, true)) {
//already started
return;
}
try {
final String appId = m_configUtil.getAppId();
final String cluster = m_configUtil.getCluster();
final String dataCenter = m_configUtil.getDataCenter();
final String secret = m_configUtil.getAccessKeySecret();
final long longPollingInitialDelayInMills = m_configUtil.getLongPollingInitialDelayInMills();
// 單線程連接池
m_longPollingService.submit(new Runnable() {
@Override
public void run() {
if (longPollingInitialDelayInMills > 0) {
try {
logger.debug("Long polling will start in {} ms.", longPollingInitialDelayInMills);
TimeUnit.MILLISECONDS.sleep(longPollingInitialDelayInMills);
} catch (InterruptedException e) {
//ignore
}
}
doLongPollingRefresh(appId, cluster, dataCenter, secret);
}
});
} catch (Throwable ex) {
m_longPollStarted.set(false);
ApolloConfigException exception =
new ApolloConfigException("Schedule long polling refresh failed", ex);
Tracer.logError(exception);
logger.warn(ExceptionUtil.getDetailMessage(exception));
}
}
private void doLongPollingRefresh(String appId, String cluster, String dataCenter, String secret) {
final Random random = new Random();
ServiceDTO lastServiceDto = null;
// 只要不中斷就循環
while (!m_longPollingStopped.get() && !Thread.currentThread().isInterrupted()) {
// limiter令牌桶限流為2qps, 5秒之內存在沒有獲取到1個令牌的情況,則休眠5秒
if (!m_longPollRateLimiter.tryAcquire(5, TimeUnit.SECONDS)) {
//wait at most 5 seconds
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
}
}
Transaction transaction = Tracer.newTransaction("Apollo.ConfigService", "pollNotification");
String url = null;
try {
if (lastServiceDto == null) {
List<ServiceDTO> configServices = getConfigServices();
lastServiceDto = configServices.get(random.nextInt(configServices.size()));
}
url =
assembleLongPollRefreshUrl(lastServiceDto.getHomepageUrl(), appId, cluster, dataCenter,
m_notifications);
logger.debug("Long polling from {}", url);
HttpRequest request = new HttpRequest(url);
request.setReadTimeout(LONG_POLLING_READ_TIMEOUT);
if (!StringUtils.isBlank(secret)) {
Map<String, String> headers = Signature.buildHttpHeaders(url, appId, secret);
request.setHeaders(headers);
}
transaction.addData("Url", url);
final HttpResponse<List<ApolloConfigNotification>> response =
m_httpClient.doGet(request, m_responseType);
logger.debug("Long polling response: {}, url: {}", response.getStatusCode(), url);
if (response.getStatusCode() == 200 && response.getBody() != null) {
updateNotifications(response.getBody());
updateRemoteNotifications(response.getBody());
transaction.addData("Result", response.getBody().toString());
// 此處通知,執行notify之后加載數據
notify(lastServiceDto, response.getBody());
}
//try to load balance
if (response.getStatusCode() == 304 && random.nextBoolean()) {
lastServiceDto = null;
}
m_longPollFailSchedulePolicyInSecond.success();
transaction.addData("StatusCode", response.getStatusCode());
transaction.setStatus(Transaction.SUCCESS);
} catch (Throwable ex) {
lastServiceDto = null;
Tracer.logEvent("ApolloConfigException", ExceptionUtil.getDetailMessage(ex));
transaction.setStatus(ex);
long sleepTimeInSecond = m_longPollFailSchedulePolicyInSecond.fail();
logger.warn(
"Long polling failed, will retry in {} seconds. appId: {}, cluster: {}, namespaces: {}, long polling url: {}, reason: {}",
sleepTimeInSecond, appId, cluster, assembleNamespaces(), url, ExceptionUtil.getDetailMessage(ex));
try {
TimeUnit.SECONDS.sleep(sleepTimeInSecond);
} catch (InterruptedException ie) {
//ignore
}
} finally {
transaction.complete();
}
}
}
private void notify(ServiceDTO lastServiceDto, List<ApolloConfigNotification> notifications) {
if (notifications == null || notifications.isEmpty()) {
return;
}
for (ApolloConfigNotification notification : notifications) {
String namespaceName = notification.getNamespaceName();
//create a new list to avoid ConcurrentModificationException
List<RemoteConfigRepository> toBeNotified =
Lists.newArrayList(m_longPollNamespaces.get(namespaceName));
ApolloNotificationMessages originalMessages = m_remoteNotificationMessages.get(namespaceName);
ApolloNotificationMessages remoteMessages = originalMessages == null ? null : originalMessages.clone();
//since .properties are filtered out by default, so we need to check if there is any listener for it
toBeNotified.addAll(m_longPollNamespaces
.get(String.format("%s.%s", namespaceName, ConfigFileFormat.Properties.getValue())));
for (RemoteConfigRepository remoteConfigRepository : toBeNotified) {
try {
remoteConfigRepository.onLongPollNotified(lastServiceDto, remoteMessages);
} catch (Throwable ex) {
Tracer.logError(ex);
}
}
}
}
public void onLongPollNotified(ServiceDTO longPollNotifiedServiceDto, ApolloNotificationMessages remoteMessages) {
m_longPollServiceDto.set(longPollNotifiedServiceDto);
m_remoteMessages.set(remoteMessages);
m_executorService.submit(new Runnable() {
@Override
public void run() {
m_configNeedForceRefresh.set(true);
trySync();
}
});
}
}
(9)AbstractConfigRepository#trySync()
public abstract class AbstractConfigRepository implements ConfigRepository {
// 拉配置信息,不是notificationID,而是配置內容
protected boolean trySync() {
try {
sync();
return true;
} catch (Throwable ex) {
Tracer.logEvent("ApolloConfigException", ExceptionUtil.getDetailMessage(ex));
logger
.warn("Sync config failed, will retry. Repository {}, reason: {}", this.getClass(), ExceptionUtil
.getDetailMessage(ex));
}
return false;
}
// 子類RemoteConfigRepository中實現
protected synchronized void sync() {
Transaction transaction = Tracer.newTransaction("Apollo.ConfigService", "syncRemoteConfig");
try {
ApolloConfig previous = m_configCache.get();
ApolloConfig current = loadApolloConfig();
//reference equals means HTTP 304
if (previous != current) {
logger.debug("Remote Config refreshed!");
m_configCache.set(current);
this.fireRepositoryChange(m_namespace, this.getConfig());
}
if (current != null) {
Tracer.logEvent(String.format("Apollo.Client.Configs.%s", current.getNamespaceName()),
current.getReleaseKey());
}
transaction.setStatus(Transaction.SUCCESS);
} catch (Throwable ex) {
transaction.setStatus(ex);
throw ex;
} finally {
transaction.complete();
}
}
protected void fireRepositoryChange(String namespace, Properties newProperties) {
for (RepositoryChangeListener listener : m_listeners) {
try {
listener.onRepositoryChange(namespace, newProperties);
} catch (Throwable ex) {
Tracer.logError(ex);
logger.error("Failed to invoke repository change listener {}", listener.getClass(), ex);
}
}
}
}
5、變更通知
(1)邏輯描述
在namespace數據發生變更時,系統將通知所有監聽該namespace的監聽器。系統會比較新老配置,將差異配置存儲在ConfigChange中,并隨后通知各個監聽器。
(2)時序圖
(2)代碼實現
DefaultConfig#onRepositoryChange()
public class DefaultConfig extends AbstractConfig implements RepositoryChangeListener {
...
/**
* Repository通知變更,
* @param namespace the namespace of this repository change
* @param newProperties the properties after change
*/
@Override
public synchronized void onRepositoryChange(String namespace, Properties newProperties) {
if (newProperties.equals(m_configProperties.get())) {
return;
}
ConfigSourceType sourceType = m_configRepository.getSourceType();
Properties newConfigProperties = propertiesFactory.getPropertiesInstance();
newConfigProperties.putAll(newProperties);
// 計算配置變更情況,對新老配置進行比較,將差異配置存儲在ConfigChange中Map的格式為{namspace:{key:value}}
Map<String, ConfigChange> actualChanges = updateAndCalcConfigChanges(newConfigProperties,
sourceType);
//check double checked result
if (actualChanges.isEmpty()) {
return;
}
// 將具體變更通知各監聽器
this.fireConfigChange(m_namespace, actualChanges);
Tracer.logEvent("Apollo.Client.ConfigChanges", m_namespace);
}
// 構建Method的變更事件ConfigChange參數
private Map<String, ConfigChange> updateAndCalcConfigChanges(Properties newConfigProperties,
ConfigSourceType sourceType) {
List<ConfigChange> configChanges = calcPropertyChanges(m_namespace, m_configProperties.get(), newConfigProperties);
ImmutableMap.Builder<String, ConfigChange> actualChanges =
new ImmutableMap.Builder<>();
/** === Double check since DefaultConfig has multiple config sources ==== **/
//1. use getProperty to update configChanges's old value
for (ConfigChange change : configChanges) {
change.setOldValue(this.getProperty(change.getPropertyName(), change.getOldValue()));
}
//2. update m_configProperties
updateConfig(newConfigProperties, sourceType);
clearConfigCache();
//3. use getProperty to update configChange's new value and calc the final changes
for (ConfigChange change : configChanges) {
change.setNewValue(this.getProperty(change.getPropertyName(), change.getNewValue()));
switch (change.getChangeType()) {
case ADDED:
if (Objects.equals(change.getOldValue(), change.getNewValue())) {
break;
}
if (change.getOldValue() != null) {
change.setChangeType(PropertyChangeType.MODIFIED);
}
actualChanges.put(change.getPropertyName(), change);
break;
case MODIFIED:
if (!Objects.equals(change.getOldValue(), change.getNewValue())) {
actualChanges.put(change.getPropertyName(), change);
}
break;
case DELETED:
if (Objects.equals(change.getOldValue(), change.getNewValue())) {
break;
}
if (change.getNewValue() != null) {
change.setChangeType(PropertyChangeType.MODIFIED);
}
actualChanges.put(change.getPropertyName(), change);
break;
default:
//do nothing
break;
}
}
return actualChanges.build();
}
/**
* 父類中實現
* 配置變更通知,通知監聽器
* @param changes map's key is config property's key
*/
protected void fireConfigChange(String namespace, Map<String, ConfigChange> changes) {
final Set<String> changedKeys = changes.keySet();
final List<ConfigChangeListener> listeners = this.findMatchedConfigChangeListeners(changedKeys);
// notify those listeners
for (ConfigChangeListener listener : listeners) {
Set<String> interestedChangedKeys = resolveInterestedChangedKeys(listener, changedKeys);
InterestedConfigChangeEvent interestedConfigChangeEvent = new InterestedConfigChangeEvent(
namespace, changes, interestedChangedKeys);
this.notifyAsync(listener, interestedConfigChangeEvent);
}
}
/**
* 異步通知
* @param listener
* @param changeEvent
*/
private void notifyAsync(final ConfigChangeListener listener, final ConfigChangeEvent changeEvent) {
m_executorService.submit(new Runnable() {
@Override
public void run() {
String listenerName = listener.getClass().getName();
Transaction transaction = Tracer.newTransaction("Apollo.ConfigChangeListener", listenerName);
try {
listener.onChange(changeEvent);
transaction.setStatus(Transaction.SUCCESS);
} catch (Throwable ex) {
transaction.setStatus(ex);
Tracer.logError(ex);
logger.error("Failed to invoke config change listener {}", listenerName, ex);
} finally {
transaction.complete();
}
}
});
}
...
}
6、配置注入
(1)邏輯描述
在配置發生變更后,系統會通知在Bean初始化時創建的與namespace對應的監聽器。接著,系統通過反射的方式觸發相應的函數(使用@ApolloConfigChange注解)。
(2)代碼位置
ConfigChangeListener#onChange()
public class ApolloAnnotationProcessor extends ApolloProcessor implements BeanFactoryAware,
EnvironmentAware {
...
private void processApolloConfigChangeListener(final Object bean, final Method method) {
ApolloConfigChangeListener annotation = AnnotationUtils
.findAnnotation(method, ApolloConfigChangeListener.class);
if (annotation == null) {
return;
}
Class<?>[] parameterTypes = method.getParameterTypes();
Preconditions.checkArgument(parameterTypes.length == 1,
"Invalid number of parameters: %s for method: %s, should be 1", parameterTypes.length,
method);
Preconditions.checkArgument(ConfigChangeEvent.class.isAssignableFrom(parameterTypes[0]),
"Invalid parameter type: %s for method: %s, should be ConfigChangeEvent", parameterTypes[0],
method);
ReflectionUtils.makeAccessible(method);
// value 是 namespace
String[] namespaces = annotation.value();
String[] annotatedInterestedKeys = annotation.interestedKeys();
String[] annotatedInterestedKeyPrefixes = annotation.interestedKeyPrefixes();
// 創建配置變化監聽器
ConfigChangeListener configChangeListener = new ConfigChangeListener() {
@Override
public void onChange(ConfigChangeEvent changeEvent) {
ReflectionUtils.invokeMethod(method, bean, changeEvent);
}
};
}
...
}
四、最后
《碼頭工人的一千零一夜》是一位專注于技術干貨分享的博主,追隨博主的文章,你將深入了解業界最新的技術趨勢,以及在開發和安全領域的實用經驗分享。無論你是Java開發人員還是對逆向工程感興趣的愛好者,都能在《碼頭工人的一千零一夜》找到有價值的知識和見解。
懂得不多,做得太少。歡迎批評、指正。
總結
以上是生活随笔為你收集整理的【架构师视角系列】Apollo配置中心之Client端(二)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Feign源码解析7:nacos loa
- 下一篇: 文心一言 VS 讯飞星火 VS chat