Hystrix組件提供了兩種隔離的解決方案:線程池隔離和信號量隔離。兩種隔離方式都是限制對共享資源的併發訪問量,線程在就緒狀態、運行狀態、阻塞狀態、終止狀態間轉變時須要由操做系統調度,佔用很大的性能消耗;而信號量是在訪問共享資源時,進行tryAcquire,tryAcquire成功才容許訪問共享資源。
線程池隔離
不一樣的業務線之間選擇用線程池隔離,下降互相影響的機率。設置隔離策略爲線程池隔離:
.withExecutionIsolationStrategy(HystrixCommandProperties.ExecutionIsolationStrategy.THREAD));在Hystrix內部,是根據
properties.executionIsolationStrategy().get()這個字段判斷隔離級別。如在
getRunObservableDecoratedForMetricsAndErrorHandling這個方法中會先判斷是否是線程池隔離,若是是就獲取線程池,若是不是則進行信號量隔離的操做。
若是是線程池隔離,還須要設置線程池的相關參數如:線程池名字andThreadPoolKey , coreSize(核心線程池大小) , KeepAliveTimeMinutes(線程存存活時間),MaxQueueSize(最大隊列長度),QueueSizeRejectionThreshold(拒絕執行的閥值)等等。
.
andThreadPoolPropertiesDefaults(HystrixThreadPoolProperties.Setter().withCoreSize(resourcesManager.getThreadPoolProperties(platformProtocol.getAppId()).getCoreSize())
.withKeepAliveTimeMinutes(resourcesManager.getThreadPoolProperties(platformProtocol.getAppId()).getKeepAliveSeconds())
.withMaxQueueSize(resourcesManager.getThreadPoolProperties(platformProtocol.getAppId()).getMaxQueueSize())
.withQueueSizeRejectionThreshold(resourcesManager.getThreadPoolProperties(platformProtocol.getAppId()).getQueueSizeRejectionThreshold()))
threadPoolKey 也是線程池的名字的前綴,默認前綴是 hystrix 。在Hystrix中,核心線程數和最大線程數是一致的,減小線程臨時建立和銷燬帶來的性能開銷。線程池的默認參數都在HystrixThreadPoolProperties中,重點講解一下參數queueSizeRejectionThreshold 和maxQueueSize 。queueSizeRejectionThreshold默認值是5,容許在隊列中的等待的任務數量。maxQueueSize默認值是-1,隊列大小。若是是Fast Fail 應用,建議使用默認值。線程池飽滿後直接拒絕後續的任務,再也不進行等待。代碼以下HystrixThreadPool類中:
@Override
public boolean isQueueSpaceAvailable() {
if (queueSize <= 0) {
// we don't have a queue so we won't look for space but instead
// let the thread-pool reject or not
return true;
} else {
return threadPool.getQueue().size() < properties.queueSizeRejectionThreshold().get();
}
}
線程池一旦建立完成,相關參數就不會更改,存放在靜態的ConcurrentHashMap中,key是對應的commandKey 。而queueSizeRejectionThreshold是每一個命令都是設置的。
線程池的相關參數都保存在HystrixThreadPool這個類文件中,線程池的建立方法getThreadPool則在HystrixConcurrencyStrategy類文件中。從getThreadPool方法能夠看出線程池的名字就是hystrix-threadPoolKey-threadNumber.
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r, "hystrix-" + threadPoolKey.name() + "-" + threadNumber.incrementAndGet());
thread.setDaemon(true);
return thread;
}
在HystrixThreadPool實現類的構造方法中,併發HystrixConcurrencyStrategy實例是經過HystrixPlugins獲取的,因此能夠經過HystrixPlugins設置自定義插件。具體的HystrixPlugins如何使用,會在後面章節中講解。
線程池的建立
前面說了,在Hystrix內部大部分類都是單實例,一樣ThreadPool也不例外,也是單實例。而且相同commandKey的依賴還必須是使用同一個線程池。這就須要把ThreadPool保存在一個靜態的map中,key是commandKey,同時要保證線程安全,Hytstrix使用了ConcurrentHashMap。關於爲何不適用HashTable保證線程安全問題的疑問請自行Google。線程池的建立在HystrixThreadPool這個類文件中的內部類Factory中的getInstance方法。
/* package */final static ConcurrentHashMap<String, HystrixThreadPool> threadPools = new ConcurrentHashMap<String, HystrixThreadPool>();
String key = threadPoolKey.name();
// this should find it for all but the first time
HystrixThreadPool previouslyCached = threadPools.get(key);
if (previouslyCached != null) {
return previouslyCached;
}
// if we get here this is the first time so we need to initialize
synchronized (HystrixThreadPool.class) {
if (!threadPools.containsKey(key)) {
threadPools.put(key, new HystrixThreadPoolDefault(threadPoolKey, propertiesBuilder));
}
}
return threadPools.get(key);
線程池的使用
HystrixCommand類的execute()內部調用了queue() ,queue又調用了父類AbstractCommand的toObservable方法,toObservable方法處理了是否可緩存問題後,交給了getRunObservableDecoratedForMetricsAndErrorHandling方法,這個方法設置了一系列的executionHook以後,交給了getExecutionObservableWithLifecycle,這個方法經過getExecutionObservable()獲取了執行器。getExecutionObservable()是個抽象方法,具體實現放在了子類:HystrixCommand和HystrixObservableCommand類中。下面是HystrixCommand類中的getExecutionObservable方法實現:
final protected Observable<R> getExecutionObservable() {
return Observable.create(new OnSubscribe<R>() {
@Override
public void call(Subscriber<? super R> s) {
try {
s.onNext(run());
s.onCompleted();
} catch (Throwable e) {
s.onError(e);
}
}
});
}
在這個Call方法中執行了具體的業務邏輯run() ;
轉自