相信你在平常的開發中確定遇到過這種問題: 須要對實體類的狀態信息進行管理,好比必定時間後修改它爲XXX狀態.html
舉個例子: 訂單服務,當用戶提交了訂單後,若是在30分鐘內沒有支付,自動取消訂單,這就是一個對狀態的管理;java
再舉一個我實際開發的例子: 消息管道的例子,用戶來拉取消息後,若是在30s內沒有提交,那麼修改他的訂閱狀態爲:未訂閱,這樣其餘的實例能夠創建鏈接繼續讀取.緩存
整理設計圖:ide
核心就是: 一個Thread + 一個Queue;Thread不斷從隊列中取出數據, 若是隊列中爲空或者裏邊的任務沒到期,則線程卡住wait(timeOut).測試
先是簡單的有狀態的實體類:ConsumerInfoState,這個類的核心是狀態(訂閱到期時間),因此得有對狀態的查詢設置,查詢距到期還要多久等等....this
import java.io.Serializable; public class ConsumerInfoState implements Serializable { /** * 序列化ID */ private static final long serialVersionUID = 1L; /** * 過時時間20s */ protected long expiration; private String topic; private String userId; private boolean isSubscribed = false; private long CONSUMER_INSTANCE_TIMEOUT_MS_DEFAULT = 5000; public ConsumerInfoState(String userId) { this.userId = userId; this.expiration = System.currentTimeMillis() + CONSUMER_INSTANCE_TIMEOUT_MS_DEFAULT; } public ConsumerInfoState(String topic, String userId) { super(); this.topic = topic; this.userId = userId; this.expiration = System.currentTimeMillis() + CONSUMER_INSTANCE_TIMEOUT_MS_DEFAULT; } /** *是否過時 */ public boolean expired(long nowMs) { return expiration <= nowMs; } /** * <p> * 更新訂閱過時時間 * </p> */ public void updateExpiration() { this.expiration = System.currentTimeMillis() + CONSUMER_INSTANCE_TIMEOUT_MS_DEFAULT; } /** * <p> * 到指定時間還有多久 * </p> */ public long untilExpiration(long nowMs) { return this.expiration - nowMs; } public String getUserId() { return userId; } public String getTopic() { return topic; } public void setTopic(String topic) { this.topic = topic; } public void setSubscribed(boolean isSubscribed) { this.isSubscribed = isSubscribed; } public boolean hasSubscribed() { return isSubscribed; } }
這個類仍是很清晰的..atom
核心類: ConsumerInfoManagerspa
import java.util.Comparator; import java.util.PriorityQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class ConsumerInfoManager { Logger logger = LoggerFactory.getLogger(ConsumerInfoManager.class); //任務隊列 private final PriorityQueue<ConsumerInfoState> consumersByExpiration = new PriorityQueue<ConsumerInfoState>( new Comparator<ConsumerInfoState>() { //小的在前 public int compare(ConsumerInfoState o1, ConsumerInfoState o2) { if (o1.expiration < o2.expiration) { return -1; } else if (o1.expiration == o2.expiration) { return 0; } else { return 1; } } }); private ExpirationThread expirationThread; public ConsumerInfoManager() { //啓動線程 this.expirationThread = new ExpirationThread(); this.expirationThread.start(); } //加入任務隊列 public synchronized void addConsumerInfoSate(ConsumerInfoState consumerInfoSate) { consumersByExpiration.add(consumerInfoSate); this.notifyAll(); } @SuppressWarnings("unused") public synchronized void updateExpiration(ConsumerInfoState state) { // 先刪除在放裏邊,從新排序 consumersByExpiration.remove(state); state.updateExpiration(); consumersByExpiration.add(state); this.notifyAll(); } public void shutdown() { logger.debug("Shutting down consumers"); expirationThread.shutdown(); synchronized (this) { consumersByExpiration.clear(); } } /** * <p> * 檢查consumerInfo的過時時間,過時就從緩存中刪除 * </p> * @author jiangyuechao 2018年1月13日 下午2:04:30 */ @SuppressWarnings("unused") private class ExpirationThread extends Thread { AtomicBoolean isRunning = new AtomicBoolean(true); CountDownLatch shutdownLatch = new CountDownLatch(1); public ExpirationThread() { super("Consumer Expiration Thread"); setDaemon(true); } @Override public void run() { synchronized (ConsumerInfoManager.this) { try { while (isRunning.get()) { long now = System.currentTimeMillis(); //隊列空和最近一個任務是否到期的判斷 while (!consumersByExpiration.isEmpty() && consumersByExpiration.peek().expired(now)) { final ConsumerInfoState state = consumersByExpiration.remove(); //{你本身的業務處理} state.setSubscribed(false); logger.info("任務已到期,topic:{}, userID:{},subscribed:{}",state.getTopic(),state.getUserId(),state.hasSubscribed()); } //須要等待的時間 long timeout = consumersByExpiration.isEmpty() ? Long.MAX_VALUE : consumersByExpiration.peek().untilExpiration(now); ConsumerInfoManager.this.wait(timeout); } } catch (InterruptedException e) { // Interrupted by other thread, do nothing to allow this thread to exit logger.error("ExpirationThread線程中斷", e); } } shutdownLatch.countDown(); } public void shutdown() { try { isRunning.set(false); this.interrupt(); shutdownLatch.await(); } catch (InterruptedException e) { throw new Error("Interrupted when shutting down consumer worker thread."); } } } public void join(){ try { expirationThread.join(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
代碼就這些,我進行了刪減,刪除了不重要的部分, 通常ConsumerInfoManager還須要一個緩存Cache,Cache中是存儲全部的實體類,queue中是Cache中的一部分,通常queue中的任務到期,須要從Cache中刪除或取出執行一些操做.線程
固然加Cache是複雜點的,核心思想就這些,額外的代碼就刪除了..debug
public class ManagerTest { static ConsumerInfoManager consumerInfoManager; static String userId = "dhsajkdsajkdsjh1"; static Logger logger = LoggerFactory.getLogger(ManagerTest.class); public static void main(String[] args) throws InterruptedException { //實例化 setUp(); for(int i = 0;i<3;i++){ ConsumerInfoState consumerInfoState = new ConsumerInfoState("chao-"+i, userId); consumerInfoState.setSubscribed(true); consumerInfoManager.addConsumerInfoSate(consumerInfoState); logger.info("任務"+i+"加入隊列"); Thread.sleep(1000); } consumerInfoManager.join(); } public static void setUp(){ consumerInfoManager = new ConsumerInfoManager(); } }
輸出結果: 符合預期...
2018-01-17 10:07:27,450 [main] INFO ManagerTest - 任務0加入隊列
2018-01-17 10:07:28,451 [main] INFO ManagerTest - 任務1加入隊列
2018-01-17 10:07:29,451 [main] INFO ManagerTest - 任務2加入隊列
2018-01-17 10:07:32,451 [Consumer Expiration Thread] INFO ConsumerInfoManager - 任務已到期,topic:chao-0, userID:dhsajkdsajkdsjh1,subscribed:false
2018-01-17 10:07:33,485 [Consumer Expiration Thread] INFO ConsumerInfoManager - 任務已到期,topic:chao-1, userID:dhsajkdsajkdsjh1,subscribed:false
2018-01-17 10:07:34,452 [Consumer Expiration Thread] INFO ConsumerInfoManager - 任務已到期,topic:chao-2, userID:dhsajkdsajkdsjh1,subscribed:false