Spring Cloud Hystrix熔斷器隔離方案

  Hystrix組件提供了兩種隔離的解決方案:線程池隔離和信號量隔離。兩種隔離方式都是限制對共享資源的併發訪問量,線程在就緒狀態、運行狀態、阻塞狀態、終止狀態間轉變時須要由操做系統調度,佔用很大的性能消耗;而信號量是在訪問共享資源時,進行tryAcquire,tryAcquire成功才容許訪問共享資源。
 
線程池隔離
     不一樣的業務線之間選擇用線程池隔離,下降互相影響的機率。設置隔離策略爲線程池隔離:
.withExecutionIsolationStrategy(HystrixCommandProperties.ExecutionIsolationStrategy.THREAD));在Hystrix內部,是根據 properties.executionIsolationStrategy().get()這個字段判斷隔離級別。如在 getRunObservableDecoratedForMetricsAndErrorHandling這個方法中會先判斷是否是線程池隔離,若是是就獲取線程池,若是不是則進行信號量隔離的操做。
     若是是線程池隔離,還須要設置線程池的相關參數如:線程池名字andThreadPoolKey , coreSize(核心線程池大小) , KeepAliveTimeMinutes(線程存存活時間),MaxQueueSize(最大隊列長度),QueueSizeRejectionThreshold(拒絕執行的閥值)等等。
. andThreadPoolPropertiesDefaults(HystrixThreadPoolProperties.Setter().withCoreSize(resourcesManager.getThreadPoolProperties(platformProtocol.getAppId()).getCoreSize())
                                    .withKeepAliveTimeMinutes(resourcesManager.getThreadPoolProperties(platformProtocol.getAppId()).getKeepAliveSeconds())
                                    .withMaxQueueSize(resourcesManager.getThreadPoolProperties(platformProtocol.getAppId()).getMaxQueueSize())
                                    .withQueueSizeRejectionThreshold(resourcesManager.getThreadPoolProperties(platformProtocol.getAppId()).getQueueSizeRejectionThreshold()))
threadPoolKey 也是線程池的名字的前綴,默認前綴是 hystrix 。在Hystrix中,核心線程數和最大線程數是一致的,減小線程臨時建立和銷燬帶來的性能開銷。線程池的默認參數都在HystrixThreadPoolProperties中,重點講解一下參數queueSizeRejectionThreshold 和maxQueueSize 。queueSizeRejectionThreshold默認值是5,容許在隊列中的等待的任務數量。maxQueueSize默認值是-1,隊列大小。若是是Fast Fail 應用,建議使用默認值。線程池飽滿後直接拒絕後續的任務,再也不進行等待。代碼以下HystrixThreadPool類中:
@Override
        public boolean isQueueSpaceAvailable() {
            if (queueSize <= 0) {
                // we don't have a queue so we won't look for space but instead
                // let the thread-pool reject or not
                return true;
            } else {
                return threadPool.getQueue().size() < properties.queueSizeRejectionThreshold().get();
            }
        }
 
線程池一旦建立完成,相關參數就不會更改,存放在靜態的ConcurrentHashMap中,key是對應的commandKey 。而queueSizeRejectionThreshold是每一個命令都是設置的。
 
 
     
     線程池的相關參數都保存在HystrixThreadPool這個類文件中,線程池的建立方法getThreadPool則在HystrixConcurrencyStrategy類文件中。從getThreadPool方法能夠看出線程池的名字就是hystrix-threadPoolKey-threadNumber.
@Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r, "hystrix-" + threadPoolKey.name() + "-" + threadNumber.incrementAndGet());
                thread.setDaemon(true);
                return thread;
            }
     
     在HystrixThreadPool實現類的構造方法中,併發HystrixConcurrencyStrategy實例是經過HystrixPlugins獲取的,因此能夠經過HystrixPlugins設置自定義插件。具體的HystrixPlugins如何使用,會在後面章節中講解。
 
線程池的建立     
     前面說了,在Hystrix內部大部分類都是單實例,一樣ThreadPool也不例外,也是單實例。而且相同commandKey的依賴還必須是使用同一個線程池。這就須要把ThreadPool保存在一個靜態的map中,key是commandKey,同時要保證線程安全,Hytstrix使用了ConcurrentHashMap。關於爲何不適用HashTable保證線程安全問題的疑問請自行Google。線程池的建立在HystrixThreadPool這個類文件中的內部類Factory中的getInstance方法。
 
/* package */final static ConcurrentHashMap<String, HystrixThreadPool> threadPools = new ConcurrentHashMap<String, HystrixThreadPool>();
     String key = threadPoolKey.name();

            // this should find it for all but the first time
            HystrixThreadPool previouslyCached = threadPools.get(key);
            if (previouslyCached != null) {
                return previouslyCached;
            }

            // if we get here this is the first time so we need to initialize
            synchronized (HystrixThreadPool.class) {
                if (!threadPools.containsKey(key)) {
                    threadPools.put(key, new HystrixThreadPoolDefault(threadPoolKey, propertiesBuilder));
                }
            }
            return threadPools.get(key);
     
線程池的使用
     HystrixCommand類的execute()內部調用了queue() ,queue又調用了父類AbstractCommand的toObservable方法,toObservable方法處理了是否可緩存問題後,交給了getRunObservableDecoratedForMetricsAndErrorHandling方法,這個方法設置了一系列的executionHook以後,交給了getExecutionObservableWithLifecycle,這個方法經過getExecutionObservable()獲取了執行器。getExecutionObservable()是個抽象方法,具體實現放在了子類:HystrixCommand和HystrixObservableCommand類中。下面是HystrixCommand類中的getExecutionObservable方法實現:
final protected Observable<R> getExecutionObservable() {
        return Observable.create(new OnSubscribe<R>() {

            @Override
            public void call(Subscriber<? super R> s) {
                try {
                    s.onNext(run());
                    s.onCompleted();
                } catch (Throwable e) {
                    s.onError(e);
                }
            }

        });
    }
在這個Call方法中執行了具體的業務邏輯run() ;
 
轉自
相關文章
相關標籤/搜索