在微服務架構中,咱們將系統拆分爲不少個服務,各個服務之間經過註冊與訂閱的方式相互依賴,因爲各個服務都是在各自的進程中運行,就有可能因爲網絡緣由或者服務自身的問題致使調用故障或延遲,隨着服務的積壓,可能會致使服務崩潰。爲了解決這一系列的問題,斷路器等一系列服務保護機制出現了。java
斷路器自己是一種開關保護機制,用於在電路上保護線路過載,當線路中有電器發生短路時,斷路器可以及時切斷故障電路,防止發生過載、發熱甚至起火等嚴重後果。web
在分佈式架構中,斷路器模式的做用也是相似的。spring
針對上述問題,Spring Cloud Hystrix 實現了斷路器、線路隔離等一系列服務保護功能。它也是基於 Netflix 的開源框架 Hystrix 實現的,該框架的目標在於經過控制那些訪問遠程系統、服務和第三方庫的節點,從而對延遲和故障提供更強大的容錯能力。Hystrix 具有服務降級、服務熔斷、線程和信號隔離、請求緩存、請求合併以及服務監控等強大功能。apache
在開始實現斷路器以前,先用以前實現的一些內容做爲基礎,構建一個以下圖所示的服務調用關係。編程
須要啓動的工程有以下一些:緩存
在未加入斷路器以前,關閉8081的實例,發送 GET 請求到 http://localhost:3333/ribbon-consumer ,能夠獲取下面的輸入。網絡
下面引入 Spring Cloud Hystrix。架構
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-hystrix</artifactId> </dependency>
package com.example.demo; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.client.circuitbreaker.EnableCircuitBreaker; import org.springframework.cloud.client.discovery.EnableDiscoveryClient; import org.springframework.cloud.client.loadbalancer.LoadBalanced; import org.springframework.context.annotation.Bean; import org.springframework.web.client.RestTemplate; @EnableCircuitBreaker @EnableDiscoveryClient @SpringBootApplication public class DemoApplication { @Bean @LoadBalanced RestTemplate restTemplate(){ return new RestTemplate(); } public static void main(String[] args) { SpringApplication.run(DemoApplication.class, args); } }
注:此處還可使用 Spring Cloud 應用中的 @SpringCloudApplication 註解來修飾主類,該註解的具體定義以下。能夠看到,該註解中包含了上述所引用的三個註解,這意味着一個 Spring Cloud 標準應用應包含服務發現以及斷路器。app
// // Source code recreated from a .class file by IntelliJ IDEA // (powered by Fernflower decompiler) // package org.springframework.cloud.client; import java.lang.annotation.Documented; import java.lang.annotation.ElementType; import java.lang.annotation.Inherited; import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.client.circuitbreaker.EnableCircuitBreaker; import org.springframework.cloud.client.discovery.EnableDiscoveryClient; @Target({ElementType.TYPE}) @Retention(RetentionPolicy.RUNTIME) @Documented @Inherited @SpringBootApplication @EnableDiscoveryClient @EnableCircuitBreaker public @interface SpringCloudApplication { }
package com.example.demo.web; import com.netflix.hystrix.contrib.javanica.annotation.HystrixCommand; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.client.RestTemplate; /** * @author lxx * @version V1.0.0 * @date 2017-8-16 */ @Service public class HelloService { @Autowired RestTemplate restTemplate; @HystrixCommand(fallbackMethod = "helloFallback") public String helloService(){ return restTemplate.getForEntity("http://hello-service/index", String.class).getBody(); } public String helloFallback(){ return "error"; } }
package com.example.demo.web; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.RestController; import org.springframework.web.client.RestTemplate; /** * @author lxx * @version V1.0.0 * @date 2017-8-9 */ @RestController public class ConsumerController { @Autowired HelloService helloService; @RequestMapping(value = "ribbon-consumer", method = RequestMethod.GET) public String helloConsumer(){ return helloService.helloService(); } }
下面,對斷路器實現的服務回調邏輯進行驗證,從新啓動以前關閉的 2221 端口的 hello-service,確保此時服務註冊中心、兩個 hello-service 和 ribbon-consumer 均已啓動,再次訪問 http://localhost:3333/ribbon-consumer 能夠輪詢兩個 hello-serive 並返回一些文字信息。此時斷開其中任意一個端口的 hello-service,再次訪問,當輪詢到關閉的端口服務時,輸出內容爲 error ,再也不是以前的提示信息。框架
除了經過斷開具體的服務實例來模擬某個節點沒法訪問的狀況以外,還能夠模擬一下服務阻塞(長時間未響應)的狀況。下面對hello-serive 的 /index 接口作一些修改,具體以下:
package com.example.demo.web; import org.apache.log4j.Logger; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.cloud.client.ServiceInstance; import org.springframework.cloud.client.discovery.DiscoveryClient; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import java.util.Random; /** * @author lxx * @version V1.0.0 * @date 2017-8-9 */ @RestController public class HelloController { private final Logger logger = Logger.getLogger(getClass()); @Autowired private DiscoveryClient client; @RequestMapping(value = "/index") public String index(){ ServiceInstance instance = client.getLocalServiceInstance(); // 讓處理線程等待幾秒鐘 int sleepTime = new Random().nextInt(3000); logger.info("sleepTime:"+sleepTime); try { Thread.sleep(sleepTime); } catch (InterruptedException e) { e.printStackTrace(); } logger.info("/hello:host:"+instance.getHost()+" port:"+instance.getPort() +" service_id:"+instance.getServiceId()); return "hello world!"; } }
經過Thread.sleep 函數可以讓 /index 接口的處理線程不是立刻返回內容,而是在阻塞幾秒後才返回內容。因爲 Hystrix 默認超時時間爲 2000 毫秒,因此這裏採用了 0 至 3000 的隨機數以讓處理過程有必定機率發生超時來觸發斷路器。爲了更精確的觀察斷路器的觸發,在消費者調用函數中作一些時間記錄,具體以下:
package com.example.demo.web; import com.netflix.hystrix.contrib.javanica.annotation.HystrixCommand; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.client.RestTemplate; /** * @author lxx * @version V1.0.0 * @date 2017-8-16 */ @Service public class HelloService { @Autowired RestTemplate restTemplate; @HystrixCommand(fallbackMethod = "helloFallback") public String helloService(){ long beginTime = System.currentTimeMillis(); String body = restTemplate.getForEntity("http://hello-service/index", String.class).getBody(); long endTime = System.currentTimeMillis(); System.out.println("Spend Time : "+ (endTime - beginTime)); return body; } public String helloFallback(){ return "error"; } }
首先,建立一個 HystrixCommand 或 HystrixObservableCommand 對象,用來表示對依賴服務的操做請求,同時傳遞全部須要的參數。從其命名中咱們就能知道它採用了「命令模式」 來實現服務調用操做的封裝。而這兩個 Command 對象分別針對不一樣的應用場景。
命令模式,未來自客戶端的請求封裝成一個對象,從而讓你可使用不一樣的請求對客戶端進行參數化。它能夠被用於實現「行爲請求者」 與 「行爲實現者」 的解耦,以便使二者能夠適應變化。下面的示例是對命令模式的簡單實現:
package com.example.demo.command; /** * @author lxx * @version V1.0.0 * @date 2017-8-16 */ // 接收者 public class Receiver { public void active(){ //真正的業務邏輯 System.out.println("測試命令模式"); } }
package com.example.demo.command; /** * @author lxx * @version V1.0.0 * @date 2017-8-16 */ //抽象命令 public interface Command { void excute(); }
package com.example.demo.command; import org.springframework.beans.factory.annotation.Autowired; /** * @author lxx * @version V1.0.0 * @date 2017-8-16 */ //具體命令實現 public class CommandImpl implements Command { private Receiver receiver; public CommandImpl(Receiver receiver) { this.receiver = receiver; } @Override public void excute() { this.receiver.active(); } }
package com.example.demo.command; /** * @author lxx * @version V1.0.0 * @date 2017-8-16 */ //客戶端調用 public class Invoker { private Command command; public void setCommand(Command command) { this.command = command; } public void active (){ command.excute(); } }
package com.example.demo.command; /** * @author lxx * @version V1.0.0 * @date 2017-8-16 */ public class Client { public static void main(String[] args) { Receiver receiver = new Receiver(); Command command = new CommandImpl(receiver); Invoker invoker = new Invoker(); invoker.setCommand(command); invoker.active(); //客戶端經過調用者來執行命令 } }
從代碼中,能夠看到這樣幾個對象。
從上面的示例中,咱們能夠看到,調用者 Invoker 與操做者 Receiver 經過 Command 命令接口實現瞭解耦。對於調用者來講,咱們能夠爲其注入多個命令操做,調用者只需在須要的時候直接調用便可,而不須要知道這些操做命令實際是如何實現的。而在這裏所提到的 HystrixCommand 和 HystrixObservableCommand 則是在 Hystrix 中對 Command 的進一步抽象定義。
2. 命令執行
命令執行方式一共有4種,而 Hystrix 在執行時會根據建立的Command對象以及具體的狀況來選擇一種執行。其中 HystrixCommand 實現了下面兩個執行方式。
R execute();
Future<R> queue();
而 HystrixObservableCommand 實現了另外兩種執行方式。
Observable<R> observe();
Observable<R> toObservable();
在 Hystrix 的底層實現中大量使用了 RxJava ,爲了更容易的理解後續內容,在這裏對 RxJava 的觀察者-訂閱者模式作一個簡單的入門介紹。
上面提到的 Observable 對象就是 RxJava 中的核心內容之一,能夠理解爲 「事件源」 或者 「被觀察者」,與其對應的 Subscriber 對象,能夠理解爲 「訂閱者」 或者 「觀察者」。這兩個對象是 RxJava 響應式編程的重要組成部分。
下面經過一個簡單的例子來直觀理解一下 Observable 與 Subscribers:
package com.example.demo.Observable_Subsciber; import rx.Observable; import rx.Subscriber; /** * @author lxx * @version V1.0.0 * @date 2017-8-16 */ public class Obs_Subs { public static void main(String[] args) { //建立事件源 Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> subscriber) { subscriber.onNext("Hello RxJava "); subscriber.onNext("I'm XX"); subscriber.onCompleted(); } }); //建立訂閱者 Subscriber<String> subscriber = new Subscriber<String>() { @Override public void onCompleted() { } @Override public void onError(Throwable throwable) { } @Override public void onNext(String s) { } }; observable.subscribe(subscriber); } }
在該示例中,建立了一個簡單的事件源 observable,一個對事件傳遞內容輸出的訂閱者 subscriber ,經過 observable.subscribe(subscriber) 來觸發事件的發佈。
在這裏咱們對於事件源 observable 提到了兩個不一樣的概念:Hot Observable 和 Cold Observable ,分別對應了上面的 command.observe() 和 command.toObservable() 的返回對象。其中 HotObservable,不論 「事件源」 是否有 「訂閱者」 ,都會在建立後對事件進行發佈,因此對於 Hot Observable 的每個 「訂閱者」 都有多是從 「事件源」 的中途開始的,並可能只是看到了整個操做的局部過程。而 Cold Observable 在沒有 「訂閱者」 的時候並不會發佈事件,而是進行等待,直到有 「訂閱者」 以後才發佈事件,因此對於 Cold Observable 的訂閱者,它能夠保證從一開始看到整個操做的所有過程。
3. 結果是否被緩存
若當前命令的請求緩存功能是被啓用的,而且該命令緩存命中,那麼緩存的結果會當即以 Observable 對象的形式返回。
4. 斷路器是否打開
在命令結果沒有緩存命中的時候,Hystrix 在執行命令前須要檢查斷路器是否爲打開狀態:
5. 線程池 / 請求隊列 / 信息量是否佔滿
若是與命令相關的線程池 / 請求隊列 / 信息量已經佔滿,那麼 Hystrix 不會執行命令,跳轉到 fallback 處理邏輯(對應下面第8步)。
注意:此處的線程池並不是容器的線程池,而是每一個依賴服務的專有線程池。Hystrix 爲了保證不會由於某個依賴服務的問題影響到其餘依賴服務而採用了 「艙壁模式」 來隔離每一個依賴的服務。
6. HystrixObservableCommand.construct() 或 HystrixCommand.run()
Hystrix 會根據編寫的方法來決定採起什麼樣的方式去請求依賴服務。
若是 run() 或 construct() 方法的執行時間超過了命令設置的超時閾值,當前處理線程會拋出 TimeoutException。這種狀況下,也會跳轉到 fallback 處理邏輯(第8步)。
7. 計算斷路器的健康度
Hystrix 會將 「成功」、「失敗」、「拒絕」、「超時」 等信息報告給斷路器,而斷路器會維護一組計數器來統計這些數據。
斷路器會使用這些統計數據來決定是否要將斷路器打開,來對某個依賴服務的請求進行 「熔斷 / 短路」,直到恢復期結束。
8. fallback 處理
當命令執行失敗的時候,Hystrix 會進入 fallback 嘗試回退處理,咱們一般也稱爲 「服務降級」。下面就是可以引起服務降級處理的幾種狀況:
九、返回成功的響應
HystrixCircuitBreaker 的定義:
// // Source code recreated from a .class file by IntelliJ IDEA // (powered by Fernflower decompiler) // package com.netflix.hystrix; import com.netflix.hystrix.HystrixCommandGroupKey; import com.netflix.hystrix.HystrixCommandKey; import com.netflix.hystrix.HystrixCommandMetrics; import com.netflix.hystrix.HystrixCommandProperties; import com.netflix.hystrix.HystrixCommandMetrics.HealthCounts; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import rx.Subscriber; import rx.Subscription; public interface HystrixCircuitBreaker { boolean allowRequest(); boolean isOpen(); void markSuccess(); void markNonSuccess(); boolean attemptExecution(); public static class NoOpCircuitBreaker implements HystrixCircuitBreaker { public NoOpCircuitBreaker() { } public boolean allowRequest() { return true; } public boolean isOpen() { return false; } public void markSuccess() { } public void markNonSuccess() { } public boolean attemptExecution() { return true; } } public static class HystrixCircuitBreakerImpl implements HystrixCircuitBreaker { private final HystrixCommandProperties properties; private final HystrixCommandMetrics metrics; private final AtomicReference<HystrixCircuitBreaker.HystrixCircuitBreakerImpl.Status> status; private final AtomicLong circuitOpened; private final AtomicReference<Subscription> activeSubscription; protected HystrixCircuitBreakerImpl(HystrixCommandKey key, HystrixCommandGroupKey commandGroup, HystrixCommandProperties properties, HystrixCommandMetrics metrics) { this.status = new AtomicReference(HystrixCircuitBreaker.HystrixCircuitBreakerImpl.Status.CLOSED); this.circuitOpened = new AtomicLong(-1L); this.activeSubscription = new AtomicReference((Object)null); this.properties = properties; this.metrics = metrics; Subscription s = this.subscribeToStream(); this.activeSubscription.set(s); } private Subscription subscribeToStream() { return this.metrics.getHealthCountsStream().observe().subscribe(new Subscriber() { public void onCompleted() { } public void onError(Throwable e) { } public void onNext(HealthCounts hc) { if(hc.getTotalRequests() >= (long)((Integer)HystrixCircuitBreakerImpl.this.properties.circuitBreakerRequestVolumeThreshold().get()).intValue() && hc.getErrorPercentage() >= ((Integer)HystrixCircuitBreakerImpl.this.properties.circuitBreakerErrorThresholdPercentage().get()).intValue() && HystrixCircuitBreakerImpl.this.status.compareAndSet(HystrixCircuitBreaker.HystrixCircuitBreakerImpl.Status.CLOSED, HystrixCircuitBreaker.HystrixCircuitBreakerImpl.Status.OPEN)) { HystrixCircuitBreakerImpl.this.circuitOpened.set(System.currentTimeMillis()); } } }); } public void markSuccess() { if(this.status.compareAndSet(HystrixCircuitBreaker.HystrixCircuitBreakerImpl.Status.HALF_OPEN, HystrixCircuitBreaker.HystrixCircuitBreakerImpl.Status.CLOSED)) { this.metrics.resetStream(); Subscription previousSubscription = (Subscription)this.activeSubscription.get(); if(previousSubscription != null) { previousSubscription.unsubscribe(); } Subscription newSubscription = this.subscribeToStream(); this.activeSubscription.set(newSubscription); this.circuitOpened.set(-1L); } } public void markNonSuccess() { if(this.status.compareAndSet(HystrixCircuitBreaker.HystrixCircuitBreakerImpl.Status.HALF_OPEN, HystrixCircuitBreaker.HystrixCircuitBreakerImpl.Status.OPEN)) { this.circuitOpened.set(System.currentTimeMillis()); } } public boolean isOpen() { return ((Boolean)this.properties.circuitBreakerForceOpen().get()).booleanValue()?true:(((Boolean)this.properties.circuitBreakerForceClosed().get()).booleanValue()?false:this.circuitOpened.get() >= 0L); } public boolean allowRequest() { return ((Boolean)this.properties.circuitBreakerForceOpen().get()).booleanValue()?false:(((Boolean)this.properties.circuitBreakerForceClosed().get()).booleanValue()?true:(this.circuitOpened.get() == -1L?true:(((HystrixCircuitBreaker.HystrixCircuitBreakerImpl.Status)this.status.get()).equals(HystrixCircuitBreaker.HystrixCircuitBreakerImpl.Status.HALF_OPEN)?false:this.isAfterSleepWindow()))); } private boolean isAfterSleepWindow() { long circuitOpenTime = this.circuitOpened.get(); long currentTime = System.currentTimeMillis(); long sleepWindowTime = (long)((Integer)this.properties.circuitBreakerSleepWindowInMilliseconds().get()).intValue(); return currentTime > circuitOpenTime + sleepWindowTime; } public boolean attemptExecution() { return ((Boolean)this.properties.circuitBreakerForceOpen().get()).booleanValue()?false:(((Boolean)this.properties.circuitBreakerForceClosed().get()).booleanValue()?true:(this.circuitOpened.get() == -1L?true:(this.isAfterSleepWindow()?this.status.compareAndSet(HystrixCircuitBreaker.HystrixCircuitBreakerImpl.Status.OPEN, HystrixCircuitBreaker.HystrixCircuitBreakerImpl.Status.HALF_OPEN):false))); } static enum Status { CLOSED, OPEN, HALF_OPEN; private Status() { } } } public static class Factory { private static ConcurrentHashMap<String, HystrixCircuitBreaker> circuitBreakersByCommand = new ConcurrentHashMap(); public Factory() { } public static HystrixCircuitBreaker getInstance(HystrixCommandKey key, HystrixCommandGroupKey group, HystrixCommandProperties properties, HystrixCommandMetrics metrics) { HystrixCircuitBreaker previouslyCached = (HystrixCircuitBreaker)circuitBreakersByCommand.get(key.name()); if(previouslyCached != null) { return previouslyCached; } else { HystrixCircuitBreaker cbForCommand = (HystrixCircuitBreaker)circuitBreakersByCommand.putIfAbsent(key.name(), new HystrixCircuitBreaker.HystrixCircuitBreakerImpl(key, group, properties, metrics)); return cbForCommand == null?(HystrixCircuitBreaker)circuitBreakersByCommand.get(key.name()):cbForCommand; } } public static HystrixCircuitBreaker getInstance(HystrixCommandKey key) { return (HystrixCircuitBreaker)circuitBreakersByCommand.get(key.name()); } static void reset() { circuitBreakersByCommand.clear(); } } }
主要定義了三個斷路器的抽象方法。
另外還有三個靜態類。
HystrixCircuitBreakerImpl 的各個實現方法以下:
public boolean isOpen() { return ((Boolean)this.properties.circuitBreakerForceOpen().get()).booleanValue()?true:(((Boolean)this.properties.circuitBreakerForceClosed().get()).booleanValue()?false:this.circuitOpened.get() >= 0L); }
public boolean allowRequest() { return ((Boolean)this.properties.circuitBreakerForceOpen().get()).booleanValue()?false:(((Boolean)this.properties.circuitBreakerForceClosed().get()).booleanValue()?true:(this.circuitOpened.get() == -1L?true:(((HystrixCircuitBreaker.HystrixCircuitBreakerImpl.Status)this.status.get()).equals(HystrixCircuitBreaker.HystrixCircuitBreakerImpl.Status.HALF_OPEN)?false:this.isAfterSleepWindow()))); }
public void markNonSuccess() { if(this.status.compareAndSet(HystrixCircuitBreaker.HystrixCircuitBreakerImpl.Status.HALF_OPEN, HystrixCircuitBreaker.HystrixCircuitBreakerImpl.Status.OPEN)) { this.circuitOpened.set(System.currentTimeMillis()); } }
Hystrix 使用 「艙壁模式」 實現線程池的隔離,它爲每個依賴服務建立一個獨立的線程池,就算某個依賴服務出現延遲太高的狀況,也不會拖慢其餘的依賴服務。
Hystrix 命令就是咱們以前所說的 HystrixCommand,它用來封裝具體的依賴服務調用邏輯。
能夠經過繼承的方式來實現,好比: