排行榜在遊戲中很是常見的功能之一,在遊戲中有各類排行榜,如工會活躍度,玩家的英雄戰鬥力排行等。當數據上億時,若是使用數據庫直排是致命的慢,遠遠超出用戶接受的響應時間。也對數據庫形成很是大的壓力。本文將會講述千萬用戶級別的用戶排行系統的一些設計理念並講述數據庫直排以及使用桶排和內存數據優化排行榜。python
在講述設計前,有必要先了解一些基礎理論,文章將會先講述什麼排行榜的類別,排行規則和排名分佈,而後進一步結合以往寫的一個簡單的排行系統Nagi,講述數據庫直排和使用桶排技術,以及內存緩存技術等。mysql
排名規則,這裏並非如競技場,使用交換排名的方式,一個新用戶進入競技場時只要簡單的統計下當前競技場用戶數量就能夠初始化其排名,隨着玩家挑戰高名次的玩家,若是勝利就交換名次這類規則。而是諸如工會活躍度多是當前工會全部工會成員的活躍度總和做爲工會活躍度、或工會全部玩家戰鬥力總和做爲工會戰鬥力。這類由於最後由惟一屬性(如工會活躍度,工會戰鬥力)決定排名的歸爲簡單排名(惟一屬性排名)。json
你可能會爲不放心如何計算工會的戰鬥力。那麼考慮一個簡單的遊戲功能如簽到排名,規則是用戶天天簽到將會記錄用戶最近連續簽到的天數,若是某天用戶忘記簽到,那麼用戶簽到天數將會從零開始從新計算,除非用戶補籤。若是用戶簽到天數越多,那麼用戶排名越高這類就是簡單的排名,僅有單一屬性決定玩家的排名。可是因爲這個排名可能由於大多數用戶都在遊戲開始就持續的簽名,這樣就會有不少玩家排名一致,但爲了保證每一個用戶都有不一樣的排名,因而將由用戶id來區分排名,id越小排名越靠前,這類排名簽到天數結合用戶id就有多個屬性決定排名就是複合屬性排名。api
在設計排名系統時必定要注意到用戶排名的分佈,正如上面講到簽到系統,是很是符合‘二八法則’的,大多數用戶的排名將會很是接近或者相同。這類分佈也可能會相近於正太分佈。兩端的用戶愈來愈少,中間用戶越來多。這樣形成大量用戶的排名相同。因此若是有可能應該制定比較好的遊戲規則,使用戶的排行分散均勻。緩存
算法設計將結合我的一個項目Nagi來說述具體設計。 Nagi是一個抽象的排行榜系統,在系統中把全部須要排行的數據抽象成一個具備一個積分的實體對象。而且能夠排行多個排行榜,數據庫使用的是MySQL。
CREATE TABLE entries ( eid INT(11) unsigned NOT NULL COMMENT 'The unique identifier for a entry in a leaderboards.', lid MEDIUMINT(8) unsigned NOT NULL, score INT(11) unsigned NOT NULL, data VARCHAR(1024) DEFAULT NULL COMMENT 'The custom entry data', created DATETIME NOT NULL DEFAULT NOW() COMMENT 'The DATETIME when the entry was created.', PRIMARY KEY (lid, eid), KEY user_entry (lid, score) ) ENGINE=InnoDB CHARSET=utf8;
eid: | 實體惟一標識符(在簽到系統至關於用戶id) |
---|---|
score: | 排名積分(在簽到系統至關於簽到天數) |
data: | 存放實體的一些自定義數據,json序列化數據 |
created: | 建立時間 |
lid: | 排行榜惟一標識,參考leaderboards表 |
CREATE TABLE leaderboards ( lid MEDIUMINT(8) unsigned NOT NULL AUTO_INCREMENT, name VARCHAR(124) NOT NULL, adapter VARCHAR(16), PRIMARY KEY (lid), UNIQUE KEY name (name) ) ENGINE=InnoDB CHARSET=utf8;
lid: | 排行榜惟一標識 |
---|---|
name: | 可讀的排行榜名 |
adapter: | 這個用來決定使用什麼什麼算法作排行榜 |
數據庫直排,算法比較低效,但數據少許時,依舊是最高效最簡單的算法。
獲取某個用戶排名核心sql以下
RANK_SQL = """SELECT eo.*, ( SELECT COUNT(%sei.score) %s FROM entries ei WHERE eo.lid=ei.lid AND %s ) AS rank FROM entries eo""" def rank_for_user(self, lid, eid, dense=False): sql = self._build_rank_sql(dense) sql += '\nWHERE lid=%s AND eid=%s' data = db.query_one(sql, (lid, eid)) if data: return self._load(data) def _build_rank_sql(self, dense=False): if dense: sql = self.RANK_SQL % (('', '', '(ei.score, eo.eid) >= (eo.score, ei.eid)') else: sql = self.RANK_SQL %('DISTINCT ', ' + 1', 'ei.score > eo.score')) return sql
核心一條低效的sql統計出當前用戶的排名,代碼中dense爲True是使用複合屬性,就是用戶排名將不會重複。
隨着offset增大,查詢效率會愈來愈低,返回的數據真實性也會下降。
def rank(self, leaderboard_id, limit=1000, offset=0, dense=False): sql = 'SELECT * FROM entries WHERE lid=%s ' if dense: sql += 'ORDER BY score DESC, eid ASC' else: sql += 'GROUP BY score, eid ORDER BY score DESC' sql += ' LIMIT %s OFFSET %s' res = db.query(sql, (leaderboard_id, limit, offset)) res = [self._load(data) for data in res] if res: if not dense: entry = self.rank_for_user(leaderboard_id, res[0].entry_id, dense) offset = entry.rank else: offset += 1 self._rank_entries(res, dense, offset) return res def _rank_entries(self, entries, dense=False, rank=0): prev_entry = entries[0] prev_entry.rank = rank for e in entries[1:]: if dense: rank += 1 elif e.score != prev_entry.score: rank += 1 e.rank = rank prev_entry = e
一樣經過低效的order group選出用戶後,而後獲取到第一個用戶排名,而後簡單的在程序中作排名。
桶排是使用桶排序結合數據庫特性優化的一種排行榜算法,在使用不一樣數據庫實現時,有必要了解數據庫的特性,才能設計好的系統。
桶排適合週期性排行,桶排在用戶更新積分時會改變影響整個排行,總體來講就是個近似排名。 桶排的優化原則是保證區間桶的用戶數量在適合範圍,保證用戶可接受的響應時間。
對於簽到系統,簽到天數在 [0, 5000] 範圍絕對是夠用的(有遊戲能作到13年一直保持維護更新?)。那麼以簽到天數做爲桶號,桶統計當前簽到天數爲當前桶號用戶數量,因而最多可能有5001桶,每一個桶統計當前得分用戶的數量。這樣能夠用簡單的sql:
SELECT SUM(uid) FROM entries GROUP BY score
來獲取桶信息,而後計算出各個積分的排名區間好比得當前簽到天數爲5000且有1000個用戶。 若是使用複合uid來排名那麼桶號爲5000的排名區間爲[1-1000] ,若是僅僅使用積分做爲排名那麼桶5000的排名爲1。
由於桶排鬚要記錄額外的桶信息,因此須要額外的表來保存桶信息。
積分桶表以下:
CREATE TABLE score_buckets ( lid MEDIUMINT(8) unsigned NOT NULL, score INT(11) unsigned NOT NULL, size INT(11) unsigned NOT NULL, from_dense INT(11) unsigned NOT NULL, to_dense INT(11) unsigned NOT NULL, rank INT(11) unsigned NOT NULL, PRIMARY KEY leaderboard_score (lid, score), KEY dense (from_dense, to_dense) ) ENGINE=InnoDB CHARSET=utf8;
lid: | 排行榜惟一標識 |
---|---|
score: | 積分桶當前桶號,也就是積分 |
size: | 用於記錄當前桶的用戶數量 |
from_dense: | 記錄複合屬性時桶中用戶的最高排名(起始排名) |
to_dense: | 記錄複合屬性時桶中用戶的最低排名(終止排名) |
rank: | 記錄惟一屬性時當前桶的排名 |
def sort(self, leaderboard_id, chunk_block=CHUNK_BLOCK): # 獲取當前排行榜的最高分與最低分 res = db.query_one('SELECT max(score) as max_score, min(score) as min_score \ FROM entries WHERE lid=%s', (leaderboard_id,)) max_score, min_score = res rank, dense = 0, 0 from_score = max_score #清空可能比如今最高分更高的桶 self.clear_buckets_by_score_range(leaderboard_id, from_score + 1, None) # 由於一次統計全部桶過於費時,因此切割分桶,並清空之前的桶數據,寫入新的的桶數據 while from_score >= min_score: buckets, rank, dense = self._get_buckets(leaderboard_id, from_score - chunk_block, from_score, rank, dense) self.clear_buckets_by_score_range(leaderboard_id, from_score - chunk_block, from_score) self.save_buckets(buckets) from_score -= chunk_block # 清空比當前排行榜最低積分低的桶數據 self.clear_buckets_by_score_range(leaderboard_id, None, min_score -1) def _get_buckets(self, leaderboard_id, from_score, to_score, rank, dense): """獲取新的桶區間數據""" res = db.query('SELECT score, COUNT(score) size FROM entries WHERE lid=%s AND %s<score AND score<=%s GROUP BY score ORDER BY score DESC', (leaderboard_id, from_score, to_score)) buckets = [] for data in res: buckets.append(ScoreBucket(leaderboard_id, data[0], data[1], dense + 1, dense + data[1], rank + 1)) dense += data[1] rank += 1 return buckets, rank, dense def clear_buckets_by_score_range(self, leaderboard_id, from_score, to_score): """清空桶區間""" if to_score is None: return db.execute('DELETE FROM score_buckets WHERE lid=%s AND %s<score', (leaderboard_id, from_score)) if from_score is None: return db.execute('DELETE FROM score_buckets WHERE lid=%s AND score<=%s', (leaderboard_id, to_score)) return db.execute('DELETE FROM score_buckets WHERE lid=%s AND %s<score AND score<=%s', (leaderboard_id, from_score, to_score)) def save_buckets(self, buckets): """寫入桶數據""" if not buckets: return sql = 'INSERT INTO score_buckets(score, size, lid, from_dense, to_dense, rank) VALUES ' rows = [] for bucket in buckets: rows.append('(%d, %d, %d, %d, %d, %d)' % (bucket.score, bucket.size, bucket.leaderboard_id, bucket.from_dense, bucket.to_dense, bucket.rank)) db.execute(sql + ','.join(rows))
能夠輕鬆根據用戶id獲取到score後使用以下api能獲取到當前用戶的排名。
def rank_for_user(self, leaderboard_id, entry_id, dense=False): entry = self.find(leaderboard_id, entry_id) if entry: if dense: data = db.query_one('SELECT from_dense FROM score_buckets WHERE lid=%s AND score=%s', (leaderboard_id, entry.score)) from_rank = data[0] rank = db.query_one('SELECT COUNT(eid) as rank FROM entries WHERE lid=%s AND eid<%s AND score=%s', (leaderboard_id, entry_id, entry.score))[0] entry.rank = from_rank + rank else: data = db.query_one('SELECT rank FROM score_buckets WHERE lid=%s AND score=%s', (leaderboard_id, entry.score)) entry.rank = data[0] return entry
使用桶排 rank算法相對複雜些:
def rank(self, leaderboard_id, limit=1000, offset=0, dense=False): to_score,from_rank, to_rank = db.query_one('SELECT score, from_dense, to_dense FROM score_buckets WHERE lid=%s AND from_dense<=%s AND %s<=to_dense', (leaderboard_id, offset+1, offset+1)) if to_rank >=limit + offset + 1: from_score = to_score else: from_score = db.query_one('SELECT score FROM score_buckets WHERE lid=%s AND from_dense<=%s AND %s<=to_dense', (leaderboard_id, limit+offset+1, limit+offset+1))[0] sql = 'SELECT * FROM entries WHERE lid=%s AND %s<=score AND score<=%s ' if dense: sql += 'ORDER BY score DESC, eid ASC' else: sql += 'GROUP BY score, eid ORDER BY score DESC' sql += ' LIMIT %s OFFSET %s' res = db.query(sql, (leaderboard_id, from_score, to_score, limit, offset - from_rank+1)) res = [self._load(data) for data in res] if res: if not dense: entry = self.rank_for_user(leaderboard_id, res[0].entry_id, dense) offset = entry.rank else: offset += 1 self._rank_entries(res, dense, offset) return res def _rank_entries(self, entries, dense=False, rank=0): prev_entry = entries[0] prev_entry.rank = rank for e in entries[1:]: if dense: rank += 1 elif e.score != prev_entry.score: rank += 1 e.rank = rank prev_entry = e
代碼流程是:
對於工會活躍度積分範圍可能在 [0, 1000000000) 積分分佈比較分散,若是使用積分桶,須要耗費比較長的計算時間,查詢用戶排名也會變慢。這時可以使用均勻區間桶, 咱們把積分分爲這樣的連續均勻遞增區間[0, 10000), [10001, 20000), .... ,而後桶再也不只對應一個積分,而是對應相關的積分區間,好比桶1對應[0, 10000),桶2對應[10000, 20000)。這樣的桶算法也就是區間桶,實際上是最爲常見的桶排序。
CREATE TABLE block_buckets ( lid MEDIUMINT(8) unsigned NOT NULL, from_score INT(11) unsigned NOT NULL, to_score INT(11) unsigned NOT NULL, from_rank INT(11) unsigned NOT NULL, to_rank INT(11) unsigned NOT NULL, from_dense INT(11) unsigned NOT NULL, to_dense INT(11) unsigned NOT NULL, PRIMARY KEY leaderboard_score (lid,from_score, to_score) ) ENGINE=InnoDB CHARSET=utf8;
lid: | 排行榜惟一標識 |
---|---|
from_score: | 記錄區間桶的低端 |
to_score: | 記錄區間桶的高端 |
from_rank: | 記錄當前桶惟一屬性排名時的中用戶最高排名 |
to_rank: | 記錄當前桶惟一屬性排名時的中用戶最低排名 |
from_dense: | 記錄複合屬性時桶中用戶的最高排名(起始排名) |
to_dense: | 記錄複合屬性時桶中用戶的最低排名(終止排名) |
桶排算法以下:
1 def sort(self, leaderboard_id, chunk_block=BUCKET_BLOCK): 2 """計算刷新保存桶信息""" 3 4 # 獲取當前排行榜的最高分與最低分 5 res = db.query_one('SELECT max(score) as max_score, min(score) as min_score FROM entries WHERE lid=%s', (leaderboard_id,)) 6 if not res: return 7 8 max_score, min_score = res 9 if chunk_block is None and max_score > min_score: 10 chunk_block = (max_score - min_score) / (self.total(leaderboard_id)/ (max_score - min_score)) 11 elif max_score == min_score: 12 chunk_block = BUCKET_BLOCK 13 14 rank, dense = 1, 1 15 buckets = [] 16 self.clear_buckets(leaderboard_id) 17 to_score = max_score 18 from_score = to_score - chunk_block 19 from_score = max(min_score, from_score) 20 21 # 切割區間保存並保存桶信息 22 while to_score >= min_score: 23 dense_size = self._get_dense_size(leaderboard_id, from_score, to_score) 24 rank_size = self._get_rank_size(leaderboard_id, from_score, to_score) 25 buckets.append(BlockBucket(leaderboard_id, from_score, to_score, rank, rank + rank_size - 1, dense, dense + dense_size - 1)) 26 if len(buckets) == 500: 27 self.save_buckets(buckets) 28 buckets = [] 29 to_score = from_score - 1 30 from_score = to_score - chunk_block 31 from_score = max(min_score, from_score) 32 dense += dense_size 33 rank += rank_size 34 35 self.save_buckets(buckets) 36 37 def _get_dense_size(self, leaderboard_id, from_score, to_score): 38 """獲取當前區間的複合屬性時的用戶數量""" 39 return db.query_one('SELECT COUNT(score) size FROM entries WHERE lid=%s AND %s<=score AND score<=%s', 40 (leaderboard_id, from_score, to_score))[0] 41 42 def _get_rank_size(self, leaderboard_id, from_score, to_score): 43 """獲取當前區間的惟一屬性時的用戶數量"""""" 44 return db.query_one('SELECT COUNT(DISTINCT(score)) size FROM entries WHERE lid=%s AND %s<=score AND score<=%s', 45 (leaderboard_id, from_score, to_score))[0] 46 47 def save_buckets(self, buckets): 48 """保存桶數據""" 49 if not buckets: return 50 51 sql = 'INSERT INTO block_buckets(lid, from_score, to_score, from_rank, to_rank, from_dense, to_dense) VALUES ' 52 rows = [] 53 for bucket in buckets: 54 rows.append('(%d, %d, %d, %d, %d, %d, %d)' % (bucket.leaderboard_id, bucket.from_score, 55 bucket.to_score, bucket.from_rank, bucket.to_rank, bucket.from_dense, bucket.to_dense)) 56 db.execute(sql + ','.join(rows)) 57 58 def clear_buckets(self, leaderboard_id): 59 """清空排行榜桶數據""" 60 return db.execute('DELETE FROM block_buckets WHERE lid=%s', (leaderboard_id,)) 61 62 BlockBucket = namedtuple('BlockBucket', ['leaderboard_id', 'from_score', 63 'to_score', 'from_rank', 'to_rank', 'from_dense', 'to_dense'])
流程是:
經過entry_id 獲取到用戶後使用用戶的積分獲取到積分所在桶,而後利用桶的排名範圍和積分範圍縮小sql排序的範圍,統計出用戶的排名
def rank_for_user(self, leaderboard_id, entry_id, dense=False): entry = self.find(leaderboard_id, entry_id) if entry: if dense: data = db.query_one('SELECT from_dense, to_score FROM chunk_buckets WHERE lid=%s AND from_score<=%s AND %s<=to_score', (leaderboard_id, entry.score, entry.score)) from_dense, to_score = data rank = db.query_one('SELECT COUNT(eid) AS rank FROM entries WHERE lid=%s AND eid<%s AND %s<=score AND score<=%s', (leaderboard_id, entry.entry_id, entry.score, to_score)) entry.rank = from_dense + rank[0] else: data = db.query_one('SELECT from_rank, to_score FROM chunk_buckets WHERE lid=%s AND from_score<=%s AND %s<=to_score', (leaderboard_id, entry.score, entry.score)) from_rank, to_score = data rank = db.query_one('SELECT COUNT(DISTINCT(score)) AS rank FROM entries WHERE lid=%s AND %s<score AND score<=%s', (leaderboard_id, entry.score, to_score))[0] entry.rank = from_rank + rank return entry
rank算法相對複雜:
def rank(self, leaderboard_id, limit=1000, offset=0, dense=False): from_score, to_score, from_rank, to_rank = db.query_one('SELECT from_score, to_score, from_rank, to_rank FROM chunk_buckets WHERE lid=%s AND from_rank<=%s AND %s<=to_rank', (leaderboard_id, offset+1, offset+1)) if to_rank < limit + offset + 1: from_score = db.query_one('SELECT from_score FROM chunk_buckets WHERE lid=%s AND from_rank<=%s AND %s<=to_rank', (leaderboard_id, limit+offset+1, limit+offset+1))[0] sql = 'SELECT * FROM entries WHERE lid=%s AND %s<=score AND score<=%s ' if dense: sql += 'ORDER BY score DESC, eid ASC' else: sql += 'GROUP BY score, eid ORDER BY score DESC' sql += ' LIMIT %s OFFSET %s' res = db.query(sql, (leaderboard_id, from_score, to_score, limit, offset - from_rank+1)) res = [self._load(data) for data in res] if res: if not dense: entry = self.rank_for_user(leaderboard_id, res[0].entry_id, dense) offset = entry.rank else: offset += 1 self._rank_entries(res, dense, offset) return res
流程與積分桶排差很少:
而後咱們考慮下用戶的活躍度吧,用戶活躍可能很是符合二八法則,或者在某個積分區間的用戶量特別大,積分桶和均勻區間桶就都不合適。這時能夠考慮使用自適應桶,相對前二者。對於自適應區間的算法就是取出當前最高積分而後使用一個合理閾值獲得一個區間,計算該區間的用戶數量,若是當前用戶數量符合排序的比較快的範圍好比[5000, 10000]之間那麼,就使用,若是小於5000就增長區間範圍,若是大於10000就減小區間範圍。區間範圍的自適應可使用指數遞半。好比第一次使用[high, low]發現用戶量過大,使用low = low + (high - low) / 2 將範圍縮小,但這個範圍必須保證 high - low 大於等於零,由於等於零時就是退化爲積分桶排了,已經不能再小了。反之使用 low = low - (high-low) /2 計算出一個區間,直到找當合適的區間。對於區間多大合適取決於server的硬件性能。
Note
由於自適應區間桶的數據存儲結構與均勻區間桶是同樣的再也不表述。
在算法的實現上,若是不作修改,除了sort排序多了自適應區間算法,其餘都是同樣。這裏只稍稍描述下如何作到自適應區間,其餘接口請參考均勻區間桶實現。
def sort(self, leaderboard_id, chunk_block=CHUNK_BLOCK): res = db.query_one('SELECT max(score) as max_score, min(score) as min_score FROM entries WHERE lid=%s', (leaderboard_id,)) if not res: return max_score, min_score = res rank, dense = 1, 1 buckets = [] self.clear_buckets(leaderboard_id) to_score = max_score chunk = DEFAULT_SCORE_CHUNK from_score = to_score - chunk from_score = max(min_score, from_score) while to_score >= min_score: # 經過不斷獲取當前區間的用戶數量,找到適合的閾值爲止 while True: dense_size = self._get_dense_size(leaderboard_id, from_score, to_score) if from_score == 0 or (chunk_block / 2) < dense_size <= chunk_block or chunk == 1: break chunk += (chunk / 2) if chunk_block / 2 > dense_size else -(chunk / 2) from_score = to_score - chunk rank_size = self._get_rank_size(leaderboard_id, from_score, to_score) buckets.append(ChunkBucket(leaderboard_id, from_score, to_score, rank, rank + rank_size - 1, dense, dense + dense_size - 1)) if len(buckets) == 500: self.save_buckets(buckets) buckets = [] to_score = from_score - 1 from_score = to_score - chunk from_score = max(min_score, from_score) dense += dense_size rank += rank_size self.save_buckets(buckets)
由於桶排鬚要額外的調用sort方法刷新排行榜,因此須要實現刷新機制,在Nagi中使用的mysql作的刷新機制,基本實現了定時刷新,和週期性刷新,以及crontab規則刷新。實現比較簡單,能夠稍稍看看cron.py中的實現。
細心的會注意到均勻區間桶和自適應桶都是一次性清排行榜的桶數據,而積分桶使用分段先清理老的桶分段數據,而後更新桶信息,確實有必要優化成分段更新,這樣可以避免排行榜重排時,一段時間排行榜不可用,或者形成偏差很大。在用戶更新積分時,排行榜即便沒有及時的重排(若是使用其餘的排序方法把排名寫死,是無法作到這樣的變化效果),也能反映出用戶的一些排名變化,但積分桶可能不能反映出這種變化。
在使用rank api時,不少遊戲都更關心top的排行,好比最活躍的一百個工會。這樣,可能但願可以保證top排行可以作到實時性。對於桶排來講近似排行會形成不盡人意,這時可使用內存緩存技術來輔助完成及時排行榜。好比使用Redis來保存排行榜前5000名的活躍用戶,這樣只要稍稍在用戶更新數據時,檢查下是否須要更新。但也不必定要使用內存數據庫,好比運行的服務不須要考慮分佈式集羣,那麼使用大堆(heap),或者紅黑樹這些數據結構作個實現,或者集成網絡接口做爲top排行榜服務,另外使用數據庫直排頂部數據有時也是可行的。須要注意的是,在使用mysql這類關聯數據庫時,rank api會隨着offset的增大,拉取數據會變慢,真實性也會下降。