spring異步service中的處理線程數限制

狀況簡介
spring項目,controller異步調用service的方法,產生大量併發。java

  • 具體業務:
    前臺同時傳入大量待翻譯的單詞,後臺業務接收單詞,並調用百度翻譯接口翻譯接收單詞並將翻譯結果保存到數據庫,前臺不須要實時返回翻譯結果。
  • 處理方式:
    controller接收文本調用service中的異步方法,將單詞先保存到隊列中,再啓動2個新線程,從緩存隊列中取單詞,並調用百度翻譯接口獲取翻譯結果並將翻譯結果保存到數據庫。
  • 本文主要知識點:
    多線程同時(異步)調用方法後,開啓新線程,並限制線程數量。

代碼以下:spring

@Service
public class LgtsAsyncServiceImpl {
    /** logger日誌. */
    public static final Logger LOGGER = Logger.getLogger(LgtsAsyncServiceImpl2.class);

    private final BlockingQueue<Lgts> que = new LinkedBlockingQueue<>();// 待翻譯的隊列
    private final AtomicInteger threadCnt = new AtomicInteger(0);// 當前翻譯中的線程數
    private final Vector<String> existsKey = new Vector<>();// 保存已入隊列的數據
    private final int maxThreadCnt = 2;// 容許同時執行的翻譯線程數
    private static final int NUM_OF_EVERY_TIME = 50;// 每次提交的翻譯條數
    private static final String translationFrom = "zh";

    @Async
    public void saveAsync(Lgts t) {
        if (Objects.isNull(t) || StringUtils.isAnyBlank(t.getGco(), t.getCode())) {
            return;
        }
        offer(t);
        save();
        return;
    }

    private boolean offer(Lgts t) {
        String key = t.getGco() + "-" + t.getCode();
        if (!existsKey.contains(key)) {
            existsKey.add(key);
            boolean result = que.offer(t);
            // LOGGER.trace("待翻譯文字[" + t.getGco() + ":" + t.getCode() + "]加入隊列結果[" + result
            // + "],隊列中數據總個數:" + que.size());
            return result;
        }
        return false;
    }

    @Autowired
    private LgtsService lgtsService;

    private void save() {
        int cnt = threadCnt.incrementAndGet();// 當前線程數+1
        if (cnt > maxThreadCnt) {
            // 已啓動的線程大於設置的最大線程數直接丟棄
            threadCnt.decrementAndGet();// +1的線程數再-回去
            return;
        }
        GwallUser user = UserUtils.getUser();
        Thread thr = new Thread() {
            public void run() {
                long sleepTime = 30000l;
                UserUtils.setUser(user);
                boolean continueFlag = true;
                int maxContinueCnt = 5;// 最大連續休眠次數,連續休眠次數超過最大休眠次數後,while循環退出,當前線程銷燬
                int continueCnt = 0;// 連續休眠次數

                while (continueFlag) {// 隊列不爲空時執行
                    if (Objects.isNull(que.peek())) {
                        try {
                            if (continueCnt > maxContinueCnt) {
                                // 連續休眠次數達到最大連續休眠次數,當前線程將銷燬。
                                continueFlag = false;
                                continue;
                            }
                            // 隊列爲空,準備休眠
                            Thread.sleep(sleepTime);
                            continueCnt++;
                            continue;
                        } catch (InterruptedException e) {
                            // 休眠失敗,無需處理
                            e.printStackTrace();
                        }
                    }
                    continueCnt = 0;// 重置連續休眠次數爲0

                    List<Lgts> params = new ArrayList<>();
                    int totalCnt = que.size();
                    que.drainTo(params, NUM_OF_EVERY_TIME);
                    StringBuilder utf8q = new StringBuilder();
                    String code = "";
                    List<Lgts> needRemove = new ArrayList<>();
                    for (Lgts lgts : params) {
                        if (StringUtils.isAnyBlank(code)) {
                            code = lgts.getCode();
                        }
                        // 移除existsKey中保存的key,以避免下面翻譯失敗時再次加入隊列時,加入不進去
                        String key = lgts.getGco() + "-" + lgts.getCode();
                        existsKey.remove(key);

                        if (!code.equalsIgnoreCase(lgts.getCode())) {// 要翻譯的目標語言與當前列表中的第一個不一致
                            offer(lgts);// 從新將待翻譯的語言放回隊列
                            needRemove.add(lgts);
                            continue;
                        }
                        utf8q.append(lgts.getGco()).append("\n");
                    }
                    params.removeAll(needRemove);
                    LOGGER.debug("隊列中共" + totalCnt + " 個,獲取" + params.size() + " 個符合條件的待翻譯內容,編碼:" + code);
                    String to = "en";
                    if (StringUtils.isAnyBlank(utf8q, to)) {
                        LOGGER.warn("調用翻譯出錯,未找到[" + code + "]對應的百度編碼。");
                        continue;
                    }
                    Map<String, String> result = getBaiduTranslation(utf8q.toString(), translationFrom, to);
                    if (Objects.isNull(result) || result.isEmpty()) {// 把沒有獲取到翻譯結果的從新放回隊列
                        for (Lgts lgts : params) {
                            offer(lgts);
                        }
                        LOGGER.debug("本次翻譯結果爲空。");
                        continue;
                    }
                    int sucessCnt = 0, ignoreCnt = 0;
                    for (Lgts lgts : params) {
                        lgts.setBdcode(to);
                        String gna = result.get(lgts.getGco());
                        if (StringUtils.isAnyBlank(gna)) {
                            offer(lgts);// 從新將待翻譯的語言放回隊列
                            continue;
                        }
                        lgts.setStat(1);
                        lgts.setGna(gna);
                        int saveResult = lgtsService.saveIgnore(lgts);
                        if (0 == saveResult) {
                            ignoreCnt++;
                        } else {
                            sucessCnt++;
                        }
                    }
                    LOGGER.debug("待翻譯個數:" + params.size() + ",翻譯成功個數:" + sucessCnt + ",已存在並忽略個數:" + ignoreCnt);
                }
                threadCnt.decrementAndGet();// 運行中的線程數-1
                distory();// 清理數據,必須放在方法最後,不然distory中的判斷須要修改
            }

            /**
             * 若是是最後一個線程,清空隊列和existsKey中的數據
             */
            private void distory() {
                if (0 == threadCnt.get()) {
                    // 最後一個線程退出時,執行清理操做
                    existsKey.clear();
                    que.clear();
                }
            }
        };
        thr.setDaemon(true);// 守護線程,若是主線程執行完畢,則此線程會自動銷燬
        thr.setName("baidufanyi-" + RandomUtils.nextInt(1000, 9999));
        thr.start();// 啓動插入線程
    }

    /**
     * 百度翻譯
     * 
     * @param utf8q
     *            待翻譯的字符串,須要utf8格式的
     * @param from
     *            百度翻譯語言列表中的代碼
     *            參見:http://api.fanyi.baidu.com/api/trans/product/apidoc#languageList
     * @param to
     *            百度翻譯語言列表中的代碼
     *            參見:http://api.fanyi.baidu.com/api/trans/product/apidoc#languageList
     * @return 翻譯結果
     */
    private Map<String, String> getBaiduTranslation(String utf8q, String from, String to) {
        Map<String, String> result = new HashMap<>();
        String baiduurlStr = "http://api.fanyi.baidu.com/api/trans/vip/translate";
        if (StringUtils.isAnyBlank(baiduurlStr)) {
            LOGGER.warn("百度翻譯API接口URL相關參數爲空!");
            return result;
        }
        Map<String, String> params = buildParams(utf8q, from, to);
        if (params.isEmpty()) {
            return result;
        }

        String sendUrl = getUrlWithQueryString(baiduurlStr, params);
        try {
            HttpClient httpClient = new HttpClient();
            httpClient.setMethod("GET");
            String remoteResult = httpClient.pub(sendUrl, "");
            result = convertRemote(remoteResult);
        } catch (Exception e) {
            LOGGER.info("百度翻譯API返回結果異常!", e);
        }
        return result;
    }

    private Map<String, String> convertRemote(String remoteResult) {
        Map<String, String> result = new HashMap<>();
        if (StringUtils.isBlank(remoteResult)) {
            return result;
        }
        JSONObject jsonObject = JSONObject.parseObject(remoteResult);
        JSONArray trans_result = jsonObject.getJSONArray("trans_result");
        if (Objects.isNull(trans_result) || trans_result.isEmpty()) {
            return result;
        }
        for (Object object : trans_result) {
            JSONObject trans = (JSONObject) object;
            result.put(trans.getString("src"), trans.getString("dst"));
        }
        return result;
    }

    private Map<String, String> buildParams(String utf8q, String from, String to) {
        if (StringUtils.isBlank(from)) {
            from = "auto";
        }
        Map<String, String> params = new HashMap<String, String>();
        String skStr = "sk";
        String appidStr = "appid";
        if (StringUtils.isAnyBlank(skStr, appidStr)) {
            LOGGER.warn("百度翻譯API接口相關參數爲空!");
            return params;
        }

        params.put("q", utf8q);
        params.put("from", from);
        params.put("to", to);

        params.put("appid", appidStr);

        // 隨機數
        String salt = String.valueOf(System.currentTimeMillis());
        params.put("salt", salt);

        // 簽名
        String src = appidStr + utf8q + salt + skStr; // 加密前的原文
        params.put("sign", MD5Util.md5Encrypt(src).toLowerCase());
        return params;
    }

    public static String getUrlWithQueryString(String url, Map<String, String> params) {
        if (params == null) {
            return url;
        }

        StringBuilder builder = new StringBuilder(url);
        if (url.contains("?")) {
            builder.append("&");
        } else {
            builder.append("?");
        }

        int i = 0;
        for (String key : params.keySet()) {
            String value = params.get(key);
            if (value == null) { // 過濾空的key
                continue;
            }

            if (i != 0) {
                builder.append('&');
            }

            builder.append(key);
            builder.append('=');
            builder.append(encode(value));

            i++;
        }

        return builder.toString();
    }

    /**
     * 對輸入的字符串進行URL編碼, 即轉換爲%20這種形式
     * 
     * @param input
     *            原文
     * @return URL編碼. 若是編碼失敗, 則返回原文
     */
    public static String encode(String input) {
        if (input == null) {
            return "";
        }

        try {
            return URLEncoder.encode(input, "utf-8");
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }

        return input;
    }
}
相關文章
相關標籤/搜索