HystrixThreadPool

  定义了hystrix线程池接口

  获取ExecutorService对象,即jdk定义的线程池。
public ExecutorService getExecutor();
  获取rxjava定义的Scheduler对象。
public Scheduler getScheduler();
  获取rxjava定义的Scheduler对象。
public Scheduler getScheduler(Func0<Boolean> shouldInterruptThread);
  记录一次线程池执行
public void markThreadExecution();
  记录一次线程池执行成功
public void markThreadCompletion();
  记录一次线程池执行拒绝
public void markThreadRejection();
  队列是否有空闲
public boolean isQueueSpaceAvailable();

  通过工厂模式创建hystrix线程池,并设置了缓存,每一个threadkey对应一个HystrixThreadPoolDefault线程池。

static class Factory {
final static ConcurrentHashMap<String, HystrixThreadPool> threadPools = new ConcurrentHashMap<String, HystrixThreadPool>();
static HystrixThreadPool getInstance(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter propertiesBuilder) {
String key = threadPoolKey.name();
HystrixThreadPool previouslyCached = threadPools.get(key);
if (previouslyCached != null) {
return previouslyCached;
}
synchronized (HystrixThreadPool.class) {
if (!threadPools.containsKey(key)) {
threadPools.put(key, new HystrixThreadPoolDefault(threadPoolKey, propertiesBuilder));
}
}
return threadPools.get(key);
}
}

HystrixThreadPoolDefault

  线程池默认实现。

  通过HystrixThreadPoolMetrics实现metric记录。

        @Override
public void markThreadExecution() {
metrics.markThreadExecution();
}
@Override
public void markThreadCompletion() {
metrics.markThreadCompletion();
}
@Override
public void markThreadRejection() {
metrics.markThreadRejection();
}

  内部通过jdk的ThreadPoolExecutor实现线程池功能。

     @Override
public boolean isQueueSpaceAvailable() {
if (queueSize <= 0) {
return true;
} else {
return threadPool.getQueue().size() < properties.queueSizeRejectionThreshold().get();
}
}
@Override
public ThreadPoolExecutor getExecutor() {
return threadPool;
}

  通过HystrixContextScheduler实现rxjava定义的Scheduler。

    @Override
public Scheduler getScheduler() {
//by default, interrupt underlying threads on timeout
return getScheduler(new Func0<Boolean>() {
@Override
public Boolean call() {
return true;
}
});
}
@Override
public Scheduler getScheduler(Func0<Boolean> shouldInterruptThread) {
touchConfig();
return new HystrixContextScheduler(HystrixPlugins.getInstance().getConcurrencyStrategy(), this, shouldInterruptThread);
}

  通过HystrixConcurrencyStrategy创建线程池,通过HystrixThreadPoolMetrics的单例方法获得一个HystrixThreadPoolMetrics。

public HystrixThreadPoolDefault(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter propertiesDefaults) {
this.properties = HystrixPropertiesFactory.getThreadPoolProperties(threadPoolKey, propertiesDefaults);
HystrixConcurrencyStrategy concurrencyStrategy = HystrixPlugins.getInstance().getConcurrencyStrategy();
this.metrics = HystrixThreadPoolMetrics.getInstance(threadPoolKey,
concurrencyStrategy.getThreadPool(threadPoolKey, properties),
properties);
this.threadPool = this.metrics.getThreadPool();
HystrixMetricsPublisherFactory.createOrRetrievePublisherForThreadPool(threadPoolKey, this.metrics, this.properties);
}
05-11 22:19