Hystrix核心基礎 - 滑動窗口建立過程及demo

前言

RxJava可能有些小夥伴沒有聽過是什麼東西,多是由於你們平時在作業務需求的時候對異步編程瞭解得比較少,而RxJava就是這麼一個響應式編程框架,RxJava在安卓上面用得很是多,作安卓的朋友確定對它很熟悉。那我這裏爲何要講這個呢?由於spring cloud中服務治理框架Hystrix中大量用到了RxJava的響應式編程,爲了便於理解,這裏也簡單給你們介紹一下。這裏介紹的版本是RxJava 1.X版本的, 而在去年的早些時候,官方便宣佈,將在一段時間後再也不對 RxJava 1.x 進行維護,推出了RxJava2.X版本,既然有新的,爲何不介紹新的呢?由於目前最新的Hystrix版本1.5.12中使用的RxJava是1.2版本的,而2.X版本的api改動仍是比較大的,因此爲了你們能更加簡單的理解Hystrix,因此這裏是對1.X版本的介紹。java

響應式編程是什麼

響應式編程是一種基於異步數據流概念的編程模式,有點相似於JAVA裏面的lambda表達式,相信你們都很熟悉lambda吧。數據流,stream,你們確定不陌生,咱們能夠對stream有不少操做,filter、map、reduce 等常見操做。而後響應式中的核心就是響應二字,響應什麼呢?響應的是事件,event 。 而流就是一個按照時間進行排序的事件序列。RxJava裏面的事件是基於觀察者模式,事件流將從上往下,從訂閱源傳遞到觀察者。spring

RxJava中重要概念

RxJava 有四個基本概念:Observable (可觀察者,即被觀察者)、 Observer (觀察者)、 Subscriber (訂閱,是Observer的抽象實現類,本質上使用是同樣的)、事件。ObservableObserver 經過 subscribe() 方法實現訂閱關係,從而 Observable 能夠在須要的時候發出事件來通知 ObserverObservable 就像是一個生產者,在不斷的生產消息,而SubscriberObserver 就像是一個消費者,在不斷的消費消息編程

另外, RxJava 的事件回調方法還定義了兩個特殊的事件,在Hystrix中用得也很是多:onCompleted()onError()windows

  • onCompleted(): 事件隊列完結。RxJava 不只把每一個事件單獨處理,還會把它們看作一個隊列。RxJava 規定,當不會再有新的 onNext() 發出時,須要觸發 onCompleted() 方法做爲標誌。
  • onError(): 事件隊列異常。在事件處理過程當中出異常時,onError() 會被觸發,同時隊列自動終止,不容許再有事件發出。

怎麼作

說了這麼多概念,估計你們都是一頭霧水,咱們直接來些實際的,加深你們的印象理解。用多的天然而然就會了,就懂了,這裏說得可能不是最全的,可是說的都是Hystrix中用得不少的一些操做符,加深你們對Hystrix的理解,看源碼就會容易一些。api

例子

Observable<String> producer = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("apple");
subscriber.onNext("orange");
subscriber.onCompleted();
}
});
Subscriber<String> consumer = new Subscriber<String>() {
@Override
public void onNext(String s) {
LOG.info("我收到的水果有 = {}" , s);
}
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
};
producer.subscribe(consumer);

先來一個簡單的例子給你們直觀的介紹下ObservableSubscriber 作了些什麼,Observable 使用了onNext方法生產了2個水果,Apple和orange ,而後調用了onCompleted方法結束了此次生產, 消費者用onnext方法收到了2個水果,因此消費者就將收到的水果打印出來了,沒有作任何處理緩存

執行結果以下:app

2018-04-27 10:21:11.440 INFO  [#][#] <main> com.dzy.learn.other.NormalTest :我收到的水果有 = apple
2018-04-27 10:21:11.440 INFO [#][#] <main> com.dzy.learn.other.NormalTest :我收到的水果有 = orange

而後再給你們介紹一下Hystrix中用得很是多的操做符框架

create

Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("item1");
subscriber.onNext("item2");
subscriber.onCompleted();
}
});
//在上面的例子中已經跟你們講過了,create就是建立一個Observable,來生產消息

from

List<String> fruitList = Arrays.asList("apple","orange");
Observable.from(fruitList).subscribe(new Action1<String>() {
@Override
public void call(String fruit) {
LOG.info("fruit = {}" , fruit);
}
});

上面訂閱者的代碼被我簡化了,直接new 一個Action1, 是subscribe支持的一種訂閱方式,跟Subscriber是同樣的道理,只是更加簡化。而後咱們再用lambda表達式簡化一下就是這樣的了異步

List<String> fruitList = Arrays.asList("apple","orange");
Observable.from(fruitList).subscribe(fruit -> LOG.info("fruit = {}" , fruit));

執行結果以下:ide

2018-04-27 10:30:59.030 INFO  [#][#] <main> com.dzy.learn.other.NormalTest :fruid = apple
2018-04-27 10:30:59.030 INFO [#][#] <main> com.dzy.learn.other.NormalTest :fruid = orange

defer

只有當訂閱者訂閱才建立Observable,爲每一個訂閱建立一個新的Observable。內部經過OnSubscribeDefer在訂閱時調用Func0建立Observable

List<String> fruitList = Arrays.asList("apple","orange");
Observable.defer(new Func0<Observable<String>>() {
@Override
public Observable<String> call() {
return Observable.from(fruitList);
}
}).subscribe(new Action1<String>() {
@Override
public void call(String fruit) {
LOG.info("defer fruit = {}" , fruit);
}
});

不知道你們理解了沒有,每次生產消息都會生產一個新的消息生產者

執行結果以下:

2018-04-27 10:37:26.209 INFO  [#][#] <main> com.dzy.learn.other.NormalTest :defer fruit = apple
2018-04-27 10:37:26.209 INFO [#][#] <main> com.dzy.learn.other.NormalTest :defer fruit = orange

startWith

在生產的第一個消息前加上一個或者一些消息,看例子比較直觀

List<String> fruitList = Arrays.asList("apple","orange");
Observable.from(fruitList)
.startWith("before apple","before apple2")
.subscribe(fruit->LOG.info("fruit = {}" , fruit));

執行結果以下:

2018-04-27 10:40:52.221 INFO  [#][#] <main> com.dzy.learn.other.NormalTest :fruit = before apple
2018-04-27 10:40:52.221 INFO [#][#] <main> com.dzy.learn.other.NormalTest :fruit = before apple2
2018-04-27 10:40:52.222 INFO [#][#] <main> com.dzy.learn.other.NormalTest :fruit = apple
2018-04-27 10:40:52.222 INFO [#][#] <main> com.dzy.learn.other.NormalTest :fruit = orange

Filter

跟lambda裏面的filter很像,也是用來篩選數據的,filter接收的Func1第二個參數是Boolean,定死的

List<Integer> list = Arrays.asList(10, 5, 3, 2, 1, 0);
Observable.from(list)
.filter(new Func1<Integer, Boolean>() {
@Override
public Boolean call(Integer integer) {
return integer>4;
}
})
.subscribe(num->LOG.info("比4大的num = {}" , num));

Map

Map是將須要生產的數據通過Func1進行變換以後,而後在發送給消費者。第二個參數是Object類型的,能夠轉化成一個Object對象

List<Integer> list = Arrays.asList(10, 5, 3, 2, 1, 0);
Observable.from(list)
.map(new Func1<Integer, Object>() {
@Override
public Object call(Integer integer) {
return integer+"變成str";
}
})
.subscribe(s -> LOG.info("s = {}" , s));

輸出以下:

2018-04-27 11:08:02.743 INFO  [#][#] <main> com.dzy.learn.other.NormalTest :s = 10變成str
2018-04-27 11:08:02.747 INFO [#][#] <main> com.dzy.learn.other.NormalTest :s = 5變成str
2018-04-27 11:08:02.747 INFO [#][#] <main> com.dzy.learn.other.NormalTest :s = 3變成str
2018-04-27 11:08:02.748 INFO [#][#] <main> com.dzy.learn.other.NormalTest :s = 2變成str
2018-04-27 11:08:02.748 INFO [#][#] <main> com.dzy.learn.other.NormalTest :s = 1變成str
2018-04-27 11:08:02.748 INFO [#][#] <main> com.dzy.learn.other.NormalTest :s = 0變成str

FlatMap

跟map有些相似,可是也有很大的區別,map是一個對象變成另一個對象,而flatMap能夠把一個對象轉化爲多個對象 , 其實FlatMap是將一個對象轉成了一個Observable 對象,轉完以後,最開始的生產者並不發送這個 Observable, 而是將它激活,因而它開始發送事件,每個建立出來的 Observable 發送的事件,都被匯入同一個 Observable ,而這個 Observable 負責將這些事件統一交給 Subscriber 的回調方法,這樣也就產生了一對多的概念,就像是把對象鋪平了同樣,flat

我會跟lambda的flatMap作一個對比,仍是很像的

List<Integer> list = Arrays.asList(10, 5, 3, 2, 1, 0);
List<Object> flatMapcollect = list.stream().flatMap(new Function<Integer, Stream<?>>() {
@Override
public Stream<?> apply(Integer integer) {
//經過list裏面的元素建立一個list返回回去
List<Integer> list1 = Arrays.asList(integer, integer + 10, integer + 100, integer + 1000);
return list1.stream();
}
}).collect(Collectors.toList());
flatMapcollect.forEach(s->LOG.info("complex integer = {}" , s));

打印的結果是一開始的每一個元素10 ,5等都被加上了10 100 1000 而後輸出了,至關於平鋪了

2018-04-27 11:50:21.186 INFO  [#][#] <main> com.dzy.learn.other.NormalTest :complex integer  = 10   // 10原本數字
2018-04-27 11:50:21.191 INFO [#][#] <main> com.dzy.learn.other.NormalTest :complex integer = 20 //加上10以後
2018-04-27 11:50:21.191 INFO [#][#] <main> com.dzy.learn.other.NormalTest :complex integer = 110 //加上100以後
2018-04-27 11:50:21.191 INFO [#][#] <main> com.dzy.learn.other.NormalTest :complex integer = 1010 //加上1000以後
2018-04-27 11:50:21.191 INFO [#][#] <main> com.dzy.learn.other.NormalTest :complex integer = 5
2018-04-27 11:50:21.191 INFO [#][#] <main> com.dzy.learn.other.NormalTest :complex integer = 15
2018-04-27 11:50:21.191 INFO [#][#] <main> com.dzy.learn.other.NormalTest :complex integer = 105
2018-04-27 11:50:21.191 INFO [#][#] <main> com.dzy.learn.other.NormalTest :complex integer = 1005
//限於篇幅,後面的略,都是 源數 加上10 100 1000後輸出的

RxJava裏面的flatMap, 我乘以10,乘以100而後加下|轉成一個String

List<Integer> list = Arrays.asList(10, 5, 3, 2, 1, 0);

Observable.from(list).flatMap(new Func1<Integer, Observable<?>>() {
@Override
public Observable<?> call(Integer num) {
List<String> strings = Arrays.asList("|" + num + "|", "|" + num * 10 + "|", "|" + num * 100 + "|");
return Observable.from(strings);
}
}).subscribe(new Action1<Object>() {
@Override
public void call(Object o) {
LOG.info("新轉化後的字符串是 = {}" , o);
}
});

輸出以下:

2018-04-27 11:57:13.316 INFO  [#][#] <main> com.dzy.learn.other.NormalTest :新轉化後的字符串是 = |10|
2018-04-27 11:57:13.320 INFO [#][#] <main> com.dzy.learn.other.NormalTest :新轉化後的字符串是 = |100|
2018-04-27 11:57:13.320 INFO [#][#] <main> com.dzy.learn.other.NormalTest :新轉化後的字符串是 = |1000|
2018-04-27 11:57:13.320 INFO [#][#] <main> com.dzy.learn.other.NormalTest :新轉化後的字符串是 = |5|
2018-04-27 11:57:13.320 INFO [#][#] <main> com.dzy.learn.other.NormalTest :新轉化後的字符串是 = |50|
2018-04-27 11:57:13.320 INFO [#][#] <main> com.dzy.learn.other.NormalTest :新轉化後的字符串是 = |500|
2018-04-27 11:57:13.320 INFO [#][#] <main> com.dzy.learn.other.NormalTest :新轉化後的字符串是 = |3|
2018-04-27 11:57:13.321 INFO [#][#] <main> com.dzy.learn.other.NormalTest :新轉化後的字符串是 = |30|
2018-04-27 11:57:13.321 INFO [#][#] <main> com.dzy.learn.other.NormalTest :新轉化後的字符串是 = |300|

reduce

reduce是一個聚合函數,接收上次運算的結果,放在下次的參數中,而後輸出最後的結果,結果只輸出一次。有點相似於遞歸。跟scan操做符很像,可是有區別,你們看下scan的輸出就知道是什麼區別了。

RxJava裏面的reduce

Observable.from(list).reduce(new Func2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer result, Integer num) {
LOG.info("開始前: result {}, num = {}" , result,num);
result+=num;
return result;
}
}).subscribe(new Action1<Integer>() {
@Override
public void call(Integer result) {
LOG.info("result = {}" , result);
}
});

打印結果以下:

2018-04-27 13:47:29.237 INFO  [#][#] <main> com.dzy.learn.other.NormalTest :開始前: result 10, num = 5
2018-04-27 13:47:29.241 INFO [#][#] <main> com.dzy.learn.other.NormalTest :開始前: result 15, num = 3
2018-04-27 13:47:29.241 INFO [#][#] <main> com.dzy.learn.other.NormalTest :開始前: result 18, num = 2
2018-04-27 13:47:29.241 INFO [#][#] <main> com.dzy.learn.other.NormalTest :開始前: result 20, num = 1
2018-04-27 13:47:29.241 INFO [#][#] <main> com.dzy.learn.other.NormalTest :開始前: result 21, num = 0
2018-04-27 13:47:29.241 INFO [#][#] <main> com.dzy.learn.other.NormalTest :result = 21

lambda裏面的reduce

List<Integer> list = Arrays.asList(10, 5, 3, 2, 1, 0);

Integer sum = list.stream().reduce((result, sum1) -> {
result += sum1;
return result;
}).get();
LOG.info("sum = {}" , sum);
//sum是21,就是累加起來

scan

scan和reduce都是把上一次操做的結果作爲第二次的參數傳遞給第二次Observable使用,可是scan每次操做以後先把數據輸出,而後在調用scan的回調函數進行第二次操做,看例子

Observable.from(list)
.scan(new Func2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer result, Integer num) {
LOG.info("開始前: result {}, num = {}" , result,num);
result+=num;
return result;
}
}).subscribe(new Action1<Integer>() {
@Override
public void call(Integer result) {
LOG.info("result = {}" , result);
}
});
2018-04-27 13:47:29.245 INFO  [#][#] <main> com.dzy.learn.other.NormalTest :result = 10
2018-04-27 13:47:29.245 INFO [#][#] <main> com.dzy.learn.other.NormalTest :開始前: result 10, num = 5
2018-04-27 13:47:29.245 INFO [#][#] <main> com.dzy.learn.other.NormalTest :result = 15
2018-04-27 13:47:29.245 INFO [#][#] <main> com.dzy.learn.other.NormalTest :開始前: result 15, num = 3
2018-04-27 13:47:29.245 INFO [#][#] <main> com.dzy.learn.other.NormalTest :result = 18
2018-04-27 13:47:29.245 INFO [#][#] <main> com.dzy.learn.other.NormalTest :開始前: result 18, num = 2
2018-04-27 13:47:29.245 INFO [#][#] <main> com.dzy.learn.other.NormalTest :result = 20
2018-04-27 13:47:29.245 INFO [#][#] <main> com.dzy.learn.other.NormalTest :開始前: result 20, num = 1
2018-04-27 13:47:29.245 INFO [#][#] <main> com.dzy.learn.other.NormalTest :result = 21
2018-04-27 13:47:29.245 INFO [#][#] <main> com.dzy.learn.other.NormalTest :開始前: result 21, num = 0
2018-04-27 13:47:29.245 INFO [#][#] <main> com.dzy.learn.other.NormalTest :result = 21

每次運算後都會調用訂閱者

window

Hystrix 滑動窗口的核心用的就是window操做符,那麼window有什麼做用呢?他能將Observable的數據分拆成一些Observable窗口,而後把Observable窗口推送給訂閱者,而不是一個數據,是一個Observable。來點例子更加直白

window(int count, int skip)
List<Integer> list = Arrays.asList(10, 5, 3, 2, 1, 0);
Observable.from(list).window(2, 2).subscribe(new Action1<Observable<Integer>>() {
@Override
public void call(Observable<Integer> integerObservable) {
integerObservable.reduce((sum, num) -> sum+=num).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
LOG.info("我被2個打印一次 = {}" , integer);
}
});
}
});

window裏面有2個參數,第一個參數2 表示 選取2個事件,好比說10,5 5,3 等,第二個參數是skip,表示跳躍2個事件,來選取,因此是3組窗口,裏面是 (10,5)(3,2)(1,0)

輸出:

2018-04-27 14:06:08.703 INFO  [#][#] <main> com.dzy.learn.other.NormalTest :我被2個打印一次 = 15
2018-04-27 14:06:08.707 INFO [#][#] <main> com.dzy.learn.other.NormalTest :我被2個打印一次 = 5
2018-04-27 14:06:08.707 INFO [#][#] <main> com.dzy.learn.other.NormalTest :我被2個打印一次 = 1
window(long timespan, TimeUnit unit)
CountDownLatch countDownLatch = new CountDownLatch(1);
Observable inputEventStream = Observable.create(new Observable.OnSubscribe<Object>() {
@Override
public void call(Subscriber<? super Object> subscriber) {
subscriber.onNext("我是生產者.........");
}
});
inputEventStream.window(1000,TimeUnit.MILLISECONDS).subscribe(new Action1() {
@Override
public void call(Object o) {
Calendar calendar = Calendar.getInstance();
int i = calendar.get(Calendar.SECOND);
LOG.info("我會{}就被喚醒觸發...",i);
}
});
countDownLatch.await();

輸出:

2018-04-27 14:26:18.721 INFO  [#][#] <RxComputationScheduler-1> com.dzy.learn.other.NormalTest :我會18就被喚醒觸發...
2018-04-27 14:26:19.722 INFO [#][#] <RxComputationScheduler-1> com.dzy.learn.other.NormalTest :我會19就被喚醒觸發...
2018-04-27 14:26:20.721 INFO [#][#] <RxComputationScheduler-1> com.dzy.learn.other.NormalTest :我會20就被喚醒觸發...
2018-04-27 14:26:21.721 INFO [#][#] <RxComputationScheduler-1> com.dzy.learn.other.NormalTest :我會21就被喚醒觸發...
2018-04-27 14:26:22.722 INFO [#][#] <RxComputationScheduler-1> com.dzy.learn.other.NormalTest :我會22就被喚醒觸發...
2018-04-27 14:26:23.721 INFO [#][#] <RxComputationScheduler-1> com.dzy.learn.other.NormalTest :我會23就被喚醒觸發...
2018-04-27 14:26:24.721 INFO [#][#] <RxComputationScheduler-1> com.dzy.learn.other.NormalTest :我會24就被喚醒觸發...
2018-04-27 14:26:25.721 INFO [#][#] <RxComputationScheduler-1> com.dzy.learn.other.NormalTest :我會25就被喚醒觸發...
.....
...
..

這裏用CountDownLatch 阻塞了主線程的關閉,若是不用鎖,那麼主線程關閉了以後,你就看不到定時輸出了。

第二個demo,用到了window另外的一個函數,第一個參數是緩存在這個window的間隔時間,第二個參數是時間單位 , 1s內收到的全部的生產消息都會緩存到window裏面,而後統一發出給訂閱者。就好像一個時間軸上面,有個窗子在收集數據,1s鍾以後收集好了以後,就發送出去,而後到了第二個窗子,這就是Hystrix滑動窗口的精髓所在。

學以至用

/**
* 兩個數字相加,reduce,scan用
*/

public static final Func2<Integer, Integer, Integer> PUBLIC_SUM =
(integer, integer2) -> integer + integer2;

public static final Func1<Observable<Integer>, Observable<Integer>> WINDOW_SUM =
//跳過第一個數據,由於給了scan一個默認值0,這個值須要跳過,若是不設置就不須要跳過
window -> window.scan(0, PUBLIC_SUM).skip(1);

public static final Func1<Observable<Integer>, Observable<Integer>> INNER_BUCKET_SUM =
integerObservable -> integerObservable.reduce(0, PUBLIC_SUM);

@Test
public void testWindowSlide() throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(1);
BehaviorSubject<Integer> behaviorSubject = BehaviorSubject.create();
behaviorSubject
// 1秒做爲一個基本塊,橫向移動
.window(1000, TimeUnit.MILLISECONDS)
//將flatMap彙總平鋪成一個事件,而後累加成一個Observable<Integer>對象,好比說1s內有10個對象,被累加起來
.flatMap(INNER_BUCKET_SUM)
//對這個對象2個發送,步長爲1
.window(2,1)
//對窗口裏面的進行求和,用的scan, 每次累加都會打印出來
.flatMap(WINDOW_SUM)
.subscribe((Integer integer) ->
// 輸出統計數據到日誌
LOG.info("[{}] call ...... {}",
Thread.currentThread().getName(), integer));

for (int i = 0; i < 1000; i++) {
//200ms生產一個數據,
behaviorSubject.onNext(i);
LOG.info("i = {}" ,i);
Thread.sleep(200);
}
countDownLatch.await();
}

輸出:

2018-04-27 15:46:06.547 INFO  [#][#] <main> com.dzy.learn.other.NormalTest :i = 0
2018-04-27 15:46:06.756 INFO [#][#] <main> com.dzy.learn.other.NormalTest :i = 1
2018-04-27 15:46:07.010 INFO [#][#] <main> com.dzy.learn.other.NormalTest :i = 2
2018-04-27 15:46:07.211 INFO [#][#] <main> com.dzy.learn.other.NormalTest :i = 3
2018-04-27 15:46:07.411 INFO [#][#] <main> com.dzy.learn.other.NormalTest :i = 4
2018-04-27 15:46:07.517 INFO [#][#] <RxComputationScheduler-1> com.dzy.learn.other.NormalTest :[RxComputationScheduler-1] call ...... 10
2018-04-27 15:46:07.517 INFO [#][#] <RxComputationScheduler-1> com.dzy.learn.other.NormalTest :[RxComputationScheduler-1] call ...... 10
2018-04-27 15:46:07.611 INFO [#][#] <main> com.dzy.learn.other.NormalTest :i = 5
2018-04-27 15:46:07.811 INFO [#][#] <main> com.dzy.learn.other.NormalTest :i = 6
2018-04-27 15:46:08.011 INFO [#][#] <main> com.dzy.learn.other.NormalTest :i = 7
2018-04-27 15:46:08.211 INFO [#][#] <main> com.dzy.learn.other.NormalTest :i = 8
2018-04-27 15:46:08.411 INFO [#][#] <main> com.dzy.learn.other.NormalTest :i = 9
2018-04-27 15:46:08.517 INFO [#][#] <RxComputationScheduler-1> com.dzy.learn.other.NormalTest :[RxComputationScheduler-1] call ...... 45
2018-04-27 15:46:08.517 INFO [#][#] <RxComputationScheduler-1> com.dzy.learn.other.NormalTest :[RxComputationScheduler-1] call ...... 35
2018-04-27 15:46:08.611 INFO [#][#] <main> com.dzy.learn.other.NormalTest :i = 10
2018-04-27 15:46:08.811 INFO [#][#] <main> com.dzy.learn.other.NormalTest :i = 11
2018-04-27 15:46:09.011 INFO [#][#] <main> com.dzy.learn.other.NormalTest :i = 12
2018-04-27 15:46:09.211 INFO [#][#] <main> com.dzy.learn.other.NormalTest :i = 13
2018-04-27 15:46:09.411 INFO [#][#] <main> com.dzy.learn.other.NormalTest :i = 14
2018-04-27 15:46:09.517 INFO [#][#] <RxComputationScheduler-1> com.dzy.learn.other.NormalTest :[RxComputationScheduler-1] call ...... 95
2018-04-27 15:46:09.517 INFO [#][#] <RxComputationScheduler-1> com.dzy.learn.other.NormalTest :[RxComputationScheduler-1] call ...... 60

解析,第一個window產生了一個滑動窗口,每秒鐘就會把生產者生產的消息累加起來,第二個window是積累2個對象,而後進行發送,每次跳一個數字,第二個window是創建在第一個windows累加以後的基礎上的,可能有點難理解,咱們來看第一個window產生的序列以下:

0  10  35  60  85  ......

有的同窗可能會問,你怎麼知道,我看的log日誌,打印出來的序列是 10 10 、 45 35 、95 60 、 145 85 、由於這裏用的scan,每次累加以後都會把源數打印一遍,因此是0 10 35 60 85 。第二個window就在這個基礎上進行累加 0+10 10+35 35+60 60+85,這樣就完成了一個滑動窗口的監控過程

結語

這裏總結的也許不是最全的,也許不是最新的版本,可是是Hystrix中用到的,結合Hystrix進行鍼對性講解,對Hystrix的理解更加深入,若有錯誤,望加以斧正,謝謝。

相關文章
相關標籤/搜索