基於redis的分佈式RateLimiter(限流)實現

業務背景

系統須要對接某IM廠商rest接口,向客戶端推送消息(以及其餘IM業務)
該廠商對rest接口調用有頻率限制:總rest調用9000次/30s;消息推送600次/30s
系統爲分佈式集羣,須要控制整個分佈式集羣總的接口調用頻率知足以上限制java

Guava RateLimiter

上篇文章 《Guava RateLimiter源碼解析》中介紹了Guava RateLimiter的用法及原理,但爲何不直接使用Guava RateLimiter?緣由有二:git

  1. Guava RateLimiter只能應用於單進程,多進程間協同控制便無能爲力
  2. Guava RateLimiter可以處理突發請求(預消費),這裏rest接口調用頻率限制是固定的,不須要更不能使用預消費能力,不然將會致使接口調用失敗

Redis

爲何說選用redis是合理的?github

  1. redis效率高,易擴展
  2. redis對語言無關,能夠更好的接入不一樣語言開發的系統(異構)
  3. redis單進程單線程的特色能夠更好的解決最終一致性,多進程間協同控制更爲容易

基於Redis實現RateLimiter

這裏徹底參考Guava RateLimiter實現思路,不一樣的是,Guava將令牌桶數據存放於對象(內存)中,這裏講令牌桶數據存放在redis中,奉上源碼 https://github.com/manerfan/m...redis

首先建立令牌桶數據模型spring

class RedisPermits(
        val maxPermits: Long,
        var storedPermits: Long,
        val intervalMillis: Long,
        var nextFreeTicketMillis: Long = System.currentTimeMillis()
) {
    constructor(permitsPerSecond: Double, maxBurstSeconds: Int = 60, nextFreeTicketMillis: Long = System.currentTimeMillis()) :
            this((permitsPerSecond * maxBurstSeconds).toLong(), permitsPerSecond.toLong(), (TimeUnit.SECONDS.toMillis(1) / permitsPerSecond).toLong(), nextFreeTicketMillis)

    fun expires(): Long {
        val now = System.currentTimeMillis()
        return 2 * TimeUnit.MINUTES.toSeconds(1) + TimeUnit.MILLISECONDS.toSeconds(max(nextFreeTicketMillis, now) - now)
    }

    fun reSync(now: Long): Boolean {
        if (now > nextFreeTicketMillis) {
            storedPermits = min(maxPermits, storedPermits + (now - nextFreeTicketMillis) / intervalMillis)
            nextFreeTicketMillis = now
            return true
        }
        return false
    }
}

各屬性字段含義與Guava相同(參見《Guava RateLimiter源碼解析》),且默認最多存儲maxBurstSeconds秒生成的令牌segmentfault

reSync函數一樣是爲了解決令牌桶數據更新問題,在每次獲取令牌以前調用,這裏很少介紹
expires函數計算redis數據過時時間。一樣的例子,某接口須要分別對每一個用戶作訪問頻率限制,假設系統中存在6W用戶,則至多須要在redis中建立6W條數據,對於長期運行的系統,這個數字會只增不減,這對於redis來講也是一個不小的挑戰(雖然示例中的數字相對較小)。爲了減輕redis壓力,須要對令牌桶數據作過時處理,對於使用頻率不是很高的業務場景,能夠及時清理。springboot

爲了更好的操做,這裏建立一個操做RedisPermits的Redis模板服務器

@Configuration
class RateLimiterConfiguration {
    @Bean
    fun permitsTemplate(redisConnectionFactory: RedisConnectionFactory): PermitsTemplate {
        val template = PermitsTemplate()
        template.connectionFactory = redisConnectionFactory
        return template
    }
}

class PermitsTemplate : RedisTemplate<String, RedisPermits>() {
    private val objectMapper = jacksonObjectMapper()

    init {
        keySerializer = StringRedisSerializer()
        valueSerializer = object : RedisSerializer<RedisPermits> {
            override fun serialize(t: RedisPermits) = objectMapper.writeValueAsBytes(t)
            override fun deserialize(bytes: ByteArray?) = bytes?.let { objectMapper.readValue(it, RedisPermits::class.java) }
        }
    }
}

如下介紹幾個關鍵函數,完整代碼見 https://github.com/manerfan/m...併發

/**
 * 生成並存儲默認令牌桶
 */
private fun putDefaultPermits(): RedisPermits {
    val permits = RedisPermits(permitsPerSecond, maxBurstSeconds)
    permitsTemplate.opsForValue().set(key, permits, permits.expires(), TimeUnit.SECONDS)
    return permits
}

/**
 * 獲取/更新令牌桶
 */
private var permits: RedisPermits
    get() = permitsTemplate.opsForValue()[key] ?: putDefaultPermits()
    set(permits) = permitsTemplate.opsForValue().set(key, permits, permits.expires(), TimeUnit.SECONDS)

/**
 * 獲取redis服務器時間
 */
private val now get() = permitsTemplate.execute { it.time() } ?: System.currentTimeMillis()

putDefaultPermits用於生成默認令牌桶並存入redis
permitsgetter setter方法實現了redis中令牌桶的獲取及更新
now用於獲取redis服務器的時間,這樣便能保證分佈式集羣中各節點對數據處理的一致性app


private fun reserveAndGetWaitLength(tokens: Long): Long {
    val n = now
    var permit = permits
    permit.reSync(n)

    val storedPermitsToSpend = min(tokens, permit.storedPermits) // 能夠消耗的令牌數
    val freshPermits = tokens - storedPermitsToSpend // 須要等待的令牌數
    val waitMillis = freshPermits * permit.intervalMillis // 須要等待的時間

    permit.nextFreeTicketMillis = LongMath.saturatedAdd(permit.nextFreeTicketMillis, waitMillis)
    permit.storedPermits -= storedPermitsToSpend
    permits = permit

    return permit.nextFreeTicketMillis - n
}

該函數用於獲取tokens個令牌,並返回須要等待到的時長(毫秒)
其中,storedPermitsToSpend爲桶中能夠消費的令牌數,freshPermits爲還須要的(須要補充的)令牌數,根據該值計算須要等待的時間,追加並更新到nextFreeTicketMillis

須要注意,這裏與Guava RateLimiter不一樣的是,Guava中的返回是更新前的(上次請求計算的)nextFreeTicketMicros,本次請求經過爲上次請求的預消費行爲埋單而實現突發請求的處理;這裏返回的是因爲桶中令牌不足而須要真真切切等待的時間

通俗來說

  • Guava爲寅吃卯糧,本次請求須要爲上次請求的預消費行爲埋單
  • 這裏爲自力更生,誰消費誰埋單,爲本身的行爲負責

private fun reserve(tokens: Long): Long {
    checkTokens(tokens)
    try {
        syncLock.lock()
        return reserveAndGetWaitLength(tokens)
    } finally {
        syncLock.unLock()
    }
}

該函數與reserveAndGetWaitLength等同,只是爲了不併發問題而添加了同步鎖(分佈式同步鎖的介紹請參見《基於redis的分佈式鎖實現》)


private fun queryEarliestAvailable(tokens: Long): Long {
    val n = now
    var permit = permits
    permit.reSync(n)

    val storedPermitsToSpend = min(tokens, permit.storedPermits) // 能夠消耗的令牌數
    val freshPermits = tokens - storedPermitsToSpend // 須要等待的令牌數
    val waitMillis = freshPermits * permit.intervalMillis // 須要等待的時間

    return LongMath.saturatedAdd(permit.nextFreeTicketMillis - n, waitMillis)
}

該函數用於計算,獲取tokens個令牌須要等待的時長(毫秒)


private fun canAcquire(tokens: Long, timeoutMillis: Long): Boolean {
    return queryEarliestAvailable(tokens) - timeoutMillis <= 0
}

該函數用於計算,timeoutMillis時間內是否能夠獲取tokens個令牌


經過以上幾個函數的瞭解,咱們即可以很輕鬆的實現Guava RateLimiter中的acquiretryAcquire功能

fun acquire(tokens: Long): Long {
    var milliToWait = reserve(tokens)
    logger.info("acquire for {}ms {}", milliToWait, Thread.currentThread().name)
    Thread.sleep(milliToWait)
    return milliToWait
}

fun acquire() = acquire(1)
fun tryAcquire(tokens: Long, timeout: Long, unit: TimeUnit): Boolean {
    val timeoutMicros = max(unit.toMillis(timeout), 0)
    checkTokens(tokens)

    var milliToWait: Long
    try {
        syncLock.lock()
        if (!canAcquire(tokens, timeoutMicros)) {
            return false
        } else {
            milliToWait = reserveAndGetWaitLength(tokens)
        }
    } finally {
        syncLock.unLock()
    }
    Thread.sleep(milliToWait)

    return true
}

fun tryAcquire(timeout: Long, unit: TimeUnit) = tryAcquire(1, timeout, unit)

回顧問題

至此,基於redis的分佈式RateLimiter(限流)控制功能便完成了

回到文檔起始處提出的問題,接某IM廠商rest接口,咱們能夠針對不一樣的頻率限制建立不一樣的RateLimiter

val restRateLimiter = rateLimiterFactory.build("ratelimiter:im:rest", 9000 /30, 30)
val msgRateLimiter = rateLimiterFactory.build("ratelimiter:im:msg", 600 /30, 30)

推送消息時,能夠以下調用

restRateLimiter.acquire()
msgRateLimiter.acquire(msgs.size)
msgUtil.push(msgs)

對於接口提供方限制接口訪問頻次,能夠以下實現

val msgRateLimiter = rateLimiterFactory.build("ratelimiter:im:msg", 600 /30, 30)

fun receiveMsg(msgs: Array<Message>): Boolean {
    return when(msgRateLimiter.tryAcquire(msgs.size, 2, TimeUnit.SECONDS)) {
        true -> {
            thread(true) { msgUtil.receive(msgs) }
            true
        }
        else -> false
    }
}

訂閱號

相關文章
相關標籤/搜索