Promise-在Java中以同步的方式異步編程

java Promise

java promise(GitHub)是Promise A+規範的java實現版本。Promise A+是commonJs規範提出的一種異步編程解決方案,比傳統的解決方案—回調函數和事件—更合理和更強大。promise實現了Promise A+規範,包裝了java中對多線程的操做,提供統一的接口,使得控制異步操做更加容易。實現過程當中參考文檔以下:java

基本使用:git

<repositories>
    <repository>
      <id>wjj-maven-repo</id>
      <url>https://raw.github.com/zhanyingf15/maven-repo/master</url>
    </repository>
</repositories>
複製代碼
<dependency>
  <groupId>com.wjj</groupId>
  <artifactId>promise</artifactId>
  <version>1.0.0</version>
</dependency>
複製代碼
IPromise promise = new Promise.Builder().promiseHanler(new PromiseHandler() {
    @Override
    public Object run(PromiseExecutor executor) throws Exception {
        return 2*3;
    }
}).build();

複製代碼

上面的例子中建立了一個promise對象,指定PromiseHandler實現,在run方法中寫具體的業務邏輯,相似於Runable的run方法。promise對象一經建立,將當即異步執行。推薦使用lambda表達式,更加簡潔。es6

IPromise promise = new Promise.Builder().promiseHanler(executor -> {
    return 2*3;
}).build();
複製代碼

獲取promise的執行結果一般使用兩個方法thenlisten,前者是阻塞的後者是非阻塞的。then方法返回一個新的promise對象,所以支持鏈式調用。github

new Promise.Builder().promiseHanler(executor -> {//promise0
    return 2*3;
}).build().then(resolvedData -> {//返回一個新的promise1
    System.out.println(resolvedData);
    return (Integer)resolvedData+1;
}).then(res2->{
    System.out.println(res2);
    //建立一個新的promise2並返回
    return new Promise.Builder().externalInput(res2).promiseHanler(executor -> {
        return (Integer)executor.getExternalInput()+2;
    }).build();
}).then(res3->{
    System.out.println(res3);
    return res3;
});
複製代碼

從上面能夠看到promise0、promise1和Promise2是鏈式調用的,每一次then方法都返回一個新的promise。在then方法的回調中,若是返回的是一個非promise對象,那麼promise被認爲是一個fulfilled狀態的promise,若是返回的是一個promsie實例,那麼該實例將會異步執行。
假如須要異步順序執行a->b-c->d四個線程,調用順序以下編程

new PromiseA()
.then(dataA->new PromiseB())//A的回調
.then(dataB->new PromiseC())//B的回調
.then(dataC->new PromiseD())//C的回調
.then(dataD->xxx)//D的回調
.pCatch(error->xxxx)//捕獲中間可能產生的異常
複製代碼

具體使用 方式參考promise-java異步編程解決方案數組

Docs

promise規範

promise規範能夠參考 Promise A+規範。其中ES6 Promise對象 在Promise A+規範上作了一些補充。java promise在使用上基本與ES6 Promise對象保持一致,部分地方有些許不一樣,後面會作出說明。 Promise的三個狀態promise

  • pending:等待態,對應線程未執行或執行中
  • fulfilled:完成態,對應線程正常執行完畢,其執行結果稱爲終值
  • rejected:拒絕態,對應線程異常結束,其異常緣由稱爲拒因
    狀態轉移只能由pending->fulfilled或pending->rejected,狀態一旦發生轉移沒法再次改變。

Promise

Promise是IPromise的實現,Promise實例一經建立,將當即異步執行,部分接口以下bash

IPromise then(OnFulfilledExecutor onFulfilledExecutor)
  • 若是當前promise處於pending狀態,阻塞當前線程,等待promise狀態轉變爲fulfilled或rejected
  • 若是處於fulfilled狀態,執行onFulfilledExecutor.onFulfilled(resolvedData)回調。
    • 若是回調返回一個Promise對象a,以a做爲then方法的返回值,若是回調返回一個普通對象obj,以obj做爲終值、狀態爲fulfilled包裝一個新Promise做爲then方法的返回值
    • 若是執行回調過程當中產生異常e,返回一個以e做爲拒因、狀態爲rejected的新Promise,並拒絕執行接下來的全部Promise直到遇到pCatch。
  • 若是處於rejected狀態,執行onRejectedExecutor.onRejected(rejectReason)回調,返回一個以當前promise的異常做爲拒因、狀態爲rejected的新Promise,並拒絕執行接下來的全部Promise直到遇到pCatch或pFinally
    參數:

IPromise pCatch(OnCatchedExecutor onCatchedExecutor);

then(null,onRejectedExecutor)的別名,但返回不一樣於then,出現異常時能夠選擇不拒絕接下來Promise的執行,可用於異常修正,相似於try{}catch{}
該方法會嘗試捕獲當前promise的異常,最終返回一個新Promise,當被捕獲Promise處於不一樣的狀態時有不一樣的行爲多線程

  • pending:阻塞當前線程,等待pending轉變爲fulfilled或rejected,行爲同then
  • fulfilled:不執行回調,以當前Promise終值和狀態返回一個全新的Promise
  • rejected:執行onCatched(Throwable catchReason)回調。
    • 若是onCatched方法返回一個Promise,以這個Promise做爲最終返回。
    • 若是onCatched方法返回一個非Promise對象obj,以obj做爲終值、fulfilled狀態返回一個全新的對象。
    • 若是執行回調過程當中產生異常e,以e爲拒因、狀態爲rejected返回一個新的Promise,並拒絕執行接下來的全部Promise直到再次遇到pCatch
void listen(OnCompleteListener onCompleteListener);

指定一個監聽器,在promise狀態轉爲fulfilled或rejected調用,該方法不會阻塞線程執行,能夠屢次調用指定多個監聽器異步

void pFinally(OnCompleteListener onCompleteListener);

listen的別名,行爲同listen

Status getStatus()

獲取promise的當前狀態

Object getResolvedData()

獲取promise fulfilled狀態下的終值,其他狀態下時爲null

Throwable getRejectedData()

獲取promise rejected狀態下的拒因,其他狀態下爲null

Future getFuture()

獲取promise對應異步任務的future

boolean cancel()

嘗試取消promise對應的異步任務,底層調用future.cancel(true)。fulfilled或rejected狀態下無效。

Promise.Builder

Promise對象生成器

Builder pool(ExecutorService threadPool)

指定一個線程池用於執行promise任務,若是不指定,每個promise都將啓動一個線程

Builder promiseHanler(PromiseHandler promiseExecutor)

指定promise執行器,在promiseHanler的run方法中實現線程的具體業務邏輯,注意==promise對象一經建立,將當即執行其中的邏輯==

Builder externalInput(Object externalInput)

向Promise注入一個外部參數,能夠在指定PromiseHandler時經過PromiseExecutor.getExternalInput()獲取

int i = 3;
IPromise p = new Promise.Builder()
.externalInput(i).promiseHanler(new PromiseHandler() {
    public Object run(PromiseExecutor executor) {
        Integer args = (Integer) executor.getExternalInput();
        return args*2;
    }
}).build();
複製代碼
Builder promise(IPromise promise)

指定一個promise x,使當前promise接受 x 的狀態

  • 若是 x 處於pending, 當前promise 需保持爲pending直至 x 轉爲fulfilled或rejected
  • 若是 x 處於fulfilled,用x的終值值執行當前promise,能夠在指定PromiseHandler時經過PromiseExecutor.getPromiseInput()獲取
  • 若是 x 處於拒絕態,用相同的據因拒絕當前promise執行
ExecutorService fixedPool = Promise.pool(1);
IPromise promise1 = new Promise.Builder().pool(fixedPool).promiseHanler(executor->3).build();
IPromise promise2 = new Promise.Builder().pool(fixedPool)
    .promise(promise1)
    .promiseHanler(executor->4+(Integer) executor.getPromiseInput())
.build()
.then(resolvedData->{
    System.out.println(resolvedData);
    return resolvedData;
}, rejectedReason-> rejectedReason.printStackTrace());
複製代碼

最終結果返回7,。若是promise1在執行過程當中拋出異常e,promise2將被拒絕執行,將會以e做爲拒因,狀態爲rejected返回一個新的Promise,最終會執行rejectedReason-> rejectedReason.printStackTrace()回調。

IPromise build()

建立一個Promise實例

Promise的靜態方法

static IPromise all(IPromise ...promises)

將多個 Promise 實例p1,...pn,包裝成一個新的 Promise 實例 p,只有當p1-pn的狀態都轉爲fulfilled時,p的狀態才爲fulfilled,此時p1-pn的返回值包裝爲一個數組Object[r1,...rn]做爲p的終值。
只要p1-pn中任意一個被rejected,p的狀態就轉爲rejected,將第一個被rejected的promise的拒因做爲p的拒因,並嘗試取消其他promise的執行(內部調用future.cancel(true))

static IPromise race(IPromise ...promises)

將多個 Promise p1,...pn實例,包裝成一個新的 Promise 實例 p,只要p1-pn有一個狀態發生改變,p的狀態當即改變。並嘗試取消其他promise的執行(內部調用future.cancel(true))
第一個改變的promise的狀態和數據做爲p的狀態和數據

static IPromise resolve()

建立一個終值爲null、fulfilled狀態的promise

static IPromise resolve(Object object)

建立一個終值爲object、fulfilled狀態的promise

static IPromise resolve(Object object,List args)

將object的then方法以異步方式執行,then方法的執行結果做爲Promise的終值

static IPromise resolve(Object object,String methodName,List args)

將object的指定方法以異步方式執行,該方法的執行結果做爲Promise的終值,目標方法的參數必須按順序包含在List中,如object.doSomething(int a,Map b),用resolve執行爲

List args = new ArrayList()
args.add(1);
args.add(map)
Promise.resolve(object,"doSomething",args);
複製代碼
static IPromise reject(Object reason)

建立一個拒由於reason、rejected狀態的promise

static IPromise pTry(Object object,String methodName,List args)

將object的指定方法以同步方式執行,該方法的執行結果做爲Promise的終值,若是object爲IPromise實例,將忽略methodName和args參數,異步執行該實例。
該方法是以Promise統一處理同步和異步方法,無論object是同步操做仍是異步操做,均可以使用then指定下一步流程,用pCatch方法捕獲異常,避免開發中出現如下狀況

try{
  object.doSomething(args1,args2);//可能會拋出異常
  promise.then(resolvedData->{
      //一些邏輯
  }).then(resolvedData->{
      //一些邏輯
  }).pCatch(e->{
      //異常處理邏輯
  })
}catch(Exception e){
  //異常處理邏輯
}
複製代碼

使用pTry,能夠簡化異常處理

List args = new ArrayList(){args1,args2};
Promise.pTry(object,"doSomething",args)
.then(resolvedData->{
      //一些邏輯
}).then(resolvedData->{
  //一些邏輯
}).pCatch(e->{
  //異常處理邏輯
})
複製代碼

PromiseHandler

定義異步邏輯的接口

Object run(PromiseExecutor executor)throws Exception;

run方法中實現具體的業務邏輯,最終run方式是在線程的call方法執行,若是run方法中含有wait、sleep...等鎖操做,可能須要自行處理InterruptedException。由於該線程可能被外部調用cancel()或interrupt()方法

PromiseExecutor

promise狀態處理

void resolve(final Object args)

將Promise對象的狀態從「未完成」變爲「成功」(即從pending變爲fulfilled)。注意該方法一經調用,promise狀態將不可改變,以下例,在調用executor.resolve(3);後,return以前拋出一個異常,promise的狀態依舊是fulfilled,終值爲3。

new Promise.Builder().promiseHanler(new PromiseHandler(){
    @Override
    public Object run(PromiseExecutor executor) {
        executor.resolve(3);
        throw new RuntimeException("error");
        return null;
    }
}).build()
複製代碼

在run方法中executor.resolve(3)等同於return 3

@Override
public Object run(PromiseExecutor executor) {
    return 3;
}
複製代碼

大多數狀況下建議直接使用return返回promise的終值。

void reject(final Throwable args)

將Promise對象的狀態從「未完成」變爲「失敗」(即從pending變爲fulfilled)

Object getExternalInput()

獲取經過new Promise.Builder().externalInput(Object externalInput)方法注入的參數,具體參考Promise.Builder#externalInput(Object externalInput)

Object getPromiseInput()

獲內部promise的執行結果。經過new Promise.Builder().promise(promise1)指定的promise1的執行結果。具體參考 Promise.Builder#promise(IPromise promise)

OnFulfilledExecutor

fulfilled回調接口

Object onFulfilled(Object resolvedData)throws Exception;

狀態轉爲fulfilled時的回調,返回值能夠是IPromise實例或普通對象。若是object是IPromise實例,object做爲then方法的返回值,若是object是個普通對象,以object做爲終值、狀態爲fulfilled包裝一個新Promise做爲then方法的返回值

OnRejectedExecutor

rejected回調接口

void onRejected(Throwable rejectReason)throws Exception;

當Promise轉變爲rejected狀態時的回調

OnCatchedExecutor

rejected回調接口

Object onCatched(Throwable catchReason)throws Exception;

當發生異常時的回調,最終返回一個Promise或普通對象,若是是一個普通對象,這個對象將做爲下一個Promise的終值

OnCompleteListener

void listen(Object resolvedData,Throwable e);

當Promise執行結束時的回調(不管是fulfilled仍是rejected)

  • resolvedData fulfilled狀態時的終值,rejected狀態時爲null
  • e rejected狀態時的異常信息,fulfilled狀態時爲null

示例

示例1:基本使用
new Promise.Builder().promiseHanler(new PromiseHandler(){
    @Override
    public Object run(PromiseExecutor executor) {
        executor.resolve(3);//返回異步執行結果3
        return null;
    }
}).build().then(new OnFulfilledExecutor() {
    @Override
    public Object onFulfilled(Object resolvedData) {
        Integer i = ((Integer)resolvedData)+1;//獲取上一個promsie執行結果3,執行+1
        System.out.println(i);//輸出執行結果4
        //建立一個新的promise,將4做爲該promise的輸入
        IPromise p = new Promise.Builder().externalInput(i).promiseHanler(new PromiseHandler() {
            @Override
            public Object run(PromiseExecutor executor) {
                //獲取外部輸入4
                Integer args = (Integer) executor.getExternalInput();
                executor.resolve(args*2);//執行 4x2
                return null;
            }
        }).build();
        return p;//返回該promise p
    }
})
.then(new OnFulfilledExecutor() {//執行p的回調
    @Override
    public Object onFulfilled(Object args) {
        System.out.println(args);//輸出p的執行結果
        return args;
    }
}, new OnRejectedExecutor() {//捕獲可能出現的異常
    @Override
    public void onRejected(Throwable rejectedReason) throws Exception {
        rejectedReason.printStackTrace();
    }
});
複製代碼

結果

4
8
複製代碼
示例2
ExecutorService fixedPool = Promise.pool(1);//建立一個線程池
//建立promise1
IPromise promise1 = new Promise.Builder().pool(fixedPool).promiseHanler(executor->3).build();
//建立promise2
IPromise promise2 = new Promise.Builder().pool(fixedPool)
    .promise(promise1)//讓promise2接受promise1的狀態,優先執行promise1
    .promiseHanler(executor->{
        //獲取promise1的執行結果,執行promise2的邏輯
        return 4+(Integer) executor.getPromiseInput();
    })
    .build()
    .then(resolvedData->{
        System.out.println(resolvedData);//打印promise2的執行結果 
        return resolvedData;
    }, rejectedReason-> rejectedReason.printStackTrace());
System.out.println("end");
fixedPool.shutdown();
複製代碼

結果

7
end
複製代碼
示例3:錯誤處理
new Promise.Builder().promiseHanler(executor -> 3).build().then(resolvedData->{
    System.out.println("a:"+resolvedData);
    return new Promise.Builder().promiseHanler(executor -> {
        executor.reject(new RuntimeException("err"));//拋出異常
        return null;
    }).build();
}).then(resolvedData1 -> {//fulfilled回調
    System.out.println("b:"+resolvedData1);
    return resolvedData1;
},rejectReason -> {//rejected回調
    System.err.println("c:"+rejectReason);
});
複製代碼

結果

a:3
c:java.lang.RuntimeException: err
複製代碼
示例4:pCatch
new Promise.Builder().promiseHanler(executor -> 0).build()
  .then(res0->{
    System.out.println("a:"+res0);//輸出 a:0
    Thread.sleep(100);
    return 1;//返回1
}).then(res1 -> {
    throw new RuntimeException("throw error");//拋出異常
}).then(res2->{
    Thread.sleep(100);
    System.out.println("b:"+res2);
    return 2;
}).pCatch(e->{
    Thread.sleep(100);
    System.out.println("c:");//輸出c:
    e.printStackTrace();
    return 3;
}).then(res3->{
    Thread.sleep(100);
    System.out.println("d:"+res3);//輸出d:3
    return 4;
});
複製代碼

結果

a:0
c:
runtimeException:throw error
d:3
複製代碼

從上面結果能夠看出,在res1出拋出異常後,拒絕了res2處的執行,被pCatch捕獲,pCatch返回3,被包裝成終值爲三、fulfilled狀態的promise,在res3打印d:3。

示例5:Promise.all(IPromise ...promises)
IPromise p1 = new Promise.Builder().promiseHanler(executor -> {
    Thread.sleep(1000);
    return 1;
}).build();
IPromise p2 = new Promise.Builder().promiseHanler(executor -> {
    Thread.sleep(4000);
    return 2;
}).build();
IPromise p3 = new Promise.Builder().promiseHanler(executor -> {
    Thread.sleep(2000);
    return 3;
}).build();
long s = System.currentTimeMillis();
Promise.all(p1,p2,p3).then(resolvedData -> {
    Object[] datas = (Object[])resolvedData;
    for(Object d:datas){
        System.out.println(d);
    }
    return null;
},e->e.printStackTrace());
System.out.println("耗時:"+(System.currentTimeMillis()-s));
複製代碼

結果

1
2
3
耗時:4033
複製代碼
示例6:線程取消
Map<String,Boolean> p1Flag = new HashMap<>();
p1Flag.put("flag",true);
IPromise p1 = new Promise.Builder().externalInput(p1Flag).promiseHanler(executor -> {
    while (((Map<String,Boolean>)executor.getExternalInput()).get("flag")){
        //do something
        System.out.println("p1 正在執行任務");
    }
    System.out.println("p1任務完成,正常結束");
    return 1;
}).build();
IPromise p2 = new Promise.Builder().promiseHanler(executor -> {
    while (!Thread.currentThread().isInterrupted()){
        System.out.println("執行p2正常邏輯");
    }
    System.err.println("p2線程被取消");
    return 2;
}).build();
IPromise p3 = new Promise.Builder().promiseHanler(executor -> {
    Thread.sleep(10);
    throw new RuntimeException("p3拋出異常");
}).build();
IPromise p4 = new Promise.Builder().finalPromise("4",true).build();
long s = System.currentTimeMillis();
Promise.all(p1,p2,p3,p4).then(resolvedData -> {
    Object[] datas = (Object[])resolvedData;
    for(Object d:datas){
        System.out.println(d);
    }
    return null;
},e->e.printStackTrace());
System.out.println("耗時:"+(System.currentTimeMillis()-s));
p1Flag.put("flag",false);
複製代碼

可能的結果以下

p1 正在執行任務
p1 正在執行任務
執行p2正常邏輯
執行p2正常邏輯
p1 正在執行任務 
runtimeException:p3拋出異常
p2線程被取消
p1 正在執行任務
p1 正在執行任務
p1 正在執行任務 
p1任務完成,正常結束
複製代碼

從上面結果能夠看出,開始p1和p2都在正常執行,當p3拋出異常後,Promise.all方法當即返回p3的異常並打印,同時取消p1和p2的執行,因爲p2判斷了線程狀態Thread.currentThread().isInterrupted(),因此p2執行了正常的退出邏輯。p1仍然在執行,並無被取消掉,最後打印p1任務完成,正常結束是由於程序末尾執行了p1Flag.put("flag",false);,不然p1會永遠循環打印。

示例7:同步方法異步執行
public class ThenTest {
    public Integer then(int a,int b){
        //打印當前執行現場名稱
        System.out.println(Thread.currentThread().getName());
        return a+b;
    }
    public static void main(String[] args){
        //打印主線程名稱
        System.out.println(Thread.currentThread().getName());
        List arg = new ArrayList<>();
        arg.add(1);
        arg.add(2);
        //將ThenTest實例then方法異步執行
        Promise.resolve(new ThenTest(),arg).then(resolvedData -> {
            System.out.println(resolvedData);
            return resolvedData;
        }).pCatch(e->{
            e.printStackTrace();
            return 1;
        });
    }
}
複製代碼

結果

main
promise-thread-0
3
複製代碼
相關文章
相關標籤/搜索