問題
在許多應用中,對昂貴的資源的訪問必須加以限制,此時速率限制是必不可少的。許多現代網絡應用程序在多個進程和服務器上運行,狀態需要被共享。一個理想的解決方案應該是高效、 快捷的,而不是依賴於被綁定到特定客戶端的單個應用程序服務器(由於負載平衡) 或本身持有任何狀態。
解決方案
實現這一目標的一個簡單有效的方法就是使用 Redis, 它有很多有用的數據結構和功能, 盡管實現速率限制只需要2個功能用: 一、在某個具體的鍵值上遞增一個整數,二、給這個鍵值設置過期時間。
因為redis 有個單一的事件循環系統 (每個人每次在同一個時間只能執行一個操作),這是個原子操作, 也就是說無論有多少個客戶端同時交互操作,對於同一個鍵值總有一個確定的數值。
這在對同一個資源進行多個速率限制的情況下通常是有利的, 因為這允許少量的破裂,以及更長的期限限制。例如每秒鐘請求3次,沒分鐘請求20次。因為每個限制都是相對獨立的,這就需要與其它限制分開進行單獨的遞增。
因為速率限制通常用在響應時間比較重要的資源(比如網頁應用),所以盡量縮短速率限制的使用時間是非常有必要的。redis的最基本的應用就是發出命令,等待響應,然後發出另一個命令,如此往復。 這個花費是昂貴的,因為需要通過網絡在應用程序和redis服務器之間多次往返。由於在這個用例中,沒有命令依賴其它命令的執行結果,這使得redis的一個叫做流水線技術的使用成為可能。這就是客戶端緩存所有redis請求,然後把這寫請求發送給redis,redis一次性返回所有的結果。
Redis不會維護客戶端需要的限制的,因為redis會根據客戶端設置的過期時間刪除舊的記數。這消除了客戶端統籌協調的需要,和刪除競爭條件的可能性。
The Code
import redis import time def rate_limit_check(r, key, limits): period_lengths = [_[0] for _ in sorted(limits.items())] period_limits = [_[1] for _ in sorted(limits.items())] pipe = r.pipeline() for period_length in period_lengths: current_period = int(time.time() / period_length) redis_key = 'rate_limit:{key}:{period_length}:{current_period}'.format(key=key, period_length=period_length, current_period=current_period) pipe.incr(redis_key).expire(redis_key, period_length*3) return not any(hits > period_limit for period_limit, hits in zip(period_limits, pipe.execute()[::2])) if __name__ == '__main__': r = redis.Redis() print rate_limit_check(r, '127.0.0.1', {1: 3, 60: 20})
{1: 3, 60: 20} 意味著每秒鐘3次的命中率是允許的,在任何限制下,都允許20次的命中。'127.0.0.1'在這裡用作鍵值,盡管在真實的情況下,可能作為IP地址。更高級的用例將有一個全應用程序的速率限制,鍵值只有客戶端的IP地址,以及一個為昂貴的終結點設置的特定終結點限制,這將用到客戶端的IP地址和終結點,例如127.0.0.1+/login/。這些限制可以獨立地設置。
return rate_limit_check(r, '127.0.0.1', {1: 3, 60: 20}) and rate_limit_check(r, '127.0.0.1+/login/', {1: 2, 60: 5})
這是一個用Python寫的例子,它可以簡單地移植到任何語言,只要這門語言包含Redis客戶端庫。