一、描述

最近在批量上传文件时网关出现了异常,后面发现上传大文件也会出现文件超过256发生异常,异常信息如下:
org.springframework.core.io.buffer.DataBufferLimitException: Exceeded limit on max bytes to buffer : 262144 at org.springframework.core.io.buffer.LimitedDataBufferList.raiseLimitException(LimitedDataBufferList.java:101)

二、解决

1. 在配置文件中配置 max-in-memory-size: 1024MB

spring:
  codec:
    max-in-memory-size: 1024MB

结果:无效

2. 配置类中加大缓存

@Configuration
@EnableWebFlux
public class WebFluxWebConfig implements WebFluxConfigurer {
    @Override
    public void configureHttpMessageCodecs(ServerCodecConfigurer configurer) {
        configurer.defaultCodecs().maxInMemorySize(2 * 1024 * 1024);
    }
}

结果:无效

3. 对网关过滤器或拦截器RequestBodyRoutePredicateFactory的操作

原代码:

public class RequestBodyRoutePredicateFactory
        extends AbstractRoutePredicateFactory<RequestBodyRoutePredicateFactory.Config> {
    protected static final Log LOGGER = LogFactory.getLog(RequestBodyRoutePredicateFactory.class);

    private final List<HttpMessageReader<?>> messageReaders;

    public RequestBodyRoutePredicateFactory() {
        super(RequestBodyRoutePredicateFactory.Config.class);
        this.messageReaders = HandlerStrategies.withDefaults().messageReaders();
    }

    public RequestBodyRoutePredicateFactory(List<HttpMessageReader<?>> messageReaders) {
        super(RequestBodyRoutePredicateFactory.Config.class);
        this.messageReaders = messageReaders;
    }

    public static final String REQUEST_BODY_ATTR = "requestBodyAttr";


    @Override
    public AsyncPredicate<ServerWebExchange> applyAsync(Config config) {
        return exchange -> {
            if (!"POST".equals(exchange.getRequest().getMethodValue()) && !"PUT".equals(exchange.getRequest().getMethodValue())) {
                return Mono.just(true);
            }
            Object cachedBody = exchange.getAttribute(REQUEST_BODY_ATTR);
            if (cachedBody != null) {
                try {
                    return Mono.just(true);
                } catch (ClassCastException e) {
                    if (LOGGER.isDebugEnabled()) {
                        LOGGER.debug("Predicate test failed because class in predicate does not match the cached body object",
                                e);
                    }
                }
                return Mono.just(true);
            } else {
                return ServerWebExchangeUtils.cacheRequestBodyAndRequest(exchange, (serverHttpRequest) -> ServerRequest.create(exchange.mutate().request(serverHttpRequest).build(),
//this.messageReaders 的默认缓存还是256k
this.messageReaders).bodyToMono(String.class).defaultIfEmpty("").doOnNext((objectValue) -> {
                    if (StringUtils.isBlank(objectValue)) {
                        exchange.getAttributes().put(REQUEST_BODY_ATTR, JSON.toJSONString(exchange.getRequest().getQueryParams()));
                    } else {
                        exchange.getAttributes().put(REQUEST_BODY_ATTR, objectValue);
                    }
                }).map((objectValue) -> true));

            }
        };
    }

  ....

}

原因:原代码中获取body后,重新创建ServerRequest时,org.springframework.core.io.buffer.LimitedDataBufferList中判断接收数据大小超过制,org.springframework.core.codec.AbstractDataBufferDecoder中的默认262144。
具体可参考DataBufferLimitException: Exceeded limit on max bytes to buffer : 262144

解决办法:注入ServerCodecConfigurer,使用ServerCodecConfigurer.getReaders()获取相关配置。

public class RequestBodyRoutePredicateFactory
        extends AbstractRoutePredicateFactory<RequestBodyRoutePredicateFactory.Config> {
    protected static final Log LOGGER = LogFactory.getLog(RequestBodyRoutePredicateFactory.class);
    //注入spring.codec.max-in-memory-size
    @Autowired
    ServerCodecConfigurer codecConfigurer;
    private final List<HttpMessageReader<?>> messageReaders;

    public RequestBodyRoutePredicateFactory() {
        super(RequestBodyRoutePredicateFactory.Config.class);
        this.messageReaders = HandlerStrategies.withDefaults().messageReaders();
    }

    public RequestBodyRoutePredicateFactory(List<HttpMessageReader<?>> messageReaders) {
        super(RequestBodyRoutePredicateFactory.Config.class);
        this.messageReaders = messageReaders;
    }


    public static final String REQUEST_BODY_ATTR = "requestBodyAttr";


    @Override
    public AsyncPredicate<ServerWebExchange> applyAsync(Config config) {
        return exchange -> {
            if (!"POST".equals(exchange.getRequest().getMethodValue()) && !"PUT".equals(exchange.getRequest().getMethodValue())) {
                return Mono.just(true);
            }
            Object cachedBody = exchange.getAttribute(REQUEST_BODY_ATTR);
            if (cachedBody != null) {
                try {
                    return Mono.just(true);
                } catch (ClassCastException e) {
                    if (LOGGER.isDebugEnabled()) {
                        LOGGER.debug("Predicate test failed because class in predicate does not match the cached body object",
                                e);
                    }
                }
                return Mono.just(true);
            } else {
                return ServerWebExchangeUtils.cacheRequestBodyAndRequest(exchange, (serverHttpRequest) ->
//codecConfigurer.getReaders(),来获取spring.codec.max-in-memory-size的配置
ServerRequest.create(exchange.mutate().request(serverHttpRequest).build(),codecConfigurer.getReaders()).bodyToMono(String.class).defaultIfEmpty("").doOnNext((objectValue) -> {
                    if (StringUtils.isBlank(objectValue)) {
                        exchange.getAttributes().put(REQUEST_BODY_ATTR, JSON.toJSONString(exchange.getRequest().getQueryParams()));
                    } else {
                        exchange.getAttributes().put(REQUEST_BODY_ATTR, objectValue);
                    }
                }).map((objectValue) -> true));

            }
        };
    }
.....
}

结果:成功

05-09 16:31