首先,某頭條的文章量、用戶量都是很大的,點擊量那就更恐怖了。 請問,若是實時展示熱門文章,好比近8小時點擊量最大的文章前100名。 若是是你來開發這個功能,你怎麼作?node
這個好辦啊,redis一個sortedset搞定啊,score計數,key是文章ID,不就ok了麼?redis
回答的不錯,你能夠走了!算法
要聽清題目,說好的8小時動態時間窗口,計數是會過時的。還有,頭條的量有這麼小麼,一個redis就搞定了?同窗啊,我告訴你,文章的量你起碼得估計個幾十萬,用戶你得估計幾個億,點擊量你至少得估計個1M/s吧。數據庫
1M/s的點擊併發量,確定是須要分佈式了。客戶端可能會爲了減輕服務器的壓力而選擇延遲合併點擊請求進行批量發送。簡單起見,這裏就使用HTTP協議吧。咱們先不考慮惡意用戶刷點擊的行爲。安全
服務器確定會有多臺機器多進程部署來接受點擊請求,接收到的請求在進行參數解析後,被髮送到存儲單元。爲了減輕存儲的壓力,每一個進程可能會使用小窗口聚合數據,每隔一小段時間將窗口內的數據聚合起來一塊兒發給存儲單元。bash
點擊數據是很重要的數據,用戶的興趣偏好就靠它了。這麼大的點擊數據若是所有用內存裝的話,成本過高。因此別期望徹底使用redis了。服務器
拿kafka存是一個好辦法,ZeroCopy機制併發量很高,數據持久化在磁盤裏成本低。不過kafka的數據通常是有過時時間的,若是想徹底記住用戶的點擊以便作長期的數據分析,少不了要使用hdfs了。多線程
可是由於要作準實時統計,hdfs可不適合幹這個,hdfs適合作離線統計的數據源。因此還得靠kafka接數據,而後消費者一邊入hdfs,一邊作實時統計。併發
實時統計可使用spark stream、storm接受kafka的輸入,也能夠本身手寫。機器學習
用戶太多,用戶表按用戶ID哈希分紅了1024張子表。用戶表裏有一個字段score,表示這個用戶的積分數。如今咱們要計算前100名積分最多的用戶以及積分數,該怎麼查詢?
若是是單個表,一個SQL也就搞定了
select id, score from user order by score desc limit 100
複製代碼
若是是多個子表,你得在每一個子表上都進行一次TopN查詢,而後聚合結果再作一次TopN查詢。下面是僞代碼
candidates = []
for k in range(1024):
# 每一個表都取topn
rows = select id, score from user_${k} order by score desc limit 100
# 聚合結果
candidates.extend(rows)
# 根據score倒排
candidates = sorted(candidates, key=lambda t: t[1], reverse=True)
# 再取topn
candidates[:100]
複製代碼
子表查詢能夠多線程並行,提升聚合效率。
8小時的滑動窗口,意味着新的數據源源不斷的進來,舊的數據時時刻刻在淘汰。嚴格來講,精準的8小時滑動窗口要求每條數據要嚴格的過時,差了1秒都不行,到點了就當即被淘汰。
精準的代價是咱們要爲每條點擊記錄都設置過時時間,過時時間自己也是須要存儲的,並且過時策略還須要定時掃描時間堆來確認哪些記錄過時了。量大的時候這些都是不容小噓的負擔。
可是在業務上來說,排行版沒有必要作到如此的精準,誤差個幾分鐘這都不是事。
業務上的折中給服務的資源優化帶來了機遇。咱們對時間片進行了切分,一分鐘一個槽來進行計數。下面是僞代碼
class HitSlot {
long timestamp; # earlies timestamp
map[int]int hits; # post_id => hits
void onHit(int postId, int hits) {
this.hits[postId] += hits;
}
}
class WindowSlots {
HitSlot currentSlot; # current active slots
LinkedList<HitSlot> historySlots; # history unactive slots
map[int]int topHits; # topn posts
void onHit(int postId, int hits) { # 由於上游有合併點擊,因此有了hits參數
long ts = System.currentTimeMillis();
if(this.currentSlot == null) { # 建立第一個槽
this.currentSlot == new HitSlot(ts);
} elif(ts - this.currentSlot.timestamp > 60 * 1000) { # 建立下一個槽,一分鐘一個槽
this.historySlots.add(this.currentSlot);
this.currentSlot = new HitSlot(ts);
}
this.currentSlot.onHit(postId, hits);
}
void onBeat() { # 維護窗口,移除過時的槽,而後統計topn,30s~60s調用一次
if(historySlots.isEmpty()) {
return;
}
HitSlot slot = historySlots[0];
long ts = System.currentTimeMillis();
if(ts - slot.timestamp > 8 * 60 * 60 * 1000) { # 過時了8小時,移掉第一個
historySlots.remove(0);
topHits = topn(aggregateSlots(historySlots)); # 計算topn的帖子
}
}
}
複製代碼
上面的代碼表明着每一個分佈式子節點的邏輯,由於是僞代碼,因此加鎖問題就不細寫了。 它的目標就是定時維持一個8小時的統計窗口,並匯聚topn的熱帖放在內存裏。 這個topn的數據並非特別實時,有一個大約1分鐘的短暫的時間窗口。
每一個子節點都會有一個定時任務去負責維持統計窗口,過時失效的統計數據,計算局部的topn熱帖。
如今每一個子節點都有了各自的局部topn熱帖,那麼還須要一個主節點去彙總這些局部熱點,而後計算去全局熱帖。
主節點也不必特別實時,按期從子節點拉取topn數據便可,也可讓字節點主動彙報。
class HotPostsAggregator {
map[int]map[int]int localTopnPosts; # nodeId => topn posts
map[int]int globalTopnPosts;
void onBeat() {
// do aggregate
// save globalTopnPosts to redis
}
void onLocalReport(int nodeId, map[int]int topnPosts) {
// 子節點上報局部熱帖
}
}
複製代碼
按照頭條的文章至少幾十萬篇,若是每一個子節點都要對全部的文章統計點擊數,彷佛也會佔用很多內存,聚合和排序熱帖也會有很多計算量。最好的想法是每一個子節點只負責一部分文章的統計,這樣能夠明顯節省計算資源。
咱們將kafka的分區數設置爲字節點的數量,這樣每一個節點負責消費一個分區的數據。在kafka生產端,對點擊記錄的帖子ID進行散列,保證相同文章ID的點擊流進入相同的分區,最終流向同一個統計子節點。
當機器增多時,節點掛掉的機率也會增大。硬件可能損壞,電源可能掉電,人爲操做失誤。若是沒有作任何防範措施,當一個字節點掛掉時,該節點上8個小時時間窗口的統計數據將會丟失。該節點所管理的局部熱點文章就喪失了進入全局熱帖的機會。
這可能不會對產品和體驗上帶來很大的傷害,節點重啓8小時以後也就徹底恢復了。並且這8小時以內,喪失了部分文章的熱點投票權也不會對總體業務帶來巨大影響。
可是咱們都但願系統能夠更加完美一點不是麼?當節點掛掉時,咱們但願能夠快速恢復狀態,這也是能夠作到的,難度也不是很大,不過是定時作一下checkpoint,將當前的狀態持久化到本地文件或者數據庫中。由於每一個子節點管理的文章不會太多,因此須要序列化的內容也不會太大。當節點重啓時,從持久化的checkpoint中將以前的狀態恢復出來,而後繼續進行消費和統計。
若是你使用的是spark-stream,它內置的checkpoint功能會讓你實現備份和恢復會更加簡單,更加安全。
若是你不想作checkpoint,辦法仍是有的,就是可能耗時舊一點。那就是對hdfs中的存儲的全部的點擊流數據進行一次mapreduce,將8小時窗口內的點擊流的點擊量統計出來,而後想辦法導入到字節點進程中去。
這要求hdfs的數據也是散列存儲的,和kafka對應,這樣能夠快速圈出須要統計的數據範圍。也許會由於mapreduce自己會耗時一點時間,最終致使恢復的數據沒有那麼準確,不過這關係也不大,咱們用這樣粗糙的方法,能對得起那9.5成的數據已經作的很不錯了。
上面講了一堆堆,代碼敲了很多圖畫了很多,彷佛頗有道理。可是還有個重要的沒提到,那就是點擊去重。若是一個用戶反覆點擊了不少次,那該如何計數比較合理。
一篇好的文章若是它不是過短的話,通常會吸引讀者反覆閱讀不少次。這個計數若是徹底去重了記爲一次彷佛也不太合理。可是若是是故意被人反覆點擊而被記了太屢次明顯也很差。那該如何選擇呢?
首先要從客戶端下手,客戶端自己能夠過濾一部分無效點擊。同一篇文章在過短的時間內被當前用戶反覆點擊,這個模式仍是很好發現的。若是間隔時間比較長,那就是讀者的回味點擊,屬於文章的正向反饋,應該記錄下來。
客戶端作好了,而後再從服務器端下手,服務器端下手就比較困難了。要探測用戶的行爲模式意味着要對用戶的行爲狀態化,這樣就會大量加劇服務器的存儲負擔。
服務器還須要防止用戶的防刷行爲。若是缺失防刷控制,一個頭條號能夠經過這種漏洞來使得本身的文章非法得到大量點擊,進入熱門文章列表,打上熱門標籤,被海量的用戶看到,就會得到較大的經濟效益,即便這篇文章內容自己吸引力並不足夠。
當用戶發現這樣差勁的內容也能上熱門榜單時,無疑會對產品產生必定的質疑。若是這種行爲氾濫開來,那就可能對產品形成比較致命的負面影響。
防刷是一門大型課題,本篇內容就不作詳細講解了,筆者在這方面也不是什麼專家。簡單點說放刷本質上就是提取惡意行爲的特徵。常見的策略就是同一篇文章被來自於同一個IP或者有限的幾個IP的頻繁點擊請求,這時就可使用封禁IP的招數來搞定。還可使用用戶反饋機制來識別非正常的熱門內容,而後人工干預等。業界還有一些更高級的如機器學習深度學習等方法來防刷,這些讀者均可以自行搜索研究。
閱讀相關文章,關注公衆號【碼洞】