redis 在業務層面的應用之定時器&延時隊列

    前幾天出去面試,你們都喜歡聊redis,一個是底層數據結構的實現,一個是在業務層的使用,這裏就結合一些簡單的python代碼,講下怎樣用redis 作應用層面的定時器。python

    首先,當大批量任務作超時管理,就會涉及到如何實現定時器,使系統開銷最小的問題。一般的底層是一個timer 加一個最小堆(有個本身的實現,能夠參考:http://www.javashuo.com/article/p-xbjxmyoq-mz.html),或者是環形數組,或者是紅黑樹。每種方式都用的還比較多的,nginx 使用的是紅黑樹,golang 底層time 使用的是最小堆。當業務層面要本身實現定時回調,其實使用redis 也能夠作,並且還蠻簡單的,就是zset 就ok。nginx

    具體的思路是怎樣呢?zadd 的時候,val 設置成taskid, score 設置成爲timestamp 便可。而後主代碼,開個線程,隔一段時間sleep 去zRangeByScore 當前時間戳到以前標記時間的值,而後就能夠簡單的用redis 管理超時任務了。golang

    具體例子的能夠看下面的一個demo:面試

import redis
import time
import threading

host = "127.0.0.1"
port = "6379"
password = "123123"
db = 0

pool = redis.ConnectionPool(host=host, port=port, password=password, db=db)
rd = redis.StrictRedis(connection_pool=pool)

def put_task(task_id, timeout):
    due_time = int(time.time() + timeout)*100
    print "put", task_id
    rd.zadd("task", task_id, due_time)

def callback(task_id):
    print "consume taskid", task_id

def get_task():
    now = int(time.time() * 100)
    tasks = rd.zrangebyscore("task", 0, now)
    if tasks:
        rd.zremrangebyscore("task", 0, now)
    return tasks

def produce_task():
    while True:
        task_id = int(time.time() * 100)
        put_task(task_id, 1)
        time.sleep(1)

def consume_task():
    print "start loop"

    while True:
        task_ids = get_task()
        if task_ids:
            for task_id in task_ids:
                callback(task_id)

        time.sleep(1)

if __name__ == "__main__":
    t1 = threading.Thread(target=produce_task)
    t2 = threading.Thread(target=consume_task)
    t1.start()
    t2.start()
    t1.join()
    t2.join()

    多個線程註冊回調,一個線程隔一段時間去輪訓數據庫便可,具體時間精度,能夠控制時間戳和和循環最小週期,再明白思路的狀況下看這個demo 應該是很好理解的。redis

    在咱們線上測試,內網環境下,qps 在2w 是沒有壓力的。數據庫

相關文章
相關標籤/搜索