[Java]計算單機TPS、並對任務限流

類結構
java

說明:服務器

SpeedControlConstant 定義的一些與限流相關的常量
SpeedControlHelper 單機TPS限流類,提供當前任務TPS的計算功能
TaskDO 任務DO,定義了任務類型、任務的執行邏輯。實現了Runnable接口
TaskStateDO 任務狀態DO,定義了任務的TPS、執行次數、執行時間等
TaskStateSingleton 統計全部任務狀態的單例類
TpsTest 測試類


直接上代碼併發

package com.taobao.tps;

/**
 * 限流常量類
 * 
 * @author <a href="mailto:zq_dong@sina.cn">zhangqi.dzq</a>
 * @version 1.0
 * @since 2015年12月29日
 */
public class SpeedControlConstant {
	// 單機tps設置50
	public static int serverTps = 50;
}
package com.taobao.tps;

/**
 * 單機TPS限流功能類
 * 
 * @author <a href="mailto:zq_dong@sina.cn">zhangqi.dzq</a>
 * @version 1.0
 * @since 2015年12月29日
 */
public class SpeedControlHelper {

	/**
	 * 判斷是否被限流
	 * 
	 * @param taskType
	 * @return false限流
	 */
	public static boolean speedControl(String taskType) {
		int tpsThreshod = SpeedControlConstant.serverTps;// tps閥值
		int currentTps = currentTps(taskType);// 當前tps
		if (tpsThreshod <= currentTps) {
			return false;
		}
		return true;
	}

	/**
	 * 獲取當前任務類型tps
	 * 
	 * @param taskType
	 * @return
	 */
	public static int currentTps(String taskType) {
		TaskStateSingleton taskStateSingleton = TaskStateSingleton.getInstance();
		TaskStateDO currentTask = taskStateSingleton.getTaskStateDO(taskType);
		if (currentTask == null) {
			// 這裏有併發問題,可是統計tps不須要特別精準。添加了併發控制反而會影響性能
			currentTask = new TaskStateDO();
			taskStateSingleton.putTaskStateDO(taskType, currentTask);
		}
		return currentTask.calcuTps();
	}

	/**
	 * 設置當前任務類型執行時間
	 * 
	 * @param taskType
	 * @param time
	 */
	public static void setExecTime(String taskType, long time) {
		TaskStateSingleton taskStateSingleton = TaskStateSingleton.getInstance();
		TaskStateDO currentTask = taskStateSingleton.getTaskStateDO(taskType);
		if (currentTask == null) {
			// logger..
			return;
		}
		currentTask.state(time);
	}
}
package com.taobao.tps;

/**
 * 任務
 * 
 * @author <a href="mailto:zq_dong@sina.cn">zhangqi.dzq</a>
 * @version 1.0
 * @since 2015年12月29日
 */
public class TaskDO implements Runnable {
	private String taskType;

	public TaskDO(String taskType) {
		this.taskType = taskType;
	}

	public String getTaskType() {
		return taskType;
	}

	public void run() {
		try {
			long startTime = System.currentTimeMillis();
			Thread.sleep(15);
			long endTime = System.currentTimeMillis();
			SpeedControlHelper.setExecTime(taskType, endTime - startTime);
		} catch (InterruptedException e) {
			// logger..
		}
	}

}
package com.taobao.tps;

import java.util.concurrent.atomic.AtomicInteger;

/**
 * 任務狀態
 * 
 * @author <a href="mailto:zq_dong@sina.cn">zhangqi.dzq</a>
 * @version 1.0
 * @since 2015年12月29日
 */
public class TaskStateDO {
	/**
	 * 任務類型
	 */
	private String taskType;

	/**
	 * 一個統計時間區間內的總執行次數
	 */
	private int execCount;

	/**
	 * 一個統計時間區間內的總執行耗時
	 */
	private long totalTime;

	/**
	 * 一個統計時間區間內的平均執行耗時
	 */
	private int averageTime;

	/**
	 * 當前1秒內執行的次數
	 */
	private AtomicInteger secondExecCount = new AtomicInteger(0);

	/**
	 * 當前秒
	 */
	private long currentSecond = 0;

	public TaskStateDO() {
		reset();
	}

	/**
	 * 獲取taskType
	 * 
	 * @return taskType
	 */
	public String getTaskType() {
		return taskType;
	}

	/**
	 * 設置taskType
	 * 
	 * @param taskType 要設置的taskType
	 */
	public void setTaskType(String taskType) {
		this.taskType = taskType;
	}

	/**
	 * 獲取totalTime
	 * 
	 * @return totalTime
	 */
	public long getTotalTime() {
		return totalTime;
	}

	/**
	 * 獲取execCount
	 * 
	 * @return execCount
	 */
	public int getExecCount() {
		return execCount;
	}

	/**
	 * 獲取averageTime
	 * 
	 * @return averageTime
	 */
	public int getAverageTime() {
		return averageTime;
	}

	/**
	 * 統計任務的秒級執行次數
	 * 
	 * @return
	 */
	public int calcuTps() {
		if (currentSecond == System.currentTimeMillis() / 1000) {
			return secondExecCount.incrementAndGet();// 1s的執行次數
		} else {
			currentSecond = System.currentTimeMillis() / 1000;
			secondExecCount.set(1);
			return 1;
		}
	}

	/**
	 * 統計任務執行時間
	 * 
	 * @param time
	 */
	public void state(long time) {
		try {
			execCount++;
			totalTime += time;
			averageTime = (int) (totalTime / execCount);
		} catch (Exception e) {
			reset();
		}
	}

	/**
	 * 重置方法,沒有設置爲零,防止除法拋異常
	 */
	public void reset() {
		execCount = 1;
		totalTime = 100;
	}

}
package com.taobao.tps;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

/**
 * 統計全量任務單例
 * 
 * @author <a href="mailto:zq_dong@sina.cn">zhangqi.dzq</a>
 * @version 1.0
 * @since 2015年12月29日
 */
public class TaskStateSingleton {
	private Map<String, TaskStateDO> taskStateMap = new HashMap<String, TaskStateDO>();

	private ReadWriteLock lock = new ReentrantReadWriteLock();

	private Lock read = lock.readLock();

	private Lock write = lock.writeLock();

	private TaskStateSingleton() {
	}

	// 獲取實例
	public static TaskStateSingleton getInstance() {
		return SingletonHolder.taskStateSingleton;
	}

	private static class SingletonHolder {
		private static final TaskStateSingleton taskStateSingleton = new TaskStateSingleton();
	}

	public Map<String, TaskStateDO> getTaskStateMap() {
		try {
			read.lock();
			return taskStateMap;
		} finally {
			read.unlock();
		}
	}

	/**
	 * 新增一個任務統計信息
	 */
	public void putTaskStateDO(String taskType, TaskStateDO taskStateDO) {
		try {
			write.lock();
			taskStateMap.put(taskType, taskStateDO);
		} finally {
			write.unlock();
		}
	}

	/**
	 * 查詢一個任務統計信息
	 */
	public TaskStateDO getTaskStateDO(String taskType) {
		try {
			read.lock();
			return taskStateMap.get(taskType);
		} finally {
			read.unlock();
		}
	}

}
package com.taobao.tps;

import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * 測試類
 * 
 * @author <a href="mailto:zq_dong@sina.cn">zhangqi.dzq</a>
 * @version 1.0
 * @since 2015年12月29日
 */
public class TpsTest {
	public static ThreadPoolExecutor executer = new ThreadPoolExecutor(4, 4, 4, TimeUnit.SECONDS,
			new LinkedBlockingQueue<Runnable>(500));

	public static void main(String[] args) throws Exception {
		while (true) {
			TaskDO task = new TaskDO("calcu_item_value");
			if (!SpeedControlHelper.speedControl(task.getTaskType())) {
				print("被限流..");
				continue;
			}
			executer.submit(task);
			print("添加任務成功");
			Thread.sleep(10);
		}
	}

	public static void print(Object obj) {
		System.out.print(obj.toString());
	}
}


建議用服務器測試代碼,工做PC測試能夠比較卡。。性能

相關文章
相關標籤/搜索