HystrixThreadPool
定义了hystrix线程池接口
获取ExecutorService对象,即jdk定义的线程池。
public ExecutorService getExecutor();
获取rxjava定义的Scheduler对象。
public Scheduler getScheduler();
获取rxjava定义的Scheduler对象。
public Scheduler getScheduler(Func0<Boolean> shouldInterruptThread);
记录一次线程池执行
public void markThreadExecution();
记录一次线程池执行成功
public void markThreadCompletion();
记录一次线程池执行拒绝
public void markThreadRejection();
队列是否有空闲
public boolean isQueueSpaceAvailable();
通过工厂模式创建hystrix线程池,并设置了缓存,每一个threadkey对应一个HystrixThreadPoolDefault线程池。
static class Factory {
final static ConcurrentHashMap<String, HystrixThreadPool> threadPools = new ConcurrentHashMap<String, HystrixThreadPool>();
static HystrixThreadPool getInstance(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter propertiesBuilder) {
String key = threadPoolKey.name();
HystrixThreadPool previouslyCached = threadPools.get(key);
if (previouslyCached != null) {
return previouslyCached;
}
synchronized (HystrixThreadPool.class) {
if (!threadPools.containsKey(key)) {
threadPools.put(key, new HystrixThreadPoolDefault(threadPoolKey, propertiesBuilder));
}
}
return threadPools.get(key);
}
}
HystrixThreadPoolDefault
线程池默认实现。
通过HystrixThreadPoolMetrics实现metric记录。
@Override
public void markThreadExecution() {
metrics.markThreadExecution();
}
@Override
public void markThreadCompletion() {
metrics.markThreadCompletion();
}
@Override
public void markThreadRejection() {
metrics.markThreadRejection();
}
内部通过jdk的ThreadPoolExecutor实现线程池功能。
@Override
public boolean isQueueSpaceAvailable() {
if (queueSize <= 0) {
return true;
} else {
return threadPool.getQueue().size() < properties.queueSizeRejectionThreshold().get();
}
}
@Override
public ThreadPoolExecutor getExecutor() {
return threadPool;
}
通过HystrixContextScheduler实现rxjava定义的Scheduler。
@Override
public Scheduler getScheduler() {
//by default, interrupt underlying threads on timeout
return getScheduler(new Func0<Boolean>() {
@Override
public Boolean call() {
return true;
}
});
}
@Override
public Scheduler getScheduler(Func0<Boolean> shouldInterruptThread) {
touchConfig();
return new HystrixContextScheduler(HystrixPlugins.getInstance().getConcurrencyStrategy(), this, shouldInterruptThread);
}
通过HystrixConcurrencyStrategy创建线程池,通过HystrixThreadPoolMetrics的单例方法获得一个HystrixThreadPoolMetrics。
public HystrixThreadPoolDefault(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter propertiesDefaults) {
this.properties = HystrixPropertiesFactory.getThreadPoolProperties(threadPoolKey, propertiesDefaults);
HystrixConcurrencyStrategy concurrencyStrategy = HystrixPlugins.getInstance().getConcurrencyStrategy();
this.metrics = HystrixThreadPoolMetrics.getInstance(threadPoolKey,
concurrencyStrategy.getThreadPool(threadPoolKey, properties),
properties);
this.threadPool = this.metrics.getThreadPool();
HystrixMetricsPublisherFactory.createOrRetrievePublisherForThreadPool(threadPoolKey, this.metrics, this.properties);
}