時間輪是一種高效、低延遲的調度數據結構。其在Linux內核中普遍使用,是Linux內核定時器的實現方法和基礎之一。按使用場景,大體能夠分爲兩種時間輪:原始時間輪和分層時間輪。分層時間輪是原始時間輪的升級版本,來應對時間「槽」數量比較大的狀況,對內存和精度都有很高要求的狀況。延遲任務的場景通常只須要用到原始時間輪就能夠了。git
推薦使用Netty
提供的HashedWheelTimer
工具類來實現延遲任務。算法
引入依賴:spring
<dependency> <groupId>io.netty</groupId> <artifactId>netty-common</artifactId> <version>4.1.23.Final</version> </dependency>
紅包過時隊列信息:數據結構
/** * 紅包過時隊列信息 */ public class RedPacketTimerTask implements TimerTask { private static final DateTimeFormatter F = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS"); /** * 紅包 ID */ private final long redPacketId; /** * 建立時間戳 */ private final long timestamp; public RedPacketTimerTask(long redPacketId) { this.redPacketId = redPacketId; this.timestamp = System.currentTimeMillis(); } @Override public void run(Timeout timeout) { //異步處理任務 System.out.println(String.format("任務執行時間:%s,紅包建立時間:%s,紅包ID:%s", LocalDateTime.now().format(F), LocalDateTime.ofInstant(Instant.ofEpochMilli(timestamp), ZoneId.systemDefault()).format(F), redPacketId)); } }
測試用例:異步
/** * 基於 netty 的時間輪算法 HashedWheelTimer 實現的延遲任務 */ public class RedPacketHashedWheelTimer { private static final DateTimeFormatter F = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS"); public static void main(String[] args) throws Exception { ThreadFactory factory = r -> { Thread thread = new Thread(r); thread.setDaemon(true); thread.setName("RedPacketHashedWheelTimerWorker"); return thread; }; /** * @param tickDuration - 每tick一次的時間間隔 * @param unit - tickDuration 的時間單位 * @param ticksPerWheel - 時間輪中的槽數 * @param leakDetection - 檢查內存溢出 */ Timer timer = new HashedWheelTimer(factory, 1, TimeUnit.SECONDS, 100,true); System.out.println(String.format("開始任務時間:%s",LocalDateTime.now().format(F))); for(int i=1;i<10;i++){ TimerTask timerTask = new RedPacketTimerTask(i); timer.newTimeout(timerTask, i, TimeUnit.SECONDS); } Thread.sleep(Integer.MAX_VALUE); } }
打印任務執行日誌:分佈式
開始任務時間:2020-02-12 15:22:23.404 任務執行時間:2020-02-12 15:22:25.410,紅包建立時間:2020-02-12 15:22:23.409,紅包ID:1 任務執行時間:2020-02-12 15:22:26.411,紅包建立時間:2020-02-12 15:22:23.414,紅包ID:2 任務執行時間:2020-02-12 15:22:27.424,紅包建立時間:2020-02-12 15:22:23.414,紅包ID:3 任務執行時間:2020-02-12 15:22:28.410,紅包建立時間:2020-02-12 15:22:23.414,紅包ID:4 任務執行時間:2020-02-12 15:22:29.411,紅包建立時間:2020-02-12 15:22:23.414,紅包ID:5 任務執行時間:2020-02-12 15:22:30.409,紅包建立時間:2020-02-12 15:22:23.414,紅包ID:6 任務執行時間:2020-02-12 15:22:31.411,紅包建立時間:2020-02-12 15:22:23.414,紅包ID:7 任務執行時間:2020-02-12 15:22:32.409,紅包建立時間:2020-02-12 15:22:23.414,紅包ID:8 任務執行時間:2020-02-12 15:22:33.411,紅包建立時間:2020-02-12 15:22:23.414,紅包ID:9
其核心是workerThread
線程,主要負責每過tickDuration
時間就累加一次tick
。同時也負責執行到期的timeout
任務以及添加timeout
任務到指定的wheel
中。ide
構造方法:spring-boot
public HashedWheelTimer( ThreadFactory threadFactory, long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection, long maxPendingTimeouts) { if (threadFactory == null) { throw new NullPointerException("threadFactory"); } if (unit == null) { throw new NullPointerException("unit"); } if (tickDuration <= 0) { throw new IllegalArgumentException("tickDuration must be greater than 0: " + tickDuration); } if (ticksPerWheel <= 0) { throw new IllegalArgumentException("ticksPerWheel must be greater than 0: " + ticksPerWheel); } // Normalize ticksPerWheel to power of two and initialize the wheel. wheel = createWheel(ticksPerWheel); mask = wheel.length - 1; // Convert tickDuration to nanos. this.tickDuration = unit.toNanos(tickDuration); // Prevent overflow. if (this.tickDuration >= Long.MAX_VALUE / wheel.length) { throw new IllegalArgumentException(String.format( "tickDuration: %d (expected: 0 < tickDuration in nanos < %d", tickDuration, Long.MAX_VALUE / wheel.length)); } //這裏-爪窪筆記 workerThread = threadFactory.newThread(worker); leak = leakDetection || !workerThread.isDaemon() ? leakDetector.track(this) : null; this.maxPendingTimeouts = maxPendingTimeouts; if (INSTANCE_COUNTER.incrementAndGet() > INSTANCE_COUNT_LIMIT && WARNED_TOO_MANY_INSTANCES.compareAndSet(false, true)) { reportTooManyInstances(); } }
新增任務,建立即啓動:工具
public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) { if (task == null) { throw new NullPointerException("task"); } if (unit == null) { throw new NullPointerException("unit"); } long pendingTimeoutsCount = pendingTimeouts.incrementAndGet(); if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) { pendingTimeouts.decrementAndGet(); throw new RejectedExecutionException("Number of pending timeouts (" + pendingTimeoutsCount + ") is greater than or equal to maximum allowed pending " + "timeouts (" + maxPendingTimeouts + ")"); } //這裏-爪窪筆記 start(); // Add the timeout to the timeout queue which will be processed on the next tick. // During processing all the queued HashedWheelTimeouts will be added to the correct HashedWheelBucket. long deadline = System.nanoTime() + unit.toNanos(delay) - startTime; // Guard against overflow. if (delay > 0 && deadline < 0) { deadline = Long.MAX_VALUE; } HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline); timeouts.add(timeout); return timeout; }
線程啓動:測試
/** * Starts the background thread explicitly. The background thread will * start automatically on demand even if you did not call this method. * * @throws IllegalStateException if this timer has been * {@linkplain #stop() stopped} already */ public void start() { switch (WORKER_STATE_UPDATER.get(this)) { case WORKER_STATE_INIT: if (WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) { workerThread.start(); } break; case WORKER_STATE_STARTED: break; case WORKER_STATE_SHUTDOWN: throw new IllegalStateException("cannot be started once stopped"); default: throw new Error("Invalid WorkerState"); } // Wait until the startTime is initialized by the worker. while (startTime == 0) { try { startTimeInitialized.await(); } catch (InterruptedException ignore) { // Ignore - it will be ready very soon. } } }
執行相關操做:
public void run() { // Initialize the startTime. startTime = System.nanoTime(); if (startTime == 0) { // We use 0 as an indicator for the uninitialized value here, so make sure it's not 0 when initialized. startTime = 1; } // Notify the other threads waiting for the initialization at start(). startTimeInitialized.countDown(); do { final long deadline = waitForNextTick(); if (deadline > 0) { int idx = (int) (tick & mask); processCancelledTasks(); HashedWheelBucket bucket = wheel[idx]; transferTimeoutsToBuckets(); bucket.expireTimeouts(deadline); tick++; } } while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED); // Fill the unprocessedTimeouts so we can return them from stop() method. for (HashedWheelBucket bucket: wheel) { bucket.clearTimeouts(unprocessedTimeouts); } for (;;) { HashedWheelTimeout timeout = timeouts.poll(); if (timeout == null) { break; } if (!timeout.isCancelled()) { unprocessedTimeouts.add(timeout); } } processCancelledTasks(); }
以上方案並無實現持久化和分佈式,生產環境可根據實際業務需求選擇使用。
https://gitee.com/52itstyle/spring-boot-seckill