服務接口API限流 Rate Limit (token blucket)

 

服務接口API限流 Rate Limit

1、場景描述                                                                                                html

     不少作服務接口的人或多或少的遇到這樣的場景,因爲業務應用系統的負載能力有限,爲了防止非預期的請求對系統壓力過大而拖垮業務應用系統。前端

    也就是面對大流量時,如何進行流量控制?java

    服務接口的流量控制策略:分流、降級、限流等。本文討論下限流策略,雖然下降了服務接口的訪問頻率和併發量,卻換取服務接口和業務應用系統的高可用。redis

     實際場景中經常使用的限流策略:算法

  • Nginx前端限流

         按照必定的規則如賬號、IP、系統調用邏輯等在Nginx層面作限流數據庫

  • 業務應用系統限流

        一、客戶端限流緩存

        二、服務端限流網絡

  • 數據庫限流

        紅線區,力保數據庫併發

2、經常使用的限流算法                                                                                       app

     經常使用的限流算法由:樓桶算法和令牌桶算法。本文不具體的詳細說明兩種算法的原理,原理會在接下來的文章中作說明。

     一、漏桶算法

         漏桶(Leaky Bucket)算法思路很簡單,水(請求)先進入到漏桶裏,漏桶以必定的速度出水(接口有響應速率),當水流入速度過大會直接溢出(訪問頻率超過接口響應速率),而後就拒絕請求,能夠看出漏桶算法能強行限制數據的傳輸速率.示意圖以下:

   

         可見這裏有兩個變量,一個是桶的大小,支持流量突發增多時能夠存多少的水(burst),另外一個是水桶漏洞的大小(rate)。

         由於漏桶的漏出速率是固定的參數,因此,即便網絡中不存在資源衝突(沒有發生擁塞),漏桶算法也不能使流突發(burst)到端口速率.所以,漏桶算法對於存在突發特性的流量來講缺少效率.

     二、令牌桶算法

         令牌桶算法(Token Bucket)和 Leaky Bucket 效果同樣但方向相反的算法,更加容易理解.隨着時間流逝,系統會按恆定1/QPS時間間隔(若是QPS=100,則間隔是10ms)往桶裏加入Token(想象和漏洞漏水相反,有個水龍頭在不斷的加水),若是桶已經滿了就再也不加了.新請求來臨時,會各自拿走一個Token,若是沒有Token可拿了就阻塞或者拒絕服務.

 

  令牌桶的另一個好處是能夠方便的改變速度. 一旦須要提升速率,則按需提升放入桶中的令牌的速率. 通常會定時(好比100毫秒)往桶中增長必定數量的令牌, 有些變種算法則實時的計算應該增長的令牌的數量.

3、基於Redis功能的實現                                                                                

       簡陋的設計思路:假設一個用戶(用IP判斷)每分鐘訪問某一個服務接口的次數不能超過10次,那麼咱們能夠在Redis中建立一個鍵,並此時咱們就設置鍵的過時時間爲60秒,每個用戶對此服務接口的訪問就把鍵值加1,在60秒內當鍵值增長到10的時候,就禁止訪問服務接口。在某種場景中添加訪問時間間隔仍是頗有必要的。

      1)使用Redis的incr命令,將計數器做爲Lua腳本         

1 local current
2 current = redis.call("incr",KEYS[1])
3 if tonumber(current) == 1 then
4     redis.call("expire",KEYS[1],1)
5 end

        Lua腳本在Redis中運行,保證了incr和expire兩個操做的原子性。

       2)使用Reids的列表結構代替incr命令

複製代碼

1 FUNCTION LIMIT_API_CALL(ip)
 2 current = LLEN(ip)
 3 IF current > 10 THEN
 4     ERROR "too many requests per second"
 5 ELSE
 6     IF EXISTS(ip) == FALSE
 7         MULTI
 8             RPUSH(ip,ip)
 9             EXPIRE(ip,1)
10         EXEC
11     ELSE
12         RPUSHX(ip,ip)
13     END
14     PERFORM_API_CALL()
15 END

複製代碼

         Rate Limit使用Redis的列表做爲容器,LLEN用於對訪問次數的檢查,一個事物中包含了RPUSH和EXPIRE兩個命令,用於在第一次執行計數是建立列表並設置過時時間,

    RPUSHX在後續的計數操做中進行增長操做。

4、基於令牌桶算法的實現                                                                                

       令牌桶算法能夠很好的支撐忽然額流量的變化即滿令牌桶數的峯值。

      

複製代碼

1 import java.io.BufferedWriter;
  2 import java.io.FileOutputStream;
  3 import java.io.IOException;
  4 import java.io.OutputStreamWriter;
  5 import java.util.Random;
  6 import java.util.concurrent.ArrayBlockingQueue;
  7 import java.util.concurrent.Executors;
  8 import java.util.concurrent.ScheduledExecutorService;
  9 import java.util.concurrent.TimeUnit;
 10 import java.util.concurrent.locks.ReentrantLock;
 11  
 12 import com.google.common.base.Preconditions;
 13 import com.netease.datastream.util.framework.LifeCycle;
 14  
 15
 20 public class TokenBucket implements LifeCycle {
 21  
 22 // 默認桶大小個數 即最大瞬間流量是64M
 23  private static final int DEFAULT_BUCKET_SIZE = 1024 * 1024 * 64;
 24  
 25 // 一個桶的單位是1字節
 26  private int everyTokenSize = 1;
 27  
 28 // 瞬間最大流量
 29  private int maxFlowRate;
 30  
 31 // 平均流量
 32  private int avgFlowRate;
 33  
 34 // 隊列來緩存桶數量:最大的流量峯值就是 = everyTokenSize*DEFAULT_BUCKET_SIZE 64M = 1 * 1024 * 1024 * 64
 35  private ArrayBlockingQueue<Byte> tokenQueue = new ArrayBlockingQueue<Byte>(DEFAULT_BUCKET_SIZE);
 36  
 37 private ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
 38  
 39 private volatile boolean isStart = false;
 40  
 41 private ReentrantLock lock = new ReentrantLock(true);
 42  
 43 private static final byte A_CHAR = 'a';
 44  
 45 public TokenBucket() {
 46  }
 47  
 48 public TokenBucket(int maxFlowRate, int avgFlowRate) {
 49  this.maxFlowRate = maxFlowRate;
 50  this.avgFlowRate = avgFlowRate;
 51  }
 52  
 53 public TokenBucket(int everyTokenSize, int maxFlowRate, int avgFlowRate) {
 54  this.everyTokenSize = everyTokenSize;
 55  this.maxFlowRate = maxFlowRate;
 56  this.avgFlowRate = avgFlowRate;
 57  }
 58  
 59 public void addTokens(Integer tokenNum) {
 60  
 61 // 如果桶已經滿了,就再也不家如新的令牌
 62  for (int i = 0; i < tokenNum; i++) {
 63  tokenQueue.offer(Byte.valueOf(A_CHAR));
 64  }
 65  }
 66  
 67 public TokenBucket build() {
 68  
 69 start();
 70  return this;
 71  }
 72  
 73 /**
 74  * 獲取足夠的令牌個數
 75  *
 76  * @return
 77  */
 78  public boolean getTokens(byte[] dataSize) {
 79  
 80 Preconditions.checkNotNull(dataSize);
 81  Preconditions.checkArgument(isStart, "please invoke start method first !");
 82  
 83 int needTokenNum = dataSize.length / everyTokenSize + 1;// 傳輸內容大小對應的桶個數
 84  
 85 final ReentrantLock lock = this.lock;
 86  lock.lock();
 87  try {
 88  boolean result = needTokenNum <= tokenQueue.size(); // 是否存在足夠的桶數量
 89  if (!result) {
 90  return false;
 91  }
 92  
 93 int tokenCount = 0;
 94  for (int i = 0; i < needTokenNum; i++) {
 95  Byte poll = tokenQueue.poll();
 96  if (poll != null) {
 97  tokenCount++;
 98  }
 99  }
100  
101 return tokenCount == needTokenNum;
102  } finally {
103  lock.unlock();
104  }
105  }
106  
107 @Override
108  public void start() {
109  
110 // 初始化桶隊列大小
111  if (maxFlowRate != 0) {
112  tokenQueue = new ArrayBlockingQueue<Byte>(maxFlowRate);
113  }
114  
115 // 初始化令牌生產者
116  TokenProducer tokenProducer = new TokenProducer(avgFlowRate, this);
117  scheduledExecutorService.scheduleAtFixedRate(tokenProducer, 0, 1, TimeUnit.SECONDS);
118  isStart = true;
119  
120 }
121  
122 @Override
123  public void stop() {
124  isStart = false;
125  scheduledExecutorService.shutdown();
126  }
127  
128 @Override
129  public boolean isStarted() {
130  return isStart;
131  }
132  
133 class TokenProducer implements Runnable {
134  
135 private int avgFlowRate;
136  private TokenBucket tokenBucket;
137  
138 public TokenProducer(int avgFlowRate, TokenBucket tokenBucket) {
139  this.avgFlowRate = avgFlowRate;
140  this.tokenBucket = tokenBucket;
141  }
142  
143 @Override
144  public void run() {
145  tokenBucket.addTokens(avgFlowRate);
146  }
147  }
148  
149 public static TokenBucket newBuilder() {
150  return new TokenBucket();
151  }
152  
153 public TokenBucket everyTokenSize(int everyTokenSize) {
154  this.everyTokenSize = everyTokenSize;
155  return this;
156  }
157  
158 public TokenBucket maxFlowRate(int maxFlowRate) {
159  this.maxFlowRate = maxFlowRate;
160  return this;
161  }
162  
163 public TokenBucket avgFlowRate(int avgFlowRate) {
164  this.avgFlowRate = avgFlowRate;
165  return this;
166  }
167  
168 private String stringCopy(String data, int copyNum) {
169  
170 StringBuilder sbuilder = new StringBuilder(data.length() * copyNum);
171  
172 for (int i = 0; i < copyNum; i++) {
173  sbuilder.append(data);
174  }
175  
176 return sbuilder.toString();
177  
178 }
179  
180 public static void main(String[] args) throws IOException, InterruptedException {
181  
182 tokenTest();
183  }
184  
185 private static void arrayTest() {
186  ArrayBlockingQueue<Integer> tokenQueue = new ArrayBlockingQueue<Integer>(10);
187  tokenQueue.offer(1);
188  tokenQueue.offer(1);
189  tokenQueue.offer(1);
190  System.out.println(tokenQueue.size());
191  System.out.println(tokenQueue.remainingCapacity());
192  }
193  
194 private static void tokenTest() throws InterruptedException, IOException {
195  TokenBucket tokenBucket = TokenBucket.newBuilder().avgFlowRate(512).maxFlowRate(1024).build();
196  
197 BufferedWriter bufferedWriter = new BufferedWriter(new OutputStreamWriter(new FileOutputStream("/tmp/ds_test")));
198  String data = "xxxx";// 四個字節
199  for (int i = 1; i <= 1000; i++) {
200  
201 Random random = new Random();
202  int i1 = random.nextInt(100);
203  boolean tokens = tokenBucket.getTokens(tokenBucket.stringCopy(data, i1).getBytes());
204  TimeUnit.MILLISECONDS.sleep(100);
205  if (tokens) {
206  bufferedWriter.write("token pass --- index:" + i1);
207  System.out.println("token pass --- index:" + i1);
208  } else {
209  bufferedWriter.write("token rejuect --- index" + i1);
210  System.out.println("token rejuect --- index" + i1);
211  }
212  
213 bufferedWriter.newLine();
214  bufferedWriter.flush();
215  }
216  
217 bufferedWriter.close();
218  }
219  
220 }

複製代碼

相關文章
相關標籤/搜索