日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程语言 > java >内容正文

java

Java互联网架构-如何设计服务接口API限流功能

發布時間:2024/5/8 java 45 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Java互联网架构-如何设计服务接口API限流功能 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

API 概念的出現,遠遠早于個人計算機的誕生,更不用說網絡的誕生了。在公用數據處理的早期,為了一個應用能夠與其它系統交互,開發者便已開始設計可公開訪問并描述清晰的“接入點”。早在那時,這種做法作為一種準則,已是軟件開發的主流理念。 但是,直到分布式系統的出現,乃至網絡的降臨,這些基礎概念才淋漓的發揮出其重要性和驚人功效。

當我們回顧 API 的歷史,會發現其中有一個階段非常重要。 那是2000年左右,SOA(面向服務的架構)正在發展之中。API 的一種形式在企業應用中誕生。作為 SOA 偉大實踐的一種,這種形式的 API 走出了企業應用的領域,在創新科技的世界里找到了更肥沃的土壤。

到了今天,我們能從技術角度,找出無數原因來解釋為何 web API 能夠在各種類型、不同大小的企業中獲得成功,甚至也廣受政府機構的歡迎。 但實際上,技術并非一切。web API 的成功,還要歸功于很多其它方面的因素。這些因素大多并不那么搶眼,所以需要我們認真的研究歷史,經過仔細觀察才會發現為何那些 web API 的開拓者能夠成功。

時至今日,我們還是要去學習過去十幾年里的最佳實踐。在對那些成功提供 API 的開拓者,包括 Amazon,Salesforce, Ebay,Twitter進行研究時,我們不能忽略任何重要細節。要知道,它們提供的 API 大部分還在運行著。

一、場景描述

很多做服務接口的人或多或少的遇到這樣的場景,由于業務應用系統的負載能力有限,為了防止非預期的請求對系統壓力過大而拖垮業務應用系統。

也就是面對大流量時,如何進行流量控制?

服務接口的流量控制策略:分流、降級、限流等。本文討論下限流策略,雖然降低了服務接口的訪問頻率和并發量,卻換取服務接口和業務應用系統的高可用。

實際場景中常用的限流策略:

Nginx前端限流

按照一定的規則如帳號、IP、系統調用邏輯等在Nginx層面做限流

業務應用系統限流

1、客戶端限流

2、服務端限流

數據庫限流

紅線區,力保數據庫

二、常用的限流算法

常用的限流算法由:樓桶算法和令牌桶算法。本文不具體的詳細說明兩種算法的原理,原理會在接下來的文章中做說明。

1、漏桶算法

漏桶(Leaky Bucket)算法思路很簡單,水(請求)先進入到漏桶里,漏桶以一定的速度出水(接口有響應速率),當水流入速度過大會直接溢出(訪問頻率超過接口響應速率),然后就拒絕請求,可以看出漏桶算法能強行限制數據的傳輸速率.

示意圖如下:

可見這里有兩個變量,一個是桶的大小,支持流量突發增多時可以存多少的水(burst),另一個是水桶漏洞的大小(rate)。

因為漏桶的漏出速率是固定的參數,所以,即使網絡中不存在資源沖突(沒有發生擁塞),漏桶算法也不能使流突發(burst)到端口速率.因此,漏桶算法對于存在突發特性的流量來說缺乏效率.

2、令牌桶算法

令牌桶算法(Token Bucket)和 Leaky Bucket 效果一樣但方向相反的算法,更加容易理解.隨著時間流逝,系統會按恒定1/QPS時間間隔(如果QPS=100,則間隔是10ms)往桶里加入Token(想象和漏洞漏水相反,有個水龍頭在不斷的加水),如果桶已經滿了就不再加了.新請求來臨時,會各自拿走一個Token,如果沒有Token可拿了就阻塞或者拒絕服務.

令牌桶的另外一個好處是可以方便的改變速度. 一旦需要提高速率,則按需提高放入桶中的令牌的速率. 一般會定時(比如100毫秒)往桶中增加一定數量的令牌, 有些變種算法則實時的計算應該增加的令牌的數量.

三、基于Redis功能的實現

簡陋的設計思路:假設一個用戶(用IP判斷)每分鐘訪問某一個服務接口的次數不能超過10次,那么我們可以在Redis中創建一個鍵,并此時我們就設置鍵的過期時間為60秒,每一個用戶對此服務接口的訪問就把鍵值加1,在60秒內當鍵值增加到10的時候,就禁止訪問服務接口。在某種場景中添加訪問時間間隔還是很有必要的。

1)使用Redis的incr命令,將計數器作為Lua腳本

local current

current = redis.call("incr",KEYS[1])

if tonumber(current) == 1 then

redis.call("expire",KEYS[1],1)

end

Lua腳本在Redis中運行,保證了incr和expire兩個操作的原子性。

2)使用Reids的列表結構代替incr命令

FUNCTION LIMIT_API_CALL(ip)

current = LLEN(ip)

IF current > 10 THEN

ERROR "too many requests per second"

ELSE

IF EXISTS(ip) == FALSE

MULTI

RPUSH(ip,ip)

EXPIRE(ip,1)

EXEC

ELSE

RPUSHX(ip,ip)

END

PERFORM_API_CALL()

END

Rate Limit使用Redis的列表作為容器,LLEN用于對訪問次數的檢查,一個事物中包含了RPUSH和EXPIRE兩個命令,用于在第一次執行計數是創建列表并設置過期時間,

RPUSHX在后續的計數操作中進行增加操作。

四、基于令牌桶算法的實現

令牌桶算法可以很好的支撐突然額流量的變化即滿令牌桶數的峰值。

import java.io.BufferedWriter;

import java.io.FileOutputStream;

import java.io.IOException;

import java.io.OutputStreamWriter;

import java.util.Random;

import java.util.concurrent.ArrayBlockingQueue;

import java.util.concurrent.Executors;

import java.util.concurrent.ScheduledExecutorService;

import java.util.concurrent.TimeUnit;

import java.util.concurrent.locks.ReentrantLock;

import com.google.common.base.Preconditions;

import com.netease.datastream.util.framework.LifeCycle;

20 public class TokenBucket implements LifeCycle {

// 默認桶大小個數 即最大瞬間流量是64M

private static final int DEFAULT_BUCKET_SIZE = 1024 * 1024 * 64;

// 一個桶的單位是1字節

private int everyTokenSize = 1;

// 瞬間最大流量

private int maxFlowRate;

// 平均流量

private int avgFlowRate;

// 隊列來緩存桶數量:最大的流量峰值就是 = everyTokenSize*DEFAULT_BUCKET_SIZE 64M = 1 * 1024 * 1024 * 64

private ArrayBlockingQueue<Byte> tokenQueue = new ArrayBlockingQueue<Byte>(DEFAULT_BUCKET_SIZE);

private ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();

private volatile boolean isStart = false;

private ReentrantLock lock = new ReentrantLock(true);

private static final byte A_CHAR = 'a';

public TokenBucket() {

}

public TokenBucket(int maxFlowRate, int avgFlowRate) {

this.maxFlowRate = maxFlowRate;

this.avgFlowRate = avgFlowRate;

}

public TokenBucket(int everyTokenSize, int maxFlowRate, int avgFlowRate) {

this.everyTokenSize = everyTokenSize;

this.maxFlowRate = maxFlowRate;

this.avgFlowRate = avgFlowRate;

}

public void addTokens(Integer tokenNum) {

// 若是桶已經滿了,就不再家如新的令牌

for (int i = 0; i < tokenNum; i++) {

tokenQueue.offer(Byte.valueOf(A_CHAR));

}

}

public TokenBucket build() {

start();

return this;

}

/**

· 獲取足夠的令牌個數

·

· @return

· */

· public boolean getTokens(byte[] dataSize) {

Preconditions.checkNotNull(dataSize);

Preconditions.checkArgument(isStart, "please invoke start method first !");

int needTokenNum = dataSize.length / everyTokenSize + 1;// 傳輸內容大小對應的桶個數

final ReentrantLock lock = this.lock;

lock.lock();

try {

boolean result = needTokenNum <= tokenQueue.size(); // 是否存在足夠的桶數量

if (!result) {

return false;

}

int tokenCount = 0;

for (int i = 0; i < needTokenNum; i++) {

Byte poll = tokenQueue.poll();

if (poll != null) {

tokenCount++;

}

}

return tokenCount == needTokenNum;

} finally {

lock.unlock();

}

}

@Override

public void start() {

// 初始化桶隊列大小

if (maxFlowRate != 0) {

tokenQueue = new ArrayBlockingQueue<Byte>(maxFlowRate);

}

// 初始化令牌生產者

TokenProducer tokenProducer = new TokenProducer(avgFlowRate, this);

scheduledExecutorService.scheduleAtFixedRate(tokenProducer, 0, 1, TimeUnit.SECONDS);

isStart = true;

}

@Override

public void stop() {

isStart = false;

scheduledExecutorService.shutdown();

}

@Override

public boolean isStarted() {

return isStart;

}

class TokenProducer implements Runnable {

private int avgFlowRate;

private TokenBucket tokenBucket;

public TokenProducer(int avgFlowRate, TokenBucket tokenBucket) {

this.avgFlowRate = avgFlowRate;

this.tokenBucket = tokenBucket;

}

@Override

public void run() {

tokenBucket.addTokens(avgFlowRate);

}

}

public static TokenBucket newBuilder() {

return new TokenBucket();

}

public TokenBucket everyTokenSize(int everyTokenSize) {

this.everyTokenSize = everyTokenSize;

return this;

}

public TokenBucket maxFlowRate(int maxFlowRate) {

this.maxFlowRate = maxFlowRate;

return this;

}

public TokenBucket avgFlowRate(int avgFlowRate) {

this.avgFlowRate = avgFlowRate;

return this;

}

private String stringCopy(String data, int copyNum) {

StringBuilder sbuilder = new StringBuilder(data.length() * copyNum);

for (int i = 0; i < copyNum; i++) {

sbuilder.append(data);

}

return sbuilder.toString();

}

public static void main(String[] args) throws IOException, InterruptedException {

tokenTest();

}

private static void arrayTest() {

ArrayBlockingQueue<Integer> tokenQueue = new ArrayBlockingQueue<Integer>(10);

tokenQueue.offer(1);

tokenQueue.offer(1);

tokenQueue.offer(1);

System.out.println(tokenQueue.size());

System.out.println(tokenQueue.remainingCapacity());

}

private static void tokenTest() throws InterruptedException, IOException {

TokenBucket tokenBucket = TokenBucket.newBuilder().avgFlowRate(512).maxFlowRate(1024).build();

BufferedWriter bufferedWriter = new BufferedWriter(new OutputStreamWriter(new FileOutputStream("/tmp/ds_test")));

String data = "xxxx";// 四個字節

for (int i = 1; i <= 1000; i++) {

Random random = new Random();

int i1 = random.nextInt(100);

boolean tokens = tokenBucket.getTokens(tokenBucket.stringCopy(data, i1).getBytes());

TimeUnit.MILLISECONDS.sleep(100);

if (tokens) {

bufferedWriter.write("token pass --- index:" + i1);

System.out.println("token pass --- index:" + i1);

} else {

bufferedWriter.write("token rejuect --- index" + i1);

System.out.println("token rejuect --- index" + i1);

}

bufferedWriter.newLine();

bufferedWriter.flush();

}

bufferedWriter.close();

}

}

總結

到這里,服務接口API限流功能就結束了,,不足之處還望大家多多包涵!!覺得收獲的話可以點個關注收藏轉發一波喔,謝謝大佬們支持。(吹一波,233~~)

下面和大家交流幾點編程的經驗:

1、多寫多敲代碼,好的代碼與扎實的基礎知識一定是實踐出來的

2丶 測試、測試再測試,如果你不徹底測試自己的代碼,那恐怕你開發的就不只是代碼,可能還會聲名狼藉。

3丶 簡化編程,加快速度,代碼風騷,在你完成編碼后,應回頭并且優化它。從長遠來看,這里或那里一些的改進,會讓后來的支持人員更加輕松。

總結

以上是生活随笔為你收集整理的Java互联网架构-如何设计服务接口API限流功能的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。