最近小主看到不少公衆號都在發佈Hystrix停更的文章,spring cloud體系的使用者和擁護者一片哀嚎,實際上,spring做爲Java最大的家族,根本不須要擔憂其中一兩個零件的廢棄,Hystrix的停更,只會催生更多或者更好的零件來替代它,所以,咱們須要作的是:**知道Hystrix是幹嗎,怎麼用的,這樣要找替代者就易於反掌了。java
文章提綱:redis
- 爲何須要Hystrix?
- Hystrix如何解決依賴隔離
- 如何使用Hystrix
在大中型分佈式系統中,一般系統不少依賴(HTTP,hession,Netty,Dubbo等),以下圖:
在高併發訪問下,這些依賴的穩定性與否對系統的影響很是大,可是依賴有不少不可控問題:如網絡鏈接緩慢,資源繁忙,暫時不可用,服務脫機等.spring
以下圖:QPS爲50的依賴 I 出現不可用,可是其餘依賴仍然可用.
當依賴I 阻塞時,大多數服務器的線程池就出現阻塞(BLOCK),影響整個線上服務的穩定性.以下圖:
在複雜的分佈式架構的應用程序有不少的依賴,都會不可避免地在某些時候失敗。高併發的依賴失敗時若是沒有隔離措施,當前應用服務就有被拖垮的風險。編程
例如:一個依賴30個SOA服務的系統,每一個服務99.99%可用。 99.99%的30次方 ≈ 99.7% 0.3% 意味着一億次請求 會有 3,000,00次失敗 換算成時間大約每個月有2個小時服務不穩定. 隨着服務依賴數量的變多,服務不穩定的機率會成指數性提升.
解決問題方案:對依賴作隔離,Hystrix就是處理依賴隔離的框架,同時也是能夠幫咱們作依賴服務的治理和監控.
Netflix 公司開發併成功使用Hystrix,使用規模以下:緩存
he Netflix API processes 10+ billion HystrixCommand executions per day using thread isolation. Each API instance has 40+ thread-pools with 5-20 threads in each (most are set to 10).
- Hystrix使用命令模式HystrixCommand(Command)包裝依賴調用邏輯,每一個命令在單獨線程中/信號受權下執行。
- 可配置依賴調用超時時間,超時時間通常設爲比99.5%平均時間略高便可.當調用超時時,直接返回或執行fallback邏輯。
- 爲每一個依賴提供一個小的線程池(或信號),若是線程池已滿調用將被當即拒絕,默認不採用排隊.加速失敗斷定時間。
- 依賴調用結果分:成功,失敗(拋出異常),超時,線程拒絕,短路。 請求失敗(異常,拒絕,超時,短路)時執行fallback(降級)邏輯。
- 提供熔斷器組件,能夠自動運行或手動調用,中止當前依賴一段時間(10秒),熔斷器默認錯誤率閾值爲50%,超過將自動運行。
- 提供近實時依賴的統計和監控
Hystrix依賴的隔離架構,以下圖:服務器
- 使用maven引入Hystrix依賴
<!-- 依賴版本 --> <hystrix.version>1.3.16</hystrix.version> <hystrix-metrics-event-stream.version>1.1.2</hystrix-metrics-event-stream.version> <dependency> <groupId>com.netflix.hystrix</groupId> <artifactId>hystrix-core</artifactId> <version>${hystrix.version}</version> </dependency> <dependency> <groupId>com.netflix.hystrix</groupId> <artifactId>hystrix-metrics-event-stream</artifactId> <version>${hystrix-metrics-event-stream.version}</version> </dependency> <!-- 倉庫地址 --> <repository> <id>nexus</id> <name>local private nexus</name> <url>http://maven.oschina.net/content/groups/public/</url> <releases> <enabled>true</enabled> </releases> <snapshots> <enabled>false</enabled> </snapshots> </repository>
- 使用命令模式封裝依賴邏輯
public class HelloWorldCommand extends HystrixCommand<String> { private final String name; public HelloWorldCommand(String name) { //最少配置:指定命令組名(CommandGroup) super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup")); this.name = name; } @Override protected String run() { // 依賴邏輯封裝在run()方法中 return "Hello " + name +" thread:" + Thread.currentThread().getName(); } //調用實例 public static void main(String[] args) throws Exception{ //每一個Command對象只能調用一次,不能夠重複調用, //重複調用對應異常信息:This instance can only be executed once. Please instantiate a new instance. HelloWorldCommand helloWorldCommand = new HelloWorldCommand("Synchronous-hystrix"); //使用execute()同步調用代碼,效果等同於:helloWorldCommand.queue().get(); String result = helloWorldCommand.execute(); System.out.println("result=" + result); helloWorldCommand = new HelloWorldCommand("Asynchronous-hystrix"); //異步調用,可自由控制獲取結果時機, Future<String> future = helloWorldCommand.queue(); //get操做不能超過command定義的超時時間,默認:1秒 result = future.get(100, TimeUnit.MILLISECONDS); System.out.println("result=" + result); System.out.println("mainThread=" + Thread.currentThread().getName()); } } //運行結果: run()方法在不一樣的線程下執行 // result=Hello Synchronous-hystrix thread:hystrix-HelloWorldGroup-1 // result=Hello Asynchronous-hystrix thread:hystrix-HelloWorldGroup-2 // mainThread=main
note:異步調用使用 command.queue()get(timeout, TimeUnit.MILLISECONDS);同步調用使用command.execute() 等同於 command.queue().get();網絡
- 註冊異步事件回調執行
//註冊觀察者事件攔截 Observable<String> fs = new HelloWorldCommand("World").observe(); //註冊結果回調事件 fs.subscribe(new Action1<String>() { @Override public void call(String result) { //執行結果處理,result 爲HelloWorldCommand返回的結果 //用戶對結果作二次處理. } }); //註冊完整執行生命週期事件 fs.subscribe(new Observer<String>() { @Override public void onCompleted() { // onNext/onError完成以後最後回調 System.out.println("execute onCompleted"); } @Override public void onError(Throwable e) { // 當產生異常時回調 System.out.println("onError " + e.getMessage()); e.printStackTrace(); } @Override public void onNext(String v) { // 獲取結果後回調 System.out.println("onNext: " + v); } }); /* 運行結果 call execute result=Hello observe-hystrix thread:hystrix-HelloWorldGroup-3 onNext: Hello observe-hystrix thread:hystrix-HelloWorldGroup-3 execute onCompleted */
- 使用Fallback() 提供降級策略
//重載HystrixCommand 的getFallback方法實現邏輯 public class HelloWorldCommand extends HystrixCommand<String> { private final String name; public HelloWorldCommand(String name) { super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("HelloWorldGroup")) /* 配置依賴超時時間,500毫秒*/ .andCommandPropertiesDefaults(HystrixCommandProperties.Setter().withExecutionIsolationThreadTimeoutInMilliseconds(500))); this.name = name; } @Override protected String getFallback() { return "exeucute Falled"; } @Override protected String run() throws Exception { //sleep 1 秒,調用會超時 TimeUnit.MILLISECONDS.sleep(1000); return "Hello " + name +" thread:" + Thread.currentThread().getName(); } public static void main(String[] args) throws Exception{ HelloWorldCommand command = new HelloWorldCommand("test-Fallback"); String result = command.execute(); } } /* 運行結果:getFallback() 調用運行 getFallback executed */
NOTE: 除了HystrixBadRequestException異常以外,全部從run()方法拋出的異常都算做失敗,並觸發降級getFallback()和斷路器邏輯。架構
HystrixBadRequestException用在非法參數或非系統故障異常等不該觸發回退邏輯的場景。
- 依賴命名:CommandKey
public HelloWorldCommand(String name) { super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ExampleGroup")) /* HystrixCommandKey工廠定義依賴名稱 */ .andCommandKey(HystrixCommandKey.Factory.asKey("HelloWorld"))); this.name = name; }
NOTE: 每一個CommandKey表明一個依賴抽象,相同的依賴要使用相同的CommandKey名稱。依賴隔離的根本就是對相同CommandKey的依賴作隔離.併發
- 依賴分組:CommandGroup
命令分組用於對依賴操做分組,便於統計,彙總等.app
//使用HystrixCommandGroupKey工廠定義 public HelloWorldCommand(String name) { Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("HelloWorldGroup")) }
NOTE: CommandGroup是每一個命令最少配置的必選參數,在不指定ThreadPoolKey的狀況下,字面值用於對不一樣依賴的線程池/信號區分.
- 線程池/信號:ThreadPoolKey
public HelloWorldCommand(String name) { super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ExampleGroup")) .andCommandKey(HystrixCommandKey.Factory.asKey("HelloWorld")) /* 使用HystrixThreadPoolKey工廠定義線程池名稱*/ .andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("HelloWorldPool"))); this.name = name; }
NOTE: 當對同一業務依賴作隔離時使用CommandGroup作區分,可是對同一依賴的不一樣遠程調用如(一個是redis 一個是http),可使用HystrixThreadPoolKey作隔離區分.
最然在業務上都是相同的組,可是須要在資源上作隔離時,可使用HystrixThreadPoolKey區分.
- 請求緩存 Request-Cache
public class RequestCacheCommand extends HystrixCommand<String> { private final int id; public RequestCacheCommand( int id) { super(HystrixCommandGroupKey.Factory.asKey("RequestCacheCommand")); this.id = id; } @Override protected String run() throws Exception { System.out.println(Thread.currentThread().getName() + " execute id=" + id); return "executed=" + id; } //重寫getCacheKey方法,實現區分不一樣請求的邏輯 @Override protected String getCacheKey() { return String.valueOf(id); } public static void main(String[] args){ HystrixRequestContext context = HystrixRequestContext.initializeContext(); try { RequestCacheCommand command2a = new RequestCacheCommand(2); RequestCacheCommand command2b = new RequestCacheCommand(2); Assert.assertTrue(command2a.execute()); //isResponseFromCache斷定是不是在緩存中獲取結果 Assert.assertFalse(command2a.isResponseFromCache()); Assert.assertTrue(command2b.execute()); Assert.assertTrue(command2b.isResponseFromCache()); } finally { context.shutdown(); } context = HystrixRequestContext.initializeContext(); try { RequestCacheCommand command3b = new RequestCacheCommand(2); Assert.assertTrue(command3b.execute()); Assert.assertFalse(command3b.isResponseFromCache()); } finally { context.shutdown(); } } }
NOTE:請求緩存可讓(CommandKey/CommandGroup)相同的狀況下,直接共享結果,下降依賴調用次數,在高併發和CacheKey碰撞率高場景下能夠提高性能.
Servlet容器中,能夠直接實用Filter機制Hystrix請求上下文
public class HystrixRequestContextServletFilter implements Filter { public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException, ServletException { HystrixRequestContext context = HystrixRequestContext.initializeContext(); try { chain.doFilter(request, response); } finally { context.shutdown(); } } } <filter> <display-name>HystrixRequestContextServletFilter</display-name> <filter-name>HystrixRequestContextServletFilter</filter-name> <filter-class>com.netflix.hystrix.contrib.requestservlet.HystrixRequestContextServletFilter</filter-class> </filter> <filter-mapping> <filter-name>HystrixRequestContextServletFilter</filter-name> <url-pattern>/*</url-pattern> </filter-mapping>
- 信號量隔離:SEMAPHORE
隔離本地代碼或可快速返回遠程調用(如memcached,redis)能夠直接使用信號量隔離,下降線程隔離開銷.
public class HelloWorldCommand extends HystrixCommand<String> { private final String name; public HelloWorldCommand(String name) { super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("HelloWorldGroup")) /* 配置信號量隔離方式,默認採用線程池隔離 */ .andCommandPropertiesDefaults(HystrixCommandProperties.Setter().withExecutionIsolationStrategy(HystrixCommandProperties.ExecutionIsolationStrategy.SEMAPHORE))); this.name = name; } @Override protected String run() throws Exception { return "HystrixThread:" + Thread.currentThread().getName(); } public static void main(String[] args) throws Exception{ HelloWorldCommand command = new HelloWorldCommand("semaphore"); String result = command.execute(); System.out.println(result); System.out.println("MainThread:" + Thread.currentThread().getName()); } } /** 運行結果 HystrixThread:main MainThread:main */
- fallback降級邏輯命令嵌套
用場景:用於fallback邏輯涉及網絡訪問的狀況,如緩存訪問。
public class CommandWithFallbackViaNetwork extends HystrixCommand<String> { private final int id; protected CommandWithFallbackViaNetwork(int id) { super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("RemoteServiceX")) .andCommandKey(HystrixCommandKey.Factory.asKey("GetValueCommand"))); this.id = id; } @Override protected String run() { // RemoteService.getValue(id); throw new RuntimeException("force failure for example"); } @Override protected String getFallback() { return new FallbackViaNetwork(id).execute(); } private static class FallbackViaNetwork extends HystrixCommand<String> { private final int id; public FallbackViaNetwork(int id) { super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("RemoteServiceX")) .andCommandKey(HystrixCommandKey.Factory.asKey("GetValueFallbackCommand")) // 使用不一樣的線程池作隔離,防止上層線程池跑滿,影響降級邏輯. .andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("RemoteServiceXFallback"))); this.id = id; } @Override protected String run() { MemCacheClient.getValue(id); } @Override protected String getFallback() { return null; } } }
NOTE:依賴調用和降級調用使用不一樣的線程池作隔離,防止上層線程池跑滿,影響二級降級邏輯調用.
- 顯示調用fallback邏輯,用於特殊業務處理
public class CommandFacadeWithPrimarySecondary extends HystrixCommand<String> { private final static DynamicBooleanProperty usePrimary = DynamicPropertyFactory.getInstance().getBooleanProperty("primarySecondary.usePrimary", true); private final int id; public CommandFacadeWithPrimarySecondary(int id) { super(Setter .withGroupKey(HystrixCommandGroupKey.Factory.asKey("SystemX")) .andCommandKey(HystrixCommandKey.Factory.asKey("PrimarySecondaryCommand")) .andCommandPropertiesDefaults( HystrixCommandProperties.Setter() .withExecutionIsolationStrategy(ExecutionIsolationStrategy.SEMAPHORE))); this.id = id; } @Override protected String run() { if (usePrimary.get()) { return new PrimaryCommand(id).execute(); } else { return new SecondaryCommand(id).execute(); } } @Override protected String getFallback() { return "static-fallback-" + id; } @Override protected String getCacheKey() { return String.valueOf(id); } private static class PrimaryCommand extends HystrixCommand<String> { private final int id; private PrimaryCommand(int id) { super(Setter .withGroupKey(HystrixCommandGroupKey.Factory.asKey("SystemX")) .andCommandKey(HystrixCommandKey.Factory.asKey("PrimaryCommand")) .andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("PrimaryCommand")) .andCommandPropertiesDefaults( // we default to a 600ms timeout for primary HystrixCommandProperties.Setter().withExecutionTimeoutInMilliseconds(600))); this.id = id; } @Override protected String run() { // perform expensive 'primary' service call return "responseFromPrimary-" + id; } } private static class SecondaryCommand extends HystrixCommand<String> { private final int id; private SecondaryCommand(int id) { super(Setter .withGroupKey(HystrixCommandGroupKey.Factory.asKey("SystemX")) .andCommandKey(HystrixCommandKey.Factory.asKey("SecondaryCommand")) .andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("SecondaryCommand")) .andCommandPropertiesDefaults( // we default to a 100ms timeout for secondary HystrixCommandProperties.Setter().withExecutionTimeoutInMilliseconds(100))); this.id = id; } @Override protected String run() { // perform fast 'secondary' service call return "responseFromSecondary-" + id; } } public static class UnitTest { @Test public void testPrimary() { HystrixRequestContext context = HystrixRequestContext.initializeContext(); try { ConfigurationManager.getConfigInstance().setProperty("primarySecondary.usePrimary", true); assertEquals("responseFromPrimary-20", new CommandFacadeWithPrimarySecondary(20).execute()); } finally { context.shutdown(); ConfigurationManager.getConfigInstance().clear(); } } @Test public void testSecondary() { HystrixRequestContext context = HystrixRequestContext.initializeContext(); try { ConfigurationManager.getConfigInstance().setProperty("primarySecondary.usePrimary", false); assertEquals("responseFromSecondary-20", new CommandFacadeWithPrimarySecondary(20).execute()); } finally { context.shutdown(); ConfigurationManager.getConfigInstance().clear(); } } } }
NOTE:顯示調用降級適用於特殊需求的場景,fallback用於業務處理,fallback再也不承擔降級職責,建議慎重使用,會形成監控統計換亂等問題.
- 命令調用合併:HystrixCollapser
命令調用合併容許多個請求合併到一個線程/信號下批量執行。
執行流程圖以下:
public class CommandCollapserGetValueForKey extends HystrixCollapser<List<String>, String, Integer> { private final Integer key; public CommandCollapserGetValueForKey(Integer key) { this.key = key; } @Override public Integer getRequestArgument() { return key; } @Override protected HystrixCommand<List<String>> createCommand(final Collection<CollapsedRequest<String, Integer>> requests) { //建立返回command對象 return new BatchCommand(requests); } @Override protected void mapResponseToRequests(List<String> batchResponse, Collection<CollapsedRequest<String, Integer>> requests) { int count = 0; for (CollapsedRequest<String, Integer> request : requests) { //手動匹配請求和響應 request.setResponse(batchResponse.get(count++)); } } private static final class BatchCommand extends HystrixCommand<List<String>> { private final Collection<CollapsedRequest<String, Integer>> requests; private BatchCommand(Collection<CollapsedRequest<String, Integer>> requests) { super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ExampleGroup")) .andCommandKey(HystrixCommandKey.Factory.asKey("GetValueForKey"))); this.requests = requests; } @Override protected List<String> run() { ArrayList<String> response = new ArrayList<String>(); for (CollapsedRequest<String, Integer> request : requests) { response.add("ValueForKey: " + request.getArgument()); } return response; } } public static class UnitTest { HystrixRequestContext context = HystrixRequestContext.initializeContext(); try { Future<String> f1 = new CommandCollapserGetValueForKey(1).queue(); Future<String> f2 = new CommandCollapserGetValueForKey(2).queue(); Future<String> f3 = new CommandCollapserGetValueForKey(3).queue(); Future<String> f4 = new CommandCollapserGetValueForKey(4).queue(); assertEquals("ValueForKey: 1", f1.get()); assertEquals("ValueForKey: 2", f2.get()); assertEquals("ValueForKey: 3", f3.get()); assertEquals("ValueForKey: 4", f4.get()); assertEquals(1, HystrixRequestLog.getCurrentRequest().getExecutedCommands().size()); HystrixCommand<?> command = HystrixRequestLog.getCurrentRequest().getExecutedCommands().toArray(new HystrixCommand<?>[1])[0]; assertEquals("GetValueForKey", command.getCommandKey().name()); assertTrue(command.getExecutionEvents().contains(HystrixEventType.COLLAPSED)); assertTrue(command.getExecutionEvents().contains(HystrixEventType.SUCCESS)); } finally { context.shutdown(); } } }
NOTE:使用場景:HystrixCollapser用於對多個相同業務的請求合併到一個線程甚至能夠合併到一個鏈接中執行,下降線程交互次和IO數,但必須保證他們屬於同一依賴.
以爲本文對你有幫助?請分享給更多人
關注「編程無界」,提高裝逼技能
![]()