摘要: 原創出處 www.iocoder.cn/Hystrix/com… 「芋道源碼」歡迎轉載,保留摘要,謝謝!html
本文主要基於 Hystrix 1.5.X 版本 java
🙂🙂🙂關注微信公衆號:【芋道源碼】有福利: git
- RocketMQ / MyCAT / Sharding-JDBC 全部源碼分析文章列表
- RocketMQ / MyCAT / Sharding-JDBC 中文註釋源碼 GitHub 地址
- 您對於源碼的疑問每條留言都將獲得認真回覆。甚至不知道如何讀源碼也能夠請教噢。
- 新的源碼解析文章實時收到通知。每週更新一篇左右。
- 認真的源碼交流微信羣。
本文主要分享 Hystrix 執行命令方法。github
建議 :對 RxJava 已經有必定的瞭解的基礎上閱讀本文。微信
在官方提供的示例中,咱們看到 CommandHelloWorld 經過繼承 HystrixCommand 抽象類,有四種調用方式:架構
方法 | ||
---|---|---|
#execute() |
同步調用,返回直接結果 | |
#queue() |
異步調用,返回 java.util.concurrent.Future |
|
#observe() |
異步調用,返回 rx.Observable 。向 Observable 註冊 rx.Subscriber 處理結果 |
|
#toObservable() |
未調用,返回 rx.Observable 。向 Observable 註冊 rx.Subscriber 處理結果 |
#testToObservable()
查看筆者補充的示例。推薦 Spring Cloud 書籍:異步
// AbstractCommand.java
abstract class AbstractCommand<R> implements HystrixInvokableInfo<R>, HystrixObservable<R> {
// ... 省略無關屬性與方法
public Observable<R> toObservable() {
return Observable.defer(new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
// ....
}
}
}
public Observable<R> observe() {
// us a ReplaySubject to buffer the eagerly subscribed-to Observable
ReplaySubject<R> subject = ReplaySubject.create();
// eagerly kick off subscription
final Subscription sourceSubscription = toObservable().subscribe(subject);
// return the subject that can be subscribed to later while the execution has already started
return subject.doOnUnsubscribe(new Action0() {
@Override
public void call() {
sourceSubscription.unsubscribe();
}
});
}
}
// HystrixCommand.java
public abstract class HystrixCommand<R> extends AbstractCommand<R> implements HystrixExecutable<R>, HystrixInvokableInfo<R>, HystrixObservable<R> {
// ... 省略無關屬性與方法
public Future<R> queue() {
final Future<R> delegate = toObservable().toBlocking().toFuture();
final Future<R> f = new Future<R>() {
// ... 包裝 delegate
}
// ...
return f;
}
public R execute() {
try {
return queue().get();
} catch (Exception e) {
throw Exceptions.sneakyThrow(decomposeException(e));
}
}
protected abstract R run() throws Exception;
}複製代碼
#toObservable()
方法 :未作訂閱,返回乾淨的 Observable 。這就是爲何上文說「未調用」 。#observe()
方法 :調用 #toObservable()
方法的基礎上,向 Observable 註冊 rx.subjects.ReplaySubject
發起訂閱 。
#queue()
方法 :調用 #toObservable()
方法的基礎上,調用:
Observable#toBlocking()
方法 :將 Observable 轉換成阻塞的 rx.observables.BlockingObservable
。BlockingObservable#toFuture()
方法 :返回可得到 #run()
抽象方法執行結果的 Future 。
#run()
方法 :子類實現該方法,執行正常的業務邏輯。
#execute()
方法 :調用 #queue()
方法的基礎上,調用 Future#get()
方法,同步返回 #run()
的執行結果。整理四種調用方式以下:ide
FROM 《【翻譯】Hystrix文檔-實現原理》
微服務
本小節爲拓展內容,源碼解析 RxJava ( 非 Hystrix ) 的 rx.observables.BlockingObservable
的實現,因此你能夠選擇:源碼分析
《RxJava 源碼解析 —— BlockingObservable》
第一篇 Hystrix 正式的源碼解析。
梳理 Hystrix 的源碼仍是蠻痛苦的,主要是由於對 RxJava 不夠熟悉。
胖友,分享一波朋友圈可好!