最近接到一個需求,用一句話來講就是:展現關注人發佈的動態,這個涉及到 feed 流系統的設計。本文主要介紹一個通常企業可用的 Feed 流解決方案。java
下面先介紹一下關於 Feed 流的簡單概念。redis
Feed
:Feed 流中的每一條狀態或者消息都是 Feed,好比微博中的一條微博就是一個 Feed。Feed流
:持續更新並呈現給用戶內容的信息流。每一個人的朋友圈,微博關注頁等等都是一個 Feed 流。Feed 流常見的分類有兩種:數據庫
Timeline
:按發佈的時間順序排序,產品若是選擇 Timeline 類型,那麼就是認爲 Feed 流中的 Feed 很少,可是每一個 Feed 都很重要,都須要用戶看到。相似於微信朋友圈,微博等。Rank
:按某個非時間的因子排序,通常是按照用戶的喜愛度排序,通常用於新聞推薦類、商品推薦等。設計一個 Feed 流系統,兩個關鍵步驟,一個是 Feed 流的
初始化
,一個是推送
。關於 Feed 流的存儲其實也是一個核心的點,可是筆主持久化使用的仍是 MySQL,後續能夠考慮優化。緩存
Feed 流【關注頁 Feed 流】的初始化指的是,當用戶的 Feed 流還不存在的時候,爲該用戶建立一個屬於他本身的關注頁 Feed 流,具體怎麼作呢?其實很簡單,遍歷一遍關注列表,取出全部關注用戶的 feed,將 feedId 存放到 redis 的 sortSet
中便可。這裏面有幾個關鍵點:微信
通過上面的初始化,已經把 feed 流放在了 redis 緩存中了。接下來就是須要更新 feed 流了,在下面四種狀況須要進行更新:網絡
上面四步具體怎麼操做,會在下面的實現步驟中詳細描述,在這裏先咱們重點討論一下第1、二種狀況。由於在處理 大V 【千萬級別粉絲】的時候,咱們是須要對 大V 的全部粉絲的 feed 流進行處理的,這時候涉及到的量就會很是巨大,須要多加斟酌。關於推送,通常有兩種 推/拉。併發
推
:A用戶發佈新的動態時,要往 A用戶全部的粉絲 feed 流中推。拉
:A用戶發佈新的動態時,先不進行推送,而是等 粉絲進來的時候,才主動到 A用戶的我的頁TimeLine 拉取最新的 feed,而後進行一個 merge。若是關注了多個大V,能夠併發的向多個大V 我的頁TimeLine 中拉取。當用戶發佈一條新的 Feed 時,處理流程以下:app
當刷新本身的Feed流的時候,處理流程以下:異步
至此,使用推拉結合方式的發佈,讀取Feed流的流程都結束了。優化
若是隻是用推模式了,則會變的比較簡單:
推拉結合存在一個弊端,就是刷新本身的Feed流時,大V的我的頁Timeline 的讀壓力會很大。
如何解決:
筆主主要採用純推模式實現了一個普通企業基本可用的 Feed 流系統,下面介紹一下具體的實現代碼,主要包括3大個部分:
當用戶第一進來刷新Feed 流,且 Feed 流還不存在時,咱們須要進行初始化,初始化的具體代碼以下:核心思想就是從數據庫中load出 feed 信息,塞到 zSet 中,而後分頁返回。
/** * 獲取關注的人的信息流 */
public List<FeedDto> listFocusFeed(Long userId, Integer page, Integer size) {
String focusFeedKey = "focusFeedKey" + userId;
// 若是 zset 爲空,先初始化
if (!zSetRedisTemplate.exists(focusFeedKey)) {
initFocusIdeaSet(userId);
}
// 若是 zset 存在,可是存在 0 值
Double zscore = zSetRedisTemplate.zscore(focusFeedKey, "0");
if (zscore != null && zscore > 0) {
return null;
}
//分頁
int offset = (page - 1) * size;
long score = System.currentTimeMillis();
// 按 score 值從大到小從 zSet 中取出 FeedId 集合
List<String> list = zSetRedisTemplate.zrevrangeByScore(focusFeedKey, score, 0, offset, size);
List<FeedDto> result = new ArrayList<>();
if (QlchatUtil.isNotEmpty(list)) {
for (String s : list) {
// 根據 feedId 從緩存中 load 出 feed
FeedDto feedDto = this.loadCache(Long.valueOf(s));
if (feedDto != null) {
result.add(feedDto);
}
}
}
return result;
}
/** * 初始化關注的人的信息流 zSet */
private void initFocusFeedSet( Long userId) {
String focusFeedKey = "focusFeedKey" + userId;
zSetRedisTemplate.del(focusIdeaKey);
// 從數據庫中加載當前用戶關注的人發佈過的 Feed
List<Feed> list = this.feedMapper.listFocusFeed(userId);
if (QlchatUtil.isEmpty(list)) {
//保存0,避免空數據頻繁查庫
zSetRedisTemplate.zadd(focusFeedKey, 1, "0");
zSetRedisTemplate.expire(focusFeedKey, RedisKeyConstants.ONE_MINUTE * 5);
return;
}
// 遍歷 FeedList,把 FeedId 存到 zSet 中
for (Feed feed : list) {
zSetRedisTemplate.zadd(focusFeedKey, feed.getCreateTime().getTime(), feed.getId().toString());
}
zSetRedisTemplate.expire(focusFeedKey, 60 * 60 * 60);
}
複製代碼
每當用戶發佈/刪除新的 feed,咱們須要更新該用戶全部的粉絲的 Feed流,該步驟通常比較耗時,因此建議異步處理,爲了不一次性load出太多的粉絲數據,這裏採用循環分頁查詢。爲了不粉絲的 Feed流過大,咱們會限制 Feed 流的長度爲1000,當Feed流長度超過1000時,會移除最舊的 Feed。
/** * 新增/刪除 feed時,處理粉絲 feed 流 * * @param userId 新增/刪除 feed的用戶id * @param feedId 新增/刪除 的feedId * @param type feed_add = 新增feed feed_sub = 刪除feed */
public void handleFeed(Long userId, Long feedId, String type) {
Integer currentPage = 1;
Integer size = 1000;
List<FansDto> fansDtos;
while (true) {
Page page = new Page();
page.setSize(size);
page.setPage(currentPage);
fansDtos = this.fansService.listFans(userId, page);
for (FansDto fansDto : fansDtos) {
String focusFeedKey = "focusFeedKey" + userId;
// 若是粉絲 zSet 不存在,退出
if (!this.zSetRedisTemplate.exists(focusFeedKey)) {
continue;
}
// 新增Feed
if ("feed_add".equals(type)) {
this.removeOldestZset(focusFeedKey);
zSetRedisTemplate.zadd(focusFeedKey, System.currentTimeMillis(), feedId);
}
// 刪除Feed
else if ("feed_sub".equals(type)) {
zSetRedisTemplate.zrem(focusFeedKey, feedId);
}
}
if (fansDtos.size() < size) {
break;
}
currentPage++;
}
}
/** * 刪除 zSet 中最舊的數據 */
private void removeOldestZset(String focusFeedKey){
// 若是當前 zSet 大於1000,刪除最舊的數據
if (this.zSetRedisTemplate.zcard(focusFeedKey) >= 1000) {
// 獲取當前 zSet 中 score 值最小的
List<String> zrevrange = this.zSetRedisTemplate.zrevrange(focusFeedKey, -1, -1, String.class);
if (QlchatUtil.isNotEmpty(zrevrange)) {
this.zSetRedisTemplate.zrem(focusFeedKey, zrevrange.get(0));
}
}
}
複製代碼
這裏比較簡單,新增/取消關注,把新關注的 Feed 往本身的 Feed流中增長/刪除 便可,可是一樣須要異步處理。
/** * 關注/取關 時,處理followerId的zSet * * @param followId 被關注的人 * @param followerId 當前用戶 * @param type focus = 關注 unfocus = 取關 */
public void handleFocus( Long followId, Long followerId, String type) {
String focusFeedKey = "focusFeedKey" + userId;
// 若是粉絲 zSet 不存在,退出
if (!this.zSetRedisTemplate.exists(focusFeedKey)) {
return;
}
List<FeedDto> FeedDtos = this.listFeedByFollowId(source, followId);
for (FeedDto feedDto : FeedDtos) {
// 關注
if ("focus".equals(type)) {
this.removeOldestZset(focusFeedKey);
this.zSetRedisTemplate.zadd(focusFeedKey, feedDto.getCreateTime().getTime(), feedDto.getId());
}
// 取關
else if ("unfocus".equals(type)) {
this.zSetRedisTemplate.zrem(focusFeedKey, feedDto.getId());
}
}
}
複製代碼
上面展現的是核心代碼,僅僅是爲你們提供一個實現思路,並非直接可運行的代碼,畢竟真正實現還會涉及到不少其餘的無關要緊的類。
在這裏已經介紹完一個簡單可用的 Feed流系統,歡迎各路大神指出錯誤,多提意見!
參考文章: