使用多線程機制異步執行業務方法

    開發過程當中,常常會遇到一鍵操做這樣的功能,當數據量較少或者業務邏輯單一的時候沒什麼問題,可是當遇到數據量較大,並且業務邏輯較爲複雜的狀況,就比較棘手了,一鍵執行後,彷彿整個世界都在跟着轉圈圈,直到請求超時,更有甚者,服務器直接駕崩。java

    最近,在幫客戶作微信會員資料更新操做時就遇到這樣的狀況,最開始是最簡單的遍歷執行,發現行不通;因而又換用分頁批量執行的方法,結果發現換湯不換藥,仍是請求超時;最後沒辦發,只能經過使用多線程機制,經過開啓多線程,以增長系統開銷來節省請求時間,終於把問題解決了。也許會有更好的方法,可是目前實現功能要緊。話很少說,關門,放代碼。。。git

    對此,我寫了兩套方案,其一是經過實現Callable接口異步執行業務方法,最後返回執行結果;github

package com.web.demo.thread;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

/**
 * 異步執行業務方法,實現Callable接口,返回執行結果
 * 
 * @author jiangyf
 */
public class AsyncTask implements Callable<List<Map<String, Object>>> {
	// 執行任務名稱
	private String taskName;
	// 執行任務時間
	private long taskTime;
	// 線程同步輔助類,在完成一組正在其餘線程中執行的操做以前,它容許一個或多個線程一直等待
	private CountDownLatch latch;

	// 任務執行結果
	List<Map<String, Object>> resultList;
	private Map<String, Object> resultMap;

	public AsyncTask(String taskName, long taskTime, CountDownLatch latch) {
		super();
		this.taskName = taskName;
		this.taskTime = taskTime;
		this.latch = latch;
	}

	@Override
	public List<Map<String, Object>> call() throws Exception {
		resultList = new ArrayList<Map<String, Object>>();
		resultMap = new HashMap<String, Object>();
		// 任務開始時間
		long begin = System.currentTimeMillis();

		System.out.println(taskName + " 任務開始....");
		
		// 執行具體業務
		Thread.sleep(taskTime * 1000);
		
		System.out.println(taskName + " 任務結束....");

		// 任務結束時間
		long end = System.currentTimeMillis();
		taskTime = (end - begin) / 1000;
		resultMap.put("taskName", taskName);
		resultMap.put("taskTime", taskTime);
		resultList.add(resultMap);
		System.out.println(taskName + "任務用時:" + taskTime + "秒");
		if (latch != null) {
			// 任務完成,計數器減一
			latch.countDown();
		}
		return resultList;
	}

	public static void main(String[] args) throws InterruptedException,
			ExecutionException {
		// 任務開始時間
		long begin = System.currentTimeMillis();

		// 初始化計數器
		CountDownLatch latch = new CountDownLatch(2);
		// 初始化線程池
		ExecutorService executorService = Executors.newFixedThreadPool(2);
		// 初始化線程
		Future<List<Map<String, Object>>> future = executorService
				.submit(new AsyncTask("running", 2, latch));
		Future<List<Map<String, Object>>> future2 = executorService
				.submit(new AsyncTask("walking", 5, latch));
		executorService.shutdown();
		// 所有任務執行完成前,會一直阻塞當前線程,直到計時器的值爲0
		latch.await();
		List<Map<String, Object>> result = future.get();
		List<Map<String, Object>> result2 = future2.get();
		result.addAll(result2);
		System.out.println(result.size());
		
		// 任務結束時間
		long end = System.currentTimeMillis();
		System.out.println("任務總用時:" + ((end - begin) / 1000) + "秒");
	}

}

其二是經過繼承Thread類異步執行業務方法,最後不返回執行結果。web

package com.web.demo.thread;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;

/**
 * 異步執行業務方法,繼承Thread類,不返回執行結果
 * 
 * @author jiangyf
 */
public class AsyncJob extends Thread {
	// 執行任務名稱
	private String jobName;
	// 執行任務時間
	private long jobTime;
	// 線程同步輔助類,在完成一組正在其餘線程中執行的操做以前,它容許一個或多個線程一直等待
	private CountDownLatch latch;

	public AsyncJob(String jobName, long jobTime, CountDownLatch latch) {
		super();
		this.jobName = jobName;
		this.jobTime = jobTime;
		this.latch = latch;
	}

	public void run() {
		// 任務開始時間
		long begin = System.currentTimeMillis();

		System.out.println(jobName + " 任務開始....");

		// 執行具體業務
		try {
			Thread.sleep(jobTime * 1000);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}

		System.out.println(jobName + " 任務結束....");

		// 任務結束時間
		long end = System.currentTimeMillis();
		jobTime = (end - begin) / 1000;
		System.out.println(jobName + "任務用時:" + jobTime + "秒");
		if (latch != null) {
			// 任務完成,計數器減一
			latch.countDown();
		}

	}

	public static void main(String[] args) throws InterruptedException,
			ExecutionException {
		// 任務開始時間
		long begin = System.currentTimeMillis();

		// 初始化計數器
		CountDownLatch latch = new CountDownLatch(2);
		// 初始化線程
		AsyncJob job = new AsyncJob("running", 5, latch);
		AsyncJob job2 = new AsyncJob("walking", 2, latch);
		job.start();
		job2.start();
		// 所有任務執行完成前,會一直阻塞當前線程,直到計時器的值爲0
		latch.await();
		
		// 任務結束時間
		long end = System.currentTimeMillis();
		System.out.println("任務總用時:" + ((end - begin) / 1000) + "秒");
	}

}

如下爲業務代碼實現示例:服務器

// 會員資料更新失敗的卡號
	private static StringBuffer cardNoStr = new StringBuffer();

	/**
	 * 同步微信會員信息
	 */
	public String syncVipInfo(String weixinId) {
		log.info("-------------同步微信會員信息開始");
		String msg = "";
		setWeixinInfo(weixinId);
		try {
			List<VipInfo> vipInfos = vipInfoDao.getByWeixinId(weixinId, null,
					null);
			int totalRows = vipInfos.size();
			msg = "須要同步的微信會員數:" + totalRows;
			log.info(msg);
			if (totalRows == 0) {
				return msg;
			}
			Map<String, String> map = PropertiesUtil
					.propertiesToMap("syncvipinfo.properties");
			if (map.get("max_num") == null || map.get("max_thread") == null) {
				msg = "同步微信會員信息配置文件錯誤";
				log.info(msg);
				return msg;
			}
			int offset = 0;
			int rows = Integer.parseInt(map.get("max_num"));
			int threadNum = Integer.parseInt(map.get("max_thread"));
			int count = totalRows / rows;
			if ((totalRows % rows) > 0) {
				count += 1;
			}
			if (count > threadNum) {
				if ((totalRows % threadNum) > 0) {
					count = threadNum - 1;
					rows = totalRows / count;
					if ((totalRows % count) > 0) {
						count += 1;
					}
				} else {
					count = threadNum;
					rows = totalRows / count;
				}
			}

			log.info("須要開啓線程數量:" + count);
			// 任務開始時間
			long begin = System.currentTimeMillis();

			// 初始化計數器
			CountDownLatch latch = new CountDownLatch(count);
			
			// 初始化線程池
			ExecutorService executorService = Executors
					.newFixedThreadPool(count);
			// 初始化線程
			for (int i = 0; i < count; i++) {
				Future<String> future = executorService.submit(new AsyncTask(
						latch, vipInfos, offset, rows));
				offset += rows;
			}
			executorService.shutdown();
			log.info("線程池是否已關閉:" + executorService.isShutdown());
			
			// 初始化線程
			/*
			for (int i = 0; i < count; i++) {
				AsyncJob job = new AsyncJob(latch, vipInfos, offset, rows);
				job.start();
				offset += rows;
			}
			*/
			if (count > 0) {
				// 所有任務執行完成前,會一直阻塞當前線程,直到計時器的值爲0
				latch.await();
			}

			if (!"".equals(cardNoStr.toString())) {
				msg = "同步微信會員信息失敗的會員卡號有:" + cardNoStr.toString().substring(0, cardNoStr.lastIndexOf(","));
			} else {
				msg = "同步微信會員信息成功";
			}
			// 任務結束時間
			long end = System.currentTimeMillis();
			log.info("同步微信會員信息任務用時:" + ((end - begin) / 1000) + "秒");
		} catch (Exception e) {
			msg = "同步微信會員信息出現異常";
			log.error(msg + e.getMessage());
		}
		log.info("執行結果-------" + msg + "-------");
		log.info("-------------同步微信會員信息結束");
		return msg;
	}

	/**
	 * 異步執行業務方法,實現Callable接口,返回執行結果
	 * 
	 * @author jiangyf
	 */
	static class AsyncTask implements Callable<String> {
		private Logger log = LoggerFactory.getLogger(WXWebService.class);
		// 線程同步輔助類,在完成一組正在其餘線程中執行的操做以前,它容許一個或多個線程一直等待
		private CountDownLatch latch;
		private List<VipInfo> vipInfos;
		private int offset;
		private int rows;

		public AsyncTask(CountDownLatch latch, List<VipInfo> vipInfos,
				int offset, int rows) {
			super();
			this.latch = latch;
			this.vipInfos = vipInfos;
			this.offset = offset;
			this.rows = rows;
		}

		public String call() {
			// 任務開始時間
			long begin = System.currentTimeMillis();

			log.info("當前線程更新會員數量:------" + vipInfos.size());

			// 執行具體業務
			for (int i = 0; i < rows; i++) {
				int num = offset + i;
				if (num >= vipInfos.size()) {
					break;
				}
				VipInfo vipInfo = vipInfos.get(num);
				if (vipInfo != null) {
					String cardNo = vipInfo.getCardNo();
					WeixinUserInfo userInfo = getWeixinUserInfo(vipInfo
							.getOpenId());
					try {
						updateVipInfo(vipInfo, userInfo);
						log.info("會員卡號爲" + cardNo + "的會員資料更新成功");
					} catch (SQLException e) {
						cardNoStr.append(cardNo + ",");
						log.info("會員卡號爲" + cardNo + "的會員資料更新失敗");
					}
				}
			}

			// 任務結束時間
			long end = System.currentTimeMillis();
			log.info("當前線程執行任務用時:" + ((end - begin) / 1000) + "秒");
			if (latch != null) {
				latch.countDown();// 任務完成,計數器減一
			}
			return cardNoStr.toString();
		}

	}

	static class AsyncJob extends Thread {
		private Logger log = LoggerFactory.getLogger(WXWebService.class);
		// 線程同步輔助類,在完成一組正在其餘線程中執行的操做以前,它容許一個或多個線程一直等待
		private CountDownLatch latch;
		private List<VipInfo> vipInfos;
		private int offset;
		private int rows;

		public AsyncJob(CountDownLatch latch, List<VipInfo> vipInfos,
				int offset, int rows) {
			super();
			this.latch = latch;
			this.vipInfos = vipInfos;
			this.offset = offset;
			this.rows = rows;
		}

		public void run() {
			// 任務開始時間
			long begin = System.currentTimeMillis();

			log.info("當前線程更新會員數量:------" + vipInfos.size());

			// 執行具體業務
			for (int i = 0; i < rows; i++) {
				int num = offset + i;
				if (num >= vipInfos.size()) {
					break;
				}
				VipInfo vipInfo = vipInfos.get(num);
				if (vipInfo != null) {
					String cardNo = vipInfo.getCardNo();
					WeixinUserInfo userInfo = getWeixinUserInfo(vipInfo
							.getOpenId());
					try {
						updateVipInfo(vipInfo, userInfo);
						log.info("會員卡號爲" + cardNo + "的會員資料更新成功");
					} catch (SQLException e) {
						cardNoStr.append(cardNo + ",");
						log.info("會員卡號爲" + cardNo + "的會員資料更新失敗");
					}
				}
			}

			// 任務結束時間
			long end = System.currentTimeMillis();
			log.info("當前線程執行任務用時:" + ((end - begin) / 1000) + "秒");
			if (latch != null) {
				latch.countDown();// 任務完成,計數器減一
			}

		}
	}

 

代碼地址:https://github.com/github-jade/myweb/tree/develop/src/main/java/com/web/demo/thread微信

相關文章
相關標籤/搜索