熔斷器 Hystrix 源碼解析 —— 執行命令方式

摘要: 原創出處 www.iocoder.cn/Hystrix/com… 「芋道源碼」歡迎轉載,保留摘要,謝謝!html

本文主要基於 Hystrix 1.5.X 版本 java


🙂🙂🙂關注微信公衆號:【芋道源碼】有福利: git

  1. RocketMQ / MyCAT / Sharding-JDBC 全部源碼分析文章列表
  2. RocketMQ / MyCAT / Sharding-JDBC 中文註釋源碼 GitHub 地址
  3. 您對於源碼的疑問每條留言將獲得認真回覆。甚至不知道如何讀源碼也能夠請教噢
  4. 新的源碼解析文章實時收到通知。每週更新一篇左右
  5. 認真的源碼交流微信羣。

1. 概述

本文主要分享 Hystrix 執行命令方法github

建議 :對 RxJava 已經有必定的瞭解的基礎上閱讀本文。微信

在官方提供的示例中,咱們看到 CommandHelloWorld 經過繼承 HystrixCommand 抽象類,有四種調用方式:架構

方法
#execute() 同步調用,返回直接結果
#queue() 異步調用,返回 java.util.concurrent.Future
#observe() 異步調用,返回 rx.Observable 。向 Observable 註冊 rx.Subscriber 處理結果
#toObservable() 未調用,返回 rx.Observable 。向 Observable 註冊 rx.Subscriber 處理結果


推薦 Spring Cloud 書籍異步

2. 實現

// AbstractCommand.java
abstract class AbstractCommand<R> implements HystrixInvokableInfo<R>, HystrixObservable<R> {

    // ... 省略無關屬性與方法

    public Observable<R> toObservable() {

        return Observable.defer(new Func0<Observable<R>>() {
            @Override
            public Observable<R> call() {
                // ....
            }
        }

    }

    public Observable<R> observe() {
        // us a ReplaySubject to buffer the eagerly subscribed-to Observable
        ReplaySubject<R> subject = ReplaySubject.create();
        // eagerly kick off subscription
        final Subscription sourceSubscription = toObservable().subscribe(subject);
        // return the subject that can be subscribed to later while the execution has already started
        return subject.doOnUnsubscribe(new Action0() {
            @Override
            public void call() {
                sourceSubscription.unsubscribe();
            }
        });
    }

}

// HystrixCommand.java
public abstract class HystrixCommand<R> extends AbstractCommand<R> implements HystrixExecutable<R>, HystrixInvokableInfo<R>, HystrixObservable<R> {

    // ... 省略無關屬性與方法

    public Future<R> queue() {
        final Future<R> delegate = toObservable().toBlocking().toFuture();
        final Future<R> f = new Future<R>() {
            // ... 包裝 delegate
        }
        // ...
        return f;
    }

    public R execute() {
        try {
            return queue().get();
        } catch (Exception e) {
            throw Exceptions.sneakyThrow(decomposeException(e));
        }
    }

    protected abstract R run() throws Exception;

}複製代碼
  • #toObservable() 方法 :作訂閱,返回乾淨的 Observable 。這就是爲何上文說「未調用」
  • #observe() 方法 :調用 #toObservable() 方法的基礎上,向 Observable 註冊 rx.subjects.ReplaySubject 發起訂閱
  • #queue() 方法 :調用 #toObservable() 方法的基礎上,調用:
    • Observable#toBlocking() 方法 :將 Observable 轉換成阻塞rx.observables.BlockingObservable
    • BlockingObservable#toFuture() 方法 :返回可得到 #run() 抽象方法執行結果的 Future 。
      • #run() 方法 :子類實現該方法,執行正常的業務邏輯
  • #execute() 方法 :調用 #queue() 方法的基礎上,調用 Future#get() 方法,同步返回 #run() 的執行結果。
  • 整理四種調用方式以下:ide

    FROM 《【翻譯】Hystrix文檔-實現原理》
    微服務

3. BlockingObservable

本小節爲拓展內容,源碼解析 RxJava ( 非 Hystrix ) 的 rx.observables.BlockingObservable 的實現,因此你能夠選擇:源碼分析

《RxJava 源碼解析 —— BlockingObservable》

666. 彩蛋

第一篇 Hystrix 正式的源碼解析。

梳理 Hystrix 的源碼仍是蠻痛苦的,主要是由於對 RxJava 不夠熟悉。

胖友,分享一波朋友圈可好!

相關文章
相關標籤/搜索