做者:唐尤華
https://dzone.com/articles/a-...
幾年前 NoSQL 開始流行的時候,像其餘團隊同樣,咱們的團隊也熱衷於使人興奮的新東西,而且計劃替換一個應用程序的數據庫。 可是,當深刻實現細節時,咱們想起了一位智者曾經說過的話:「細節決定成敗」。最終咱們意識到 NoSQL 不是解決全部問題的銀彈,而 NoSQL vs RDMS 的答案是:「視狀況而定」。html
相似地,去年RxJava 和 Spring Reactor 這樣的併發庫加入了讓人充滿激情的語句,如異步非阻塞方法等。爲了不再犯一樣的錯誤,咱們嘗試評估諸如 ExecutorService、 RxJava、Disruptor 和 Akka 這些併發框架彼此之間的差別,以及如何肯定各自框架的正確用法。java
本文中用到的術語在這裏有更詳細的描述。面試
在開始比較併發框架的以前,讓咱們快速複習一下如何配置最佳線程數以提升並行任務的性能。 這個理論適用於全部框架,而且在全部框架中使用相同的線程配置來度量性能。數據庫
參考: http://baddotrobot.com/blog/2...編程
性能測試配置 GCP -> 處理器:Intel(R) Xeon(R) CPU @ 2.30GHz;架構:x86_64;CPU 內核:8個(注意: 這些結果僅對該配置有意義,並不表示一個框架比另外一個框架更好)。後端
若是一個應用程序部署在多個節點上,而且每一個節點的 req/sec 小於可用的核心數量,那麼 ExecutorService 可用於並行化任務,更快地執行代碼。數組
若是一個應用程序部署在多個節點上,而且每一個節點的 req/sec 遠遠高於可用的核心數量,那麼使用 ExecutorService 進一步並行化只會使狀況變得更糟。服務器
當外部服務延遲增長到 400ms 時,性能測試結果以下(請求速率 @50 req/sec,8核)。數據結構
5.3 全部任務按順序執行示例多線程
// I/O 任務:調用外部服務 String posts = JsonService.getPosts(); String comments = JsonService.getComments(); String albums = JsonService.getAlbums(); String photos = JsonService.getPhotos(); // 合併來自外部服務的響應 // (內存中的任務將做爲此操做的一部分執行) int userId = new Random().nextInt(10) + 1; String postsAndCommentsOfRandomUser = ResponseUtil.getPostsAndCommentsOfRandomUser(userId, posts, comments); String albumsAndPhotosOfRandomUser = ResponseUtil.getAlbumsAndPhotosOfRandomUser(userId, albums, photos); // 構建最終響應並將其發送回客戶端 String response = postsAndCommentsOfRandomUser + albumsAndPhotosOfRandomUser; return response;
// 添加 I/O 任務 List<Callable<String>> ioCallableTasks = new ArrayList<>(); ioCallableTasks.add(JsonService::getPosts); ioCallableTasks.add(JsonService::getComments); ioCallableTasks.add(JsonService::getAlbums); ioCallableTasks.add(JsonService::getPhotos); // 調用全部並行任務 ExecutorService ioExecutorService = CustomThreads.getExecutorService(ioPoolSize); List<Future<String>> futuresOfIOTasks = ioExecutorService.invokeAll(ioCallableTasks); // 獲取 I/O 操做(阻塞調用)結果 String posts = futuresOfIOTasks.get(0).get(); String comments = futuresOfIOTasks.get(1).get(); String albums = futuresOfIOTasks.get(2).get(); String photos = futuresOfIOTasks.get(3).get(); // 合併響應(內存中的任務是此操做的一部分) String postsAndCommentsOfRandomUser = ResponseUtil.getPostsAndCommentsOfRandomUser(userId, posts, comments); String albumsAndPhotosOfRandomUser = ResponseUtil.getAlbumsAndPhotosOfRandomUser(userId, albums, photos); // 構建最終響應並將其發送回客戶端 return postsAndCommentsOfRandomUser + albumsAndPhotosOfRandomUser;
與上述狀況相似:處理傳入請求的 HTTP 線程被阻塞,而 CompletableFuture 用於處理並行任務
若是沒有 AsyncResponse,性能與 ExecutorService 相同。 若是多個 API 調用必須異步而且連接起來,那麼這種方法更好(相似 Node 中的 Promises)。
ExecutorService ioExecutorService = CustomThreads.getExecutorService(ioPoolSize); // I/O 任務 CompletableFuture<String> postsFuture = CompletableFuture.supplyAsync(JsonService::getPosts, ioExecutorService); CompletableFuture<String> commentsFuture = CompletableFuture.supplyAsync(JsonService::getComments, ioExecutorService); CompletableFuture<String> albumsFuture = CompletableFuture.supplyAsync(JsonService::getAlbums, ioExecutorService); CompletableFuture<String> photosFuture = CompletableFuture.supplyAsync(JsonService::getPhotos, ioExecutorService); CompletableFuture.allOf(postsFuture, commentsFuture, albumsFuture, photosFuture).get(); // 從 I/O 任務(阻塞調用)得到響應 String posts = postsFuture.get(); String comments = commentsFuture.get(); String albums = albumsFuture.get(); String photos = photosFuture.get(); // 合併響應(內存中的任務將是此操做的一部分) String postsAndCommentsOfRandomUser = ResponseUtil.getPostsAndCommentsOfRandomUser(userId, posts, comments); String albumsAndPhotosOfRandomUser = ResponseUtil.getAlbumsAndPhotosOfRandomUser(userId, albums, photos); // 構建最終響應並將其發送回客戶端 return postsAndCommentsOfRandomUser + albumsAndPhotosOfRandomUser;
使用 ExecutorService 並行處理全部任務,並使用 @suspended AsyncResponse response 以非阻塞方式發送響應。
圖片來自 http://tutorials.jenkov.com/j...
若是用例相似於服務器端聊天應用程序,在客戶端響應以前,線程不須要保持鏈接,那麼異步、非阻塞方法比同步通訊更受歡迎。在這些用例中,系統資源能夠經過異步、非阻塞方法獲得更好的利用,而不只僅是等待。
// 爲異步執行提交併行任務 ExecutorService ioExecutorService = CustomThreads.getExecutorService(ioPoolSize); CompletableFuture<String> postsFuture = CompletableFuture.supplyAsync(JsonService::getPosts, ioExecutorService); CompletableFuture<String> commentsFuture = CompletableFuture.supplyAsync(JsonService::getComments, ioExecutorService); CompletableFuture<String> albumsFuture = CompletableFuture.supplyAsync(JsonService::getAlbums, ioExecutorService); CompletableFuture<String> photosFuture = CompletableFuture.supplyAsync(JsonService::getPhotos, ioExecutorService); // 當 /posts API 返回響應時,它將與來自 /comments API 的響應結合在一塊兒 // 做爲這個操做的一部分,將執行內存中的一些任務 CompletableFuture<String> postsAndCommentsFuture = postsFuture.thenCombineAsync(commentsFuture, (posts, comments) -> ResponseUtil.getPostsAndCommentsOfRandomUser(userId, posts, comments), ioExecutorService); // 當 /albums API 返回響應時,它將與來自 /photos API 的響應結合在一塊兒 // 做爲這個操做的一部分,將執行內存中的一些任務 CompletableFuture<String> albumsAndPhotosFuture = albumsFuture.thenCombineAsync(photosFuture, (albums, photos) -> ResponseUtil.getAlbumsAndPhotosOfRandomUser(userId, albums, photos), ioExecutorService); // 構建最終響應並恢復 http 鏈接,把響應發送回客戶端 postsAndCommentsFuture.thenAcceptBothAsync(albumsAndPhotosFuture, (s1, s2) -> { LOG.info("Building Async Response in Thread " + Thread.currentThread().getName()); String response = s1 + s2; asyncHttpResponse.resume(response); }, ioExecutorService);
若是編碼的場景適合異步非阻塞方式,那麼能夠首選 RxJava 或任何響應式開發庫。 還具備諸如 back-pressure 之類的附加功能,能夠在生產者和消費者之間平衡負載。
int userId = new Random().nextInt(10) + 1; ExecutorService executor = CustomThreads.getExecutorService(8); // I/O 任務 Observable<String> postsObservable = Observable.just(userId).map(o -> JsonService.getPosts()) .subscribeOn(Schedulers.from(executor)); Observable<String> commentsObservable = Observable.just(userId).map(o -> JsonService.getComments()) .subscribeOn(Schedulers.from(executor)); Observable<String> albumsObservable = Observable.just(userId).map(o -> JsonService.getAlbums()) .subscribeOn(Schedulers.from(executor)); Observable<String> photosObservable = Observable.just(userId).map(o -> JsonService.getPhotos()) .subscribeOn(Schedulers.from(executor)); // 合併來自 /posts 和 /comments API 的響應 // 做爲這個操做的一部分,將執行內存中的一些任務 Observable<String> postsAndCommentsObservable = Observable .zip(postsObservable, commentsObservable, (posts, comments) -> ResponseUtil.getPostsAndCommentsOfRandomUser(userId, posts, comments)) .subscribeOn(Schedulers.from(executor)); // 合併來自 /albums 和 /photos API 的響應 // 做爲這個操做的一部分,將執行內存中的一些任務 Observable<String> albumsAndPhotosObservable = Observable .zip(albumsObservable, photosObservable, (albums, photos) -> ResponseUtil.getAlbumsAndPhotosOfRandomUser(userId, albums, photos)) .subscribeOn(Schedulers.from(executor)); // 構建最終響應 Observable.zip(postsAndCommentsObservable, albumsAndPhotosObservable, (r1, r2) -> r1 + r2) .subscribeOn(Schedulers.from(executor)) .subscribe((response) -> asyncResponse.resume(response), e -> asyncResponse.resume("error"));
[Queue vs RingBuffer]
圖片1: http://tutorials.jenkov.com/j...
圖片2: https://www.baeldung.com/lmax...
Disruptor 框架在下列場合性能更好:與事件驅動的體系結構一塊兒使用,或主要關注內存任務的單個生產者和多個消費者。
static { int userId = new Random().nextInt(10) + 1; // 示例 Event-Handler; count down latch 用於使線程與 http 線程同步 EventHandler<Event> postsApiHandler = (event, sequence, endOfBatch) -> { event.posts = JsonService.getPosts(); event.countDownLatch.countDown(); }; // 配置 Disputor 用於處理事件 DISRUPTOR.handleEventsWith(postsApiHandler, commentsApiHandler, albumsApiHandler) .handleEventsWithWorkerPool(photosApiHandler1, photosApiHandler2) .thenHandleEventsWithWorkerPool(postsAndCommentsResponseHandler1, postsAndCommentsResponseHandler2) .handleEventsWithWorkerPool(albumsAndPhotosResponseHandler1, albumsAndPhotosResponseHandler2); DISRUPTOR.start(); } // 對於每一個請求,在 RingBuffer 中發佈一個事件: Event event = null; RingBuffer<Event> ringBuffer = DISRUPTOR.getRingBuffer(); long sequence = ringBuffer.next(); CountDownLatch countDownLatch = new CountDownLatch(6); try { event = ringBuffer.get(sequence); event.countDownLatch = countDownLatch; event.startTime = System.currentTimeMillis(); } finally { ringBuffer.publish(sequence); } try { event.countDownLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); }
圖片來自: https://blog.codecentric.de/e...
// 來自 controller : Actors.masterActor.tell(new Master.Request("Get Response", event, Actors.workerActor), ActorRef.noSender()); // handler : public Receive createReceive() { return receiveBuilder().match(Request.class, request -> { Event event = request.event; // Ideally, immutable data structures should be used here. request.worker.tell(new JsonServiceWorker.Request("posts", event), getSelf()); request.worker.tell(new JsonServiceWorker.Request("comments", event), getSelf()); request.worker.tell(new JsonServiceWorker.Request("albums", event), getSelf()); request.worker.tell(new JsonServiceWorker.Request("photos", event), getSelf()); }).match(Event.class, e -> { if (e.posts != null && e.comments != null & e.albums != null & e.photos != null) { int userId = new Random().nextInt(10) + 1; String postsAndCommentsOfRandomUser = ResponseUtil.getPostsAndCommentsOfRandomUser(userId, e.posts, e.comments); String albumsAndPhotosOfRandomUser = ResponseUtil.getAlbumsAndPhotosOfRandomUser(userId, e.albums, e.photos); String response = postsAndCommentsOfRandomUser + albumsAndPhotosOfRandomUser; e.response = response; e.countDownLatch.countDown(); } }).build(); }
推薦去個人博客閱讀更多:
2.Spring MVC、Spring Boot、Spring Cloud 系列教程
3.Maven、Git、Eclipse、Intellij IDEA 系列工具教程
以爲不錯,別忘了點贊+轉發哦!