類結構
java
說明:服務器
SpeedControlConstant | 定義的一些與限流相關的常量 |
SpeedControlHelper | 單機TPS限流類,提供當前任務TPS的計算功能 |
TaskDO | 任務DO,定義了任務類型、任務的執行邏輯。實現了Runnable接口 |
TaskStateDO | 任務狀態DO,定義了任務的TPS、執行次數、執行時間等 |
TaskStateSingleton | 統計全部任務狀態的單例類 |
TpsTest | 測試類 |
直接上代碼併發
package com.taobao.tps; /** * 限流常量類 * * @author <a href="mailto:zq_dong@sina.cn">zhangqi.dzq</a> * @version 1.0 * @since 2015年12月29日 */ public class SpeedControlConstant { // 單機tps設置50 public static int serverTps = 50; }
package com.taobao.tps; /** * 單機TPS限流功能類 * * @author <a href="mailto:zq_dong@sina.cn">zhangqi.dzq</a> * @version 1.0 * @since 2015年12月29日 */ public class SpeedControlHelper { /** * 判斷是否被限流 * * @param taskType * @return false限流 */ public static boolean speedControl(String taskType) { int tpsThreshod = SpeedControlConstant.serverTps;// tps閥值 int currentTps = currentTps(taskType);// 當前tps if (tpsThreshod <= currentTps) { return false; } return true; } /** * 獲取當前任務類型tps * * @param taskType * @return */ public static int currentTps(String taskType) { TaskStateSingleton taskStateSingleton = TaskStateSingleton.getInstance(); TaskStateDO currentTask = taskStateSingleton.getTaskStateDO(taskType); if (currentTask == null) { // 這裏有併發問題,可是統計tps不須要特別精準。添加了併發控制反而會影響性能 currentTask = new TaskStateDO(); taskStateSingleton.putTaskStateDO(taskType, currentTask); } return currentTask.calcuTps(); } /** * 設置當前任務類型執行時間 * * @param taskType * @param time */ public static void setExecTime(String taskType, long time) { TaskStateSingleton taskStateSingleton = TaskStateSingleton.getInstance(); TaskStateDO currentTask = taskStateSingleton.getTaskStateDO(taskType); if (currentTask == null) { // logger.. return; } currentTask.state(time); } }
package com.taobao.tps; /** * 任務 * * @author <a href="mailto:zq_dong@sina.cn">zhangqi.dzq</a> * @version 1.0 * @since 2015年12月29日 */ public class TaskDO implements Runnable { private String taskType; public TaskDO(String taskType) { this.taskType = taskType; } public String getTaskType() { return taskType; } public void run() { try { long startTime = System.currentTimeMillis(); Thread.sleep(15); long endTime = System.currentTimeMillis(); SpeedControlHelper.setExecTime(taskType, endTime - startTime); } catch (InterruptedException e) { // logger.. } } }
package com.taobao.tps; import java.util.concurrent.atomic.AtomicInteger; /** * 任務狀態 * * @author <a href="mailto:zq_dong@sina.cn">zhangqi.dzq</a> * @version 1.0 * @since 2015年12月29日 */ public class TaskStateDO { /** * 任務類型 */ private String taskType; /** * 一個統計時間區間內的總執行次數 */ private int execCount; /** * 一個統計時間區間內的總執行耗時 */ private long totalTime; /** * 一個統計時間區間內的平均執行耗時 */ private int averageTime; /** * 當前1秒內執行的次數 */ private AtomicInteger secondExecCount = new AtomicInteger(0); /** * 當前秒 */ private long currentSecond = 0; public TaskStateDO() { reset(); } /** * 獲取taskType * * @return taskType */ public String getTaskType() { return taskType; } /** * 設置taskType * * @param taskType 要設置的taskType */ public void setTaskType(String taskType) { this.taskType = taskType; } /** * 獲取totalTime * * @return totalTime */ public long getTotalTime() { return totalTime; } /** * 獲取execCount * * @return execCount */ public int getExecCount() { return execCount; } /** * 獲取averageTime * * @return averageTime */ public int getAverageTime() { return averageTime; } /** * 統計任務的秒級執行次數 * * @return */ public int calcuTps() { if (currentSecond == System.currentTimeMillis() / 1000) { return secondExecCount.incrementAndGet();// 1s的執行次數 } else { currentSecond = System.currentTimeMillis() / 1000; secondExecCount.set(1); return 1; } } /** * 統計任務執行時間 * * @param time */ public void state(long time) { try { execCount++; totalTime += time; averageTime = (int) (totalTime / execCount); } catch (Exception e) { reset(); } } /** * 重置方法,沒有設置爲零,防止除法拋異常 */ public void reset() { execCount = 1; totalTime = 100; } }
package com.taobao.tps; import java.util.HashMap; import java.util.Map; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; /** * 統計全量任務單例 * * @author <a href="mailto:zq_dong@sina.cn">zhangqi.dzq</a> * @version 1.0 * @since 2015年12月29日 */ public class TaskStateSingleton { private Map<String, TaskStateDO> taskStateMap = new HashMap<String, TaskStateDO>(); private ReadWriteLock lock = new ReentrantReadWriteLock(); private Lock read = lock.readLock(); private Lock write = lock.writeLock(); private TaskStateSingleton() { } // 獲取實例 public static TaskStateSingleton getInstance() { return SingletonHolder.taskStateSingleton; } private static class SingletonHolder { private static final TaskStateSingleton taskStateSingleton = new TaskStateSingleton(); } public Map<String, TaskStateDO> getTaskStateMap() { try { read.lock(); return taskStateMap; } finally { read.unlock(); } } /** * 新增一個任務統計信息 */ public void putTaskStateDO(String taskType, TaskStateDO taskStateDO) { try { write.lock(); taskStateMap.put(taskType, taskStateDO); } finally { write.unlock(); } } /** * 查詢一個任務統計信息 */ public TaskStateDO getTaskStateDO(String taskType) { try { read.lock(); return taskStateMap.get(taskType); } finally { read.unlock(); } } }
package com.taobao.tps; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; /** * 測試類 * * @author <a href="mailto:zq_dong@sina.cn">zhangqi.dzq</a> * @version 1.0 * @since 2015年12月29日 */ public class TpsTest { public static ThreadPoolExecutor executer = new ThreadPoolExecutor(4, 4, 4, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(500)); public static void main(String[] args) throws Exception { while (true) { TaskDO task = new TaskDO("calcu_item_value"); if (!SpeedControlHelper.speedControl(task.getTaskType())) { print("被限流.."); continue; } executer.submit(task); print("添加任務成功"); Thread.sleep(10); } } public static void print(Object obj) { System.out.print(obj.toString()); } }
建議用服務器測試代碼,工做PC測試能夠比較卡。。性能