使用springevent事件驅動模型(觀察者模式)結合redis bitmap 運用 實現每日數據統計

觀察者模式
當對象間存在一對多關係時,則使用觀察者模式(Observer Pattern)。好比,當一個對象被修改時,則會自動通知它的依賴對象。觀察者模式屬於行爲型模式。redis

主要解決:一個對象狀態改變給其餘對象通知的問題,並且要考慮到易用和低耦合,保證高度的協做。算法

什麼時候使用:一個對象(目標對象)的狀態發生改變,全部的依賴對象(觀察者對象)都將獲得通知,進行廣播通知。數據庫

如何解決:使用面向對象技術,能夠將這種依賴關係弱化。設計模式

優勢: 一、觀察者和被觀察者是抽象耦合的。 二、創建一套觸發機制。緩存

缺點: 一、若是一個被觀察者對象有不少的直接和間接的觀察者的話,將全部的觀察者都通知到會花費不少時間。 二、若是在觀察者和觀察目標之間有循環依賴的話,觀察目標會觸發它們之間進行循環調用,可能致使系統崩潰。 三、觀察者模式沒有相應的機制讓觀察者知道所觀察的目標對象是怎麼發生變化的,而僅僅只是知道觀察目標發生了變化。異步

Spring Boot 之事件(Event)
Spring的事件通知機制是一項頗有用的功能,使用事件機制咱們能夠將相互耦合的代碼解耦,從而方便功能的修改與添加。本文我來學習並分析一下Spring中事件的原理。ide

舉個例子,假設有一個添加評論的方法,在評論添加成功以後須要進行修改redis緩存、給用戶添加積分等等操做。固然能夠在添加評論的代碼後面假設這些操做,可是這樣的代碼違反了設計模式的多項原則:單一職責原則、迪米特法則、開閉原則。一句話說就是耦合性太大了,好比未來評論添加成功以後還須要有另一個操做,這時候咱們就須要去修改咱們的添加評論代碼了。學習

在之前的代碼中,我使用觀察者模式來解決這個問題。不過Spring中已經存在了一個升級版觀察者模式的機制,這就是監聽者模式。經過該機制咱們就能夠發送接收任意的事件並處理。ui

Spring 官方文檔翻譯以下 :翻譯

ApplicationContext 經過 ApplicationEvent 類和 ApplicationListener 接口進行事件處理。 若是將實現 ApplicationListener 接口的 bean 注入到上下文中,則每次使用 ApplicationContext 發佈 ApplicationEvent 時,都會通知該 bean。 本質上,這是標準的觀察者設計模式。

Spring的事件(Application Event)其實就是一個觀察者設計模式,一個 Bean 處理完成任務後但願通知其它 Bean 或者說 一個Bean 想觀察監聽另外一個Bean的行爲。

Spring 事件只須要幾步:

自定義事件,繼承 ApplicationEvent
定義監聽器,實現 ApplicationListener 或者經過 @EventListener 註解到方法上
定義發佈者,經過 ApplicationEventPublisher
實際代碼:
建立event文件夾

並建立event object類和handle類,一個handle類能夠對應多個object類。

@Data
@AllArgsConstructor
@NoArgsConstructor
public class EverydayStatisticEventObject {
 
  private Integer id;
 
  private String os;
 
  private String proxy;
 
  private StatisticEventType statisticEventType;
 
}

建立枚舉類 處理不一樣的事件類型,運用觀察者模式

public enum StatisticEventType {
   
  //註冊數統計
  REGISTER_COUNTER,
  //活躍數統計
  ACTIVE_COUNTER,
  //裂變數統計
  FISSION_COUNTER,
  //播放數統計
  PLAYED_COUNTER,
  //廣告點擊數統計
  ADCLICK_COUNTER;
 
  private StatisticEventType() {
  }
}

在事務service類中注入

@Autowired
private ApplicationEventPublisher publisher;
處理完相應的業務邏輯後,調取publish操做,將事務發佈出去

其一

public LoginLog increaseLoginLog(String ip, int uid, String username) {
    User user = mixinsService.getUser(uid);
    LoginLog loginLog = new LoginLog();
    loginLog.setLoginIp(ip);
    loginLog.setLoginTime(new Date());
    loginLog.setUid(uid);
    loginLog.setUsername(username);
    loginLog.setProxy(user.getProxy());
    loginLog.setChannel(user.getChannel());
    loginLog.setUserType(user.getUserType());
    loginLog.setOs(user.getOs());
    LoginLog log = loginLogRepository.save(loginLog);
    
    //發佈事件
    publisher.publishEvent(new EverydayStatisticEventObject(log.getUid(), log.getOs(), log.getProxy(),StatisticEventType.ACTIVE_COUNTER));
    ChannelDailyDataManager.fireEvent(new UserActiveEvent(user.getChannel()));
    return log;
  }

Google Guava Cache緩存
Google Guava Cache是一種很是優秀本地緩存解決方案,提供了基於容量,時間和引用的緩存回收方式。基於容量的方式內部實現採用LRU算法,基於引用回收很好的利用了Java虛擬機的垃圾回收機制。其中的緩存構造器CacheBuilder採用構建者模式提供了設置好各類參數的緩存對象,緩存核心類LocalCache裏面的內部類Segment與jdk1.7及之前的ConcurrentHashMap很是類似,都繼承於ReetrantLock,還有六個隊列,以實現豐富的本地緩存方案。

Guava Cache與ConcurrentMap的區別
Guava Cache與ConcurrentMap很類似,但也不徹底同樣。最基本的區別是ConcurrentMap會一直保存全部添加的元素,直到顯式地移除。相對地,Guava Cache爲了限制內存佔用,一般都設定爲自動回收元素。在某些場景下,儘管LoadingCache 不回收元素,它也是頗有用的,由於它會自動加載緩存。

//bitmap的偏移量offset生產,offset越大,佔用內存越多,因此以每日第一個id做爲minid,做爲被減數
//使用guava cache緩存機制獲取最小id,設置過時時間爲每一天,天天清空一次

private LoadingCache<String, Integer> minId = CacheBuilder.newBuilder().expireAfterWrite(1L, TimeUnit.DAYS).build(new CacheLoader<String, Integer>() {
    @Override
    public Integer load(String s) throws Exception {
      Date date = LocalDate.parse(StringUtils.substringAfter(s, "@")).toDate();
      if (ACTIVE_COUNTER.startsWith(s)) {
        LoginLog loginLog = loginLogRepository.getTopByLoginTimeBeforeOrderByIdDesc(date);
        if (loginLog != null) {
          return loginLog.getId();
        }
      } else if (PLAYED_COUNTER.startsWith(s)) {
        ViewHistory viewHistory = viewHistoryRepository.getTopByViewtimeBeforeOrderByIdDesc(date);
        if (viewHistory != null) {
          return viewHistory.getId();
        }
      } else if (ADCLICK_COUNTER.startsWith(s)) {
        AdvClickHistory advClickHistory = advClickHistoryRepository.getTopByCreateTimeBeforeOrderByIdDesc(date);
        if (advClickHistory != null) {
          return advClickHistory.getId();
        }
      }
      return 0;
    }
  });

用Redis bitmap統計活躍用戶、留存
對於個int型的數來講,若用來記錄id,則只能記錄一個,而若轉換爲二進制存儲,則能夠表示32個,空間的利用率提高了32倍.對於海量數據的處理,這樣的存儲方式會節省不少內存空間.對於未登錄的用戶,可使用Hash算法,把對應的用戶標識哈希爲一個數字id.對於一億個數據來講,咱們也只須要1000000000/8/1024/1024大約12M空間左右.

而Redis已經爲咱們提供了SETBIT的方法,使用起來很是的方便,咱們在item頁面能夠不停地使用SETBIT命令,設置用戶已經訪問了該頁面,也可使用GETBIT的方法查詢某個用戶是否訪問。最後經過BITCOUNT統計該網頁天天的訪問數量。

優勢: 佔用內存更小,查詢方便,能夠指定查詢某個用戶,對於非登錄的用戶,可能不一樣的key映射到同一個id,不然須要維護一個非登錄用戶的映射,有額外的開銷。

//使用觀察者模式,根據不一樣的type來判斷不一樣的事務

public String progressChanged(EverydayStatisticEventObject registerEventObject) {
    String Type = "";
    StatisticEventType eventType = registerEventObject.getStatisticEventType();
    switch (eventType) {
      case REGISTER_COUNTER:
        Type = REGISTER_COUNTER;
        break;
      case ACTIVE_COUNTER:
        Type = ACTIVE_COUNTER;
        break;
      case FISSION_COUNTER:
        Type = FISSION_COUNTER;
        break;
      case PLAYED_COUNTER:
        Type = PLAYED_COUNTER;
        break;
      case ADCLICK_COUNTER:
        Type = ADCLICK_COUNTER;
        break;
      default:
        break;
    }
    return Type;
  }
//事件監聽器
  //異步
  @EventListener
  @Async
  public void registerCountEvent(EverydayStatisticEventObject registerEventObject) {
 
 
    String date = LocalDate.now().toString(STATISTIC_DATE_FMT);
    String type = progressChanged(registerEventObject);
    
    //數據庫主鍵id 減去當天第一個id 這樣天天的偏移量都是從一開始能夠有效減小偏移量對內存的佔用。
    int offset = registerEventObject.getId() + 1 - minId.getUnchecked(StringUtils.join(type, "@", date));
 
    String key = StringUtils.join(STATISTIC_CACHE_KEY_PREFIX, type,
      date, ":", registerEventObject.getOs());
 
 
    setBitmap(offset, key);
 
    String proxyKey = StringUtils.join(STATISTIC_CACHE_KEY_PREFIX, type,
      date, ":", registerEventObject.getProxy(), ":", registerEventObject.getOs());
 
    setBitmap(offset, proxyKey);
 
        
       /* redisTemplate.execute((RedisCallback) connection -> {
            Long count = connection.bitCount(key.getBytes());
            log.info("key={},count = {},offset={}",key,count,offset);
            return true;
        });
        redisTemplate.execute((RedisCallback) connection -> {
            Long count = connection.bitCount(proxyKey.getBytes());
            log.info("proxyKey={},count = {},offset={}",proxyKey,count,offset);
            return true;
        });*/
  }
 
private void setBitmap(int offset, String key) {
 
    byte[] bitKey = key.getBytes();
 
    redisTemplate.execute((RedisCallback) connection -> {
      boolean exists = connection.getBit(bitKey, offset);
      if (!exists) {
        connection.setBit(bitKey, offset, true);
        //設置過時時間 天天的數據統計 只保留2天
        connection.expire(bitKey, 60L * 60 * 24 * 2);  //2 days
        return true;
      }
      return false;
    });
  }
相關文章
相關標籤/搜索