來,帶你鳥瞰 Java 中的併發框架!

做者:唐尤華
https://dzone.com/articles/a-...

1. 爲何要寫這篇文章

幾年前 NoSQL 開始流行的時候,像其餘團隊同樣,咱們的團隊也熱衷於使人興奮的新東西,而且計劃替換一個應用程序的數據庫。 可是,當深刻實現細節時,咱們想起了一位智者曾經說過的話:「細節決定成敗」。最終咱們意識到 NoSQL 不是解決全部問題的銀彈,而 NoSQL vs RDMS 的答案是:「視狀況而定」。html

相似地,去年RxJava 和 Spring Reactor 這樣的併發庫加入了讓人充滿激情的語句,如異步非阻塞方法等。爲了不再犯一樣的錯誤,咱們嘗試評估諸如 ExecutorService、 RxJava、Disruptor 和 Akka 這些併發框架彼此之間的差別,以及如何肯定各自框架的正確用法。java

本文中用到的術語在這裏有更詳細的描述。面試

2. 分析併發框架的示例用例

3. 快速更新線程配置

在開始比較併發框架的以前,讓咱們快速複習一下如何配置最佳線程數以提升並行任務的性能。 這個理論適用於全部框架,而且在全部框架中使用相同的線程配置來度量性能。數據庫

  • 對於內存任務,線程的數量大約等於具備最佳性能的內核的數量,儘管它能夠根據各自處理器中的超線程特性進行一些更改。
  • 例如,在8核機器中,若是對應用程序的每一個請求都必須在內存中並行執行4個任務,那麼這臺機器上的負載應該保持爲 @2 req/sec,在 ThreadPool 中保持8個線程。
  • 對於 I/O 任務,ExecutorService 中配置的線程數應該取決於外部服務的延遲。
  • 與內存中的任務不一樣,I/O 任務中涉及的線程將被阻塞,並處於等待狀態,直到外部服務響應或超時。 所以,當涉及 I/O 任務線程被阻塞時,應該增長線程的數量,以處理來自併發請求的額外負載。
  • I/O 任務的線程數應該以保守的方式增長,由於處於活動狀態的許多線程帶來了上下文切換的成本,這將影響應用程序的性能。 爲了不這種狀況,應該根據 I/O 任務中涉及的線程的等待時間按比例增長此機器的線程的確切數量以及負載。

參考: http://baddotrobot.com/blog/2...編程

4. 性能測試結果

性能測試配置 GCP -> 處理器:Intel(R) Xeon(R) CPU @ 2.30GHz;架構:x86_64;CPU 內核:8個(注意: 這些結果僅對該配置有意義,並不表示一個框架比另外一個框架更好)。後端

5. 使用執行器服務並行化 IO 任務

5.1 什麼時候使用?

若是一個應用程序部署在多個節點上,而且每一個節點的 req/sec 小於可用的核心數量,那麼 ExecutorService 可用於並行化任務,更快地執行代碼。數組

5.2 何時適用?

若是一個應用程序部署在多個節點上,而且每一個節點的 req/sec 遠遠高於可用的核心數量,那麼使用 ExecutorService 進一步並行化只會使狀況變得更糟。服務器

當外部服務延遲增長到 400ms 時,性能測試結果以下(請求速率 @50 req/sec,8核)。數據結構

5.3 全部任務按順序執行示例多線程

// I/O 任務:調用外部服務
String posts = JsonService.getPosts();
String comments = JsonService.getComments();
String albums = JsonService.getAlbums();
String photos = JsonService.getPhotos();

// 合併來自外部服務的響應
// (內存中的任務將做爲此操做的一部分執行)
int userId = new Random().nextInt(10) + 1;
String postsAndCommentsOfRandomUser = ResponseUtil.getPostsAndCommentsOfRandomUser(userId, posts, comments);
String albumsAndPhotosOfRandomUser = ResponseUtil.getAlbumsAndPhotosOfRandomUser(userId, albums, photos);

// 構建最終響應並將其發送回客戶端
String response = postsAndCommentsOfRandomUser + albumsAndPhotosOfRandomUser;
return response;

5.4 I/O 任務與 ExecutorService 並行執行代碼示例

// 添加 I/O 任務
List<Callable<String>> ioCallableTasks = new ArrayList<>();
ioCallableTasks.add(JsonService::getPosts);
ioCallableTasks.add(JsonService::getComments);
ioCallableTasks.add(JsonService::getAlbums);
ioCallableTasks.add(JsonService::getPhotos);

// 調用全部並行任務
ExecutorService ioExecutorService = CustomThreads.getExecutorService(ioPoolSize);
List<Future<String>> futuresOfIOTasks = ioExecutorService.invokeAll(ioCallableTasks);

// 獲取 I/O  操做(阻塞調用)結果
String posts = futuresOfIOTasks.get(0).get();
String comments = futuresOfIOTasks.get(1).get();
String albums = futuresOfIOTasks.get(2).get();
String photos = futuresOfIOTasks.get(3).get();

// 合併響應(內存中的任務是此操做的一部分)
String postsAndCommentsOfRandomUser = ResponseUtil.getPostsAndCommentsOfRandomUser(userId, posts, comments);
String albumsAndPhotosOfRandomUser = ResponseUtil.getAlbumsAndPhotosOfRandomUser(userId, albums, photos);

// 構建最終響應並將其發送回客戶端
return postsAndCommentsOfRandomUser + albumsAndPhotosOfRandomUser;

6. 使用執行器服務並行化 IO 任務(CompletableFuture)

與上述狀況相似:處理傳入請求的 HTTP 線程被阻塞,而 CompletableFuture 用於處理並行任務

6.1 什麼時候使用?

若是沒有 AsyncResponse,性能與 ExecutorService 相同。 若是多個 API 調用必須異步而且連接起來,那麼這種方法更好(相似 Node 中的 Promises)。

ExecutorService ioExecutorService = CustomThreads.getExecutorService(ioPoolSize);

// I/O 任務
CompletableFuture<String> postsFuture = CompletableFuture.supplyAsync(JsonService::getPosts, ioExecutorService);
CompletableFuture<String> commentsFuture = CompletableFuture.supplyAsync(JsonService::getComments,
    ioExecutorService);
CompletableFuture<String> albumsFuture = CompletableFuture.supplyAsync(JsonService::getAlbums,
    ioExecutorService);
CompletableFuture<String> photosFuture = CompletableFuture.supplyAsync(JsonService::getPhotos,
    ioExecutorService);
CompletableFuture.allOf(postsFuture, commentsFuture, albumsFuture, photosFuture).get();

// 從 I/O 任務(阻塞調用)得到響應
String posts = postsFuture.get();
String comments = commentsFuture.get();
String albums = albumsFuture.get();
String photos = photosFuture.get();

// 合併響應(內存中的任務將是此操做的一部分)
String postsAndCommentsOfRandomUser = ResponseUtil.getPostsAndCommentsOfRandomUser(userId, posts, comments);
String albumsAndPhotosOfRandomUser = ResponseUtil.getAlbumsAndPhotosOfRandomUser(userId, albums, photos);

// 構建最終響應並將其發送回客戶端
return postsAndCommentsOfRandomUser + albumsAndPhotosOfRandomUser;

7. 使用 ExecutorService 並行處理全部任務

使用 ExecutorService 並行處理全部任務,並使用 @suspended AsyncResponse response 以非阻塞方式發送響應。

圖片來自 http://tutorials.jenkov.com/j...

  • HTTP 線程處理傳入請求的鏈接,並將處理傳遞給 Executor Pool,當全部任務完成後,另外一個 HTTP 線程將把響應發送回客戶端(異步非阻塞)。
  • 性能降低緣由:
  • 在同步通訊中,儘管 I/O 任務中涉及的線程被阻塞,可是隻要進程有額外的線程來承擔併發請求負載,它仍然處於運行狀態。
  • 所以,以非阻塞方式保持線程所帶來的好處很是少,並且在此模式中處理請求所涉及的成本彷佛很高。
  • 一般,對這裏討論採用的例子使用異步非阻塞方法會下降應用程序的性能。

7.1 什麼時候使用?

若是用例相似於服務器端聊天應用程序,在客戶端響應以前,線程不須要保持鏈接,那麼異步、非阻塞方法比同步通訊更受歡迎。在這些用例中,系統資源能夠經過異步、非阻塞方法獲得更好的利用,而不只僅是等待。

// 爲異步執行提交併行任務
ExecutorService ioExecutorService = CustomThreads.getExecutorService(ioPoolSize);
CompletableFuture<String> postsFuture = CompletableFuture.supplyAsync(JsonService::getPosts, ioExecutorService);
CompletableFuture<String> commentsFuture = CompletableFuture.supplyAsync(JsonService::getComments,
ioExecutorService);
CompletableFuture<String> albumsFuture = CompletableFuture.supplyAsync(JsonService::getAlbums,
ioExecutorService);
CompletableFuture<String> photosFuture = CompletableFuture.supplyAsync(JsonService::getPhotos,
ioExecutorService);

// 當 /posts API 返回響應時,它將與來自 /comments API 的響應結合在一塊兒
// 做爲這個操做的一部分,將執行內存中的一些任務
CompletableFuture<String> postsAndCommentsFuture = postsFuture.thenCombineAsync(commentsFuture,
(posts, comments) -> ResponseUtil.getPostsAndCommentsOfRandomUser(userId, posts, comments),
ioExecutorService);

// 當 /albums API 返回響應時,它將與來自 /photos API 的響應結合在一塊兒
// 做爲這個操做的一部分,將執行內存中的一些任務
CompletableFuture<String> albumsAndPhotosFuture = albumsFuture.thenCombineAsync(photosFuture,
(albums, photos) -> ResponseUtil.getAlbumsAndPhotosOfRandomUser(userId, albums, photos),
ioExecutorService);

// 構建最終響應並恢復 http 鏈接,把響應發送回客戶端
postsAndCommentsFuture.thenAcceptBothAsync(albumsAndPhotosFuture, (s1, s2) -> {
LOG.info("Building Async Response in Thread " + Thread.currentThread().getName());
String response = s1 + s2;
asyncHttpResponse.resume(response);
}, ioExecutorService);

8. RxJava

  • 這與上面的狀況相似,惟一的區別是 RxJava 提供了更好的 DSL 能夠進行流式編程,下面的例子中沒有體現這一點。
  • 性能優於 CompletableFuture 處理並行任務。

8.1 什麼時候使用?

若是編碼的場景適合異步非阻塞方式,那麼能夠首選 RxJava 或任何響應式開發庫。 還具備諸如 back-pressure 之類的附加功能,能夠在生產者和消費者之間平衡負載。

int userId = new Random().nextInt(10) + 1;
ExecutorService executor = CustomThreads.getExecutorService(8);

// I/O 任務
Observable<String> postsObservable = Observable.just(userId).map(o -> JsonService.getPosts())
.subscribeOn(Schedulers.from(executor));
Observable<String> commentsObservable = Observable.just(userId).map(o -> JsonService.getComments())
.subscribeOn(Schedulers.from(executor));
Observable<String> albumsObservable = Observable.just(userId).map(o -> JsonService.getAlbums())
.subscribeOn(Schedulers.from(executor));
Observable<String> photosObservable = Observable.just(userId).map(o -> JsonService.getPhotos())
.subscribeOn(Schedulers.from(executor));

// 合併來自 /posts 和 /comments API 的響應
// 做爲這個操做的一部分,將執行內存中的一些任務
Observable<String> postsAndCommentsObservable = Observable
.zip(postsObservable, commentsObservable,
(posts, comments) -> ResponseUtil.getPostsAndCommentsOfRandomUser(userId, posts, comments))
.subscribeOn(Schedulers.from(executor));

// 合併來自 /albums 和 /photos API 的響應
// 做爲這個操做的一部分,將執行內存中的一些任務
Observable<String> albumsAndPhotosObservable = Observable
.zip(albumsObservable, photosObservable,
(albums, photos) -> ResponseUtil.getAlbumsAndPhotosOfRandomUser(userId, albums, photos))
.subscribeOn(Schedulers.from(executor));

// 構建最終響應
Observable.zip(postsAndCommentsObservable, albumsAndPhotosObservable, (r1, r2) -> r1 + r2)
.subscribeOn(Schedulers.from(executor))
.subscribe((response) -> asyncResponse.resume(response), e -> asyncResponse.resume("error"));

9. Disruptor

[Queue vs RingBuffer]

圖片1: http://tutorials.jenkov.com/j...

圖片2: https://www.baeldung.com/lmax...

  • 在本例中,HTTP 線程將被阻塞,直到 disruptor 完成任務,而且使用 countdowlatch 將 HTTP 線程與 ExecutorService 中的線程同步。
  • 這個框架的主要特色是在沒有任何鎖的狀況下處理線程間通訊。在 ExecutorService 中,生產者和消費者之間的數據將經過 Queue傳遞,在生產者和消費者之間的數據傳輸過程當中涉及到一個鎖。 Disruptor 框架經過一個名爲 Ring Buffer 的數據結構(它是循環數組隊列的擴展版本)來處理這種生產者-消費者通訊,而且不須要任何鎖。
  • 這個庫不適用於咱們在這裏討論的這種用例。僅出於好奇而添加。

9.1 什麼時候使用?

Disruptor 框架在下列場合性能更好:與事件驅動的體系結構一塊兒使用,或主要關注內存任務的單個生產者和多個消費者。

static {
    int userId = new Random().nextInt(10) + 1;

    // 示例 Event-Handler; count down latch 用於使線程與 http 線程同步
    EventHandler<Event> postsApiHandler = (event, sequence, endOfBatch) -> {
        event.posts = JsonService.getPosts();
        event.countDownLatch.countDown();
    };

    // 配置 Disputor 用於處理事件
    DISRUPTOR.handleEventsWith(postsApiHandler, commentsApiHandler, albumsApiHandler)
    .handleEventsWithWorkerPool(photosApiHandler1, photosApiHandler2)
    .thenHandleEventsWithWorkerPool(postsAndCommentsResponseHandler1, postsAndCommentsResponseHandler2)
    .handleEventsWithWorkerPool(albumsAndPhotosResponseHandler1, albumsAndPhotosResponseHandler2);
    DISRUPTOR.start();
}

// 對於每一個請求,在 RingBuffer 中發佈一個事件:
Event event = null;
RingBuffer<Event> ringBuffer = DISRUPTOR.getRingBuffer();
long sequence = ringBuffer.next();
CountDownLatch countDownLatch = new CountDownLatch(6);
try {
    event = ringBuffer.get(sequence);
    event.countDownLatch = countDownLatch;
    event.startTime = System.currentTimeMillis();
} finally {
    ringBuffer.publish(sequence);
}
try {
    event.countDownLatch.await();
} catch (InterruptedException e) {
    e.printStackTrace();
}

10. Akka

圖片來自: https://blog.codecentric.de/e...

  • Akka 庫的主要優點在於它擁有構建分佈式系統的本地支持。
  • 它運行在一個叫作 Actor System 的系統上。這個系統抽象了線程的概念,Actor System 中的 Actor 經過異步消息進行通訊,這相似於生產者和消費者之間的通訊。
  • 這種額外的抽象級別有助於 Actor System 提供諸如容錯、位置透明等特性。
  • 使用正確的 Actor-to-Thread 策略,能夠對該框架進行優化,使其性能優於上表所示的結果。 雖然它不能在單個節點上與傳統方法的性能匹敵,可是因爲其構建分佈式和彈性系統的能力,仍然是首選。

10.1 示例代碼

// 來自 controller :
Actors.masterActor.tell(new Master.Request("Get Response", event, Actors.workerActor), ActorRef.noSender());

// handler :
public Receive createReceive() {
    return receiveBuilder().match(Request.class, request -> {
    Event event = request.event; // Ideally, immutable data structures should be used here.
    request.worker.tell(new JsonServiceWorker.Request("posts", event), getSelf());
    request.worker.tell(new JsonServiceWorker.Request("comments", event), getSelf());
    request.worker.tell(new JsonServiceWorker.Request("albums", event), getSelf());
    request.worker.tell(new JsonServiceWorker.Request("photos", event), getSelf());
    }).match(Event.class, e -> {
    if (e.posts != null && e.comments != null & e.albums != null & e.photos != null) {
    int userId = new Random().nextInt(10) + 1;
    String postsAndCommentsOfRandomUser = ResponseUtil.getPostsAndCommentsOfRandomUser(userId, e.posts,
    e.comments);
    String albumsAndPhotosOfRandomUser = ResponseUtil.getAlbumsAndPhotosOfRandomUser(userId, e.albums,
    e.photos);
    String response = postsAndCommentsOfRandomUser + albumsAndPhotosOfRandomUser;
    e.response = response;
    e.countDownLatch.countDown();
    }
    }).build();
}

11. 總結

  • 根據機器的負載決定 Executor 框架的配置,並檢查是否能夠根據應用程序中並行任務的數量進行負載平衡。
  • 對於大多數傳統應用程序來講,使用響應式開發庫或任何異步庫都會下降性能。只有當用例相似於服務器端聊天應用程序時,這個模式纔有用,其中線程在客戶機響應以前不須要保留鏈接。
  • Disruptor 框架在與事件驅動的架構模式一塊兒使用時性能很好; 可是當 Disruptor 模式與傳統架構混合使用時,就咱們在這裏討論的用例而言,它並不符合標準。 這裏須要注意的是,Akka 和 Disruptor 庫值得單獨寫一篇文章,介紹如何使用它們來實現事件驅動的架構模式。
  • 這篇文章的源代碼能夠在 GitHub 上找到。

推薦去個人博客閱讀更多:

1.Java JVM、集合、多線程、新特性系列教程

2.Spring MVC、Spring Boot、Spring Cloud 系列教程

3.Maven、Git、Eclipse、Intellij IDEA 系列工具教程

4.Java、後端、架構、阿里巴巴等大廠最新面試題

以爲不錯,別忘了點贊+轉發哦!

相關文章
相關標籤/搜索