購物網站的redis相關實現

購物網站的redis相關實現


需求:

(1)登陸和cookie緩存

對於一個大型網上商店,假設天天都會有大約500萬不一樣的用戶,這些用戶會給網站帶來1億次點擊,並從網站購買超過10萬件商品。python

咱們須要存儲用戶登陸信息,用戶的訪問時長和已瀏覽商品的數量,若是將其保存到數據庫中,會致使大量的數據庫寫入。git

大多數關係數據庫在每臺數據庫服務器上面每秒只能插入、更新或者刪除200~2000個數據行,儘管批量操做能夠以更快的速度執行,但客戶點每次瀏覽網頁都只更新少數幾行數據,因此高速的批量插入在這裏並不適用。github

而對於負載量相對比較大的系統,譬如平均狀況下每秒大約1200次寫入,高峯時期每秒接近6000次寫入,因此它必須部署10臺關係數據庫服務器才能應對高峯期的負載量。web

爲了提高系統的處理速度,下降資源的佔用量,能夠將傳統數據庫的一部分數據處理任務以及存儲任務轉交給Redis來完成。redis

(2)使用redis實現購物車

咱們把購物車的信息也存儲到Redis,而且使用與用戶會話令牌同樣的cookie id來引用購物車。數據庫

將用戶和購物車都存儲到Redis裏面,這種作法除了能夠減小請求體積外,咱們能夠根據用戶瀏覽過的商品,用戶放入購物車的商品以及用戶最終購買的商品進行統計計算,並構建起不少大型網絡零售上都在提供的」在查看過這件商品的用戶當中,有X%的用戶最終購買了這件商品「」購買了這件商品的用戶也購買了某某其餘商品「等功能,這些功能能夠幫助用戶查找其餘相關的商品,並最終提高網站的銷售業績。json

(3)網頁緩存

購物網站上多數頁面實際上並不會常常發生大變化,雖然會向分類中添加新商品、移除舊商品、有時候特價促銷、有時甚至還有」熱賣商品「頁面,可是在通常狀況下,網站只有帳號設置、以往訂單、購物車(結帳信息)以及其餘少數幾個頁面才包含須要每次載入都要動態生成的內容。數組

對於不須要動態生成的頁面,咱們須要儘可能再也不生成,減小網站在動態生成內容上面所花的時間,能夠下降網站處理相同負載所需的服務器數量,讓網站速度加快。緩存

python應用框架大都存在中間件,咱們建立中間件來調用Redis緩存函數:對於不能被緩存的請求,直接生成並返回頁面,對於能夠被緩存的請求,先從緩存取出緩存頁面,若是緩存頁面不存在,那麼會生成頁面並將其緩存在Redis,最後將頁面返回給函數調用者。服務器

這樣的方式可讓網站在5分鐘以內無需再爲他們動態地生成視圖頁面。

(4) 數據行緩存

爲了清空舊庫存和吸引客戶消費,決定開始新一輪的促銷活動,天天都會推出一些特價商品供用戶搶購,全部特價商品的數量都是限定的,賣完爲止。在這種狀況下,網站是不能對整個促銷頁面進行緩存,這會致使用戶看到錯誤的特價商品和商品剩餘數,但每次載入頁面都從數據庫中取出特價商品的剩餘數量的話,又會給數據庫帶來巨大的壓力。

爲了應付促銷活動帶來的大量負載,須要對數據行進行緩存,能夠編寫一個持續運行的守護進程函數,讓這個函數將指定的數據行緩存到Redis裏面,並不按期地對這些緩存進行更新。緩存函數將數據和編碼爲json字典並存儲在Redis的字符串中。

咱們還須要使用兩個有序集合來記錄應該在什麼時候對緩存進行更新,第一個有序集合爲調度有序集合,成員爲數據行的ID,分值爲時間戳,記錄應該在什麼時候將制定的數據行緩存到Redis裏面。第二個有序集合爲延時有序集合,成員爲數據行的ID,而分值記錄指定數據行的緩存須要每隔多少秒更新一次。

對於更新頻率,若是數據行記錄的是特價促銷商品的剩餘數量,而且參與促銷活動的用戶很是多,那麼我麼最好每隔幾秒更新一次數據行緩存,若是數據並不常常改變,或者商品缺貨是能夠接受的,咱們能夠每分鐘更新一次緩存。

(5)網頁分析

以前對於網頁的緩存,若是網站總共包含100000件商品,貿然緩存全部商品頁面將耗盡整個網站的所有內存,因此咱們能夠只針對那些瀏覽量較高的商品頁面進行緩存。

每一個用戶都有一個相應的記錄用戶瀏覽商品歷史的有序集合,咱們在記錄的過程當中,咱們也痛死記錄全部商品的瀏覽次數,根據瀏覽次數對商品進行排序,被瀏覽得最多的商品放到有序集合的索引0位置上,而且具備整個有序集合最少的分值。

除了緩存最常被瀏覽的商品外,咱們還須要發現那些變得愈來愈流暢的新商品,因而咱們須要按期修剪有序集合的長度並調整已有元素的分值,才能使得新流行的商品在排行榜中佔據一席之地。

Redis數據結構設計

(1)登陸令牌與用戶映射關係的散列 "login:"
(2)記錄最近登陸用戶的有序集合 "recent:"
(3)記錄各個用戶最近瀏覽商品的有序集合 "viewed:94233rhsYRIq3yi3qryrye"
(4)每一個用戶的購物車散列,存儲商品ID與商品訂購數量之間的映射。"cart:94233rhsYRIq3yi3qryrye"
(5)請求頁面緩存集合 "cache:wre9w3rieruerwe3" (wre9w3rieruerwe3表明請求ID)

(94233rhsYRIq3yi3qryrye假設爲某個用戶的令牌)
(6)數據行緩存字符串,數據列(column)的名字會被映射爲json字典的鍵,而數據行的值會被映射爲json字典的值,"inv:273" (其中273爲數據行id)。
(7)數據行緩存調度有序集合,成員爲數據行的ID,分值爲時間戳,記錄應該在什麼時候將制定的數據行緩存到Redis裏面,"schedule:"。
(8)數據行緩存延時有序集合,成員爲數據行的ID,而分值記錄指定數據行的緩存須要每隔多少秒更新一次,"delay:"。

(9)商品瀏覽次數有序集合,成員爲商品,分值爲瀏覽次數負值,方便保持在有序集合的較前的索引位置,"viewed"。

Redis實現

(1)使用散列來存儲登陸cookie令牌與已登陸用戶以前的映射。根據給定的令牌查找與之相應的用戶,檢查用戶是否登陸,並返回該用戶的ID。

"""
獲取並返回令牌對應的用戶

@param {object}
@param {string} token

@return {string} 用戶id
"""
def checkToken(conn, token):
    return conn.hget('login:', token)

(2)用戶每次瀏覽頁面的時候,須要更新「登陸令牌與用戶映射關係的散列」裏面的信息,
並將用戶的令牌和當前時間戳添加到 「記錄最近登陸用戶的有序集合」 裏面,
將瀏覽商品添加到記錄「記錄各個用戶最近瀏覽商品的有序集合」中,若是記錄的商品數量超過25個,對這個有序集合進行修剪。

"""
更新令牌時,須要更改用戶令牌信息,將用戶記錄到最近登陸用戶的有序集合中,
若是用戶瀏覽的是商品,則須要將瀏覽商品寫入該用戶瀏覽過商品的有序集合中,並保證該集合不超過25個

@param {object}
@param {string} token
@param {string} user
@param {string} item

"""
def updateToken(conn, token, user, item = None):
    timestamp = time.time()
    # 更新用戶令牌登陸對應的用戶信息
    conn.hset('login:', token, user)
    # 增長最近訪問的用戶到有序集合
    conn.zadd('recent:', token, timestamp)

    # 若是瀏覽產品,記錄該用戶最近訪問的25個產品
    if item:
        conn.zadd('viewed:' + token, item, timestamp)
        conn.zremrangebyrank('viewed:' + token, 0, -26)
        # 記錄每一個商品的瀏覽量
        conn.zincrby('viewed:', item, -1)

(3)存儲會話的內存會隨着時間的推移而不斷增長,須要按期清理會話數據,咱們決定只保留最新的1000萬個會話。

咱們能夠用 守護進程的方式來運行或者定義一個cron job每隔一段時間運行
檢查最近 「記錄最近登陸用戶的有序集合」 大小是否超過了限制,超過限制每秒從集合中刪除最舊的100個令牌,而且移除相應的「登陸令牌與用戶映射關係的散列」的信息和對應的「記錄各個用戶最近瀏覽商品的有序集合」,對應的」美國用戶的購物車散列「。

咱們也可使用EXPIRE命令,爲用戶令牌設記錄用戶商品瀏覽記錄的有序集合設置過時時間,讓Redis在一段時間以後自動刪除它們,這樣就不用使用有序集合來記錄最近出現的令牌了,可是這樣咱們就沒辦法將會話數限制在1000萬以內了。

"""
按期清理會話數據,只保留最新的1000萬個會話。

使用 *守護進程的方式來運行或者定義一個cron job每隔一段時間運行* ,
檢查最近 「記錄最近登陸用戶的有序集合」 大小是否超過了限制,超過限制每秒從集合中刪除最舊的100個令牌,
而且移除相應的「登陸令牌與用戶映射關係的散列」的信息和對應的「記錄各個用戶最近瀏覽商品的有序集合」。

@param {object}
"""

# 循環判斷,若是是cron job能夠不用循環
QUIT = False
# 限制保留的最大會話數據
LIMIT = 10000000

def cleanFullSession(conn):
    # 循環判斷,若是是cron job能夠不用循環
    while not QUIT:
        # 查詢最近登陸用戶會話數
        size = conn.zcard('recent:')
        # 沒有超過限制,休眠1秒再繼續執行
        if size <= LIMIT:
            time.sleep(1)
            continue
        
        # 查詢最舊登陸的最多100個令牌範圍
        end_index = min(size - LIMIT, 100)
        tokens = conn.zrange('recent:', 0, end_index - 1)
        
        # 將要刪除的key都推入到數組中,要時候一塊兒刪除
        session_keys = []
        for token in tokens:
            session_keys.append('viewed:' + token)
            session_keys.append('cart:' + token)
        
        # 批量刪除相應的用戶最近瀏覽商品有序集合,用戶的購物車,登陸令牌與用戶映射關係的散列和記錄最近登陸用戶的有序集合
        conn.delete(*session_keys)
        conn.hdel('login:', *tokens)
        conn.zrem('recent:', *tokens)

(4)對購物車進行更新,若是用戶訂購某件商品數量大於0,將商品信息添加到 「用戶的購物車散列」中,若是購買商品已經存在,那麼更新購買數量。

"""
對購物車進行更新,若是用戶訂購某件商品數量大於0,將商品信息添加到 「用戶的購物車散列」中,若是購買商品已經存在,那麼更新購買數量

@param {object}
@param {string} session
@param {string} item
@param {float}  count

"""
def addToCart(conn, session, item, count):
    if count <= 0:
        # 從購物車移除指定商品
        conn.hrem('cart:' + session, item)
    else:
        # 將指定商品添加到對應的購物車中
        conn.hset('cart:' + session, item, count)

(5)在用戶請求頁面時,對於不能被緩存的請求,直接生成並返回頁面,對於能夠被緩存的請求,先從緩存取出緩存頁面,若是緩存頁面不存在,那麼會生成頁面並將其緩存在Redis,最後將頁面返回給函數調用者。

"""
在用戶請求頁面時,對於不能被緩存的請求,直接生成並返回頁面,
對於能夠被緩存的請求,先從緩存取出緩存頁面,若是緩存頁面不存在,那麼會生成頁面並將其緩存在Redis,最後將頁面返回給函數調用者。

@param {object} conn
@param {string} request
@param {callback}

@return 
"""
def cacheRequest(conn, request, callback):
    # 判斷請求是否能被緩存,不能的話直接調用回調函數
    if not canCache(conn, request):
        return callback(request)
    
    # 將請求轉換爲一個簡單的字符串健,方便以後進行查找
    page_key = 'cache:' + hashRequest(request)
    content = conn.get(page_key)
    
    # 沒有緩存的頁面,調用回調函數生成頁面,並緩存到redis中
    if not content:
        content = callback(request)
        conn.setex(page_key, content, 300)

    return content

"""
判斷頁面是否能被緩存,檢查商品是否被緩存以及頁面是否爲商品頁面,根據商品排名來判斷是否須要緩存

@param {object} conn
@param {string} request

@return {boolean}
"""
def canCache(conn, request):
    # 根據請求的URL,獲得商品ID
    item_id = extractItemId(request)
    # 檢查這個頁面可否被緩存以及這個頁面是否爲商品頁面
    if not item_id or isDynamic(request):
        return False

    # 商品的瀏覽排名
    rank = conn.zrank('viewed:', item_id)
    return rank is not None and rank < 10000

"""
解析請求的URL,取得query中的item id

@param {string} request

@return {string}
"""
def extractItemId(request):
    parsed = urlparse.urlparse(request)
    # 返回query字典
    query  = urlparse.parse_qs(parsed.query)
    return (query.get('item') or [None])[0]

"""
判斷請求的頁面是否動態頁面

@param {string} request

@return {boolean}
"""
def isDynamic(request):
    parsed = urlparse.urlparse(request)
    query = urlparse.parse_qs(parsed.query)
    return '_' in query

"""
將請求轉換爲一個簡單的字符串健,方便以後進行查找
@param {string} request

@return {string}
"""
def hashRequest(request):
    return str(hash(request))

(6)爲了讓緩存函數按期地緩存數據行,首先須要將行ID和給定的延遲值添加到延遲有序集合中,再將行ID和當前時間的時間戳添加到調度有序集合中。若是某個數據行的延遲值不存在,那麼程序將取消對這個數據行的調度。若是咱們想要移除某個數據行已有的緩存而且再也不緩存那個數據行,只須要把那個數據行的延遲值設置爲小於或等於0便可。

"""
設置數據行緩存的延遲值和調度時間

@param {object} conn
@param {int}    row id
@param {int}    delay

"""
def scheduleRowCache(conn, row_id, delay):
    conn.zadd('delay:', row_id, delay)
    conn.zadd('schedule:', row_id, time.time())

(7)嘗試讀取」數據行緩存調度有序集合「的第一個元素以及該元素的分支,若是」數據行緩存調度有序集合「沒有包含任何元素,或者分值存儲的時間戳所指定的時間還沒有來臨,那麼函數先休眠50毫秒,而後再從新進行檢查。

當發現一個須要當即進行更新的數據行時,若是數據行的延遲值小於或者等於0,會從」數據行緩存延時有序集合「和」數據行緩存調度有序集合「移除這個數據行的ID,並從緩存裏面刪除這個數據行已有的緩存,再從新進行檢查。

對於延遲值大於0的數據行來講,從數據庫裏面取出這些行,將他們編碼爲json格式並存儲到Redis裏面,而後更新這些行的調度時間。

"""
守護進程,根據調度時間有序集合和延遲值緩存數據行

@param {object} conn

"""
def cacheRow(conn):
    while not QUIT:
        # 須要讀取」數據行緩存調度有序集合「的第一個元素,若是沒有包含任何元素,或者分值存儲的時間戳所指定的時間還沒有來臨,那麼函數先休眠50毫秒,而後再從新進行檢查
        next = conn.zrange('schedule:', 0, 0, withscores=True)
        now = time.time()
        if not next or next[0][1] > now:
            time.sleep(.05)
            continue
        
        row_id = next[0][0]
        # 取出延遲值
        delay = conn.zscore('delay:', row_id)
        # 若是延遲值小於等於0,則再也不緩存該數據行
        if delay <= 0:
            conn.zrem('schedule:', row_id)
            conn.zrem('delay:', row_id)
            conn.delete('inv:' + row_id) 
            continue;

        # 須要緩存的,更新緩存調度的有序集合,並緩存該數據行
        row = Inventory.get(row_id)
        conn.zadd('schedule:', row_id, now + delay)
        conn.set('inv:' + row_id, json.dumps(row.toDict()))
        
"""
庫存類,庫存的商品信息
"""
class Inventory(object):
    def __init__(self, id):
        self.id = id

    @classmethod
    def get(cls, id):
        return Inventory(id)
    
    def toDict(self):
        return {'id':self.id, 'data':'data to cache...','cached':time.time()}

(8)咱們須要在用戶瀏覽頁面時,「商品瀏覽次數有序集合」對應的商品中須要減一,使得保持在有序集合較前的索引位置。

同時咱們須要開啓一個守護進程,每隔5分鐘,刪除全部排名在20000名以後的商品瀏覽數,並使用ZINTERSTORE將刪除以後剩餘的全部商品的瀏覽次數減半。

而判斷頁面是否須要緩存,咱們須要經過ZRANK取出商品的瀏覽次數排名,若是排名在10000內,那麼說明該頁面須要緩存。

"""
守護進程,刪除全部排名在20000名以後的商品,並將刪除以後剩餘的全部商品瀏覽次數減半,5分鐘執行一次

@param {object} conn

"""
def rescaleViewed(conn):
    while not QUIT:
        conn.zremrangebyrank('viewed:', 20000, -1)
        conn.zinterstore('viewed:', {'viewed:', .5})
        time.sleep(300)

測試代碼

"""
測試
"""
import time
import urlparse
import uuid
import threading
import unittest
import json

class TestShoppingWebsite(unittest.TestCase):
    def setUp(self):
        import redis
        self.conn = redis.Redis(db=15)
    
    def tearDown(self):
        conn = self.conn
        to_del = (
            conn.keys('login:*') + conn.keys('recent:*') + conn.keys('viewed:*') +
            conn.keys('cart:*') + conn.keys('cache:*') + conn.keys('delay:*') + 
            conn.keys('schedule:*') + conn.keys('inv:*'))

        if to_del:
            conn.delete(*to_del)

        del self.conn

        global QUIT, LIMIT
        QUIT = False
        LIMIT = 10000000
        print
        print

    def testLoginCookies(self):
        conn = self.conn
        global LIMIT, QUIT
        token = str(uuid.uuid4())

        updateToken(conn, token, 'username', 'itemX')
        print "We just logged-in/updated token:", token
        print "For user:", 'username'
        print

        print "What username do we get when we look-up that tokan?"
        r = checkToken(conn, token)
        print r
        print
        self.assertTrue(r)

        print "Let's drop the maximun number of cookies to 0 to clear them out"
        print "We will start a thread to do the cleaning, while we stop it later"

        LIMIT = 0
        t = threading.Thread(target = cleanFullSession, args = (conn,))
        t.setDaemon(1)
        t.start()
        time.sleep(1)
        QUIT = True
        time.sleep(2)
        if t.isAlive():
            raise Exception("The clean sessions thread is still slive?!?")

        s = conn.hlen('login:')
        print "The current number of session still available is:", s
        self.assertFalse(s)

    def testShoppingCartCookies(self):
        conn = self.conn
        global LIMIT, QUIT
        token = str(uuid.uuid4())

        print "We'll refresh our session..."
        updateToken(conn, token, 'username', 'itemX')
        print "And add an item to the shopping cart"
        addToCart(conn, token, "itemY", 3)
        r = conn.hgetall('cart:' + token)
        print "Our Shopping cart currently has:", r
        print

        self.assertTrue(len(r) >= 1)

        print "Let's clean out our sessions an carts"
        LIMIT = 0
        t = threading.Thread(target=cleanFullSession, args=(conn,))
        t.setDaemon(1)
        t.start()
        time.sleep(1)
        QUIT = True
        time.sleep(2)
        if t.isAlive():
            raise Exception("The clean sessions thread is still alive?!?")

        r = conn.hgetall('cart:' + token)
        print "Our shopping cart now contains:", r

        self.assertFalse(r)

    def testCacheRequest(self):
        conn = self.conn
        token = str(uuid.uuid4())

        def callback(request):
            return "content for " + request

        updateToken(conn, token, 'username', 'itemX')
        url = 'http://test.com/?item=itemX'
        print "We are going to cache a simple request against", url
        result = cacheRequest(conn, url, callback)
        print "We got initial content:", repr(result)
        print

        self.assertTrue(result)

        print "To test that we've cached the request, we'll pass a bad callback"
        result2 = cacheRequest(conn, url, None)
        print "We ended up getting the same response!", repr(result2)

        self.assertEquals(result, result2)

        self.assertFalse(canCache(conn, 'http://test.com/'))
        self.assertFalse(canCache(conn, 'http://test.com/?item=itemX&_=1234567'))

    def testCacheRows(self):
        import pprint
        conn = self.conn
        global  QUIT

        print "First, let's schedule caching of itemX every 5 seconds"
        scheduleRowCache(conn, 'itemX', 5)
        print "Our schedule looks like:"
        s = conn.zrange('schedule:', 0, -1, withscores = True)
        pprint.pprint(s)
        self.assertTrue(s)

        print "We'll start a caching thread that will cache the data..."
        t = threading.Thread(target=cacheRow, args=(conn,))
        t.setDaemon(1)
        t.start()
        time.sleep(1)
        print "Our cached data looks like:"
        r = conn.get('inv:itemX')
        print repr(r)
        self.assertTrue(r)
        print
        print "We'll check again in 5 seconds..."
        time.sleep(5)
        print "Notice that the data has changed..."
        r2 = conn.get('inv:itemX')
        print repr(r2)
        print
        self.assertTrue(r2)
        self.assertTrue(r != r2)

        print "Let's force un-caching"
        scheduleRowCache(conn, 'itemX', -1)
        time.sleep(1)
        r = conn.get('inv:itemX')
        print "The cache was cleared?", not r
        print
        self.assertFalse(r)

        QUIT = True
        time.sleep(2)
        if t.isAlive():
            raise Exception("The database caching thread is still alive?!?")


if __name__ == '__main__':
    unittest.main()

完整示例代碼地址:https://github.com/NancyLin/r...

相關文章
相關標籤/搜索