因爲數據監管的要求,公司的數據必須放在不一樣的國家,蛋疼的是第一次感覺到了光速的「慢」。數據存儲於,印度,美國等,每一次查詢都有200ms的延遲。並且這延遲是因爲光速的限制,是物理限制,買再大的帶寬也沒有用。java
只好本身寫了一個主動緩存的方案。緩存
挺有意思的,僅供參考~app
@Slf4j
@Component
public class UserLoginListener implements MessageListener {
@Autowired
private RegisterUserService registerUserService;
@Override
public Action consume(Message message, ConsumeContext context) {
DefaultJDKMessageSeri defaultJDKMessageSeri = new DefaultJDKMessageSeri();
log.info("Receive:{}, ConsumeContext:{}", message, context);
try {
UserMessage userMessage = (UserMessage) defaultJDKMessageSeri.deserialize(message.getBody());
log.info("Receive Message:{}, ConsumeContext:{}", userMessage, context);
if (userMessage == null || StringUtils.isEmpty(userMessage.getClientId())) {
return Action.CommitMessage;
}
if ("LOGIN".equals(message.getTag())) {
registerUserService.login(userMessage.getClientId());
}
if ("LOGOUT".equals(message.getTag())) {
registerUserService.loginOut(userMessage.getClientId());
}
return Action.CommitMessage;
} catch (Exception e) {
log.error("消費失敗,message:{}, context:{}", message, context, e);
return Action.ReconsumeLater;
}
}
}
@Slf4j
@Component
public class RegisterUserService {
@Autowired
private CacheActivityHandlerService activityHandlerService;
private static Map<String, Long> clientIdLoginTimeMap = Maps.newConcurrentMap();
private Long tokenExpireTime = 60 * 60 * 2L;
private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(
new BasicThreadFactory
.Builder()
.namingPattern("register-User-Scheduled-%d")
.build()
);
// 每次調度會有一堆的任務被塞進來,並且都是IO類型的任務
private static ExecutorService activityCacheParallelExecutor = new ThreadPoolExecutor(
16,
32,
0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(512), new BasicThreadFactory.Builder().namingPattern("activity-Cache-Thread-%d").build());
@PostConstruct
public void init() {
scheduledExecutorService.scheduleAtFixedRate(() -> {
log.info("cacheInfo, clientIdLoginTimeMap:{}", clientIdLoginTimeMap);
Iterator<String> iterator = clientIdLoginTimeMap.keySet().iterator();
while (iterator.hasNext()) {
String clientId = iterator.next();
if (StringUtils.isEmpty(clientId)) {
return;
}
Long time = clientIdLoginTimeMap.get(clientId);
if (time == null) {
this.loginOut(clientId);
continue;
}
if ((System.currentTimeMillis() - time) / 1000 > tokenExpireTime) {
this.loginOut(clientId);
continue;
}
try {
activityCacheParallelExecutor.submit(() -> {
ActivityRequest activityRequest = new ActivityRequest();
activityRequest.setClientId(clientId);
activityRequest.setPageSize(10);
activityRequest.setPageNo(1);
activityHandlerService.putCacheActivity(activityRequest);
activityRequest.setPageNo(2);
activityHandlerService.putCacheActivity(activityRequest);
ActivityRequest dashboardActivityRequest = new ActivityRequest();
dashboardActivityRequest.setClientId(clientId);
dashboardActivityRequest.setPageSize(5);
dashboardActivityRequest.setPageNo(1);
activityHandlerService.putCacheActivity(dashboardActivityRequest);
});
} catch (Exception e) {
// 可能隊列滿了,拋出錯誤
log.error("error happen when submit job, clientIdLoginTimeMap:{}", clientIdLoginTimeMap, e);
}
}
}, 0, 15, TimeUnit.SECONDS);
}
public void login(String clientId) {
clientIdLoginTimeMap.put(clientId, System.currentTimeMillis());
ActivityRequest dashboardActivityRequest = new ActivityRequest();
dashboardActivityRequest.setClientId(clientId);
dashboardActivityRequest.setPageSize(5);
dashboardActivityRequest.setPageNo(1);
activityHandlerService.putCacheActivity(dashboardActivityRequest);
ActivityRequest activityRequest = new ActivityRequest();
activityRequest.setClientId(clientId);
activityRequest.setPageSize(10);
activityRequest.setPageNo(1);
activityHandlerService.putCacheActivity(activityRequest);
activityRequest.setPageNo(2);
activityHandlerService.putCacheActivity(activityRequest);
}
public void loginOut(String clientId) {
ActivityRequest activityRequest = new ActivityRequest();
activityRequest.setClientId(clientId);
activityRequest.setPageSize(10);
activityRequest.setPageNo(1);
activityHandlerService.deleteCacheActivity(activityRequest);
activityRequest.setPageNo(2);
activityHandlerService.deleteCacheActivity(activityRequest);
ActivityRequest dashboardActivityRequest = new ActivityRequest();
dashboardActivityRequest.setClientId(clientId);
dashboardActivityRequest.setPageSize(5);
dashboardActivityRequest.setPageNo(1);
activityHandlerService.deleteCacheActivity(dashboardActivityRequest);
clientIdLoginTimeMap.remove(clientId);
}
}
@Cacheable(cacheNames = "activity"
, key = "#activityRequest.clientId + '_' + #activityRequest.pageSize + '_' + #activityRequest.pageNo"
, condition = "#activityRequest.pageNo <= 2")
public List<UserActivityDTO> cacheActivity(ActivityRequest activityRequest) {
log.info("cacheActivity, activityRequest:{}", activityRequest);
return this.queryActivity(activityRequest);
}
@CachePut(cacheNames = "activity"
, key = "#activityRequest.clientId + '_' + #activityRequest.pageSize + '_' + #activityRequest.pageNo"
, condition = "#activityRequest.pageNo <= 2")
public List<UserActivityDTO> putCacheActivity(ActivityRequest activityRequest) {
log.info("putCacheActivity, activityRequest:{}", activityRequest);
return this.queryActivity(activityRequest);
}
@CacheEvict(cacheNames = "activity"
, key = "#activityRequest.clientId + '_' + #activityRequest.pageSize + '_' + #activityRequest.pageNo"
, condition = "#activityRequest.pageNo <= 2",
beforeInvocation = true)
public List<UserActivityDTO> deleteCacheActivity(ActivityRequest activityRequest) {
log.info("deleteCacheActivity, activityRequest:{}", activityRequest);
return this.queryActivity(activityRequest);
}
複製代碼