在介紹這部份內容以前,先介紹一下因爲我和另一位開發人員的考慮不周形成的一次線上事故場景(考慮企業隱私,屏蔽了一些關鍵詞)。前端
卡劵系統的後臺管理系統,用於處理用戶投訴補償以及發錯券補償的場景。java
兩種狀況會使用到此次咱們開發的功能。git
因爲功能急需上線,產品經理想盡可能簡化開發,設計的輕量化一些,不將補發記錄入庫。每次補發完成頁面就顯示成功多少個,失敗多少個,失敗的UIDs是哪些。github
事故描述:運營人員上傳了250個用戶UID進行補發劵,點擊補發按鈕,等待了約2分鐘頁面顯示失敗了215個UID。ajax
排查狀況:線上環境將應用部署到了2臺服務器。咱們在A和B兩臺服務器的日誌上都查到了補發請求相關日誌。服務器A上的日誌顯示補發失敗35條,服務器B上的日誌顯示補發失敗215條。數據庫
緣由分析:編程
2019-09-06 10:51:19.075
,響應請求的時刻是:2019-09-06 10:52:40.171
;2019-09-06 10:52:19.061
,響應請求的時刻是:2019-09-06 10:52:40.022
。初步排查:因爲後端服務器接收到了兩個請求,判斷是否運營人員點了兩次補發按鈕?通過對前端頁面的測試,點了一次補發按鈕後,頁面出現loading遮罩層,不能第二次點擊補發按鈕。排除運營人員操做的問題。後端
進一步分析:A和B兩臺服務器接收到請求的時間間隔剛好是1分鐘左右,是不是前端Ajax請求的響應超時會自動重試?因爲前端頁面是使用jQuery發送Ajax請求,而且請求類型是POST,瀏覽器並不會自動重試。瀏覽器
最終得出結論:在向指導人請教後,推測是線上環境有Nginx進行負載均衡,當ajax請求獲得響應的時間超過Nginx默認的60秒時,請求會自動重發到另外一臺服務器。向部門經理確認系統架構後,線上環境確實存在負載均衡,使用的是阿里的SLB。(因爲咱們剛接手該項目,對線上環境還不太熟悉)阿里的SLB開啓了超時自動重發機制,超時時間是60秒。bash
一個補發劵的請求通過SLB負載均衡到後端服務器,後端服務器執行業務代碼時間超過了一分鐘,過了60秒後,SLB認爲該請求超時,觸發重試機制,將一樣的請求負載到另一臺後端服務器上,兩臺服務器上的線程開始併發調用發劵接口,因爲發劵接口作了接口冪等性校驗,因此並未出現發劵重複的狀況。最終250個UIDs都成功的完成了補發。
產品經理提出需求時,說要簡化開發,設計輕量化等。但咱們做爲Java開發工程師,咱們不能和產品經理想的同樣,將系統想的過於簡化。仍然要從一個程序的角度出發來考慮問題。
咱們知道,在原生安卓項目開發中,全部的網絡請求都必須在子線程中執行。
安卓爲何要這樣限制呢?我想,安卓必定是考慮到全部的網絡請求都是有可能出現超時的,即便網絡請求只是去簡單的獲取一個資源,但仍可能會出現網絡延遲的狀況。若是在主線程中執行,一旦出現延遲或者超時,給用戶的感受就是界面卡住。因而安卓進行了異步化設計。限制網絡請求只能在子線程中執行。
對於Web應用系統,若是有執行時間較長的請求,咱們也要儘可能將其放在子線程中執行。避免由於等待遠程服務的返回,或者對數據庫的查詢,而阻塞主線程的執行,浪費寶貴的計算資源,影響用戶體驗。
此次線上事故的根本緣由就是開發經驗不足,考慮不周,不瞭解線上狀況,未進行異步化設計。因爲一次請求須要補發較多的用戶,致使一次HTTP請求遲遲未完成三次握手四次揮手過程,SLB服務器認爲請求超時,觸發了重試機制,將一樣的請求打到另一臺服務器上。
在Java語言中,Future
接口,尤爲是它在Java 8中的新版實現CompletableFuture
,是進行異步化設計的利器。
Future
接口在Java 5中被引入,設計初衷是對未來某個時刻會發生的結果進行建模。它建模了一種異步計算,返回一個執行運算結果的引用,當運算結束後,這個引用被返回給調用方。在Future
中觸發那些潛在耗時的操做把調用線程解放出來,讓它能及時響應客戶端或者繼續執行其它有價值的工做,再也不須要呆呆的等到耗時的操做完成。
上述補發劵業務最初的同步代碼大體以下(考慮企業隱私,屏蔽關鍵詞):
業務Service層代碼:
/**
* 同步 劵補發操做
* @param uIds 用戶UID集合
* @param couponId 優惠券ID
* @return 失敗的用戶UID集合
*/
@Override
public List<String> syncReSupplyCoupon(List<String> uIds, String couponId) {
List<String> result = new ArrayList<>();
List<UserInfoModel> userInfoModelList = new ArrayList<>();
// 循環驗證UID有效性
for (String uId : uIds) {
// 查詢UID對應用戶信息
UserInfoModel userInfoModel = reSupplyCouponService.queryUserInfo(uId);
if (userInfoModel != null) {
// UID存在,放入待進行補發用戶集合
userInfoModelList.add(userInfoModel);
} else {
// UID不存在,放入返回結果集合
result.add(uId);
}
}
// 循環進行劵補發
for (UserInfoModel userInfoModel : userInfoModelList) {
Boolean flag = false;
try {
flag = reSupplyCouponService.reSupplyCoupon(couponId,userInfoModel.getUid());
} catch (Exception e) {
// 異常處理
}
if (!flag) {
// 補發劵失敗,放入返回結果集合
result.add(userInfoModel.getUid());
}
}
return result;
}
複製代碼
基礎Service層代碼:
/**
* 查詢用戶信息
* @param uId 用戶UID
* @return 用戶信息model
*/
@Override
public UserInfoModel queryUserInfo(String uId) {
return reSupplyCouponIntegration.queryUserInfo(uId);
}
/**
* 補發劵操做
* @param couponId 優惠券ID
* @param uId 用戶ID
* @return 補發結果:成功或失敗
*/
@Override
public Boolean reSupplyCoupon(String couponId, String uId) {
return reSupplyCouponIntegration.reSupplyCoupon(couponId,uId);
}
複製代碼
Integration
防腐層代碼:
private static List<UserInfoModel> users = new ArrayList<>();
/**
* 初始化操做,模擬遠程用戶數據
*/
static {
for (int i = 0; i < 250; i++) {
users.add(new UserInfoModel(String.valueOf(i)));
}
}
/**
* 模擬查找用戶操做,不存在則UID則新增一個。
* @param uId 用戶UID
* @return 用戶信息model
*/
@Override
public UserInfoModel queryUserInfo(String uId) {
try {
// 模擬調用遠程服務耗時
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
return users.get(Integer.valueOf(uId));
}
/**
* 模擬補發劵操做
* @param couponId 優惠券ID
* @param uId 用戶id
* @return 補發劵結果:成功或失敗
*/
@Override
public Boolean reSupplyCoupon(String couponId, String uId) {
try {
// 模擬調用遠程服務耗時
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 模擬成功或失敗機率
return new Random().nextInt(100) < 90;
}
複製代碼
這段同步執行的代碼中存在的問題:包含2個for循環中經過RPC調用遠程服務提供方進行數據庫操做,若是UID集合數據量較大,這個方法的執行時間是很是長的,例如此次事故中運營人員上傳了250個UID,執行時間就花了2分鐘左右。耗時過長,SLB負載均衡服務器認爲請求超時,進行重試。
使用Future接口進行代碼異步化改造:將耗時的操做封裝到一個Callable
對象中,再將它提交給ExecutorService線程池。
業務Service層代碼:
/**
* 初始化線程池
*/
private static ExecutorService executorService = Executors.newCachedThreadPool();
/**
* 聲明Future
*/
private static Future<List<String>> future;
/**
* 使用Callable封裝耗時操做
*/
class AsyncReSupplyCouponCallable implements Callable<List<String>> {
// 經過構造函數間接傳遞參數給call方法
private List<String> uIds;
private String couponId;
public AsyncReSupplyCouponCallable(List<String> uIds, String couponId) {
this.uIds = uIds;
this.couponId = couponId;
}
@Override
public List<String> call() throws Exception {
// 調用同步的補發劵方法
return syncReSupplyCoupon(uIds,couponId);
}
}
/**
* 異步 劵補發操做 基於JDK 5的Future接口
* @param uIds 用戶UID集合
* @param couponId 優惠券ID
*/
@Override
public void asyncFutureReSupplyCoupon(List<String> uIds, String couponId) {
future = executorService.submit(new AsyncReSupplyCouponCallable(uIds,couponId));
executorService.shutdown();
}
/**
* 獲取補發劵失敗的UIDs在前端顯示
* 由前端控制調用該方法的時機
* 根據上傳的UIDs數量作輪詢,時間能夠設置久一點。
* @return 補發失敗的UID集合
*/
@Override
public List<String> getFailedUIDs() {
List<String> result = new ArrayList<>();
try {
if (future != null) {
// 若是調用get方法時,Callable中的任務還未執行完,則線程阻塞在這裏。
// 使用重載的get方法設置超時時間爲50秒。若是發生阻塞,則最多等待50秒後退出。
result = future.get(50, TimeUnit.SECONDS);
}
} catch (InterruptedException e) {
// 線程等待過程當中被中斷
} catch (ExecutionException e) {
// 計算拋出一個異常
} catch (TimeoutException e) {
// 在Future對象完成以前超時已過時
}
return result;
}
複製代碼
異步化改造基本已經完成。以上代碼已經可以有效避免此次線上事故再次發生了。
基於Future
接口的異步改造已經可以避免事故再次發生,可是耗時的補發劵操做在子線程執行仍然是同步的。子線程中驗證同步執行驗證250個UIDs是否合法,給250個用戶補發劵。耗時仍然很長。如何提高接口的性能呢?若是讓不一樣的UID之間的操做並行,則可顯著提高性能。
利用Java 8的並行流避免每一個UID的順序執行。
業務Service層代碼:
/**
* 使用並行流 補發劵
* @param uIds 用戶UID集合
* @param couponId 優惠券ID
* @return 補發失敗的用戶UIDs集合
*/
@Override
public List<String> parallelReSupplyCoupon(List<String> uIds, String couponId) {
List<String> failUidList = new ArrayList<>();
// 使用並行流驗證UID是否合法,按是否合法進行分區:不存在的爲true區
Map<Boolean, List<UserInfoModel>> userInfoModelMap = uIds.parallelStream()
.map(uId -> reSupplyCouponService.queryUserInfo(uId))
.collect(Collectors.partitioningBy(Objects::isNull));
// 取出不合法的UID加入補發失敗的集合中
userInfoModelMap.get(true)
.parallelStream()
.map(userInfoModel -> failUidList.add(userInfoModel.getUid()))
.collect(Collectors.toList()); // 觸發中間操做
// 取出合法的UID進行補發劵操做
List<Map<String, Object>> reSupplyCouponResult = userInfoModelMap.get(false)
.parallelStream()
.map(userInfoModel -> reSupplyCouponService.reSupplyCouponWithUid(couponId, userInfoModel.getUid()))
.collect(Collectors.toList());
// 從補發劵結果中取出補發失敗的加入補發失敗的集合中
reSupplyCouponResult.parallelStream()
.filter(map -> !(Boolean) map.get("result"))
.map(map -> failUidList.add(String.valueOf(map.get("uId"))))
.collect(Collectors.toList());
return failUidList;
}
複製代碼
基礎Service層中新增接口:
/**
* 補發劵操做
* @param couponId 優惠券ID
* @param uId 用戶ID
* @return [UID,"成功或失敗"],返回對應UID。
*/
@Override
public Map<String, Object> reSupplyCouponWithUid(String couponId, String uId) {
Map<String,Object> map = new HashMap<>();
map.put("uId",uId);
Boolean result = reSupplyCouponIntegration.reSupplyCoupon(couponId,uId);
map.put("result",result);
return map;
}
複製代碼
利用Java 8的CompletableFuture
接口異步化。每個UID的操做之間都是異步的。
須要對全部的CompletableFuture
對象執行join
操做,一個一個等待它們執行完畢。CompletableFuture
類中的join
方法和Future
接口中的get
方法有相同的含義,而且也聲明在Future
接口中,惟一的不一樣是join
方法不會拋出任何檢測到的異常。因此不會顯得Lambda表達式過於臃腫。
業務Service層代碼:
/**
* 異步 劵補發操做 每個UID之間都是異步的 基於JDK 8的CompletableFuture接口
* @param uIds
* @param couponId
* @return
*/
@Override
public List<String> asyncCompletableFutureReSupplyCoupon(List<String> uIds, String couponId) {
List<String> failUidList = new ArrayList<>();
// 使用CompletableFuture異步操做:驗證UID是否存在系統中
List<CompletableFuture<UserInfoModel>> list = uIds.stream()
.map(uId -> CompletableFuture.supplyAsync(
() -> reSupplyCouponService.queryUserInfo(uId))
).collect(Collectors.toList());
// 等待全部異步操做執行結束,分區篩選出存在的UIDs和不存在的UIDs
Map<Boolean, List<UserInfoModel>> joinMap = list.stream()
.map(CompletableFuture::join)
.collect(Collectors.partitioningBy(Objects::isNull));
// 將不存在的UIDs加入補發失敗的集合中
joinMap.get(true)
.stream()
.map(userInfoModel -> failUidList.add(userInfoModel.getUid()))
.collect(Collectors.toList());
// 使用CompletableFuture異步給存在的UIDs補發劵
List<CompletableFuture<Map<String, Object>>> reSupplyCouponResult = joinMap.get(false)
.stream()
.map(userInfoModel -> CompletableFuture.supplyAsync(
() -> reSupplyCouponService.reSupplyCouponWithUid(couponId, userInfoModel.getUid()))
).collect(Collectors.toList());
// 等待全部異步操做執行結束,篩選出補發劵失敗的UIDs存入返回結果集合中
reSupplyCouponResult.stream()
.map(CompletableFuture::join)
.filter(r -> !(Boolean) r.get("result"))
.map(r -> failUidList.add(String.valueOf(r.get("uId"))))
.collect(Collectors.toList());
return failUidList;
}
複製代碼
初始化8個UID進行測試。
測試代碼:
private static List<String> uIds = new ArrayList<>();
/**
* 初始化8個UIDs,模擬待補發用戶
*/
static {
for (int i = 0; i < 8; i++) {
uIds.add(String.valueOf(i));
}
}
/**
* 測試使用Java 8的並行流進行的補發劵操做
*
* 8個UID
* done in 312msecs
*/
@Test
public void testParallelReSupplyCoupon() {
long start = System.nanoTime();
List<String> failedUIDs = reSupplyCouponBizService.parallelReSupplyCoupon(uIds, "1");
long duration = (System.nanoTime() - start) / 1_000_000;
System.out.println("done in " + duration + "msecs");
failedUIDs.stream().forEach(System.out::println);
}
/**
* 測試 異步 劵補發操做 每個UID之間都是異步的 基於JDK 8的CompletableFuture接口
*
* 8個UID
* done in 610msecs
*/
@Test
public void testAsyncCompletableFutureReSupplyCoupon() {
long start = System.nanoTime();
List<String> failedUIDs = reSupplyCouponBizService.asyncCompletableFutureReSupplyCoupon(uIds, "1");
long duration = (System.nanoTime() - start) / 1_000_000;
System.out.println("done in " + duration + "msecs");
failedUIDs.stream().forEach(System.out::println);
}
複製代碼
結果讓人至關失望。使用CompletableFuture
新接口的耗時大約是使用並行流版本的兩倍。難道這種場景下使用CompletableFuture
真的是浪費時間嗎?也許咱們漏掉了某些很重要的東西?咱們運行測試代碼的電腦是否足以以並行方式運行8個線程?
並行流的版本運行的足夠快,那是由於它能並行的執行的8個線程,它能爲每一個UID的操做分配一個線程。可是,若是如今咱們初始化9個UID進行測試,咱們來看看結果:
並行流版本
9個UID
done in 617msecs
異步接口版本
9個UID
done in 611msecs
複製代碼
並行流版本9個UID的測試結果比以前大概多消耗了3秒,這個時間間隔恰好是一次模擬調用遠程服務接口的耗時。由於能夠並行運行的8個線程開始都處於工做狀態,都在對前8個UID進行補發劵等操做。第9個UID的操做只能等到前面某個操做完成釋放出空閒線程才能繼續。
異步接口版本的測試結果和並行流版本相差無幾。究其緣由都同樣:它們內部採用的是一樣的通用線程池,默認都使用固定數量的線程,具體線程數取決於Runtime.getRuntime().availableProcessors()
的返回值。然而,CompletableFuture
具備必定優點,它能夠定製執行器,自定義線程池的大小。這是並行流API沒法實現的。
建立一個配有線程池的執行器很容易,可是咱們該如何選擇合適的線程數目呢?
《Java併發編程實戰》書中介紹到,Brian Goetz和合著者們爲線程池大小的優化提供了很多中肯的建議。這很是重要,若是線程池中線程的數量過多,最終它們會競爭稀缺的處理器和內存資源,浪費大量的時間在上下文切換上。反之,若是線程的數目過少,正如你的應用所面臨的狀況,處理器的一些核可能就沒法充分利用。Brian Goetz建議,線程池大小與處理器的利用率之比可使用下面的公式進行估算: Nthreads = NCPU * UCPU * (1 + W/C) 其中:
- Nthreads是處理器的核的數目,能夠經過
Runtime.getRuntime().availableProcessors()
獲得;- UCPU是指望的CPU利用率(該值應該介於0和1之間)
- W/C是等待時間與計算時間的比率
補發劵接口99%的時間都在等待遠程服務的響應,因此估算出的W/C的比率爲100。若是指望的CPU利用率爲100%,則須要建立一個擁有800個線程的線程池。但實際上,線程池中的有些線程根本沒機會被使用,反而是一種浪費。因此建議將執行器使用的線程數,與實際須要的線程數(UIDs的數量)設定爲一樣的值。這樣每一個UID都對應一個服務線程。可是,當UIDs數量過大時,運行代碼的機器必然會因超負荷而崩潰,因此最好仍是有一個上限。
業務Service層相關代碼以下:
/**
* 定製執行器-線程池大小爲UIDs的數量:設置爲守護線程,當程序退出時,線程也會被回收。
*/
private final Executor executor = Executors.newFixedThreadPool(125, r -> {
Thread t = new Thread(r);
t.setDaemon(true);
return t;
});
/**
* 異步 劵補發操做 定製CompletableFuture接口的執行器
* @param uIds 用戶UID集合
* @param couponId 優惠券ID
* @return 補發失敗的用戶UID集合
*/
@Override
public List<String> asyncCompletableFutureCustomExecutorReSupplyCoupon(List<String> uIds, String couponId) {
List<String> failUidList = new ArrayList<>();
// 使用定製執行器的CompletableFuture異步操做:驗證UID是否存在系統中
List<CompletableFuture<UserInfoModel>> list = uIds.stream()
.map(uId -> CompletableFuture.supplyAsync(
() -> reSupplyCouponService.queryUserInfo(uId),executor)
).collect(Collectors.toList());
// 等待全部異步操做執行結束,分區篩選出存在的UIDs和不存在的UIDs
Map<Boolean, List<UserInfoModel>> joinMap = list.stream()
.map(CompletableFuture::join)
.collect(Collectors.partitioningBy(Objects::isNull));
// 將不存在的UIDs加入補發失敗的集合中
joinMap.get(true)
.stream()
.map(userInfoModel -> failUidList.add(userInfoModel.getUid()))
.collect(Collectors.toList());
// 使用定製執行器的CompletableFuture異步給存在的UIDs補發劵
List<CompletableFuture<Map<String, Object>>> reSupplyCouponResult = joinMap.get(false)
.stream()
.map(userInfoModel -> CompletableFuture.supplyAsync(
() -> reSupplyCouponService.reSupplyCouponWithUid(couponId, userInfoModel.getUid()),executor)
).collect(Collectors.toList());
// 等待全部異步操做執行結束,篩選出補發劵失敗的UIDs存入返回結果集合中
reSupplyCouponResult.stream()
.map(CompletableFuture::join)
.filter(r -> !(Boolean) r.get("result"))
.map(r -> failUidList.add(String.valueOf(r.get("uId"))))
.collect(Collectors.toList());
return failUidList;
}
複製代碼
使用125個UID進行測試:
private static List<String> uIds = new ArrayList<>();
/**
* 初始化操做,模擬待補發用戶
*/
static {
for (int i = 0; i < 125; i++) {
uIds.add(String.valueOf(i));
}
}
/**
* 測試 異步 劵補發操做 定製CompletableFuture接口的執行器
*
* 125個UID
* done in 369msecs
*/
@Test
public void testAsyncCompletableFutureCustomExecutorReSupplyCoupon() {
long start = System.nanoTime();
List<String> failedUIDs = reSupplyCouponBizService.asyncCompletableFutureCustomExecutorReSupplyCoupon(uIds, "1");
long duration = (System.nanoTime() - start) / 1_000_000;
System.out.println("done in " + duration + "msecs");
failedUIDs.stream().forEach(System.out::println);
}
複製代碼
測試結果:done in 369msecs
,顯而易見,耗時和8個UID的並行流版本很接近。性能顯著提高。通常而言,隨着UID數量繼續增多,耗時不會相差太多,直到達到以前計算的閾值800(CPU利用率達到100%)。
並行流底層的Fork/Join框架使用通用的線程池,沒法個性化定製。新的CompletableFuture
接口能夠定製執行器,調整線程池大小,可以更加充分的利用CPU資源。
建議以下:
- 若是你進行的是計算密集型的操做,而且沒有I/O,那麼推薦使用Stream接口,由於實 現簡單,同時效率也多是最高的(若是全部的線程都是計算密集型的,那就沒有必要 建立比處理器核數更多的線程)。
- 反之,若是你並行的工做單元還涉及等待I/O的操做(包括網絡鏈接等待),那麼使用 CompletableFuture靈活性更好,你能夠像前文討論的那樣,依據等待/計算,或者 W/C的比率設定須要使用的線程數。這種狀況不使用並行流的另外一個緣由是,處理流的 流水線中若是發生I/O等待,流的延遲特性會讓咱們很難判斷到底何時觸發了等待。
執行比較耗時的操做時,尤爲是那些依賴一個或多個遠程服務的操做,建議進行異步化設計,使用CompletableFuture
類提供的特性可輕鬆實現異步API。