支持实现
<!--SpringCloud依赖-->
org.springframework.cloud:spring-cloud-starter-netflix-hystrix
<!-- SpringCloud依赖(Resilience4j的方式) -->
 org.springframework.cloud:spring-cloud-starter-circuitbreaker-resilience4j

<!-- SpringCloud依赖(Reactive Resilience4J的方式) --->
org.springframework.cloud:spring-cloud-starter-circuitbreaker-reator-resilience4j

官方介绍:

多种支持:

环境要求:

  • 提供的组件(版本均为1.1.0

    • Circuit Breaker

      <!-- 需要添加的依赖 -->
      <dependency>
      	<groupId>io.github.resilience4j</groupId>
      	<artifactId>resilience4j-circuitbreaker</artifactId>
      	<version>${resilience4j.version}</version>
      </dependency>
      

      断路器三种状态

      • CLOSED - 一切正常,不涉及短路
      • OPEN - 远程服务器已关闭,所有请求都被短路
      • HALF_OPEN - 从进入开放状态到现在已经经过了一段时间,断路器允许请求检查远程服务是否重新上线

      配置项:

CircuitBreakerConfig cfg = CircuitBreakerConfig
                .custom()
              .failureRateThreshold(50) // 错误率,这个是根据滑动窗口大小决定的,e.g. windowSize = 2,failureRate=50% 那么,当出现一个错误的时候即为失败
                .waitDurationInOpenState(Duration.ofMillis(1000))
              .permittedNumberOfCallsInHalfOpenState(2)
                .slidingWindowSize(2) // 滑动窗口大小
              .recordExceptions(RuntimeException.class) // 当出现列表中的异常类型时记录
                .build();
  • RateLimiter

    • 默认实现 - io.github.resilience4j.ratelimiter.internal.AtomicRateLimiter
      • State -io.github.resilience4j.ratelimiter.internal.AtomicRateLimiter.State
        • activeCycle - 上一次调用使用的周期号
        • activePermissions- 上次调用后的可用权限计数。如果保留某些权限,则可以为负
        • nanosToWait- 等待上一次呼叫的等待许可的纳秒数
  <!-- 需要添加的依赖 -->
  <dependency>
      <groupId>io.github.resilience4j</groupId>
      <artifactId>resilience4j-ratelimiter</artifactId>
      <version>${resilience4j.version}</version>
  </dependency>
配置项
  • Bulkhead
<!-- 需要添加的依赖 -->
<dependency>
     <groupId>io.github.resilience4j</groupId>
     <artifactId>resilience4j-bulkhead</artifactId>
     <version>${resilience4j.version}</version>
</dependency>

实现方式

  • SemaphoreBulkhead - 使用信号量
配置项:
  BulkheadConfig config = BulkheadConfig.custom()
                .maxConcurrentCalls(5)
                .maxWaitDuration(Duration.ofMillis(1))
                .build();
        BulkheadRegistry registry = BulkheadRegistry.of(config);
        Bulkhead bulkhead = registry.bulkhead("Bulkhead");
        for (int i = 0; i<10;i++){
            Supplier<String> decoratedSupplier = Bulkhead
                    .decorateSupplier(bulkhead, CircuitBreakerService::say);
            String result = Try.ofSupplier(decoratedSupplier)
                    .recover(throwable -> "bulkhead").get();
            System.out.println(result);
        }
  • FixedThreadPoolBulkhead - 使用有界队列和固定大小的线程池

配置项

ThreadPoolBulkheadConfig config = ThreadPoolBulkheadConfig.custom()
                .maxThreadPoolSize(2)
                .coreThreadPoolSize(1)
                .queueCapacity(1)
                .build();
        ThreadPoolBulkheadRegistry registry = ThreadPoolBulkheadRegistry.of(config);
        ThreadPoolBulkhead bulkhead = registry.bulkhead("ThreadPoolBulkhead");
        ThreadPoolBulkhead bulkhead2 = registry.bulkhead("ThreadPoolBulkhead2");
        ThreadPoolBulkhead bulkhead3 = registry.bulkhead("ThreadPoolBulkhead3");
        Callable<CompletionStage<String>> call =
                ThreadPoolBulkhead.decorateCallable(bulkhead, () -> CircuitBreakerService.say());

        Callable<CompletionStage<String>> call2 =
                ThreadPoolBulkhead.decorateCallable(bulkhead2, () -> CircuitBreakerService.say());

        Callable<CompletionStage<String>> call3 =
                ThreadPoolBulkhead.decorateCallable(bulkhead3, () -> CircuitBreakerService.say());


        ExecutorService es = Executors.newFixedThreadPool(2);
        es.submit(call);
        es.submit(call2);
        es.submit(call3);
  • Retry
<!-- 需要添加的依赖 -->
<dependency>
     <groupId>io.github.resilience4j</groupId>
     <artifactId>resilience4j-retry</artifactId>
     <version>${resilience4j.version}</version>
</dependency>

配置项

 RetryConfig config = RetryConfig.custom()
                .maxAttempts(2) // 最大重试次数
                .waitDuration(Duration.ofMillis(100)) // 等待时间
                .retryOnException(e -> e instanceof WebServiceException)
                .retryExceptions(IOException.class, TimeoutException.class,RuntimeException.class)
//                .ignoreExceptions(TimeoutException.class)
                .build();
        RetryRegistry registry = RetryRegistry.of(config);
        Retry retry = registry.retry("Retry");
 Function<Void , String> decorated
                = Retry.decorateFunction(retry, (s) -> CircuitBreakerService.say());
        decorated.apply(null);

  • Cache
<!-- 需要添加的依赖 -->
<dependency>
     <groupId>io.github.resilience4j</groupId>
     <artifactId>resilience4j-cache</artifactId>
     <version>${resilience4j.version}</version>
</dependency>

注意:

Spring Cloud 配置 Resilience4J

1.1. 启动器

  • org.springframework.cloud:spring-cloud-starter-circuitbreaker-resilience4j -非反应式应用
  • org.springframework.cloud:spring-cloud-starter-circuitbreaker-reactor-resilience4j -反应式应用

1.2. 禁用自动装配

spring.cloud.circuitbreaker.resilience4j.enabled=false

1.3. 默认配置

  • Resilience4JCircuitBreakerFactory
@Bean
public Customizer<Resilience4JCircuitBreakerFactory> defaultCustomizer() {
    return factory -> factory.configureDefault(id -> new Resilience4JConfigBuilder(id)
            .timeLimiterConfig(TimeLimiterConfig.custom().timeoutDuration(Duration.ofSeconds(4)).build())
            .circuitBreakerConfig(CircuitBreakerConfig.ofDefaults())
            .build());
}
  • ReactiveResilience4JCircuitBreakerFactory
@Bean
public Customizer<Resilience4JCircuitBreakerFactory> defaultCustomizer() {
    return factory -> factory.configureDefault(id -> new Resilience4JConfigBuilder(id)
            .timeLimiterConfig(TimeLimiterConfig.custom().timeoutDuration(Duration.ofSeconds(4)).build())
            .circuitBreakerConfig(CircuitBreakerConfig.ofDefaults())
            .build());
}

1.4. 特殊熔断配置

@Bean
public Customizer<Resilience4JCircuitBreakerFactory> slowCustomizer() {
    return factory -> factory.configure(builder -> builder.circuitBreakerConfig(CircuitBreakerConfig.ofDefaults())
            .timeLimiterConfig(TimeLimiterConfig.custom().timeoutDuration(Duration.ofSeconds(2)).build()), "slow");
}

@Bean
public Customizer<Resilience4JCircuitBreakerFactory> slowCustomizer() {
    return factory -> factory.addCircuitBreakerCustomizer(circuitBreaker -> circuitBreaker.getEventPublisher()
    .onError(normalFluxErrorConsumer).onSuccess(normalFluxSuccessConsumer), "normalflux");
}

// Reactive方式
@Bean
public Customizer<ReactiveResilience4JCircuitBreakerFactory> slowCusomtizer() {
    return factory -> {
        factory.configure(builder -> builder
        .timeLimiterConfig(TimeLimiterConfig.custom().timeoutDuration(Duration.ofSeconds(2)).build())
        .circuitBreakerConfig(CircuitBreakerConfig.ofDefaults()), "slow", "slowflux");
        factory.addCircuitBreakerCustomizer(circuitBreaker -> circuitBreaker.getEventPublisher()
            .onError(normalFluxErrorConsumer).onSuccess(normalFluxSuccessConsumer), "normalflux");
     };
}

1.5. 收集指标

<!-- SpringCloud依赖 -->
org.springframework.cloud:spring-cloud-starter-circuitbreaker-sentinal

环境:

<!-- SpringCloud依赖 -->
org.springframework.cloud:spring-cloud-starter-circuitbreaker-spring-retry

1.1. 默认配置

@Bean
public Customizer<SpringRetryCircuitBreakerFactory> defaultCustomizer() {
    return factory -> factory.configureDefault(id -> new SpringRetryConfigBuilder(id)
        .retryPolicy(new TimeoutRetryPolicy()).build());
}

1.2. 特殊配置

@Bean
public Customizer<SpringRetryCircuitBreakerFactory> slowCustomizer() {
    return factory -> factory.configure(builder -> builder.retryPolicy(new SimpleRetryPolicy(1)).build(), "slow");
}
@Bean
public Customizer<SpringRetryCircuitBreakerFactory> slowCustomizer() {
    return factory -> factory.addRetryTemplateCustomizers(retryTemplate -> retryTemplate.registerListener(new RetryListener() {

        @Override
        public <T, E extends Throwable> boolean open(RetryContext context, RetryCallback<T, E> callback) {
            return false;
        }
        @Override
        public <T, E extends Throwable> void close(RetryContext context, RetryCallback<T, E> callback, Throwable throwable) {

        }
        @Override
        public <T, E extends Throwable> void onError(RetryContext context, RetryCallback<T, E> callback, Throwable throwable) {

        }
    }));
}

11-16 20:22