開發過程當中,常常會遇到一鍵操做這樣的功能,當數據量較少或者業務邏輯單一的時候沒什麼問題,可是當遇到數據量較大,並且業務邏輯較爲複雜的狀況,就比較棘手了,一鍵執行後,彷彿整個世界都在跟着轉圈圈,直到請求超時,更有甚者,服務器直接駕崩。java
最近,在幫客戶作微信會員資料更新操做時就遇到這樣的狀況,最開始是最簡單的遍歷執行,發現行不通;因而又換用分頁批量執行的方法,結果發現換湯不換藥,仍是請求超時;最後沒辦發,只能經過使用多線程機制,經過開啓多線程,以增長系統開銷來節省請求時間,終於把問題解決了。也許會有更好的方法,可是目前實現功能要緊。話很少說,關門,放代碼。。。git
對此,我寫了兩套方案,其一是經過實現Callable接口異步執行業務方法,最後返回執行結果;github
package com.web.demo.thread; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; /** * 異步執行業務方法,實現Callable接口,返回執行結果 * * @author jiangyf */ public class AsyncTask implements Callable<List<Map<String, Object>>> { // 執行任務名稱 private String taskName; // 執行任務時間 private long taskTime; // 線程同步輔助類,在完成一組正在其餘線程中執行的操做以前,它容許一個或多個線程一直等待 private CountDownLatch latch; // 任務執行結果 List<Map<String, Object>> resultList; private Map<String, Object> resultMap; public AsyncTask(String taskName, long taskTime, CountDownLatch latch) { super(); this.taskName = taskName; this.taskTime = taskTime; this.latch = latch; } @Override public List<Map<String, Object>> call() throws Exception { resultList = new ArrayList<Map<String, Object>>(); resultMap = new HashMap<String, Object>(); // 任務開始時間 long begin = System.currentTimeMillis(); System.out.println(taskName + " 任務開始...."); // 執行具體業務 Thread.sleep(taskTime * 1000); System.out.println(taskName + " 任務結束...."); // 任務結束時間 long end = System.currentTimeMillis(); taskTime = (end - begin) / 1000; resultMap.put("taskName", taskName); resultMap.put("taskTime", taskTime); resultList.add(resultMap); System.out.println(taskName + "任務用時:" + taskTime + "秒"); if (latch != null) { // 任務完成,計數器減一 latch.countDown(); } return resultList; } public static void main(String[] args) throws InterruptedException, ExecutionException { // 任務開始時間 long begin = System.currentTimeMillis(); // 初始化計數器 CountDownLatch latch = new CountDownLatch(2); // 初始化線程池 ExecutorService executorService = Executors.newFixedThreadPool(2); // 初始化線程 Future<List<Map<String, Object>>> future = executorService .submit(new AsyncTask("running", 2, latch)); Future<List<Map<String, Object>>> future2 = executorService .submit(new AsyncTask("walking", 5, latch)); executorService.shutdown(); // 所有任務執行完成前,會一直阻塞當前線程,直到計時器的值爲0 latch.await(); List<Map<String, Object>> result = future.get(); List<Map<String, Object>> result2 = future2.get(); result.addAll(result2); System.out.println(result.size()); // 任務結束時間 long end = System.currentTimeMillis(); System.out.println("任務總用時:" + ((end - begin) / 1000) + "秒"); } }
其二是經過繼承Thread類異步執行業務方法,最後不返回執行結果。web
package com.web.demo.thread; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; /** * 異步執行業務方法,繼承Thread類,不返回執行結果 * * @author jiangyf */ public class AsyncJob extends Thread { // 執行任務名稱 private String jobName; // 執行任務時間 private long jobTime; // 線程同步輔助類,在完成一組正在其餘線程中執行的操做以前,它容許一個或多個線程一直等待 private CountDownLatch latch; public AsyncJob(String jobName, long jobTime, CountDownLatch latch) { super(); this.jobName = jobName; this.jobTime = jobTime; this.latch = latch; } public void run() { // 任務開始時間 long begin = System.currentTimeMillis(); System.out.println(jobName + " 任務開始...."); // 執行具體業務 try { Thread.sleep(jobTime * 1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(jobName + " 任務結束...."); // 任務結束時間 long end = System.currentTimeMillis(); jobTime = (end - begin) / 1000; System.out.println(jobName + "任務用時:" + jobTime + "秒"); if (latch != null) { // 任務完成,計數器減一 latch.countDown(); } } public static void main(String[] args) throws InterruptedException, ExecutionException { // 任務開始時間 long begin = System.currentTimeMillis(); // 初始化計數器 CountDownLatch latch = new CountDownLatch(2); // 初始化線程 AsyncJob job = new AsyncJob("running", 5, latch); AsyncJob job2 = new AsyncJob("walking", 2, latch); job.start(); job2.start(); // 所有任務執行完成前,會一直阻塞當前線程,直到計時器的值爲0 latch.await(); // 任務結束時間 long end = System.currentTimeMillis(); System.out.println("任務總用時:" + ((end - begin) / 1000) + "秒"); } }
如下爲業務代碼實現示例:服務器
// 會員資料更新失敗的卡號 private static StringBuffer cardNoStr = new StringBuffer(); /** * 同步微信會員信息 */ public String syncVipInfo(String weixinId) { log.info("-------------同步微信會員信息開始"); String msg = ""; setWeixinInfo(weixinId); try { List<VipInfo> vipInfos = vipInfoDao.getByWeixinId(weixinId, null, null); int totalRows = vipInfos.size(); msg = "須要同步的微信會員數:" + totalRows; log.info(msg); if (totalRows == 0) { return msg; } Map<String, String> map = PropertiesUtil .propertiesToMap("syncvipinfo.properties"); if (map.get("max_num") == null || map.get("max_thread") == null) { msg = "同步微信會員信息配置文件錯誤"; log.info(msg); return msg; } int offset = 0; int rows = Integer.parseInt(map.get("max_num")); int threadNum = Integer.parseInt(map.get("max_thread")); int count = totalRows / rows; if ((totalRows % rows) > 0) { count += 1; } if (count > threadNum) { if ((totalRows % threadNum) > 0) { count = threadNum - 1; rows = totalRows / count; if ((totalRows % count) > 0) { count += 1; } } else { count = threadNum; rows = totalRows / count; } } log.info("須要開啓線程數量:" + count); // 任務開始時間 long begin = System.currentTimeMillis(); // 初始化計數器 CountDownLatch latch = new CountDownLatch(count); // 初始化線程池 ExecutorService executorService = Executors .newFixedThreadPool(count); // 初始化線程 for (int i = 0; i < count; i++) { Future<String> future = executorService.submit(new AsyncTask( latch, vipInfos, offset, rows)); offset += rows; } executorService.shutdown(); log.info("線程池是否已關閉:" + executorService.isShutdown()); // 初始化線程 /* for (int i = 0; i < count; i++) { AsyncJob job = new AsyncJob(latch, vipInfos, offset, rows); job.start(); offset += rows; } */ if (count > 0) { // 所有任務執行完成前,會一直阻塞當前線程,直到計時器的值爲0 latch.await(); } if (!"".equals(cardNoStr.toString())) { msg = "同步微信會員信息失敗的會員卡號有:" + cardNoStr.toString().substring(0, cardNoStr.lastIndexOf(",")); } else { msg = "同步微信會員信息成功"; } // 任務結束時間 long end = System.currentTimeMillis(); log.info("同步微信會員信息任務用時:" + ((end - begin) / 1000) + "秒"); } catch (Exception e) { msg = "同步微信會員信息出現異常"; log.error(msg + e.getMessage()); } log.info("執行結果-------" + msg + "-------"); log.info("-------------同步微信會員信息結束"); return msg; } /** * 異步執行業務方法,實現Callable接口,返回執行結果 * * @author jiangyf */ static class AsyncTask implements Callable<String> { private Logger log = LoggerFactory.getLogger(WXWebService.class); // 線程同步輔助類,在完成一組正在其餘線程中執行的操做以前,它容許一個或多個線程一直等待 private CountDownLatch latch; private List<VipInfo> vipInfos; private int offset; private int rows; public AsyncTask(CountDownLatch latch, List<VipInfo> vipInfos, int offset, int rows) { super(); this.latch = latch; this.vipInfos = vipInfos; this.offset = offset; this.rows = rows; } public String call() { // 任務開始時間 long begin = System.currentTimeMillis(); log.info("當前線程更新會員數量:------" + vipInfos.size()); // 執行具體業務 for (int i = 0; i < rows; i++) { int num = offset + i; if (num >= vipInfos.size()) { break; } VipInfo vipInfo = vipInfos.get(num); if (vipInfo != null) { String cardNo = vipInfo.getCardNo(); WeixinUserInfo userInfo = getWeixinUserInfo(vipInfo .getOpenId()); try { updateVipInfo(vipInfo, userInfo); log.info("會員卡號爲" + cardNo + "的會員資料更新成功"); } catch (SQLException e) { cardNoStr.append(cardNo + ","); log.info("會員卡號爲" + cardNo + "的會員資料更新失敗"); } } } // 任務結束時間 long end = System.currentTimeMillis(); log.info("當前線程執行任務用時:" + ((end - begin) / 1000) + "秒"); if (latch != null) { latch.countDown();// 任務完成,計數器減一 } return cardNoStr.toString(); } } static class AsyncJob extends Thread { private Logger log = LoggerFactory.getLogger(WXWebService.class); // 線程同步輔助類,在完成一組正在其餘線程中執行的操做以前,它容許一個或多個線程一直等待 private CountDownLatch latch; private List<VipInfo> vipInfos; private int offset; private int rows; public AsyncJob(CountDownLatch latch, List<VipInfo> vipInfos, int offset, int rows) { super(); this.latch = latch; this.vipInfos = vipInfos; this.offset = offset; this.rows = rows; } public void run() { // 任務開始時間 long begin = System.currentTimeMillis(); log.info("當前線程更新會員數量:------" + vipInfos.size()); // 執行具體業務 for (int i = 0; i < rows; i++) { int num = offset + i; if (num >= vipInfos.size()) { break; } VipInfo vipInfo = vipInfos.get(num); if (vipInfo != null) { String cardNo = vipInfo.getCardNo(); WeixinUserInfo userInfo = getWeixinUserInfo(vipInfo .getOpenId()); try { updateVipInfo(vipInfo, userInfo); log.info("會員卡號爲" + cardNo + "的會員資料更新成功"); } catch (SQLException e) { cardNoStr.append(cardNo + ","); log.info("會員卡號爲" + cardNo + "的會員資料更新失敗"); } } } // 任務結束時間 long end = System.currentTimeMillis(); log.info("當前線程執行任務用時:" + ((end - begin) / 1000) + "秒"); if (latch != null) { latch.countDown();// 任務完成,計數器減一 } } }
代碼地址:https://github.com/github-jade/myweb/tree/develop/src/main/java/com/web/demo/thread微信