本文主要研究一下nacos config的publishConfig

ConfigController

nacos-1.1.3/config/src/main/java/com/alibaba/nacos/config/server/controller/ConfigController.java

@Controller
@RequestMapping(Constants.CONFIG_CONTROLLER_PATH)
public class ConfigController {

    private static final Logger log = LoggerFactory.getLogger(ConfigController.class);

    private static final String NAMESPACE_PUBLIC_KEY = "public";

    public static final String EXPORT_CONFIG_FILE_NAME = "nacos_config_export_";

    public static final String EXPORT_CONFIG_FILE_NAME_EXT = ".zip";

    public static final String EXPORT_CONFIG_FILE_NAME_DATE_FORMAT = "yyyy-MM-dd HH:mm:ss";

    private final transient ConfigServletInner inner;

    private final transient PersistService persistService;

    private final transient ConfigSubService configSubService;

    @Autowired
    public ConfigController(ConfigServletInner configServletInner, PersistService persistService,
                            ConfigSubService configSubService) {
        this.inner = configServletInner;
        this.persistService = persistService;
        this.configSubService = configSubService;
    }

    /**
     * 增加或更新非聚合数据。
     *
     * @throws NacosException
     */
    @RequestMapping(method = RequestMethod.POST)
    @ResponseBody
    public Boolean publishConfig(HttpServletRequest request, HttpServletResponse response,
                                 @RequestParam("dataId") String dataId, @RequestParam("group") String group,
                                 @RequestParam(value = "tenant", required = false, defaultValue = StringUtils.EMPTY)
                                     String tenant,
                                 @RequestParam("content") String content,
                                 @RequestParam(value = "tag", required = false) String tag,
                                 @RequestParam(value = "appName", required = false) String appName,
                                 @RequestParam(value = "src_user", required = false) String srcUser,
                                 @RequestParam(value = "config_tags", required = false) String configTags,
                                 @RequestParam(value = "desc", required = false) String desc,
                                 @RequestParam(value = "use", required = false) String use,
                                 @RequestParam(value = "effect", required = false) String effect,
                                 @RequestParam(value = "type", required = false) String type,
                                 @RequestParam(value = "schema", required = false) String schema)
        throws NacosException {
        final String srcIp = RequestUtil.getRemoteIp(request);
        String requestIpApp = RequestUtil.getAppName(request);
        ParamUtils.checkParam(dataId, group, "datumId", content);
        ParamUtils.checkParam(tag);

        Map<String, Object> configAdvanceInfo = new HashMap<String, Object>(10);
        if (configTags != null) {
            configAdvanceInfo.put("config_tags", configTags);
        }
        if (desc != null) {
            configAdvanceInfo.put("desc", desc);
        }
        if (use != null) {
            configAdvanceInfo.put("use", use);
        }
        if (effect != null) {
            configAdvanceInfo.put("effect", effect);
        }
        if (type != null) {
            configAdvanceInfo.put("type", type);
        }
        if (schema != null) {
            configAdvanceInfo.put("schema", schema);
        }
        ParamUtils.checkParam(configAdvanceInfo);

        if (AggrWhitelist.isAggrDataId(dataId)) {
            log.warn("[aggr-conflict] {} attemp to publish single data, {}, {}",
                RequestUtil.getRemoteIp(request), dataId, group);
            throw new NacosException(NacosException.NO_RIGHT, "dataId:" + dataId + " is aggr");
        }

        final Timestamp time = TimeUtils.getCurrentTime();
        String betaIps = request.getHeader("betaIps");
        ConfigInfo configInfo = new ConfigInfo(dataId, group, tenant, appName, content);
        if (StringUtils.isBlank(betaIps)) {
            if (StringUtils.isBlank(tag)) {
                persistService.insertOrUpdate(srcIp, srcUser, configInfo, time, configAdvanceInfo, false);
                EventDispatcher.fireEvent(new ConfigDataChangeEvent(false, dataId, group, tenant, time.getTime()));
            } else {
                persistService.insertOrUpdateTag(configInfo, tag, srcIp, srcUser, time, false);
                EventDispatcher.fireEvent(new ConfigDataChangeEvent(false, dataId, group, tenant, tag, time.getTime()));
            }
        } else { // beta publish
            persistService.insertOrUpdateBeta(configInfo, betaIps, srcIp, srcUser, time, false);
            EventDispatcher.fireEvent(new ConfigDataChangeEvent(true, dataId, group, tenant, time.getTime()));
        }
        ConfigTraceService.logPersistenceEvent(dataId, group, tenant, requestIpApp, time.getTime(),
            LOCAL_IP, ConfigTraceService.PERSISTENCE_EVENT_PUB, content);

        return true;
    }

    //......
}
  • publishConfig根据入参构造configAdvanceInfo及configInfo,对于前者会执行ParamUtils.checkParam(configAdvanceInfo)校验
  • 对于有betaIps的则执行persistService.insertOrUpdateBeta(configInfo, betaIps, srcIp, srcUser, time, false),然后发布ConfigDataChangeEvent
  • 对于没有betaIps的则判断tag是否为空,为空则执行persistService.insertOrUpdate(srcIp, srcUser, configInfo, time, configAdvanceInfo, false),不为空则执行persistService.insertOrUpdateTag(configInfo, tag, srcIp, srcUser, time, false);二者都会发布ConfigDataChangeEvent

PersistService

nacos-1.1.3/config/src/main/java/com/alibaba/nacos/config/server/service/PersistService.java

@Repository
public class PersistService {

    @Autowired
    private DynamicDataSource dynamicDataSource;

    private DataSourceService dataSourceService;

    //......

    /**
     * 写入主表,插入或更新
     */
    public void insertOrUpdate(String srcIp, String srcUser, ConfigInfo configInfo, Timestamp time,
                               Map<String, Object> configAdvanceInfo, boolean notify) {
        try {
            addConfigInfo(srcIp, srcUser, configInfo, time, configAdvanceInfo, notify);
        } catch (DataIntegrityViolationException ive) { // 唯一性约束冲突
            updateConfigInfo(configInfo, srcIp, srcUser, time, configAdvanceInfo, notify);
        }
    }

    public void insertOrUpdateTag(final ConfigInfo configInfo, final String tag, final String srcIp,
                                  final String srcUser, final Timestamp time, final boolean notify) {
        try {
            addConfigInfo4Tag(configInfo, tag, srcIp, null, time, notify);
        } catch (DataIntegrityViolationException ive) { // 唯一性约束冲突
            updateConfigInfo4Tag(configInfo, tag, srcIp, null, time, notify);
        }
    }

    public void insertOrUpdateBeta(final ConfigInfo configInfo, final String betaIps, final String srcIp,
                                   final String srcUser, final Timestamp time, final boolean notify) {
        try {
            addConfigInfo4Beta(configInfo, betaIps, srcIp, null, time, notify);
        } catch (DataIntegrityViolationException ive) { // 唯一性约束冲突
            updateConfigInfo4Beta(configInfo, srcIp, null, time, notify);
        }
    }

    //......

    /**
     * 添加普通配置信息,发布数据变更事件
     */
    public void addConfigInfo(final String srcIp, final String srcUser, final ConfigInfo configInfo,
                              final Timestamp time, final Map<String, Object> configAdvanceInfo, final boolean notify) {
        tjt.execute(new TransactionCallback<Boolean>() {
            @Override
            public Boolean doInTransaction(TransactionStatus status) {
                try {
                    long configId = addConfigInfoAtomic(srcIp, srcUser, configInfo, time, configAdvanceInfo);
                    String configTags = configAdvanceInfo == null ? null : (String) configAdvanceInfo.get("config_tags");
                    addConfiTagsRelationAtomic(configId, configTags, configInfo.getDataId(), configInfo.getGroup(),
                        configInfo.getTenant());
                    insertConfigHistoryAtomic(0, configInfo, srcIp, srcUser, time, "I");
                    if (notify) {
                        EventDispatcher.fireEvent(
                            new ConfigDataChangeEvent(false, configInfo.getDataId(), configInfo.getGroup(),
                                configInfo.getTenant(), time.getTime()));
                    }
                } catch (CannotGetJdbcConnectionException e) {
                    fatalLog.error("[db-error] " + e.toString(), e);
                    throw e;
                }
                return Boolean.TRUE;
            }
        });
    }

    /**
     * 更新配置信息
     */
    public void updateConfigInfo(final ConfigInfo configInfo, final String srcIp, final String srcUser,
                                 final Timestamp time, final Map<String, Object> configAdvanceInfo,
                                 final boolean notify) {
        tjt.execute(new TransactionCallback<Boolean>() {
            @Override
            public Boolean doInTransaction(TransactionStatus status) {
                try {
                    ConfigInfo oldConfigInfo = findConfigInfo(configInfo.getDataId(), configInfo.getGroup(),
                        configInfo.getTenant());
                    String appNameTmp = oldConfigInfo.getAppName();
                    // 用户传过来的appName不为空,则用持久化用户的appName,否则用db的;清空appName的时候需要传空串
                    if (configInfo.getAppName() == null) {
                        configInfo.setAppName(appNameTmp);
                    }
                    updateConfigInfoAtomic(configInfo, srcIp, srcUser, time, configAdvanceInfo);
                    String configTags = configAdvanceInfo == null ? null : (String) configAdvanceInfo.get("config_tags");
                    if (configTags != null) {
                        // 删除所有tag,然后再重新创建
                        removeTagByIdAtomic(oldConfigInfo.getId());
                        addConfiTagsRelationAtomic(oldConfigInfo.getId(), configTags, configInfo.getDataId(),
                            configInfo.getGroup(), configInfo.getTenant());
                    }
                    insertConfigHistoryAtomic(oldConfigInfo.getId(), oldConfigInfo, srcIp, srcUser, time, "U");
                    if (notify) {
                        EventDispatcher.fireEvent(new ConfigDataChangeEvent(false, configInfo.getDataId(),
                            configInfo.getGroup(), configInfo.getTenant(), time.getTime()));
                    }
                } catch (CannotGetJdbcConnectionException e) {
                    fatalLog.error("[db-error] " + e.toString(), e);
                    throw e;
                }
                return Boolean.TRUE;
            }
        });
    }

    /**
     * 添加普通配置信息,发布数据变更事件
     */
    public void addConfigInfo4Tag(ConfigInfo configInfo, String tag, String srcIp, String srcUser, Timestamp time,
                                  boolean notify) {
        String appNameTmp = StringUtils.isBlank(configInfo.getAppName()) ? StringUtils.EMPTY : configInfo.getAppName();
        String tenantTmp = StringUtils.isBlank(configInfo.getTenant()) ? StringUtils.EMPTY : configInfo.getTenant();
        String tagTmp = StringUtils.isBlank(tag) ? StringUtils.EMPTY : tag.trim();
        try {
            String md5 = MD5.getInstance().getMD5String(configInfo.getContent());
            jt.update(
                "INSERT INTO config_info_tag(data_id,group_id,tenant_id,tag_id,app_name,content,md5,src_ip,src_user,"
                    + "gmt_create,gmt_modified) VALUES(?,?,?,?,?,?,?,?,?,?,?)",
                configInfo.getDataId(), configInfo.getGroup(), tenantTmp, tagTmp, appNameTmp, configInfo.getContent(),
                md5,
                srcIp, srcUser, time, time);
            if (notify) {
                EventDispatcher.fireEvent(new ConfigDataChangeEvent(false, configInfo.getDataId(),
                    configInfo.getGroup(), tenantTmp, tagTmp, time.getTime()));
            }
        } catch (CannotGetJdbcConnectionException e) {
            fatalLog.error("[db-error] " + e.toString(), e);
            throw e;
        }
    }

    /**
     * 更新配置信息
     */
    public void updateConfigInfo4Tag(ConfigInfo configInfo, String tag, String srcIp, String srcUser, Timestamp time,
                                     boolean notify) {
        String appNameTmp = StringUtils.isBlank(configInfo.getAppName()) ? StringUtils.EMPTY : configInfo.getAppName();
        String tenantTmp = StringUtils.isBlank(configInfo.getTenant()) ? StringUtils.EMPTY : configInfo.getTenant();
        String tagTmp = StringUtils.isBlank(tag) ? StringUtils.EMPTY : tag.trim();
        try {
            String md5 = MD5.getInstance().getMD5String(configInfo.getContent());
            jt.update(
                "UPDATE config_info_tag SET content=?, md5 = ?, src_ip=?,src_user=?,gmt_modified=?,app_name=? WHERE "
                    + "data_id=? AND group_id=? AND tenant_id=? AND tag_id=?",
                configInfo.getContent(), md5, srcIp, srcUser, time, appNameTmp, configInfo.getDataId(),
                configInfo.getGroup(), tenantTmp, tagTmp);
            if (notify) {
                EventDispatcher.fireEvent(new ConfigDataChangeEvent(true, configInfo.getDataId(), configInfo.getGroup(),
                    tenantTmp, tagTmp, time.getTime()));
            }

        } catch (CannotGetJdbcConnectionException e) {
            fatalLog.error("[db-error] " + e.toString(), e);
            throw e;
        }
    }

    /**
     * 添加普通配置信息,发布数据变更事件
     */
    public void addConfigInfo4Beta(ConfigInfo configInfo, String betaIps,
                                   String srcIp, String srcUser, Timestamp time, boolean notify) {
        String appNameTmp = StringUtils.isBlank(configInfo.getAppName()) ? StringUtils.EMPTY : configInfo.getAppName();
        String tenantTmp = StringUtils.isBlank(configInfo.getTenant()) ? StringUtils.EMPTY : configInfo.getTenant();
        try {
            String md5 = MD5.getInstance().getMD5String(configInfo.getContent());
            jt.update(
                "INSERT INTO config_info_beta(data_id,group_id,tenant_id,app_name,content,md5,beta_ips,src_ip,"
                    + "src_user,gmt_create,gmt_modified) VALUES(?,?,?,?,?,?,?,?,?,?,?)",
                configInfo.getDataId(), configInfo.getGroup(), tenantTmp, appNameTmp, configInfo.getContent(), md5,
                betaIps, srcIp, srcUser, time, time);
            if (notify) {
                EventDispatcher.fireEvent(new ConfigDataChangeEvent(true, configInfo.getDataId(), configInfo.getGroup(),
                    tenantTmp, time.getTime()));
            }

        } catch (CannotGetJdbcConnectionException e) {
            fatalLog.error("[db-error] " + e.toString(), e);
            throw e;
        }
    }

    /**
     * 更新配置信息
     */
    public void updateConfigInfo4Beta(ConfigInfo configInfo, String srcIp, String srcUser, Timestamp time,
                                      boolean notify) {
        String appNameTmp = StringUtils.isBlank(configInfo.getAppName()) ? StringUtils.EMPTY : configInfo.getAppName();
        String tenantTmp = StringUtils.isBlank(configInfo.getTenant()) ? StringUtils.EMPTY : configInfo.getTenant();
        try {
            String md5 = MD5.getInstance().getMD5String(configInfo.getContent());
            jt.update(
                "UPDATE config_info_beta SET content=?, md5 = ?, src_ip=?,src_user=?,gmt_modified=?,app_name=? WHERE "
                    + "data_id=? AND group_id=? AND tenant_id=?",
                configInfo.getContent(), md5, srcIp, srcUser, time, appNameTmp, configInfo.getDataId(),
                configInfo.getGroup(), tenantTmp);
            if (notify) {
                EventDispatcher.fireEvent(new ConfigDataChangeEvent(true, configInfo.getDataId(), configInfo.getGroup(),
                    tenantTmp, time.getTime()));
            }

        } catch (CannotGetJdbcConnectionException e) {
            fatalLog.error("[db-error] " + e.toString(), e);
            throw e;
        }
    }

    //......
}
  • insertOrUpdate、insertOrUpdateTag、insertOrUpdateBeta三者的执行逻辑都是先执行insert操作,捕获到DataIntegrityViolationException时执行update操作

小结

  • publishConfig根据入参构造configAdvanceInfo及configInfo,对于前者会执行ParamUtils.checkParam(configAdvanceInfo)校验
  • 对于有betaIps的则执行persistService.insertOrUpdateBeta(configInfo, betaIps, srcIp, srcUser, time, false),然后发布ConfigDataChangeEvent
  • 对于没有betaIps的则判断tag是否为空,为空则执行persistService.insertOrUpdate(srcIp, srcUser, configInfo, time, configAdvanceInfo, false),不为空则执行persistService.insertOrUpdateTag(configInfo, tag, srcIp, srcUser, time, false);二者都会发布ConfigDataChangeEvent

doc

03-05 18:03