Spring cloud gateway是替代zuul的网关产品,基于Spring 5、Spring boot 2.0以上、Reactor, 提供任意的路由匹配和断言、过滤功能。上一篇文章谈了一下Gateway网关使用不规范,同事加班泪两行~,这篇文章将会侧重于其他的几个需要注意的地方。
网关实现
这里介绍编码方式实现
HystrixObservableCommand.Setter getSetter() { HystrixCommandGroupKey groupKey = HystrixCommandGroupKey.Factory.asKey("group-accept"); HystrixObservableCommand.Setter setter = HystrixObservableCommand.Setter.withGroupKey(groupKey); HystrixCommandKey commandKey = HystrixCommandKey.Factory.asKey("command-accept"); setter.andCommandKey(commandKey); HystrixCommandProperties.Setter proertiesSetter = HystrixCommandProperties.Setter(); proertiesSetter /* * * 线程策略配置 */ //设置线程模式 缺省 1000ms .withExecutionIsolationStrategy(HystrixCommandProperties.ExecutionIsolationStrategy.THREAD) //执行是否启用超时时间 缺省 true .withExecutionTimeoutEnabled(true) //使用线程隔离时,是否对命令执行超时的线程调用中断 缺省false .withExecutionIsolationThreadInterruptOnFutureCancel(false) //执行超时的时候是否要它中断 缺省 true .withExecutionIsolationThreadInterruptOnTimeout(true) //执行的超时时间 缺省 1000ms .withExecutionTimeoutInMilliseconds(2000) /* * * 熔断策略 */ //是否开启溶断 缺省 true .withCircuitBreakerEnabled(true) // 是否允许熔断器忽略错误,默认false, 不开启 ; // true,断路器强制进入“关闭”状态,它会接收所有请求。 // 如果forceOpen属性为true,该属性不生效 .withCircuitBreakerForceClosed(false) // 是否强制开启熔断器阻断所有请求, 默认为false // 为true时,所有请求都将被拒绝,直接到fallback. // 如果该属性设置为true,断路器将强制进入“打开”状态, // 它会拒绝所有请求。该属性优于forceClosed属性 .withCircuitBreakerForceOpen(false) // 用来设置当断路器打开之后的休眠时间窗。 // 休眠时间窗结束之后,会将断路器设置为“半开”状态,尝试熔断的请求命令, // 如果依然请求错误就将断路器继续设置为“打开”状态,如果成功,就设置为“关闭”状态 // 熔断器默认工作时间,默认:5000豪秒. // 熔断器中断请求10秒后会进入半打开状态,放部分流量过去重试. .withCircuitBreakerSleepWindowInMilliseconds(5000) // 熔断器在整个统计时间内是否开启的阀值. // 在metricsRollingStatisticalWindowInMilliseconds(默认10s)内默认至少请求10次, // 熔断器才发挥起作用,9次熔断器都不起作用。 .withCircuitBreakerRequestVolumeThreshold(100) // 该属性用来设置断路器打开的错误百分比条件。默认值为50. // 表示在滚动时间窗中,在请求值超过requestVolumeThreshold阈值的前提下, // 如果错误请求数百分比超过50,就把断路器设置为“打开”状态,否则就设置为“关闭”状态 .withCircuitBreakerErrorThresholdPercentage(50); setter.andCommandPropertiesDefaults(proertiesSetter); return setter; } @Bean public RouteLocator customRouteLocator(RouteLocatorBuilder builder) { RouteLocatorBuilder.Builder routes = builder.routes(); RouteLocatorBuilder.Builder serviceProvider = routes .route("accept", r -> r.method(HttpMethod.GET) .and() .path("/gateway-accept/**") .and() .header(HttpHeaders.CONTENT_TYPE, "application/json;charset=UTF-8") .filters(f -> { f.rewritePath("/gateway-accept/(?<path>.*)", "/${path}"); f.requestRateLimiter( config -> config.setKeyResolver(new GenericAccessResolver()) .setRateLimiter(redisRateLimiter())); f.hystrix(config -> config.setName("accept") .setFallbackUri("forward:/gateway-fallback") .setSetter(getSetter())); return f; }) .uri("http://localhost:8888") ); return serviceProvider.build(); }
在上面的代码中,主要做了3件事情:限流、熔断策略及降级方法配置
限流
- 配置redis
spring:
redis:
database: 0
host: 127.0.0.1
port: 6379
password:
timeout: 1500
lettuce:
pool:
max-active: 300 #连接池最大连接数(使用负值表示没有限制)
max-idle: 10 #连接池中的最大空闲连接
min-idle: 5 #连接池中的最小空闲连接
max-wait: -1 #连接池最大阻塞等待时间(使用负值表示没有限制)
- 自定义解析
/** * @description: 按照访问地址进行限流(也可以安装其他条件进行限流),具体可以看exchange.getRequest()的方法和属性 **/ public class GenericAccessResolver implements KeyResolver { @Override public Mono<String> resolve(ServerWebExchange exchange) { return Mono.just(exchange.getRequest().getPath().value()); } }
- 自定义限流配置
RedisRateLimiter redisRateLimiter() { //1000,1500对应replenishRate、burstCapacity return new RedisRateLimiter(1000, 1500); }
- 网关使用自定义限流器(网关使用代码实现)
@Bean public RouteLocator customRouteLocator(RouteLocatorBuilder builder) { RouteLocatorBuilder.Builder routes = builder.routes(); RouteLocatorBuilder.Builder serviceProvider = routes .route("accept", r -> r.method(HttpMethod.GET) .and() .path("/gateway-accept/**") .and() .header(HttpHeaders.CONTENT_TYPE, "application/json;charset=UTF-8") //.and() //.readBody(String.class, readBody -> true) .filters(f -> { f.rewritePath("/gateway-accept/(?<path>.*)", "/${path}"); f.requestRateLimiter(config -> config.setKeyResolver(new GenericAccessResolver()).setRateLimiter(redisRateLimiter())); return f; }) .uri("http://localhost:8888") ); return serviceProvider.build(); }
测试
jmeter配置
结果
其他
如果有多个路由,使用不同的限流策略,可以自定义KeyResolver和RedisRateLimiter, 在路由定义时加入
//基于ip限流 public class OtherAccessResolver implements KeyResolver { @Override public Mono<String> resolve(ServerWebExchange exchange) { return Mono.just(exchange.getRequest().getRemoteAddress().getHostName()); } } RedisRateLimiter otherRedisRateLimiter() { //1000,1500对应replenishRate、burstCapacity return new RedisRateLimiter(100, 500); } @Bean public RouteLocator customRouteLocator(RouteLocatorBuilder builder) { RouteLocatorBuilder.Builder routes = builder.routes(); RouteLocatorBuilder.Builder serviceProvider = routes .route("accept", r -> r.method(HttpMethod.GET) .and() .path("/gateway-accept/**") .and() .header(HttpHeaders.CONTENT_TYPE, "application/json;charset=UTF-8") .filters(f -> { f.rewritePath("/gateway-accept/(?<path>.*)", "/${path}"); f.requestRateLimiter( config -> config.setKeyResolver(new GenericAccessResolver()) .setRateLimiter(redisRateLimiter())); f.hystrix(config -> config.setName("accept") .setFallbackUri("forward:/gateway-fallback") .setSetter(getSetter())); return f; }) .uri("http://localhost:8888")) .route("sign", r -> r.method(HttpMethod.POST) .and() .path("/gateway-sign/**") .and() .header(HttpHeaders.CONTENT_TYPE, "application/json;charset=UTF-8") .filters(f -> { f.rewritePath("/gateway-sign/(?<path>.*)", "/${path}"); f.requestRateLimiter( config -> config.setKeyResolver(new OtherAccessResolver()) .setRateLimiter(otherRedisRateLimiter())); f.hystrix(config -> config.setName("sign") .setFallbackUri("forward:/gateway-fallback") .setSetter(getSetter())); return f; }) .uri("http://localhost:7777") ); return serviceProvider.build(); }
熔断策略
熔断策略主要是线程配置和熔断配置,上面已经说明很清楚了。在上篇文章中,为了解决网关调用后台服务Connection prematurely closed BEFORE response的问题,要设置后台服务线程的空闲时间和网关线程池线程的空闲时间,并让网关线程池线程的空闲时间小于后台服务的空闲时间
配置方法
spring: cloud: gateway: httpclient: pool: max-connections: 500 max-idle-time: 10000
编码实现
翻阅Spring Cloud Gateway英文资料,知道路由提供一个metadata方法,可以设置路由的元数据(https://docs.spring.io/spring-cloud-gateway/docs/2.2.6.RELEASE/reference/html/#route-metadata-configuration),这些元数据在RouteMetadataUtils中定义:
package org.springframework.cloud.gateway.support; public final class RouteMetadataUtils { public static final String RESPONSE_TIMEOUT_ATTR = "response-timeout"; public static final String CONNECT_TIMEOUT_ATTR = "connect-timeout"; private RouteMetadataUtils() { throw new AssertionError("Must not instantiate utility class."); } }
其中没有我要的线程数量(max-connection)和空闲时间(max-idle-time)的设置,没有关系,自己加上去:
@Bean public RouteLocator customRouteLocator(RouteLocatorBuilder builder) { RouteLocatorBuilder.Builder routes = builder.routes(); RouteLocatorBuilder.Builder serviceProvider = routes .route("accept", r -> r.method(HttpMethod.GET) .and() .path("/gateway-accept/**") .and() .header(HttpHeaders.CONTENT_TYPE, "application/json;charset=UTF-8") .filters(f -> { f.rewritePath("/gateway-accept/(?<path>.*)", "/${path}"); f.requestRateLimiter( config -> config.setKeyResolver(new GenericAccessResolver()) .setRateLimiter(redisRateLimiter())); f.hystrix(config -> config.setName("accept") .setFallbackUri("forward:/gateway-fallback") .setSetter(getSetter())); return f; }) .uri("http://localhost:8888") .metadata("max-idle-time", 10000) //网关调用后台线程空闲时间设置 .metadata("max-connections", 200) //网关调用后台服务线程数量设置 ); return serviceProvider.build(); }
测试果然和yml配置一样有效果。
降级方法
降级方法本身没有什么特别,有一个问题需要注意,调用降级方法也是使用线程池的,缺省在HystrixThreadPoolProperties中定义:
public abstract class HystrixThreadPoolProperties { /* defaults */ static int default_coreSize = 10; // core size of thread pool static int default_maximumSize = 10; // maximum size of thread pool static int default_keepAliveTimeMinutes = 1; // minutes to keep a thread alive static int default_maxQueueSize = -1; // size of queue (this can't be dynamically changed so we use 'queueSizeRejectionThreshold' to artificially limit and reject) // -1 turns it off and makes us use SynchronousQueue
错误
如果上面的限流设置比较大,比如1000,最大突发2000,网关调用后台服务发生熔断降级, 熔断后降级的方法调用太频繁,10个线程不够用,会导致以下500错误:
2021-02-01 14:29:45.076 ERROR 64868 --- [ioEventLoop-5-1] a.w.r.e.AbstractErrorWebExceptionHandler : [a0ed6911-18982] 500 Server Error for HTTP GET "/gateway-accept/test"
com.netflix.hystrix.exception.HystrixRuntimeException: command-accept fallback execution rejected.
at com.netflix.hystrix.AbstractCommand.handleFallbackRejectionByEmittingError(AbstractCommand.java:1043) ~[hystrix-core-1.5.18.jar:1.5.18]
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Error has been observed at the following site(s):
|_ checkpoint ⇢ org.springframework.cloud.gateway.filter.WeightCalculatorWebFilter [DefaultWebFilterChain]
|_ checkpoint ⇢ HTTP GET "/gateway-accept/test" [ExceptionHandlingWebHandler]
com.netflix.hystrix.exception.HystrixRuntimeException: command-accept fallback execution rejected.
at com.netflix.hystrix.AbstractCommand.handleFallbackRejectionByEmittingError(AbstractCommand.java:1043) ~[hystrix-core-1.5.18.jar:1.5.18]
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Error has been observed at the following site(s):
|_ checkpoint ⇢ org.springframework.cloud.gateway.filter.WeightCalculatorWebFilter [DefaultWebFilterChain]
|_ checkpoint ⇢ HTTP GET "/gateway-accept/test" [ExceptionHandlingWebHandler]
配置方法
所以要在yml中设置合适的调用降级方法的线程池, 合理的配置能够杜绝网关500错误的发生。
hystrix:
threadpool:
group-accept: #代码里面设置的HystrixCommandGroupKey.Factory.asKey("group-accept")
coreSize: 50 #并发执行的最大线程数,默认10
maxQueueSize: 1500 #BlockingQueue的最大队列数
#即使maxQueueSize没有达到,达到queueSizeRejectionThreshold该值后,请求也会被拒绝
queueSizeRejectionThreshold: 1400
网关异常截获
上面的异常后,没有捕获异常直接返回前端500错误,一般情况下需要返回一个统一接口,比如:
@Data @ToString @EqualsAndHashCode @Accessors(chain = true) public class Result<T> implements Serializable { private Integer code; private String message; private T data; private String sign; public static final String SUCCESS = "成功"; public static final String FAILURE = "失败"; public Result(int code, String message) { this.code = code; this.message = message; } public Result(int code, String message, T data) { this.code = code; this.message = message; this.data = data; } public Result(int code, String message, T data, String sign) { this.code = code; this.message = message; this.data = data; this.sign = sign; } public static Result<Object> success() { return new Result<Object>(200, SUCCESS); } public static Result<Object> success(Object data) { return new Result<Object>(200, SUCCESS, data); } public static Result<Object> success(Object data, String sign) { return new Result<Object>(200, SUCCESS, data, sign); } public static Result<Object> failure() { return new Result<Object>(400, FAILURE); } public static Result<Object> failure(Object data) { return new Result<Object>(400, FAILURE, data); } public static Result<Object> failure(Object data, String sign) { return new Result<Object>(400, FAILURE, data, sign); } }
创建GlobalExceptionConfiguration 实现ErrorWebExceptionHandler(这一段是来者网友提供的)
@Slf4j @Order(-1) @Component @RequiredArgsConstructor public class GlobalExceptionConfiguration implements ErrorWebExceptionHandler { private final ObjectMapper objectMapper; @Override public Mono<Void> handle(ServerWebExchange exchange, Throwable ex) { ServerHttpResponse response = exchange.getResponse(); if (response.isCommitted()) { return Mono.error(ex); } response.getHeaders().setContentType(MediaType.APPLICATION_JSON_UTF8); if (ex instanceof ResponseStatusException) { response.setStatusCode(((ResponseStatusException) ex).getStatus()); } return response .writeWith(Mono.fromSupplier(() -> { DataBufferFactory bufferFactory = response.bufferFactory(); try { return bufferFactory.wrap(objectMapper.writeValueAsBytes(Result.failure(ex.getMessage()))); } catch (JsonProcessingException e) { log.warn("Error writing response", ex); return bufferFactory.wrap(new byte[0]); } })); } }
这样,就会把网关异常统一包装在接口中返回:如:
后台日志已经没有之前的错误日志了。
编码实现,没找到
由于Spring Cloud Gateway 中的 Hystrix采用的是HystrixObservableCommand.Setter, 没有采用 HystrixCommand.Setter, 在 HystrixCommand.Setter中是可以编码实现线程池配置的, 但是在HystrixObservableCommand.Setter没有提供:
final public static class Setter { protected final HystrixCommandGroupKey groupKey; protected HystrixCommandKey commandKey; protected HystrixThreadPoolKey threadPoolKey; //有属性但是没有set方法 protected HystrixCommandProperties.Setter commandPropertiesDefaults; protected HystrixThreadPoolProperties.Setter threadPoolPropertiesDefaults; //有属性没有set方法 protected Setter(HystrixCommandGroupKey groupKey) { this.groupKey = groupKey; // default to using SEMAPHORE for ObservableCommand commandPropertiesDefaults = setDefaults(HystrixCommandProperties.Setter()); } public static Setter withGroupKey(HystrixCommandGroupKey groupKey) { return new Setter(groupKey); } public Setter andCommandKey(HystrixCommandKey commandKey) { this.commandKey = commandKey; return this; } public Setter andCommandPropertiesDefaults(HystrixCommandProperties.Setter commandPropertiesDefaults) { this.commandPropertiesDefaults = setDefaults(commandPropertiesDefaults); return this; } private HystrixCommandProperties.Setter setDefaults(HystrixCommandProperties.Setter commandPropertiesDefaults) { if (commandPropertiesDefaults.getExecutionIsolationStrategy() == null) { // default to using SEMAPHORE for ObservableCommand if the user didn't set it commandPropertiesDefaults.withExecutionIsolationStrategy(ExecutionIsolationStrategy.SEMAPHORE); } return commandPropertiesDefaults; } }
由于本人水平有限,没有找到Setter中设置HystrixThreadPoolKey和HystrixThreadPoolProperties.Setter的方法,所以只能在yml中配置。有知道的同学告诉我一声,不胜感激。
总结
所以在Spring Cloud Gateway网关的配置中,需要综合考虑限流大小、网关调用后台连接池设置大小、后台服务的连接池以及空闲时间,包括网关调用降级方法的线程池配置,都需要在压测中调整到一个合理的配置,才能发挥最大的功效。
本人水平有限,跟深入的研究还在继续,如果文章有表达错误或者不周,请大家指正,谢谢!