public class Consts { public static final int PROBLEM_BANK_COUNT = 2000;//題庫大小 //取得本地機器cpu數量 public final static int THREAD_COUNT_BASE = Runtime.getRuntime().availableProcessors(); }
/** * 用sleep模擬實際的業務操做 */ public class BusiMock { public static void buisness(int sleepTime){ try { Thread.sleep(sleepTime); } catch (InterruptedException e) { e.printStackTrace(); } } }
public class MakeSrcDoc { /** * 造成待處理文檔 * @param docCount 生成的文檔數量 * @return 待處理文檔列表 */ public static List<PendingDocVo> makeDoc(int docCount){ Random r = new Random(); Random rProblemCount = new Random(); List<PendingDocVo> docList = new LinkedList<>();//文檔列表 for(int i=0;i<docCount;i++){ List<Integer> problemList = new LinkedList<Integer>();//文檔中題目列表 int docProblemCount = rProblemCount.nextInt(60)+60; for(int j=0;j< docProblemCount;j++){ int problemId = r.nextInt(Consts.PROBLEM_BANK_COUNT); problemList.add(problemId); } PendingDocVo pendingDocVo = new PendingDocVo("pending_"+i, problemList); docList.add(pendingDocVo); } return docList; } }
public class ProblemBank { //題庫數據存儲 private static ConcurrentHashMap<Integer,ProblemDBVo> problemBankMap = new ConcurrentHashMap<>(); //定時任務池,負責定時更新題庫數據 private static ScheduledExecutorService updateProblemBank = new ScheduledThreadPoolExecutor(1); //初始化題庫 public static void initBank(){ for(int i=0;i<Consts.PROBLEM_BANK_COUNT;i++){ String problemContent = getRandomString(700); problemBankMap.put(i,new ProblemDBVo(i, problemContent,EncryptTools.EncryptBySHA1(problemContent))); } updateProblemTimer(); } //生成隨機字符串 //length表示生成字符串的長度 private static String getRandomString(int length) { String base = "abcdefghijklmnopqrstuvwxyz0123456789"; Random random = new Random(); StringBuffer sb = new StringBuffer(); for (int i = 0; i < length; i++) { int number = random.nextInt(base.length()); sb.append(base.charAt(number)); } return sb.toString(); } //得到題目,咱們假設一次數據庫的讀耗時在通常在20ms左右,因此休眠20ms public static ProblemDBVo getProblem(int i) { BusiMock.buisness(20); return problemBankMap.get(i); } public static String getProblemSha(int i){ BusiMock.buisness(10); return problemBankMap.get(i).getSha(); } //更新題庫的定時任務 private static class UpdateProblem implements Runnable{ @Override public void run() { Random random = new Random(); int problemId = random.nextInt(Consts.PROBLEM_BANK_COUNT); String problemContent = getRandomString(700); problemBankMap.put(problemId,new ProblemDBVo(problemId, problemContent,EncryptTools.EncryptBySHA1(problemContent))); //System.out.println("題目【"+problemId+"】被更新!!"); } } //按期更新題庫數據 private static void updateProblemTimer(){ System.out.println("開始定時更新題庫.........................."); updateProblemBank.scheduleAtFixedRate(new UpdateProblem(), 15,5, TimeUnit.SECONDS); } }
/** * 文檔處理服務 */ public class DocService { /** * 上傳文檔到網絡 * @param docFileName 實際文檔在本地的存儲位置 * @return 上傳後的網絡存儲地址 */ public static String upLoadDoc(String docFileName){ Random r = new Random(); BusiMock.buisness(5000+r.nextInt(400)); return "http://www.xxxx.com/file/upload/"+docFileName; } /** * 將待處理文檔處理爲本地實際文檔 * @param pendingDocVo 待處理文檔 * @return 實際文檔在本地的存儲位置 */ public static String makeDoc(PendingDocVo pendingDocVo){ System.out.println("開始處理文檔:"+ pendingDocVo.getDocName()); StringBuffer sb = new StringBuffer(); for(Integer problemId: pendingDocVo.getProblemVoList()){ sb.append(ProblemService.makeProblem(problemId)); } return "complete_"+System.currentTimeMillis()+"_" +pendingDocVo.getDocName()+".pdf"; } /** * 異步並行處理文檔中的題目 * @param pendingDocVo * @return * @throws ExecutionException * @throws InterruptedException */ public static String makeAsyn(PendingDocVo pendingDocVo) throws ExecutionException, InterruptedException { System.out.println("開始處理文檔:"+ pendingDocVo.getDocName()); //對題目處理結果的緩存 Map<Integer,MultiProblemVo> multiProblemVoMap = new HashMap<>(); //並行處理文檔中的每一個題目 for(Integer problemId:pendingDocVo.getProblemVoList()){ multiProblemVoMap.put(problemId, ProblemMultiService.makeProblem(problemId)); } //獲取題目的結果 StringBuffer sb = new StringBuffer(); for(Integer problemId:pendingDocVo.getProblemVoList()){ MultiProblemVo multiProblemVo = multiProblemVoMap.get(problemId); sb.append( multiProblemVo.getProblemText()==null ? multiProblemVo.getProbleFuture().get().getProcessedContent() : multiProblemVo.getProblemText()); } return "complete_"+System.currentTimeMillis()+"_" +pendingDocVo.getDocName()+".pdf"; } }
/** * 題目處理的基礎服務,模擬解析題目文本,下載圖片等操做, * 返回解析後的文本 */ public class BaseProblemService { /** * 對題目進行處理,如解析文本,下載圖片等等工做 * @param problemId 題目id * @return 題目解析後的文本 */ public static String makeProblem(Integer problemId,String problemSrc){ Random r = new Random(); BusiMock.buisness(450+r.nextInt(100)); return "CompleteProblem[id="+problemId +" content=:"+ problemSrc+"]"; } }
/** * 並行異步的處理題目 */ public class ProblemMultiService { //存放處理過題目內容的緩存 private static ConcurrentHashMap<Integer,ProblemCacheVo> problemCache = new ConcurrentHashMap<>(); //存放正在處理的題目的緩存,防止多個線程同時處理一個題目 private static ConcurrentHashMap<Integer,Future<ProblemCacheVo>> processingProblemCache = new ConcurrentHashMap<>(); //處理的題目的線程池 private static ExecutorService makeProblemExec = Executors.newFixedThreadPool(Consts.THREAD_COUNT_BASE*2); //供調用者使用,返回題目的內容或者任務 public static MultiProblemVo makeProblem(Integer problemId){ //檢查緩存中是否存在 ProblemCacheVo problemCacheVo = problemCache.get(problemId); if(null==problemCacheVo){ System.out.println("題目【"+problemId+"】在緩存中不存在,須要新啓任務"); return new MultiProblemVo(getProblemFuture(problemId)); }else{ //拿摘要,一篇文檔中的全部題目的摘要其實能夠一次性取得,以減小對數據庫的訪問 String problemSha = ProblemBank.getProblemSha(problemId); if(problemCacheVo.getProblemSha().equals(problemSha)){ System.out.println("題目【"+problemId+"】在緩存中存在且沒有修改過,能夠直接使用。"); return new MultiProblemVo(problemCacheVo.getProcessedContent()); } else{ System.out.println("題目【"+problemId+"】的摘要發生了變化,啓動任務更新緩存。"); return new MultiProblemVo(getProblemFuture(problemId)); } } } //返回題目的工做任務 private static Future<ProblemCacheVo> getProblemFuture(Integer problemid){ Future<ProblemCacheVo> problemFuture = processingProblemCache.get(problemid); if (problemFuture==null){ ProblemDBVo problemDBVo = ProblemBank.getProblem(problemid); ProblemTask problemTask = new ProblemTask(problemDBVo,problemid); //當前線程新啓了一個任務 FutureTask<ProblemCacheVo> ft = new FutureTask<ProblemCacheVo>(problemTask); problemFuture = processingProblemCache.putIfAbsent(problemid,ft); if (problemFuture==null){ //表示沒有別的線程正在處理當前題目 problemFuture = ft; makeProblemExec.execute(ft); System.out.println("題目【"+problemid+"】計算任務啓動,請等待完成>>>>>>>>>>>>>。"); }else{ System.out.println("剛剛有其餘線程啓動了題目【"+problemid+"】的計算任務,任務沒必要開啓"); } }else{ System.out.println("當前已經有了題目【"+problemid+"】的計算任務,沒必要從新開啓"); } return problemFuture; } //處理題目的任務 private static class ProblemTask implements Callable<ProblemCacheVo>{ private ProblemDBVo problemDBVo; private Integer problemId; public ProblemTask(ProblemDBVo problemDBVo, Integer problemId) { this.problemDBVo = problemDBVo; this.problemId = problemId; } @Override public ProblemCacheVo call() throws Exception { try { ProblemCacheVo problemCacheVo = new ProblemCacheVo(); problemCacheVo.setProcessedContent( BaseProblemService.makeProblem(problemId,problemDBVo.getContent())); problemCacheVo.setProblemSha(problemDBVo.getSha()); problemCache.put(problemId,problemCacheVo); return problemCacheVo; } finally { //不管正常仍是異常,都須要將生成的題目的任務從緩存移除 processingProblemCache.remove(problemId); } } } }
public class ProblemService { /** * 普通對題目進行處理 * @param problemId 題目id * @return 題目解析後的文本 */ public static String makeProblem(Integer problemId){ return BaseProblemService.makeProblem(problemId, ProblemBank.getProblem(problemId).getContent()); } }
/** * 併發題目處理時,返回處理的題目結果 */ public class MultiProblemVo { private final String problemText;//要麼就是題目處理後的文本; private final Future<ProblemCacheVo> probleFuture;//處理題目的任務 public MultiProblemVo(String problemText) { this.problemText = problemText; this.probleFuture = null; } public MultiProblemVo(Future<ProblemCacheVo> probleFuture) { this.probleFuture = probleFuture; this.problemText = null; } public String getProblemText() { return problemText; } public Future<ProblemCacheVo> getProbleFuture() { return probleFuture; } }
/** * 待處理文檔實體類 */ public class PendingDocVo { //待處理文檔名稱 private final String docName; //待處理文檔中題目id列表 private final List<Integer> problemVoList; public PendingDocVo(String docName, List<Integer> problemVoList) { this.docName = docName; this.problemVoList = problemVoList; } public String getDocName() { return docName; } public List<Integer> getProblemVoList() { return problemVoList; } }
/** * 題目保存在緩存中的實體類 */ public class ProblemCacheVo implements Serializable{ private String processedContent; private String problemSha; public ProblemCacheVo() { } public ProblemCacheVo(String processedContent, String problemSha) { this.processedContent = processedContent; this.problemSha = problemSha; } public String getProcessedContent() { return processedContent; } public void setProcessedContent(String processedContent) { this.processedContent = processedContent; } public String getProblemSha() { return problemSha; } public void setProblemSha(String problemSha) { this.problemSha = problemSha; } }
/** * 題目在數據庫中存放實體類 */ public class ProblemDBVo { //題目id private final int problemId; //題目內容,平均長度700字節 private final String content; //題目的sha串 private final String sha; public ProblemDBVo(int problemId, String content, String sha) { this.problemId = problemId; this.content = content; this.sha = sha; } public int getProblemId() { return problemId; } public String getContent() { return content; } public String getSha() { return sha; } }
public class SingleWeb { public static void main(String[] args) { System.out.println("題庫開始初始化..........."); ProblemBank.initBank(); System.out.println("題庫初始化完成。"); List<PendingDocVo> docList = MakeSrcDoc.makeDoc(2); long startTotal = System.currentTimeMillis(); for(PendingDocVo doc:docList){ System.out.println("開始處理文檔:"+doc.getDocName()+"......."); long start = System.currentTimeMillis(); String localName = DocService.makeDoc(doc); System.out.println("文檔"+localName+"生成耗時:" +(System.currentTimeMillis()-start)+"ms"); start = System.currentTimeMillis(); String remoteUrl = DocService.upLoadDoc(localName); System.out.println("已上傳至["+remoteUrl+"]耗時:" +(System.currentTimeMillis()-start)+"ms"); } System.out.println("共耗時:"+(System.currentTimeMillis()-startTotal)+"ms"); } } 運行結果 ====================== 題庫開始初始化........... 開始定時更新題庫.......................... 題庫初始化完成。 開始處理文檔:pending_0....... 開始處理文檔:pending_0 文檔complete_1531484294630_pending_0.pdf生成耗時:42965ms 已上傳至[http://www.xxxx.com/file/upload/complete_1531484294630_pending_0.pdf]耗時:5223ms 開始處理文檔:pending_1....... 開始處理文檔:pending_1 文檔complete_1531484341109_pending_1.pdf生成耗時:41256ms 已上傳至[http://www.xxxx.com/file/upload/complete_1531484341109_pending_1.pdf]耗時:5036ms 共耗時:94480ms
/** * 服務的拆分,rpc服務 */ public class RpcMode { //生成文檔的線程池 private static ExecutorService docMakeService = Executors.newFixedThreadPool(Consts.THREAD_COUNT_BASE*2); //上傳文檔的線程池 private static ExecutorService docUploadService = Executors.newFixedThreadPool(Consts.THREAD_COUNT_BASE*2); private static CompletionService docCompletionService = new ExecutorCompletionService(docMakeService); private static CompletionService uploadCompletionService = new ExecutorCompletionService(docUploadService); private static class MakeDocTask implements Callable<String>{ private PendingDocVo pendingDocVo; public MakeDocTask(PendingDocVo pendingDocVo) { this.pendingDocVo = pendingDocVo; } @Override public String call() throws Exception { long start = System.currentTimeMillis(); String localName = DocService.makeAsyn(pendingDocVo); System.out.println("文檔"+localName+"生成耗時:" +(System.currentTimeMillis()-start)+"ms"); return localName; } } private static class UploadDocTask implements Callable<String>{ private String localName; public UploadDocTask(String localName) { this.localName = localName; } @Override public String call() throws Exception { long start = System.currentTimeMillis(); String remoteUrl = DocService.upLoadDoc(localName); System.out.println("已上傳至["+remoteUrl+"]耗時:" +(System.currentTimeMillis()-start)+"ms"); return remoteUrl; } } public static void main(String[] args) throws InterruptedException, ExecutionException { System.out.println("題庫開始初始化..........."); ProblemBank.initBank(); System.out.println("題庫初始化完成。"); List<PendingDocVo> docList = MakeSrcDoc.makeDoc(60); long startTotal = System.currentTimeMillis(); for(PendingDocVo doc:docList){ docCompletionService.submit(new MakeDocTask(doc)); } for(PendingDocVo doc:docList){ Future<String> futureLocalName = docCompletionService.take(); uploadCompletionService.submit(new UploadDocTask(futureLocalName.get())); } for(PendingDocVo doc:docList){ //把上傳後的網絡存儲地址拿到 uploadCompletionService.take().get(); } System.out.println("共耗時:"+(System.currentTimeMillis()-startTotal)+"ms"); } } 運行結果 ========== 題庫開始初始化........... 開始定時更新題庫.......................... 題庫初始化完成。 開始處理文檔:pending_5 開始處理文檔:pending_3 開始處理文檔:pending_4 開始處理文檔:pending_7 開始處理文檔:pending_6 開始處理文檔:pending_10 開始處理文檔:pending_0 開始處理文檔:pending_1 開始處理文檔:pending_2 開始處理文檔:pending_9 開始處理文檔:pending_8 開始處理文檔:pending_11 開始處理文檔:pending_12 開始處理文檔:pending_13 開始處理文檔:pending_14 題目【780】在緩存中不存在,須要新啓任務 題目【1433】在緩存中不存在,須要新啓任務 題目【847】在緩存中不存在,須要新啓任務 題目【688】在緩存中不存在,須要新啓任務 題目【1695】在緩存中不存在,須要新啓任務 題目【547】在緩存中不存在,須要新啓任務 題目【446】在緩存中不存在,須要新啓任務 開始處理文檔:pending_15 題目【1439】在緩存中不存在,須要新啓任務 題目【1245】在緩存中不存在,須要新啓任務 題目【895】在緩存中不存在,須要新啓任務 題目【1416】在緩存中不存在,須要新啓任務 ...... ...... ...... 已上傳至[http://www.xxxx.com/file/upload/complete_1531485132232_pending_58.pdf]耗時:5319ms 已上傳至[http://www.xxxx.com/file/upload/complete_1531485132409_pending_59.pdf]耗時:5341ms 共耗時:65280ms