问题说明

最近做了关于flink的需求.
现在需要通过HTTP访问FLINK的 RESTAPI, rest 接口的JSON 非常庞大而复杂。
那么怎么去完整的接收数据呢?

方法一就是手写部分需要的JavaBean,嵌套比较麻烦而复杂。照着json schema写,非常慢。
方法二直接通过jsonObject 接收,当作map 使用,虽然没有第一种方法的问题,但是看不见结构,对于java这种强类型语言,非常不友好。
方法三,直接使用FLINK的源码的类。

那么根据官方文档的 jsonschema 找到对应的实体类。以jobDetailInfo为例.

package org.apache.flink.runtime.rest.messages.job;

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonRawValue;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize;

public class JobDetailsInfo implements ResponseBody {
       public static final String FIELD_NAME_JOB_ID = "jid";

    @JsonProperty(FIELD_NAME_JOB_ID)
    @JsonSerialize(using = JobIDSerializer.class)
    private final JobID jobId;

    @JsonProperty(FIELD_NAME_JOB_NAME)
    private final String name;

//.....
}

问题思考

可以看到这里jobId的属性是 jid.

这里我是通过 spring的httpMessageConverter 接收,也就是需要 json序列化工具来处理。

如果是用fastjson序列化工具,那么fastjson 是无法处理jackson的注解的。

第二点,SpringBoot的框架内是带有 jackson的 消息转换器的,但是通过查看import的信息可以看出,这是无法正确处理这种shade的json。此时可以说和jackson毫无关系。

如果把flink的源码类直接复制出来,修改成正常的非shaded的包名下的jackson 是不是可以接收了呢。 一开始我是这么做的,但是实在是接口比较多,而且源码中依赖的类型比较多,一时半会是复制不完的。

最终解决方案

那么我们提供一个专门针对 shadedJackson的 httpMessageConvert不就可以了吗?

步骤一, 定义shaded jackson 的httpMessageConverter

写一个类 继承 Spring的 抽象类:
org.springframework.http.converter.AbstractGenericHttpMessageConverter

其他内容完全复制AbstractJackson2HttpMessageConverter 即可


public abstract class AbstractShadedJackson2HttpMessageConverter extends AbstractGenericHttpMessageConverter<Object> {
}

然后写一个实现类,其他内容依然是复制MappingJackson2HttpMessageConverter即可。
然后这里最重要的是将所有的import com.fasterxml.jackson 替换为import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.
这样就实现了JVM兼容.

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.http.MediaType;
import org.springframework.http.converter.json.Jackson2ObjectMapperBuilder;
import org.springframework.lang.Nullable;

import java.io.IOException;

public class ShadedMappingJackson2HttpMessageConverter extends AbstractShadedJackson2HttpMessageConverter {
}

此处省略相关的jackson的类型,处理方式类似,都是替换包名。

最后注册到Spring 内大功告成。

@Configuration
public class FeignSupport {

  @Bean
  public ShadedMappingJackson2HttpMessageConverter httpMessageConverter() {
    ObjectMapper objectMapper = new ObjectMapper();
    objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
    objectMapper.setTimeZone(TimeZone.getTimeZone("GMT+8"));
    objectMapper.setDateFormat(new SimpleDateFormat());
    return new ShadedMappingJackson2HttpMessageConverter(objectMapper);
  }

}

这里再提供一个FeignClient接口:


    /*
     * @see org.apache.flink.runtime.rest.messages.job.JobDetailsInfo
     */
  @GetMapping(value = "/v1/jobs/{jobid}", produces = MediaType.APPLICATION_JSON_UTF8_VALUE)
  JobDetailsInfo job(@PathVariable("jobid") String jobId, @RequestHeader(APP_ID_HEADER) String appId);

这样便可以接收。

思考

这里有必要条件:就是要保证刚刚提供的HttpMessageConverter 需要比较高的优先级。
为什么说HttpMessageConverter的顺序非常重要_SpringBoot 参考这篇文章

01-22 17:44