在處理原始對帳文件的時候,我將數據歸類後批量存入相應的表中。在持久化的時候,用了parallelStream(),想着同時存入不少表這樣能夠提升效率。spring
@Override @Transactional public boolean handleTask(AccEbankAlEveBill[] task, String ownSign) throws Exception { Arrays.stream(task).parallel().map(AccEbankBill::convert).collect(Collectors.groupingBy(AccEbankBill::getTradeType)).entrySet() // 這裏出了問題 .parallelStream().forEach(item -> { switch (EbankAleveTradeTypeEnum.valueOf(item.getKey())) { // 提現... // 充值 case RECHARGE: accEbankRechargeBillDao.batchSave(item.getValue()); break; // 利息 case INTEREST: accEbankInterestBillDao.batchSave(item.getValue()); break; case OTHERS: accEBankAleveOthersBillDao.batchSave(item.getValue()); break; } }); // DB update return true; }
上面代碼中將對帳數據按類型歸類後,獲得一個Map<String, List<Bean>>,key爲類型,value爲數據。考慮到數據類型有不少種,而後使用了parallelStream()
。在最近的一次自測中,因爲開發數據庫網絡問題,形成事務處理超時,但發現數據沒有回滾!數據庫
開始沿着數據庫超時中斷機制的思路找問題,花了比較多的時間在分析是數據庫端觸發了中斷,仍是應用層主動中斷,以及二者對是否回滾有啥區別。。最後發現這些都不是緣由apache
次日一早忽然想到這裏的parallelStream()
多是罪魁禍首,由於它開啓了多線程(多線程每每有問題),本機環境一共有4個worker在處理(包含主線程),可是超時致使的org.springframework.transaction.TransactionTimedOutException
錯誤是發生在主線程內的,那確定只有主線程回滾了。後經測試證明。
編程
解決方法就是去除parallelStream()
,簡簡單單的使用Map的forEach就行了。服務器
結論:事務只能管着開啓事務的線程,其餘子線程出了問題都感知不到,因此在多線程環境操做DB要慎重。普通的多線程很容易發覺,但parallelStream是也是,切記網絡
1. 線程池的大小多線程
線程池的大小 = 處理器的核的數目 指望的CPU利用率 (1 + W/C)併發
其中:ide
源自《Java併發編程實戰》Brian Goetz的建議測試
2. 文件下載
HTTP(S)用apache httpclient可實現連接池和斷點續傳, FTP可以使用Apache Commons Net API。重試間隔設置爲5~10分鐘較合適。高頻容易搞死服務器,低頻會阻塞自身程序。重試次數和超時時間根據業務狀況設置。