多線程訪問限流接口: html
/** * 多線程同步代碼 * */ protected Integer syncXXX(List<String> erps, Date startTime, Date endTime) throws Exception { int total = 0; if(CollectionUtils.isEmpty(erps)) { return total; } //TPS(Transaction Per Second) 每秒鐘系統可以處理的交易或事務的數量 int tps = SystemConfigs.getInt(SysConfigKeys.XXX_TPS); int tSize = (int) Math.ceil(tps / 5.0);// 預計一個線程每秒5次請求,計算線程數,向上取整 int slen = (int) Math.ceil(erps.size() / (tSize + 0.0));//單個線程計算的erp個數,向上取整 int stps = (int) Math.floor(tps / (tSize + 0.0)); // 預計單個線程的訪問頻率,向下取整 ExecutorService executorService = Executors.newFixedThreadPool(tSize);//創建ExecutorService線程池 List<Future<Integer>> futureList = new ArrayList<Future<Integer>>(); int sindex = 0; while(sindex < erps.size()) {//將erps分配給每一個線程 int limit = sindex + slen; List<String> serps = erps.subList(sindex, limit < erps.size() ? limit : erps.size());//單個線程計算的ERP list Future<Integer> future = executorService.submit(new XXXeSyncTask(serps, startTime, endTime, stps)); futureList.add(future); sindex = limit; } for(Future<Integer> future : futureList) { total += future.get(); } executorService.shutdown(); return total; } class XXXSyncTask implements Callable<Integer> { // erp子級,代碼行彙總 private List<String> erps; private Date start; private Date end; // 容許最大訪問頻率 private int tps; public XXXSyncTask(List<String> erps, Date start, Date end, int tps) { this.erps = erps; this.start = start; this.end = end; this.tps = tps; } @Override public Integer call() throws Exception { long startMillis = System.currentTimeMillis(); long duration = 0; int count = 0; List<XXX> xxxList = new ArrayList<XXX>(); for (int i = 0; i < erps.size(); i++) { String erp = erps.get(i); while (start.before(end)) { // 預計耗時(ms) long predict = i * 1000 / tps; if (predict > duration) { // 實際耗時小於預計耗時,sleep long waitting = predict - duration; Thread.sleep(waitting); LOG.debug("代碼行同步休眠{}ms. ", waitting); } duration = System.currentTimeMillis() - startMillis; List<XXX> tempList = queryData(erp, start); xxxList.addAll(tempList); if (xxxList.size() > 100) { xxxDao.batch(xxxList, SQL_OPTION.INSERT); count += xxxList.size(); xxxList.clear(); } start = DateUtils.addDay(start, 1); } } if (!xxxList.isEmpty()) { xxxDao.batch(codeColumnList, SQL_OPTION.INSERT); count += codeColumnList.size(); xxxList.clear(); } return count; } }
相關文章: java
http://www.oschina.net/translate/java-util-concurrent-future-basics 多線程
http://www.cnblogs.com/whgw/archive/2011/09/28/2194760.html ide