多線程讀寫多個文件02

/**
 * 除1002 1004 1009 1010的其餘流程
 */
@Component
public class AutoTestOtherQueryBL {

    private final Log logger = LogFactory.getLog(this.getClass());
    @Autowired
    private SendRequestService sendRequestService;


    public List<AutoMaticTestPojo> dealData(List<File> filesOtherQuery) {
        int size = filesOtherQuery.size();
        Map<String, File> filesOtherOut = new HashMap<>(16); //全部1004返回
        if (filesOtherQuery != null && size > 0) {
            for (File file : filesOtherQuery) {
                if (file.getName().contains("Out")) {
                    int i = file.getName().indexOf("-");
                    String serialNo = file.getName().substring(0, i);
                    filesOtherOut.put(serialNo, file);
                }
            }
        }

        logger.info(" 其餘查詢準備落庫數量 ::: " + ( size - filesOtherOut.size()) );
        //int queryNumber = size - filesOtherOut.size();
        List<AutoMaticTestPojo> automaticTestPojoList = new ArrayList<>();
        ThreadPoolExecutor threadPoolExecutor = null;
        if (filesOtherQuery != null && size > 0) {
            if(size <= 30){
                BlockingQueue blockingQueue = new ArrayBlockingQueue<>(16);
                threadPoolExecutor = new ThreadPoolExecutor(size, (size + 3), 15, TimeUnit.MINUTES, blockingQueue);
                OtherQueryThread otherQueryThread = new OtherQueryThread(filesOtherOut, filesOtherQuery, automaticTestPojoList,sendRequestService,0, size);
                threadPoolExecutor.execute(otherQueryThread);
            }else{
                BlockingQueue blockingQueue = new ArrayBlockingQueue<>(16);
                threadPoolExecutor = new ThreadPoolExecutor(30, (30 + 3), (size/30), TimeUnit.MINUTES, blockingQueue);
                //取餘,把餘數給最後一個線程
                int m = size%30;
                //每一個線程分配多少個任務
                int s = (size-m)/30;
                for (int i = 0 ; i < 29; i++){
                    OtherQueryThread otherQueryThread = new OtherQueryThread(filesOtherOut, filesOtherQuery, automaticTestPojoList,sendRequestService,s*i, s*(i+1));
                    threadPoolExecutor.execute(otherQueryThread);
                }
                //建立第30個線程
                OtherQueryThread otherQueryThread = new OtherQueryThread(filesOtherOut, filesOtherQuery, automaticTestPojoList,sendRequestService,s*29, s*30 + m);
                threadPoolExecutor.execute(otherQueryThread);;
            }

            threadPoolExecutor.shutdown();//不會觸發中斷
            boolean flag = true;
            while (flag) {
                if (threadPoolExecutor.isTerminated()) {
                    flag = false;
                    logger.info(" 其餘查詢落庫數量 ::: " + automaticTestPojoList.size());
                    return automaticTestPojoList;
                }
            }
        }

        return automaticTestPojoList;
    }
}
-----------------------------------------------------------------------------------------------------------------
技術交流羣:816227112
public class OtherQueryThread implements Runnable {

    private final Log logger = LogFactory.getLog(this.getClass());
    private Map<String, File> filesOtherOut;
    private List<File> filesOtherQuery;
    private List<AutoMaticTestPojo> automaticTestPojoList;
    private SendRequestService sendRequestService;
    private int start;
    private int end;


    public OtherQueryThread(Map<String, File> filesOtherOut, List<File> filesOtherQuery, List<AutoMaticTestPojo> automaticTestPojoList, SendRequestService sendRequestService, int start, int end) {
        this.filesOtherOut = filesOtherOut;
        this.filesOtherQuery = filesOtherQuery;
        this.automaticTestPojoList = automaticTestPojoList;
        this.sendRequestService = sendRequestService;
        this.start = start;
        this.end = end;
    }

    @Override
    public void run() {

        for (int i = this.start; i < this.end; i++) {
            SAXReader saxReader = new SAXReader();
            Document docOtherReq = null;
            AutoMaticTestPojo automaticTestPojo = new AutoMaticTestPojo();
            File file = filesOtherQuery.get(i);
            if (file != null && file.getName().contains("In")) {
                try {
                    docOtherReq = saxReader.read(file);
                } catch (DocumentException e) {
                    e.printStackTrace();
                }

                Element rootOtherReq = docOtherReq.getRootElement();
                String serialNo = rootOtherReq.element("Header").element("SerialNo").getTextTrim();//其餘老核心請求流水
                //String policyNo = rootOtherReq.element("App").element("Req").element("PolicyNo").getTextTrim();//其餘老核心對應1004保單號
                String flag = rootOtherReq.element("Header").element("TransCode").getTextTrim();

                automaticTestPojo.setTrialTransNo(serialNo);
                //automaticTestPojo.setTrialPolicyNo( policyNo );
                automaticTestPojo.setTemp3(flag);
                automaticTestPojo.setSystem("04");
                automaticTestPojo.setMakeDate(PubFun.getCurrentDate());
                automaticTestPojo.setMakeTime(PubFun.getCurrentTime());

                // 讀取對應的返回報文
                if (StringUtils.isNotEmpty(serialNo)) {
                    //查今天對應的報文
                    File fileout = this.filesOtherOut.get(serialNo);
                    if (fileout != null && fileout.exists() && fileout.isFile()) {
                        Document docOtherReturn = null;
                        try {
                            docOtherReturn = saxReader.read(fileout);
                        } catch (DocumentException e) {
                            e.printStackTrace();
                        }

                        Element root1004Return = docOtherReturn.getRootElement();
                        String insuredmessage = root1004Return.element("Header").element("RetMsg").getTextTrim();//老核心交易信息
                        automaticTestPojo.setTrialMessage(insuredmessage);

                    } else {
                        automaticTestPojo.setTrialMessage("沒有對應的返回報文。");
                    }
                }

                String xml = docOtherReq.asXML();
                logger.info("other請求 xml:" + xml);
                AutoMaticTestPojo returnPojo = send(xml, flag);
                automaticTestPojo.setInsuredMessage(returnPojo.getInsuredMessage());
                automaticTestPojo.setInsuredTransNo(returnPojo.getInsuredTransNo());
                //automaticTestPojo.setInsuredPolicyNo(returnPojo.getInsuredPolicyNo());
                this.automaticTestPojoList.add(automaticTestPojo);
            }
        }


    }

    public AutoMaticTestPojo send(String xml, String flag) {
        AutoMaticTestPojo automaticTestPojo = new AutoMaticTestPojo();
        if (!"1002".equals(flag.trim()) && !"1004".equals(flag.trim()) && !"1009".equals(flag.trim()) && !"1010".equals(flag.trim())) {
            String resultRevoke = "";
            logger.info("sendRequestService:  " + sendRequestService);
            resultRevoke = sendRequestService.dealData(xml);
            logger.info("other返回 xml:" + resultRevoke);

            if (resultRevoke.contains("<RetMsg>")) {
                automaticTestPojo.setInsuredMessage(resultRevoke.substring(resultRevoke.indexOf("<RetMsg>") + 8, resultRevoke.indexOf("</RetMsg>")));
            } else {
                automaticTestPojo.setInsuredMessage("返回報文未找到RetMsg");
            }
            if (resultRevoke.contains("<SerialNo>")) {
                automaticTestPojo.setInsuredTransNo(resultRevoke.substring(resultRevoke.indexOf("<SerialNo>") + 10, resultRevoke.indexOf("</SerialNo>")));
            }

        }
        return automaticTestPojo;
    }


}
相關文章
相關標籤/搜索