在併發編程中,咱們一般會用到一組非阻塞的模型:Promise,Future 和 Callback。其中的 Future 表示一個可能尚未實際完成的異步任務的結果,針對這個結果能夠添加 Callback 以便在任務執行成功或失敗後作出對應的操做,而 Promise 交由任務執行者,任務執行者經過 Promise 能夠標記任務完成或者失敗。 能夠說這一套模型是不少異步非阻塞架構的基礎。html
這一套經典的模型在 Scala、C# 中獲得了原生的支持,但 JDK 中暫時還只有無 Callback 的 Future 出現,固然也並不是在 JAVA 界就沒有發展了,好比 Guava 就提供了ListenableFuture 接口,而 Netty 4+ 更是提供了完整的 Promise、Future 和 Listener 機制,在 Netty 的官方文檔 Using as a generic library 中也介紹了將 Netty 做爲一個 lib 包依賴,而且使用 Listenable futures 的示例。在實際的項目使用中,發現 Netty 的 EventLoop 機制不必定適用其餘場景,所以想去除對 EventLoop 的依賴,實現一個簡化版本。java
參考 Scala 和 Netty 的代碼從新定義了接口和實現,先介紹下和 Netty 版本的區別:git
去除了對 EventLoop 的依賴,Callback 的執行策略不一樣:任務未完成時添加的 Callback,會在結束任務的線程執行;任務完成後添加的 Callback 會在添加 Callback 線程當即執行github
一個 Callback 執行後會當即被清理編程
Callback 能夠根據任務結果添加,支持添加如下三種 Callback: onComplete, onSuccess, onFailure, 不須要和 Netty 的 FutureListener 同樣大部分場景下都須要檢查 future.isSuccess 等api
支持 Callback 的組合,Callback 包含一些函數式的方法,好比 compose 和 andThen 能夠用來組合promise
使用 CountdownLatch 替換掉了 Netty 的 wait/notify 實現架構
去掉 Netty Future 一些不常使用的方法,同時補充一些模型間關聯的方法,好比 Promise.getFuture併發
而後再介紹幾個使用這個 commons-future 的示例:oracle
異步執行任務,得到 Future 後添加 Callback
01 |
final TaskPromise promise = new DefaultTaskPromise(); |
02 |
final TaskFuture future = promise.getFuture(); |
03 |
final CountDownLatch latch = new CountDownLatch( 1 ); |
04 |
future.onComplete( new TaskCallback() { // 添加結束 Callback |
06 |
public TaskFuture apply(TaskFuture f) { |
11 |
new Thread( new Runnable() { |
14 |
promise.setSuccess( null ); |
異步執行任務,得到 Future 後添加成功結束的 Callback
01 |
final TaskPromise promise = new DefaultTaskPromise(); |
02 |
final TaskFuture future = promise.getFuture(); |
03 |
final CountDownLatch latch = new CountDownLatch( 1 ); |
04 |
future.onSuccess( new TaskCallback() { // 添加成功結束 Callback |
06 |
public TaskFuture apply(TaskFuture f) { |
11 |
new Thread( new Runnable() { |
14 |
promise.setSuccess( null ); |
異步執行任務,得到 Future 後,添加失敗結束的組合 Callback
01 |
final TaskPromise promise = new DefaultTaskPromise(); |
02 |
final TaskFuture future = promise.getFuture(); |
03 |
final CountDownLatch latch = new CountDownLatch( 2 ); |
04 |
future.onFailure( new TaskCallback() { |
06 |
public TaskFuture apply(TaskFuture f) { |
10 |
}.andThen( new TaskCallback() { |
12 |
public TaskFuture apply(TaskFuture f2) { |
17 |
new Thread( new Runnable() { |
20 |
promise.setFailure( new IllegalStateException( "cm" )); |
異步執行任務,得到 Future 後阻塞等待任務完成
01 |
final TaskPromise promise = new DefaultTaskPromise(); |
02 |
final TaskFuture future = promise.getFuture(); |
03 |
new Thread( new Runnable() { |
07 |
TimeUnit.SECONDS.sleep( 2 ); |
08 |
} catch (InterruptedException e) { |
10 |
promise.setFailure( new IllegalStateException( "cm" )); |
代碼倉庫: https://bitbucket.org/qiyi/commons-future
源文連接: http://isouth.org/archives/354.html
參考: