異步併發利器:實際項目中使用CompletionService提高系統性能的一次實踐

場景

隨着互聯網應用的深刻,不少傳統行業也都須要接入到互聯網。咱們公司也是這樣,保險核心須要和不少保險中介對接,好比阿里、京東等等。這些公司對於接口服務的性能有些比較高的要求,傳統的核心沒法知足要求,因此信息技術部領導高瞻遠矚,決定開發互聯網接入服務,知足來自性能的需求。java

概念

CompletionServiceExecutorBlockingQueue的功能融合在一塊兒,將Callable任務提交給CompletionService來執行,而後使用相似於隊列操做的take和poll等方法來得到已完成的結果,而這些結果會在完成時被封裝爲Future。對於更多的概念,請參閱其餘網絡文檔。git

線程池的設計,阿里開發手冊說過不要使用Java Executors 提供的默認線程池,所以須要更接近實際的狀況來自定義一個線程池,根據屢次壓測,採用的線程池以下:github

  public ExecutorService getThreadPool(){
          return new ThreadPoolExecutor(75,
                  125,
                  180000,
                  TimeUnit.MILLISECONDS,
                  new LinkedBlockingDeque<>(450),
                  new ThreadPoolExecutor.CallerRunsPolicy());
      }

說明:公司的業務爲低頻交易,對於單次調用性能要求高,可是併發壓力根本不大,因此 阻塞隊列已滿且線程數達到最大值時所採起的飽和策略爲調用者執行。json

實現

業務

投保業務主要涉及這幾個大的方面:投保校驗、覈保校驗、保費試算網絡

  • 投保校驗:最主要的是要查詢客戶黑名單和風險等級,都是千萬級的表。並且投保人和被保人都須要校驗多線程

  • 覈保校驗:除了常規的核保規則校驗,查詢千萬級的大表,還須要調用外部智能覈保接口得到用戶的風險等級,投保人和被保人都須要校驗併發

  • 保費試算:須要計算每一個險種的保費app

設計

根據上面的業務,若是串行執行的話,單次性能確定不高,因此考慮多線程異步執行得到校驗結果,再對結果綜合判斷異步

    • 投保校驗:採用一個線程(也能夠根據投保人和被保人數量來採用幾個線程)async

    • 覈保校驗:

      • 常規校驗:採用一個線程

      • 外部調用:有幾個用戶(指投保人和被保人)就採用幾個線程

保費計算:有幾個險種就採用幾個線程,最後合併獲得整個的保費

代碼

如下代碼是樣例,實際邏輯已經去掉

先建立投保、覈保(常規、外部調用)、保費計算4個業務服務類:

投保服務類:InsuranceVerificationServiceImpl,假設耗時50ms

    @Service
    public class InsuranceVerificationServiceImpl implements InsuranceVerificationService {
        private static final Logger logger = LoggerFactory.getLogger(InsuranceVerificationServiceImpl.class);
        @Override
        public TaskResponseModel<Object> insuranceCheck(String key, PolicyModel policyModel) {
            try {
                //假設耗時50ms
                Thread.sleep(50);            
                return TaskResponseModel.success().setKey(key).setData(policyModel);
            } catch (InterruptedException e) {
                logger.warn(e.getMessage());            
                return TaskResponseModel.failure().setKey(key).setResultMessage(e.getMessage());
            }
        }
    }

覈保常規校驗服務類:UnderwritingCheckServiceImpl,假設耗時50ms

    @Service
    public class UnderwritingCheckServiceImpl implements UnderwritingCheckService {
        private static final Logger logger = LoggerFactory.getLogger(UnderwritingCheckServiceImpl.class);
        @Override
        public TaskResponseModel<Object> underwritingCheck(String key, PolicyModel policyModel) {
            try {
                //假設耗時50ms
                Thread.sleep(50);            
                return TaskResponseModel.success().setKey(key).setData(policyModel);
            } catch (InterruptedException e) {
                logger.warn(e.getMessage());            
                return TaskResponseModel.failure().setKey(key).setResultMessage(e.getMessage());
            }
        }
    }

覈保外部調用服務類:ExternalCallServiceImpl,假設耗時200ms

    @Service
    public class ExternalCallServiceImpl implements ExternalCallService {
        private static final Logger logger = LoggerFactory.getLogger(ExternalCallServiceImpl.class);
        @Override
        public TaskResponseModel<Object> externalCall(String key, Insured insured) {
            try {
                //假設耗時200ms
                Thread.sleep(200);
                ExternalCallResultModel externalCallResultModel = new ExternalCallResultModel();
                externalCallResultModel.setIdcard(insured.getIdcard());
                externalCallResultModel.setScore(200);
                return TaskResponseModel.success().setKey(key).setData(externalCallResultModel);
            } catch (InterruptedException e) {
                logger.warn(e.getMessage());
                return TaskResponseModel.failure().setKey(key).setResultMessage(e.getMessage());
            }
        }
    }

試算服務類:TrialCalculationServiceImpl,假設耗時50ms

    @Service
    public class TrialCalculationServiceImpl implements TrialCalculationService {
        private static final Logger logger = LoggerFactory.getLogger(TrialCalculationServiceImpl.class);
        @Override
        public TaskResponseModel<Object> trialCalc(String key, Risk risk) {
            try {
                //假設耗時50ms
                Thread.sleep(50);
                return TaskResponseModel.success().setKey(key).setData(risk);
            } catch (InterruptedException e) {
                logger.warn(e.getMessage());
                return TaskResponseModel.failure().setKey(key).setResultMessage(e.getMessage());
            }
        }
    }

統一返回接口類:TaskResponseModel, 上面4個服務的方法統一返回TaskResponseModel

  @Data
  @ToString
  @NoArgsConstructor
  @AllArgsConstructor
  @EqualsAndHashCode
  @Accessors(chain = true)
  public class TaskResponseModel<T extends Object> implements Serializable {
      private String key;           //惟一調用標誌
      private String resultCode;    //結果碼
      private String resultMessage; //結果信息
      private T data;               //業務處理結果

      public static TaskResponseModel<Object> success() {
          TaskResponseModel<Object> taskResponseModel = new TaskResponseModel<>();
          taskResponseModel.setResultCode("200");
          return taskResponseModel;
      }
      public static TaskResponseModel<Object> failure() {
          TaskResponseModel<Object> taskResponseModel = new TaskResponseModel<>();
          taskResponseModel.setResultCode("400");
          return taskResponseModel;
      }
  }

注:

  1. key爲此次調用的惟一標識,由調用者傳進來

  2. resultCode結果碼,200爲成功,400表示有異常

  3. resultMessage信息,表示不成功或者異常信息

  4. data業務處理結果,若是成功的話

  5. 這些服務類都是單例模式

要使用用CompletionService的話,須要建立實現了Callable接口的線程

投保Callable:

    @Data
    @AllArgsConstructor
    public class InsuranceVerificationCommand implements Callable<TaskResponseModel<Object>> {
        private String key;
        private PolicyModel policyModel;
        private final InsuranceVerificationService insuranceVerificationService;
        @Override
        public TaskResponseModel<Object> call() throws Exception {
            return insuranceVerificationService.insuranceCheck(key, policyModel);
        }
    }

覈保常規校驗Callable:

    @Data
    @AllArgsConstructor
    public class UnderwritingCheckCommand implements Callable<TaskResponseModel<Object>> {
        private String key;
        private PolicyModel policyModel;
        private final UnderwritingCheckService underwritingCheckService;
        @Override
        public TaskResponseModel<Object> call() throws Exception {
            return underwritingCheckService.underwritingCheck(key, policyModel);
        }
    }

覈保外部調用Callable:

    @Data
    @AllArgsConstructor
    public class ExternalCallCommand implements Callable<TaskResponseModel<Object>> {
        private String key;
        private Insured insured;
        private final ExternalCallService externalCallService;
        @Override
        public TaskResponseModel<Object> call() throws Exception {
            return externalCallService.externalCall(key, insured);
        }
    }

試算調用Callable:

    @Data
    @AllArgsConstructor
    public class TrialCalculationCommand implements Callable<TaskResponseModel<Object>> {
        private String key;
        private Risk risk;
        private final TrialCalculationService trialCalculationService;
        @Override
        public TaskResponseModel<Object> call() throws Exception {
            return trialCalculationService.trialCalc(key, risk);
        }
    }

  1. 每一次調用,須要建立這4種Callable

  2. 返回統一接口TaskResopnseModel

異步執行的類:TaskExecutor

  @Component
  public class TaskExecutor {
      private static final Logger logger = LoggerFactory.getLogger(TaskExecutor.class);
      //線程池
      private final ExecutorService executorService;

      public TaskExecutor(ExecutorService executorService) {
          this.executorService = executorService;
      }

      //異步執行,獲取全部結果後返回
      public List<TaskResponseModel<Object>> execute(List<Callable<TaskResponseModel<Object>>> commands) {
          //建立異步執行對象
          CompletionService<TaskResponseModel<Object>> completionService = new ExecutorCompletionService<>(executorService);
          for (Callable<TaskResponseModel<Object>> command : commands) {
              completionService.submit(command);
          }
          //獲取全部異步執行線程的結果
          int taskCount = commands.size();
          List<TaskResponseModel<Object>> params = new ArrayList<>(taskCount);
          try {
              for (int i = 0; i < taskCount; i++) {
                  Future<TaskResponseModel<Object>> future = completionService.take();
                  params.add(future.get());
              }
          } catch (InterruptedException | ExecutionException e) {
              //異常處理
              params.clear();
              params.add(TaskResponseModel.failure().setKey("error").setResultMessage("異步執行線程錯誤"));
          }
          //返回,若是執行中發生error, 則返回相應的key值:error
          return params;
      }
  }

  1. 爲單例模式

  2. 接收參數爲List<Callable<TaskResponseModel<Object>>>,也就是上面定義的4種Callable的列表

  3. 返回List<TaskResponseModel<Object>>,也就是上面定義4種Callable返回的結果列表

  4. 咱們的業務是對返回結果統一判斷,業務返回結果有因果關係

  5. 若是線程執行有異常,也返回List<TaskResponseModel>,這個時候列表中只有一個TaskResponseModelkey爲error, 後續調用者能夠經過這個來判斷線程是否執行成功;

     

調用方:CompletionServiceController

  @RestController
  public class CompletionServiceController {
      //投保key
      private static final String INSURANCE_KEY = "insurance_";
      //覈保key
      private static final String UNDERWRITING_KEY = "underwriting_";
      //外部調用key
      private static final String EXTERNALCALL_KEY = "externalcall_";
      //試算key
      private static final String TRIA_KEY = "trial_";

      private static final Logger logger = LoggerFactory.getLogger(CompletionServiceController.class);

      private final ExternalCallService externalCallService;
      private final InsuranceVerificationService insuranceVerificationService;
      private final TrialCalculationService trialCalculationService;
      private final UnderwritingCheckService underwritingCheckService;
      private final TaskExecutor taskExecutor;

      public CompletionServiceController(ExternalCallService externalCallService, InsuranceVerificationService insuranceVerificationService, TrialCalculationService trialCalculationService, UnderwritingCheckService underwritingCheckService, TaskExecutor taskExecutor) {
          this.externalCallService = externalCallService;
          this.insuranceVerificationService = insuranceVerificationService;
          this.trialCalculationService = trialCalculationService;
          this.underwritingCheckService = underwritingCheckService;
          this.taskExecutor = taskExecutor;
      }

      //多線程異步併發接口
      @PostMapping(value = "/async", headers = "Content-Type=application/json;charset=UTF-8")
      public String asyncExec(@RequestBody PolicyModel policyModel) {
          long start = System.currentTimeMillis();

          asyncExecute(policyModel);
          logger.info("異步總共耗時:" + (System.currentTimeMillis() - start));
          return "ok";
      }

      //串行調用接口
      @PostMapping(value = "/sync", headers = "Content-Type=application/json;charset=UTF-8")
      public String syncExec(@RequestBody PolicyModel policyModel) {
          long start = System.currentTimeMillis();
          syncExecute(policyModel);
          logger.info("同步總共耗時:" + (System.currentTimeMillis() - start));
          return "ok";
      }
      private void asyncExecute(PolicyModel policyModel) {
          List<Callable<TaskResponseModel<Object>>> baseTaskCallbackList = new ArrayList<>();
          //根據被保人外部接口調用
          for (Insured insured : policyModel.getInsuredList()) {
              ExternalCallCommand externalCallCommand = new ExternalCallCommand(EXTERNALCALL_KEY + insured.getIdcard(), insured, externalCallService);
              baseTaskCallbackList.add(externalCallCommand);
          }
          //投保校驗
          InsuranceVerificationCommand insuranceVerificationCommand = new InsuranceVerificationCommand(INSURANCE_KEY, policyModel, insuranceVerificationService);
          baseTaskCallbackList.add(insuranceVerificationCommand);
          //覈保校驗
          UnderwritingCheckCommand underwritingCheckCommand = new UnderwritingCheckCommand(UNDERWRITING_KEY, policyModel, underwritingCheckService);
          baseTaskCallbackList.add(underwritingCheckCommand);
          //根據險種進行保費試算
          for(Risk risk : policyModel.getRiskList()) {
              TrialCalculationCommand trialCalculationCommand = new TrialCalculationCommand(TRIA_KEY + risk.getRiskcode(), risk, trialCalculationService);
              baseTaskCallbackList.add(trialCalculationCommand);
          }
          List<TaskResponseModel<Object>> results = taskExecutor.execute(baseTaskCallbackList);
          for (TaskResponseModel<Object> t : results) {
              if (t.getKey().equals("error")) {
                  logger.warn("線程執行失敗");
                  logger.warn(t.toString());
              }
              logger.info(t.toString());
          }

      }
      private void syncExecute(PolicyModel policyModel) {
          //根據被保人外部接口調用
          for (Insured insured : policyModel.getInsuredList()) {
              TaskResponseModel<Object> externalCall = externalCallService.externalCall(insured.getIdcard(), insured);
              logger.info(externalCall.toString());
          }
          //投保校驗
          TaskResponseModel<Object> insurance = insuranceVerificationService.insuranceCheck(INSURANCE_KEY, policyModel);
          logger.info(insurance.toString());
          //覈保校驗
          TaskResponseModel<Object> underwriting = underwritingCheckService.underwritingCheck(UNDERWRITING_KEY, policyModel);
          logger.info(underwriting.toString());
          //根據險種進行保費試算
          for(Risk risk : policyModel.getRiskList()) {
              TaskResponseModel<Object> risktrial = trialCalculationService.trialCalc(risk.getRiskcode(), risk);
              logger.info(risktrial.toString());
          }

      }
  }

1.爲測試方便,提供兩個接口調用:一個是串行執行,一個是異步併發執行

2.在異步併發執行函數asyncExecute中:

  1. 根據有多少個被保人,建立多少個外部調用的Callable實例,key值爲EXTERNALCALL_KEY + insured.getIdcard(),在一次保單投保調用中,每個被保人Callablekey是不同的。

  2. 根據有多少個險種,建立多少個試算的Callable實例,keyTRIA_KEY + risk.getRiskcode(),在一次保單投保調用中,每個險種的Callable的key是不同的

  3. 建立投保校驗的Callable實例,業務上只須要一個

  4. 建立覈保校驗的Callable實例,業務上只須要一個

  5. 將Callable列表傳入到TaskExecutor執行異步併發調用

  6. 根據返回結果來判斷,經過判斷返回的TaskResponseModelkey值能夠知道是哪類業務校驗,分別進行判斷,還能夠交叉判斷(公司的業務就是要交叉判斷)

驗證

驗證數據:

{
"insuredList":
[{"idcard":"laza","name":"320106"},
{"idcard":"ranran","name":"120102"}],
"policyHolder":"lazasha","policyNo":"345000987","riskList":
[{"mainFlag":1,"premium":300,"riskcode":"risk001","riskname":"險種一"},
{"mainFlag":0,"premium":400,"riskcode":"risk002","riskname":"險種二"}]
}

上面數據代表:有兩個被保人,兩個險種。按照咱們上面的定義,會調用兩次外部接口,兩次試算,一次投保,一次覈保。而在樣例代碼中,一次外部接口調用耗時爲200ms, 其餘都爲50ms.

本地開發的配置爲8C16G:

  • 同步串行接口調用計算:2 * 200 + 2 * 50 + 50 + 50 = 600ms

  • 多線程異步執行調用計算:按照多線程併發執行原理,取耗時最長的200ms

驗證:同步接口

異步併發利器:實際項目中使用CompletionService提高系統性能的一次實踐

輸出耗時:能夠看到耗時601ms

異步併發利器:實際項目中使用CompletionService提高系統性能的一次實踐

驗證:多線程異步執行接口

異步併發利器:實際項目中使用CompletionService提高系統性能的一次實踐

輸出耗時:能夠看到爲204ms

異步併發利器:實際項目中使用CompletionService提高系統性能的一次實踐

結果:基本和咱們的預期相符合。

結束

這是將實際生產中的例子簡化出來,具體生產的業務比較複雜,不便於展現。

實際狀況下,原來的接口須要1000ms以上才能完成單次調用,有的須要2000-3000ms。如今的接口,在生產兩臺8c16g的虛擬機, 通過4個小時的簡單壓測可以支持2000用戶併發,單次返回時長爲350ms左右,服務很穩定,徹底可以知足公司的業務發展需求。

提供的這個是能夠運行的列子,代碼在:https://github.com/lazasha111211/completionservice-demo.git

相關文章
相關標籤/搜索