多線程(項目性能優化實戰)

public class Consts {
    public static final int PROBLEM_BANK_COUNT = 2000;//題庫大小

    //取得本地機器cpu數量
    public final static int THREAD_COUNT_BASE
            = Runtime.getRuntime().availableProcessors();
}
/**
 * 用sleep模擬實際的業務操做
 */
public class BusiMock {

    public static void buisness(int sleepTime){
        try {
            Thread.sleep(sleepTime);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

}
public class MakeSrcDoc {

    /**
     * 造成待處理文檔
     * @param docCount 生成的文檔數量
     * @return 待處理文檔列表
     */
    public static List<PendingDocVo> makeDoc(int docCount){
        Random r = new Random();
        Random rProblemCount = new Random();
        List<PendingDocVo> docList = new LinkedList<>();//文檔列表
        for(int i=0;i<docCount;i++){
            List<Integer> problemList = new LinkedList<Integer>();//文檔中題目列表
            int docProblemCount = rProblemCount.nextInt(60)+60;
            for(int j=0;j< docProblemCount;j++){
                int problemId = r.nextInt(Consts.PROBLEM_BANK_COUNT);
                problemList.add(problemId);
            }
            PendingDocVo pendingDocVo = new PendingDocVo("pending_"+i,
                    problemList);
            docList.add(pendingDocVo);
        }
        return docList;
    }

}
public class ProblemBank {

    //題庫數據存儲
    private static ConcurrentHashMap<Integer,ProblemDBVo> problemBankMap
            = new ConcurrentHashMap<>();
    //定時任務池,負責定時更新題庫數據
    private static ScheduledExecutorService updateProblemBank
            = new ScheduledThreadPoolExecutor(1);

    //初始化題庫
    public static void initBank(){
        for(int i=0;i<Consts.PROBLEM_BANK_COUNT;i++){
            String problemContent = getRandomString(700);
            problemBankMap.put(i,new ProblemDBVo(i,
                    problemContent,EncryptTools.EncryptBySHA1(problemContent)));
        }
        updateProblemTimer();
    }

    //生成隨機字符串
    //length表示生成字符串的長度
    private static String getRandomString(int length) {
        String base = "abcdefghijklmnopqrstuvwxyz0123456789";
        Random random = new Random();
        StringBuffer sb = new StringBuffer();
        for (int i = 0; i < length; i++) {
            int number = random.nextInt(base.length());
            sb.append(base.charAt(number));
        }
        return sb.toString();
    }

    //得到題目,咱們假設一次數據庫的讀耗時在通常在20ms左右,因此休眠20ms
    public static ProblemDBVo getProblem(int i) {
        BusiMock.buisness(20);
        return problemBankMap.get(i);
    }

    public static String getProblemSha(int i){
        BusiMock.buisness(10);
        return problemBankMap.get(i).getSha();
    }


    //更新題庫的定時任務
    private static class UpdateProblem implements Runnable{

        @Override
        public void run() {
            Random random = new Random();
            int problemId = random.nextInt(Consts.PROBLEM_BANK_COUNT);
            String problemContent = getRandomString(700);
            problemBankMap.put(problemId,new ProblemDBVo(problemId,
                    problemContent,EncryptTools.EncryptBySHA1(problemContent)));
            //System.out.println("題目【"+problemId+"】被更新!!");
        }
    }

    //按期更新題庫數據
    private static void updateProblemTimer(){
        System.out.println("開始定時更新題庫..........................");
        updateProblemBank.scheduleAtFixedRate(new UpdateProblem(),
                15,5, TimeUnit.SECONDS);
    }
}
/**
 * 文檔處理服務
 */
public class DocService {

    /**
     * 上傳文檔到網絡
     * @param docFileName 實際文檔在本地的存儲位置
     * @return 上傳後的網絡存儲地址
     */
    public static String upLoadDoc(String docFileName){
        Random r = new Random();
        BusiMock.buisness(5000+r.nextInt(400));
        return "http://www.xxxx.com/file/upload/"+docFileName;
    }

    /**
     * 將待處理文檔處理爲本地實際文檔
     * @param pendingDocVo 待處理文檔
     * @return 實際文檔在本地的存儲位置
     */
    public static String makeDoc(PendingDocVo pendingDocVo){
        System.out.println("開始處理文檔:"+ pendingDocVo.getDocName());
        StringBuffer sb = new StringBuffer();
        for(Integer problemId: pendingDocVo.getProblemVoList()){
            sb.append(ProblemService.makeProblem(problemId));
        }
        return "complete_"+System.currentTimeMillis()+"_"
                +pendingDocVo.getDocName()+".pdf";
    }

    /**
     * 異步並行處理文檔中的題目
     * @param pendingDocVo
     * @return
     * @throws ExecutionException
     * @throws InterruptedException
     */
    public static String makeAsyn(PendingDocVo pendingDocVo) throws ExecutionException, InterruptedException {
        System.out.println("開始處理文檔:"+ pendingDocVo.getDocName());

        //對題目處理結果的緩存
        Map<Integer,MultiProblemVo> multiProblemVoMap = new HashMap<>();
        //並行處理文檔中的每一個題目
        for(Integer problemId:pendingDocVo.getProblemVoList()){
            multiProblemVoMap.put(problemId,
                    ProblemMultiService.makeProblem(problemId));
        }

        //獲取題目的結果
        StringBuffer sb = new StringBuffer();
        for(Integer problemId:pendingDocVo.getProblemVoList()){
            MultiProblemVo multiProblemVo = multiProblemVoMap.get(problemId);
            sb.append(
             multiProblemVo.getProblemText()==null
                     ? multiProblemVo.getProbleFuture().get().getProcessedContent()
                     : multiProblemVo.getProblemText());
        }
        return "complete_"+System.currentTimeMillis()+"_"
                +pendingDocVo.getDocName()+".pdf";

    }

}
/**
 * 題目處理的基礎服務,模擬解析題目文本,下載圖片等操做,
 * 返回解析後的文本
 */
public class BaseProblemService {
    /**
     * 對題目進行處理,如解析文本,下載圖片等等工做
     * @param problemId 題目id
     * @return 題目解析後的文本
     */
    public static String makeProblem(Integer problemId,String problemSrc){
        Random r = new Random();
        BusiMock.buisness(450+r.nextInt(100));
        return "CompleteProblem[id="+problemId
                +" content=:"+ problemSrc+"]";
    }
}
/**
 * 並行異步的處理題目
 */
public class ProblemMultiService {

    //存放處理過題目內容的緩存
    private static ConcurrentHashMap<Integer,ProblemCacheVo> problemCache
            = new ConcurrentHashMap<>();

    //存放正在處理的題目的緩存,防止多個線程同時處理一個題目
    private static ConcurrentHashMap<Integer,Future<ProblemCacheVo>>
      processingProblemCache = new ConcurrentHashMap<>();

    //處理的題目的線程池
    private static ExecutorService makeProblemExec =
            Executors.newFixedThreadPool(Consts.THREAD_COUNT_BASE*2);

    //供調用者使用,返回題目的內容或者任務
    public static MultiProblemVo makeProblem(Integer problemId){
        //檢查緩存中是否存在
        ProblemCacheVo problemCacheVo = problemCache.get(problemId);
        if(null==problemCacheVo){
            System.out.println("題目【"+problemId+"】在緩存中不存在,須要新啓任務");
            return new MultiProblemVo(getProblemFuture(problemId));
        }else{
            //拿摘要,一篇文檔中的全部題目的摘要其實能夠一次性取得,以減小對數據庫的訪問
            String problemSha = ProblemBank.getProblemSha(problemId);
            if(problemCacheVo.getProblemSha().equals(problemSha)){
                System.out.println("題目【"+problemId+"】在緩存中存在且沒有修改過,能夠直接使用。");
                return  new MultiProblemVo(problemCacheVo.getProcessedContent());
            }
            else{
                System.out.println("題目【"+problemId+"】的摘要發生了變化,啓動任務更新緩存。");
                return new MultiProblemVo(getProblemFuture(problemId));
            }
        }

    }

    //返回題目的工做任務
    private static Future<ProblemCacheVo> getProblemFuture(Integer problemid){
        Future<ProblemCacheVo> problemFuture = processingProblemCache.get(problemid);
        if (problemFuture==null){
            ProblemDBVo problemDBVo = ProblemBank.getProblem(problemid);
            ProblemTask problemTask = new ProblemTask(problemDBVo,problemid);
            //當前線程新啓了一個任務
            FutureTask<ProblemCacheVo> ft =
                    new FutureTask<ProblemCacheVo>(problemTask);
            problemFuture = processingProblemCache.putIfAbsent(problemid,ft);
            if (problemFuture==null){
                //表示沒有別的線程正在處理當前題目
                problemFuture = ft;
                makeProblemExec.execute(ft);
                System.out.println("題目【"+problemid+"】計算任務啓動,請等待完成>>>>>>>>>>>>>。");
            }else{
                System.out.println("剛剛有其餘線程啓動了題目【"+problemid+"】的計算任務,任務沒必要開啓");
            }
        }else{
            System.out.println("當前已經有了題目【"+problemid+"】的計算任務,沒必要從新開啓");
        }
        return problemFuture;
    }

    //處理題目的任務
    private static class ProblemTask implements Callable<ProblemCacheVo>{

        private ProblemDBVo problemDBVo;
        private Integer problemId;

        public ProblemTask(ProblemDBVo problemDBVo, Integer problemId) {
            this.problemDBVo = problemDBVo;
            this.problemId = problemId;
        }

        @Override
        public ProblemCacheVo call() throws Exception {
            try {
                ProblemCacheVo problemCacheVo = new ProblemCacheVo();
                problemCacheVo.setProcessedContent(
                        BaseProblemService.makeProblem(problemId,problemDBVo.getContent()));
                problemCacheVo.setProblemSha(problemDBVo.getSha());
                problemCache.put(problemId,problemCacheVo);
                return problemCacheVo;
            } finally {
                //不管正常仍是異常,都須要將生成的題目的任務從緩存移除
                processingProblemCache.remove(problemId);
            }
        }
    }

}
public class ProblemService {

    /**
     * 普通對題目進行處理
     * @param problemId 題目id
     * @return 題目解析後的文本
     */
    public static String makeProblem(Integer problemId){
        return BaseProblemService.makeProblem(problemId,
                ProblemBank.getProblem(problemId).getContent());
    }

}
/**
 * 併發題目處理時,返回處理的題目結果
 */
public class MultiProblemVo {

    private final String problemText;//要麼就是題目處理後的文本;
    private final Future<ProblemCacheVo> probleFuture;//處理題目的任務

    public MultiProblemVo(String problemText) {
        this.problemText = problemText;
        this.probleFuture = null;
    }

    public MultiProblemVo(Future<ProblemCacheVo> probleFuture) {
        this.probleFuture = probleFuture;
        this.problemText = null;
    }

    public String getProblemText() {
        return problemText;
    }

    public Future<ProblemCacheVo> getProbleFuture() {
        return probleFuture;
    }
}
/**
 * 待處理文檔實體類
 */
public class PendingDocVo {
    //待處理文檔名稱
    private final String docName;
    //待處理文檔中題目id列表
    private final List<Integer> problemVoList;

    public PendingDocVo(String docName, List<Integer> problemVoList) {
        this.docName = docName;
        this.problemVoList = problemVoList;
    }

    public String getDocName() {
        return docName;
    }

    public List<Integer> getProblemVoList() {
        return problemVoList;
    }
}
/**
 * 題目保存在緩存中的實體類
 */
public class ProblemCacheVo implements Serializable{

    private String processedContent;
    private String problemSha;

    public ProblemCacheVo() {
    }

    public ProblemCacheVo(String processedContent, String problemSha) {
        this.processedContent = processedContent;
        this.problemSha = problemSha;
    }

    public String getProcessedContent() {
        return processedContent;
    }

    public void setProcessedContent(String processedContent) {
        this.processedContent = processedContent;
    }

    public String getProblemSha() {
        return problemSha;
    }

    public void setProblemSha(String problemSha) {
        this.problemSha = problemSha;
    }
}
/**
 * 題目在數據庫中存放實體類
 */
public class ProblemDBVo {
    //題目id
    private final int problemId;
    //題目內容,平均長度700字節
    private final String content;
    //題目的sha串
    private final String sha;

    public ProblemDBVo(int problemId, String content, String sha) {
        this.problemId = problemId;
        this.content = content;
        this.sha = sha;
    }

    public int getProblemId() {
        return problemId;
    }

    public String getContent() {
        return content;
    }

    public String getSha() {
        return sha;
    }
}
public class SingleWeb {
    public static void main(String[] args) {

        System.out.println("題庫開始初始化...........");
        ProblemBank.initBank();
        System.out.println("題庫初始化完成。");

        List<PendingDocVo> docList = MakeSrcDoc.makeDoc(2);
        long startTotal = System.currentTimeMillis();
        for(PendingDocVo doc:docList){
            System.out.println("開始處理文檔:"+doc.getDocName()+".......");
            long start = System.currentTimeMillis();
            String localName = DocService.makeDoc(doc);
            System.out.println("文檔"+localName+"生成耗時:"
                    +(System.currentTimeMillis()-start)+"ms");
            start = System.currentTimeMillis();
            String remoteUrl = DocService.upLoadDoc(localName);
            System.out.println("已上傳至["+remoteUrl+"]耗時:"
                    +(System.currentTimeMillis()-start)+"ms");
        }
        System.out.println("共耗時:"+(System.currentTimeMillis()-startTotal)+"ms");
    }
}

運行結果
======================
題庫開始初始化...........
開始定時更新題庫..........................
題庫初始化完成。
開始處理文檔:pending_0.......
開始處理文檔:pending_0
文檔complete_1531484294630_pending_0.pdf生成耗時:42965ms
已上傳至[http://www.xxxx.com/file/upload/complete_1531484294630_pending_0.pdf]耗時:5223ms
開始處理文檔:pending_1.......
開始處理文檔:pending_1
文檔complete_1531484341109_pending_1.pdf生成耗時:41256ms
已上傳至[http://www.xxxx.com/file/upload/complete_1531484341109_pending_1.pdf]耗時:5036ms
共耗時:94480ms
/**
 * 服務的拆分,rpc服務
 */
public class RpcMode {

    //生成文檔的線程池
    private static ExecutorService docMakeService
            = Executors.newFixedThreadPool(Consts.THREAD_COUNT_BASE*2);
    //上傳文檔的線程池
    private static ExecutorService docUploadService
            = Executors.newFixedThreadPool(Consts.THREAD_COUNT_BASE*2);
    private static CompletionService docCompletionService
            = new ExecutorCompletionService(docMakeService);
    private static CompletionService uploadCompletionService
            = new ExecutorCompletionService(docUploadService);


    private static class MakeDocTask implements Callable<String>{
        private PendingDocVo pendingDocVo;

        public MakeDocTask(PendingDocVo pendingDocVo) {
            this.pendingDocVo = pendingDocVo;
        }

        @Override
        public String call() throws Exception {
            long start = System.currentTimeMillis();
            String localName = DocService.makeAsyn(pendingDocVo);
            System.out.println("文檔"+localName+"生成耗時:"
                    +(System.currentTimeMillis()-start)+"ms");
            return localName;
        }
    }


    private static class UploadDocTask implements Callable<String>{
        private String localName;

        public UploadDocTask(String localName) {
            this.localName = localName;
        }

        @Override
        public String call() throws Exception {
            long  start = System.currentTimeMillis();
            String remoteUrl = DocService.upLoadDoc(localName);
            System.out.println("已上傳至["+remoteUrl+"]耗時:"
                    +(System.currentTimeMillis()-start)+"ms");
            return remoteUrl;
        }
    }

    public static void main(String[] args)
            throws InterruptedException, ExecutionException {
        System.out.println("題庫開始初始化...........");
        ProblemBank.initBank();
        System.out.println("題庫初始化完成。");

        List<PendingDocVo> docList = MakeSrcDoc.makeDoc(60);

        long startTotal = System.currentTimeMillis();

        for(PendingDocVo doc:docList){
            docCompletionService.submit(new MakeDocTask(doc));
        }

        for(PendingDocVo doc:docList){
            Future<String> futureLocalName = docCompletionService.take();
            uploadCompletionService.submit(new UploadDocTask(futureLocalName.get()));
        }

        for(PendingDocVo doc:docList){
            //把上傳後的網絡存儲地址拿到
            uploadCompletionService.take().get();
        }
        System.out.println("共耗時:"+(System.currentTimeMillis()-startTotal)+"ms");
    }
}

運行結果 
==========
題庫開始初始化...........
開始定時更新題庫..........................
題庫初始化完成。
開始處理文檔:pending_5
開始處理文檔:pending_3
開始處理文檔:pending_4
開始處理文檔:pending_7
開始處理文檔:pending_6
開始處理文檔:pending_10
開始處理文檔:pending_0
開始處理文檔:pending_1
開始處理文檔:pending_2
開始處理文檔:pending_9
開始處理文檔:pending_8
開始處理文檔:pending_11
開始處理文檔:pending_12
開始處理文檔:pending_13
開始處理文檔:pending_14
題目【780】在緩存中不存在,須要新啓任務
題目【1433】在緩存中不存在,須要新啓任務
題目【847】在緩存中不存在,須要新啓任務
題目【688】在緩存中不存在,須要新啓任務
題目【1695】在緩存中不存在,須要新啓任務
題目【547】在緩存中不存在,須要新啓任務
題目【446】在緩存中不存在,須要新啓任務
開始處理文檔:pending_15
題目【1439】在緩存中不存在,須要新啓任務
題目【1245】在緩存中不存在,須要新啓任務
題目【895】在緩存中不存在,須要新啓任務
題目【1416】在緩存中不存在,須要新啓任務

......
......
......

已上傳至[http://www.xxxx.com/file/upload/complete_1531485132232_pending_58.pdf]耗時:5319ms
已上傳至[http://www.xxxx.com/file/upload/complete_1531485132409_pending_59.pdf]耗時:5341ms
共耗時:65280ms
相關文章
相關標籤/搜索