解锁新姿势 | 如何用配置中心实现全局动态流控?
為什么80%的碼農(nóng)都做不了架構(gòu)師?>>> ??
摘要:?當(dāng)資源成為瓶頸時,服務(wù)框架需要對消費(fèi)者做限流,啟動流控保護(hù)機(jī)制。流量控制有多種策略,比較常用的有:針對訪問速率的靜態(tài)流控、針對資源占用的動態(tài)流控、針對消費(fèi)者并發(fā)連接數(shù)的連接控制和針對并行訪問數(shù)的并發(fā)控制。在分布式架構(gòu)中,應(yīng)用和應(yīng)用之間的調(diào)用類型分為以下兩種,流控方式也略有不同。
?
點(diǎn)此查看原文:https://yq.aliyun.com/articles/380180?spm=a2c41.11181499.0.0
當(dāng)資源成為瓶頸時,服務(wù)框架需要對消費(fèi)者做限流,啟動流控保護(hù)機(jī)制。流量控制有多種策略,比較常用的有:針對訪問速率的靜態(tài)流控、針對資源占用的動態(tài)流控、針對消費(fèi)者并發(fā)連接數(shù)的連接控制和針對并行訪問數(shù)的并發(fā)控制。在實(shí)踐中,各種流量控制策略需要綜合使用才能起到較好的效果。
在分布式架構(gòu)中,應(yīng)用和應(yīng)用之間的調(diào)用類型分為以下兩種,流控方式也略有不同。
同步RPC類調(diào)用,比如RESTful,Dubbo,HSF等都屬于該類。對于該類同步調(diào)用,通常限流方式為兩種:針對服務(wù)提供者的并發(fā)全局流控,或針對服務(wù)消費(fèi)者的并發(fā)局部流控。兩種的控制手段類似,都是通過限制服務(wù)端或客服端并發(fā)調(diào)用數(shù)來進(jìn)行限制。
異步MQ類調(diào)用,典型如RocketMQ, Kafka,等。對于該類異步調(diào)用,通常限流方式是在訂閱端限流。限流方式為兩種:針對消息訂閱者的并發(fā)流控,或針對消息訂閱者的消費(fèi)延時流控。
針對消息訂閱者的消費(fèi)延時流控基本原理是,在每次客戶端消費(fèi)時,可以增加一個延時來控制消費(fèi)速度,這樣理論消費(fèi)并發(fā)最快速度為:
MaxRate = 1 / ConsumInterval * ConcurrentThreadNumber
比如如果消息并發(fā)消費(fèi)線程為20,延時為100ms,則理論上可以將并發(fā)消費(fèi)控制在200以下。具體公式如下:
200 = 1 / 0.1 * 20
相比并發(fā)線程數(shù)流控,消費(fèi)延時流控優(yōu)點(diǎn)在于實(shí)現(xiàn)相對簡單,對MQ類客戶端包依賴較少,不需要客戶端提供控制并發(fā)線程數(shù)的動態(tài)調(diào)整接口。
以上各種流量控制方法,在分布式架構(gòu)下,如果要做到全局動態(tài)控制,一個簡單的技術(shù)方法是依賴配置中心,即通過配置中心來進(jìn)行流控參數(shù)的下發(fā)。
下面章節(jié)詳細(xì)介紹如何基于配置中心來實(shí)現(xiàn)異步消息消費(fèi)的全局動態(tài)流控。使用的例子為阿里云上的 MQ (消息隊(duì)列)和 ACM (應(yīng)用配置管理)兩款產(chǎn)品。
注:之所以用MQ為示例是因?yàn)樵诒疚淖珜懼畷r,正好MQ Consumer Client SDK并不支持動態(tài)調(diào)整現(xiàn)成并發(fā)數(shù),因此通過基于ACM來動態(tài)調(diào)整消費(fèi)延遲的方法正好可以解決MQ消費(fèi)流控動態(tài)的問題。
基于消費(fèi)延時流控的基本原理
基本原理如下。其中,管理員或應(yīng)用程序通過ACM控制臺發(fā)布消費(fèi)延時配置(RCV_INTERVAL_TIME),所有MQ消費(fèi)程序訂閱該配置。理論上,該配置從發(fā)布到下發(fā)所有客戶端,可以在1秒內(nèi)完成(取決于網(wǎng)絡(luò)延時)。
代碼示例
該章節(jié)基于配置中心來實(shí)現(xiàn)異步消息消費(fèi)的全局動態(tài)流控的代碼示例。使用的例子為阿里云上的MQ(消息隊(duì)列)和ACM(應(yīng)用配置管理)兩款產(chǎn)品,基于Java語言。關(guān)于SDK的詳細(xì)介紹,可參見兩款產(chǎn)品的官方文檔。
在ACM上創(chuàng)建消費(fèi)延時的參數(shù),截屏如下。
設(shè)置全局消費(fèi)延時變量
首先,設(shè)置消費(fèi)接收延時的全局變量, 如下。
// 初始化消息接收延時參數(shù),單位為millisecondstatic int RCV_INTERVAL_TIME = 10000;// 初始化配置服務(wù),控制臺通過示例代碼自動獲取下面參數(shù)ConfigService.init("acm.aliyun.com", /*租戶ID*/"xxx", /*AK*/"xxx", /*SK*/"yyy"); // 主動獲取配置String content = ConfigService.getConfig("app.mq.qos", "DEFAULT_GROUP", 6000);Properties p = new Properties();try {p.load(new StringReader(content));RCV_INTERVAL_TIME = Integer.valueOf(p.getProperty("RCV_INTERVAL_TIME"));} catch (IOException e) {e.printStackTrace();}其次,設(shè)置ACM listener,確保當(dāng)配置被修改時,即使更新 RCV_INTERVAL_TIME 參數(shù), 如下。
// 初始化的時候,給配置添加監(jiān)聽,配置變更會回調(diào)通知ConfigService.addListener("app.mq.qos", "DEFAULT_GROUP", new ConfigChangeListener() {public void receiveConfigInfo(String configInfo) {Properties p = new Properties();try {p.load(new StringReader(configInfo));RCV_INTERVAL_TIME = Integer.valueOf(p.getProperty("RCV_INTERVAL_TIME"));} catch (IOException e) {e.printStackTrace();}}});設(shè)置 MQ 消費(fèi)延時邏輯
完整實(shí)例如下。
注:這里 RCV_INTERVAL_TIME 參數(shù)的訪問是故意沒有加鎖的,讀者可以自行思考原因。Aliyun ONS Client不提供動態(tài)線程并發(fā)數(shù),默認(rèn)并發(fā)為20。因此這里正好使用消費(fèi)延時參數(shù)來動態(tài)調(diào)節(jié)QoS。
//以下代碼可直接貼在Main()函數(shù)里Properties properties = new Properties();properties.put(PropertyKeyConst.ConsumerId, "CID_consumer_group");properties.put(PropertyKeyConst.AccessKey,"xxx");properties.put(PropertyKeyConst.SecretKey, "yyy");properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "3000");// 設(shè)置 TCP 接入域名(此處以公共云生產(chǎn)環(huán)境為例)properties.put(PropertyKeyConst.ONSAddr,"http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet");Consumer consumer = ONSFactory.createConsumer(properties);consumer.subscribe(/*Topic*/"topic-name", /*Tag*/null, new MessageListener() {public Action consume(Message message, ConsumeContext context) {// MQ Subscribe QoS logical start, // Each consuming process will sleep for RCV_INTERVAL_TIME seconds with 100 ms sleeping cycle.// Within each cycle, the thread will check RCV_INTERVAL_TIME in case it's set to a smaller value. // RCV_INTERVAL_TIME <= 0 means no sleeping.int rcvIntervalTimeLeft = RCV_INTERVAL_TIME;while (rcvIntervalTimeLeft > 0) {if (rcvIntervalTimeLeft > RCV_INTERVAL_TIME) {rcvIntervalTimeLeft = RCV_INTERVAL_TIME;}try {if (rcvIntervalTimeLeft >= 100) {rcvIntervalTimeLeft -= 100;Thread.sleep(100);} else {Thread.sleep(rcvIntervalTimeLeft);rcvIntervalTimeLeft = 0;}} catch (InterruptedException e) {e.printStackTrace();}}// MQ Subscribe interval logical endsSystem.out.println("Receive: " + message);/** Put your business logic here.*/doSomething();return Action.CommitMessage;}});consumer.start();運(yùn)行結(jié)果
單機(jī)運(yùn)行consumer進(jìn)行消費(fèi),假設(shè)queue內(nèi)的消息無限多,不存在消費(fèi)萬的情況,分三段測試,分別運(yùn)行約5分鐘,通過ACM配置推送來達(dá)到以下效果。
RCV_INTERVAL_TIME = 100 ms
RCV_INTERVAL_TIME = 5000 ms
RCV_INTERVAL_TIME = 1000 ms
結(jié)果如下,在單MQ消費(fèi)業(yè)務(wù)處理耗時約100ms情況下的,單機(jī)并發(fā)20線程的測試結(jié)果。
RCV_INTERVAL_TIME = 100 ms:平均消費(fèi)性能約為 9000 tpm 左右
RCV_INTERVAL_TIME = 5000 ms:平均消費(fèi)性能被限制到了 200 tpm 左右
RCV_INTERVAL_TIME = 1000 ms:平均消費(fèi)性能回升到到了 1100 tpm 左右
以上結(jié)果基本達(dá)到消費(fèi)和 tpm 成反比的預(yù)期,最關(guān)鍵的是整個過程中,應(yīng)用不中斷,流控推送結(jié)果秒級生效到分布式集群。單機(jī)性能結(jié)果如下所示。
轉(zhuǎn)載于:https://my.oschina.net/yunqi/blog/1612514
總結(jié)
以上是生活随笔為你收集整理的解锁新姿势 | 如何用配置中心实现全局动态流控?的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 阿里云CentOS 7.4 配置Ngin
- 下一篇: 命令行以及git基础使用