主動緩存

因爲數據監管的要求,公司的數據必須放在不一樣的國家,蛋疼的是第一次感覺到了光速的「慢」。數據存儲於,印度,美國等,每一次查詢都有200ms的延遲。並且這延遲是因爲光速的限制,是物理限制,買再大的帶寬也沒有用。java

只好本身寫了一個主動緩存的方案。緩存

挺有意思的,僅供參考~app

@Slf4j
@Component
public class UserLoginListener implements MessageListener {

    @Autowired
    private RegisterUserService registerUserService;

    @Override
    public Action consume(Message message, ConsumeContext context) {
        DefaultJDKMessageSeri defaultJDKMessageSeri = new DefaultJDKMessageSeri();
        log.info("Receive:{}, ConsumeContext:{}", message, context);
        try {
            UserMessage userMessage = (UserMessage) defaultJDKMessageSeri.deserialize(message.getBody());
            log.info("Receive Message:{}, ConsumeContext:{}", userMessage, context);

            if (userMessage == null || StringUtils.isEmpty(userMessage.getClientId())) {
                return Action.CommitMessage;
            }

            if ("LOGIN".equals(message.getTag())) {
                registerUserService.login(userMessage.getClientId());
            }

            if ("LOGOUT".equals(message.getTag())) {
                registerUserService.loginOut(userMessage.getClientId());
            }

            return Action.CommitMessage;
        } catch (Exception e) {
            log.error("消費失敗,message:{}, context:{}", message, context, e);
            return Action.ReconsumeLater;
        }
    }
}

@Slf4j
@Component
public class RegisterUserService {

    @Autowired
    private CacheActivityHandlerService activityHandlerService;

    private static Map<String, Long> clientIdLoginTimeMap = Maps.newConcurrentMap();

    private Long tokenExpireTime = 60 * 60 * 2L;

    private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(
        new BasicThreadFactory
            .Builder()
            .namingPattern("register-User-Scheduled-%d")
            .build()
    );

    // 每次調度會有一堆的任務被塞進來,並且都是IO類型的任務
    private static ExecutorService activityCacheParallelExecutor = new ThreadPoolExecutor(
        16,
        32,
        0L,
        TimeUnit.MILLISECONDS,
        new LinkedBlockingQueue<>(512), new BasicThreadFactory.Builder().namingPattern("activity-Cache-Thread-%d").build());

    @PostConstruct
    public void init() {

        scheduledExecutorService.scheduleAtFixedRate(() -> {

            log.info("cacheInfo, clientIdLoginTimeMap:{}", clientIdLoginTimeMap);
            Iterator<String> iterator = clientIdLoginTimeMap.keySet().iterator();
            while (iterator.hasNext()) {
                String clientId = iterator.next();
                if (StringUtils.isEmpty(clientId)) {
                    return;
                }
                Long time = clientIdLoginTimeMap.get(clientId);
                if (time == null) {
                    this.loginOut(clientId);
                    continue;
                }

                if ((System.currentTimeMillis() - time) / 1000 > tokenExpireTime) {
                    this.loginOut(clientId);
                    continue;
                }

                try {
                    activityCacheParallelExecutor.submit(() -> {
                        ActivityRequest activityRequest = new ActivityRequest();
                        activityRequest.setClientId(clientId);
                        activityRequest.setPageSize(10);
                        activityRequest.setPageNo(1);
                        activityHandlerService.putCacheActivity(activityRequest);

                        activityRequest.setPageNo(2);
                        activityHandlerService.putCacheActivity(activityRequest);

                        ActivityRequest dashboardActivityRequest = new ActivityRequest();
                        dashboardActivityRequest.setClientId(clientId);
                        dashboardActivityRequest.setPageSize(5);
                        dashboardActivityRequest.setPageNo(1);
                        activityHandlerService.putCacheActivity(dashboardActivityRequest);
                    });
                } catch (Exception e) {
                    // 可能隊列滿了,拋出錯誤
                    log.error("error happen when submit job, clientIdLoginTimeMap:{}", clientIdLoginTimeMap, e);
                }
            }

        }, 0, 15, TimeUnit.SECONDS);
    }

    public void login(String clientId) {
        clientIdLoginTimeMap.put(clientId, System.currentTimeMillis());
        ActivityRequest dashboardActivityRequest = new ActivityRequest();
        dashboardActivityRequest.setClientId(clientId);
        dashboardActivityRequest.setPageSize(5);
        dashboardActivityRequest.setPageNo(1);
        activityHandlerService.putCacheActivity(dashboardActivityRequest);

        ActivityRequest activityRequest = new ActivityRequest();
        activityRequest.setClientId(clientId);
        activityRequest.setPageSize(10);
        activityRequest.setPageNo(1);
        activityHandlerService.putCacheActivity(activityRequest);

        activityRequest.setPageNo(2);
        activityHandlerService.putCacheActivity(activityRequest);
    }

    public void loginOut(String clientId) {

        ActivityRequest activityRequest = new ActivityRequest();
        activityRequest.setClientId(clientId);
        activityRequest.setPageSize(10);
        activityRequest.setPageNo(1);
        activityHandlerService.deleteCacheActivity(activityRequest);

        activityRequest.setPageNo(2);
        activityHandlerService.deleteCacheActivity(activityRequest);

        ActivityRequest dashboardActivityRequest = new ActivityRequest();
        dashboardActivityRequest.setClientId(clientId);
        dashboardActivityRequest.setPageSize(5);
        dashboardActivityRequest.setPageNo(1);
        activityHandlerService.deleteCacheActivity(dashboardActivityRequest);
        clientIdLoginTimeMap.remove(clientId);
    }
}

@Cacheable(cacheNames = "activity"
    , key = "#activityRequest.clientId + '_' + #activityRequest.pageSize + '_' + #activityRequest.pageNo"
    , condition = "#activityRequest.pageNo <= 2")
public List<UserActivityDTO> cacheActivity(ActivityRequest activityRequest) {
    log.info("cacheActivity, activityRequest:{}", activityRequest);
    return this.queryActivity(activityRequest);
}

@CachePut(cacheNames = "activity"
    , key = "#activityRequest.clientId + '_' + #activityRequest.pageSize + '_' + #activityRequest.pageNo"
    , condition = "#activityRequest.pageNo <= 2")
public List<UserActivityDTO> putCacheActivity(ActivityRequest activityRequest) {
    log.info("putCacheActivity, activityRequest:{}", activityRequest);
    return this.queryActivity(activityRequest);
}


@CacheEvict(cacheNames = "activity"
    , key = "#activityRequest.clientId + '_' + #activityRequest.pageSize + '_' + #activityRequest.pageNo"
    , condition = "#activityRequest.pageNo <= 2",
    beforeInvocation = true)
public List<UserActivityDTO> deleteCacheActivity(ActivityRequest activityRequest) {
    log.info("deleteCacheActivity, activityRequest:{}", activityRequest);
    return this.queryActivity(activityRequest);
}

複製代碼
相關文章
相關標籤/搜索