序
本文主要研究一下nacos config的publishConfig
ConfigController
nacos-1.1.3/config/src/main/java/com/alibaba/nacos/config/server/controller/ConfigController.java
@Controller
@RequestMapping(Constants.CONFIG_CONTROLLER_PATH)
public class ConfigController {
private static final Logger log = LoggerFactory.getLogger(ConfigController.class);
private static final String NAMESPACE_PUBLIC_KEY = "public";
public static final String EXPORT_CONFIG_FILE_NAME = "nacos_config_export_";
public static final String EXPORT_CONFIG_FILE_NAME_EXT = ".zip";
public static final String EXPORT_CONFIG_FILE_NAME_DATE_FORMAT = "yyyy-MM-dd HH:mm:ss";
private final transient ConfigServletInner inner;
private final transient PersistService persistService;
private final transient ConfigSubService configSubService;
@Autowired
public ConfigController(ConfigServletInner configServletInner, PersistService persistService,
ConfigSubService configSubService) {
this.inner = configServletInner;
this.persistService = persistService;
this.configSubService = configSubService;
}
/**
* 增加或更新非聚合数据。
*
* @throws NacosException
*/
@RequestMapping(method = RequestMethod.POST)
@ResponseBody
public Boolean publishConfig(HttpServletRequest request, HttpServletResponse response,
@RequestParam("dataId") String dataId, @RequestParam("group") String group,
@RequestParam(value = "tenant", required = false, defaultValue = StringUtils.EMPTY)
String tenant,
@RequestParam("content") String content,
@RequestParam(value = "tag", required = false) String tag,
@RequestParam(value = "appName", required = false) String appName,
@RequestParam(value = "src_user", required = false) String srcUser,
@RequestParam(value = "config_tags", required = false) String configTags,
@RequestParam(value = "desc", required = false) String desc,
@RequestParam(value = "use", required = false) String use,
@RequestParam(value = "effect", required = false) String effect,
@RequestParam(value = "type", required = false) String type,
@RequestParam(value = "schema", required = false) String schema)
throws NacosException {
final String srcIp = RequestUtil.getRemoteIp(request);
String requestIpApp = RequestUtil.getAppName(request);
ParamUtils.checkParam(dataId, group, "datumId", content);
ParamUtils.checkParam(tag);
Map<String, Object> configAdvanceInfo = new HashMap<String, Object>(10);
if (configTags != null) {
configAdvanceInfo.put("config_tags", configTags);
}
if (desc != null) {
configAdvanceInfo.put("desc", desc);
}
if (use != null) {
configAdvanceInfo.put("use", use);
}
if (effect != null) {
configAdvanceInfo.put("effect", effect);
}
if (type != null) {
configAdvanceInfo.put("type", type);
}
if (schema != null) {
configAdvanceInfo.put("schema", schema);
}
ParamUtils.checkParam(configAdvanceInfo);
if (AggrWhitelist.isAggrDataId(dataId)) {
log.warn("[aggr-conflict] {} attemp to publish single data, {}, {}",
RequestUtil.getRemoteIp(request), dataId, group);
throw new NacosException(NacosException.NO_RIGHT, "dataId:" + dataId + " is aggr");
}
final Timestamp time = TimeUtils.getCurrentTime();
String betaIps = request.getHeader("betaIps");
ConfigInfo configInfo = new ConfigInfo(dataId, group, tenant, appName, content);
if (StringUtils.isBlank(betaIps)) {
if (StringUtils.isBlank(tag)) {
persistService.insertOrUpdate(srcIp, srcUser, configInfo, time, configAdvanceInfo, false);
EventDispatcher.fireEvent(new ConfigDataChangeEvent(false, dataId, group, tenant, time.getTime()));
} else {
persistService.insertOrUpdateTag(configInfo, tag, srcIp, srcUser, time, false);
EventDispatcher.fireEvent(new ConfigDataChangeEvent(false, dataId, group, tenant, tag, time.getTime()));
}
} else { // beta publish
persistService.insertOrUpdateBeta(configInfo, betaIps, srcIp, srcUser, time, false);
EventDispatcher.fireEvent(new ConfigDataChangeEvent(true, dataId, group, tenant, time.getTime()));
}
ConfigTraceService.logPersistenceEvent(dataId, group, tenant, requestIpApp, time.getTime(),
LOCAL_IP, ConfigTraceService.PERSISTENCE_EVENT_PUB, content);
return true;
}
//......
}
- publishConfig根据入参构造configAdvanceInfo及configInfo,对于前者会执行ParamUtils.checkParam(configAdvanceInfo)校验
- 对于有betaIps的则执行persistService.insertOrUpdateBeta(configInfo, betaIps, srcIp, srcUser, time, false),然后发布ConfigDataChangeEvent
- 对于没有betaIps的则判断tag是否为空,为空则执行persistService.insertOrUpdate(srcIp, srcUser, configInfo, time, configAdvanceInfo, false),不为空则执行persistService.insertOrUpdateTag(configInfo, tag, srcIp, srcUser, time, false);二者都会发布ConfigDataChangeEvent
PersistService
nacos-1.1.3/config/src/main/java/com/alibaba/nacos/config/server/service/PersistService.java
@Repository
public class PersistService {
@Autowired
private DynamicDataSource dynamicDataSource;
private DataSourceService dataSourceService;
//......
/**
* 写入主表,插入或更新
*/
public void insertOrUpdate(String srcIp, String srcUser, ConfigInfo configInfo, Timestamp time,
Map<String, Object> configAdvanceInfo, boolean notify) {
try {
addConfigInfo(srcIp, srcUser, configInfo, time, configAdvanceInfo, notify);
} catch (DataIntegrityViolationException ive) { // 唯一性约束冲突
updateConfigInfo(configInfo, srcIp, srcUser, time, configAdvanceInfo, notify);
}
}
public void insertOrUpdateTag(final ConfigInfo configInfo, final String tag, final String srcIp,
final String srcUser, final Timestamp time, final boolean notify) {
try {
addConfigInfo4Tag(configInfo, tag, srcIp, null, time, notify);
} catch (DataIntegrityViolationException ive) { // 唯一性约束冲突
updateConfigInfo4Tag(configInfo, tag, srcIp, null, time, notify);
}
}
public void insertOrUpdateBeta(final ConfigInfo configInfo, final String betaIps, final String srcIp,
final String srcUser, final Timestamp time, final boolean notify) {
try {
addConfigInfo4Beta(configInfo, betaIps, srcIp, null, time, notify);
} catch (DataIntegrityViolationException ive) { // 唯一性约束冲突
updateConfigInfo4Beta(configInfo, srcIp, null, time, notify);
}
}
//......
/**
* 添加普通配置信息,发布数据变更事件
*/
public void addConfigInfo(final String srcIp, final String srcUser, final ConfigInfo configInfo,
final Timestamp time, final Map<String, Object> configAdvanceInfo, final boolean notify) {
tjt.execute(new TransactionCallback<Boolean>() {
@Override
public Boolean doInTransaction(TransactionStatus status) {
try {
long configId = addConfigInfoAtomic(srcIp, srcUser, configInfo, time, configAdvanceInfo);
String configTags = configAdvanceInfo == null ? null : (String) configAdvanceInfo.get("config_tags");
addConfiTagsRelationAtomic(configId, configTags, configInfo.getDataId(), configInfo.getGroup(),
configInfo.getTenant());
insertConfigHistoryAtomic(0, configInfo, srcIp, srcUser, time, "I");
if (notify) {
EventDispatcher.fireEvent(
new ConfigDataChangeEvent(false, configInfo.getDataId(), configInfo.getGroup(),
configInfo.getTenant(), time.getTime()));
}
} catch (CannotGetJdbcConnectionException e) {
fatalLog.error("[db-error] " + e.toString(), e);
throw e;
}
return Boolean.TRUE;
}
});
}
/**
* 更新配置信息
*/
public void updateConfigInfo(final ConfigInfo configInfo, final String srcIp, final String srcUser,
final Timestamp time, final Map<String, Object> configAdvanceInfo,
final boolean notify) {
tjt.execute(new TransactionCallback<Boolean>() {
@Override
public Boolean doInTransaction(TransactionStatus status) {
try {
ConfigInfo oldConfigInfo = findConfigInfo(configInfo.getDataId(), configInfo.getGroup(),
configInfo.getTenant());
String appNameTmp = oldConfigInfo.getAppName();
// 用户传过来的appName不为空,则用持久化用户的appName,否则用db的;清空appName的时候需要传空串
if (configInfo.getAppName() == null) {
configInfo.setAppName(appNameTmp);
}
updateConfigInfoAtomic(configInfo, srcIp, srcUser, time, configAdvanceInfo);
String configTags = configAdvanceInfo == null ? null : (String) configAdvanceInfo.get("config_tags");
if (configTags != null) {
// 删除所有tag,然后再重新创建
removeTagByIdAtomic(oldConfigInfo.getId());
addConfiTagsRelationAtomic(oldConfigInfo.getId(), configTags, configInfo.getDataId(),
configInfo.getGroup(), configInfo.getTenant());
}
insertConfigHistoryAtomic(oldConfigInfo.getId(), oldConfigInfo, srcIp, srcUser, time, "U");
if (notify) {
EventDispatcher.fireEvent(new ConfigDataChangeEvent(false, configInfo.getDataId(),
configInfo.getGroup(), configInfo.getTenant(), time.getTime()));
}
} catch (CannotGetJdbcConnectionException e) {
fatalLog.error("[db-error] " + e.toString(), e);
throw e;
}
return Boolean.TRUE;
}
});
}
/**
* 添加普通配置信息,发布数据变更事件
*/
public void addConfigInfo4Tag(ConfigInfo configInfo, String tag, String srcIp, String srcUser, Timestamp time,
boolean notify) {
String appNameTmp = StringUtils.isBlank(configInfo.getAppName()) ? StringUtils.EMPTY : configInfo.getAppName();
String tenantTmp = StringUtils.isBlank(configInfo.getTenant()) ? StringUtils.EMPTY : configInfo.getTenant();
String tagTmp = StringUtils.isBlank(tag) ? StringUtils.EMPTY : tag.trim();
try {
String md5 = MD5.getInstance().getMD5String(configInfo.getContent());
jt.update(
"INSERT INTO config_info_tag(data_id,group_id,tenant_id,tag_id,app_name,content,md5,src_ip,src_user,"
+ "gmt_create,gmt_modified) VALUES(?,?,?,?,?,?,?,?,?,?,?)",
configInfo.getDataId(), configInfo.getGroup(), tenantTmp, tagTmp, appNameTmp, configInfo.getContent(),
md5,
srcIp, srcUser, time, time);
if (notify) {
EventDispatcher.fireEvent(new ConfigDataChangeEvent(false, configInfo.getDataId(),
configInfo.getGroup(), tenantTmp, tagTmp, time.getTime()));
}
} catch (CannotGetJdbcConnectionException e) {
fatalLog.error("[db-error] " + e.toString(), e);
throw e;
}
}
/**
* 更新配置信息
*/
public void updateConfigInfo4Tag(ConfigInfo configInfo, String tag, String srcIp, String srcUser, Timestamp time,
boolean notify) {
String appNameTmp = StringUtils.isBlank(configInfo.getAppName()) ? StringUtils.EMPTY : configInfo.getAppName();
String tenantTmp = StringUtils.isBlank(configInfo.getTenant()) ? StringUtils.EMPTY : configInfo.getTenant();
String tagTmp = StringUtils.isBlank(tag) ? StringUtils.EMPTY : tag.trim();
try {
String md5 = MD5.getInstance().getMD5String(configInfo.getContent());
jt.update(
"UPDATE config_info_tag SET content=?, md5 = ?, src_ip=?,src_user=?,gmt_modified=?,app_name=? WHERE "
+ "data_id=? AND group_id=? AND tenant_id=? AND tag_id=?",
configInfo.getContent(), md5, srcIp, srcUser, time, appNameTmp, configInfo.getDataId(),
configInfo.getGroup(), tenantTmp, tagTmp);
if (notify) {
EventDispatcher.fireEvent(new ConfigDataChangeEvent(true, configInfo.getDataId(), configInfo.getGroup(),
tenantTmp, tagTmp, time.getTime()));
}
} catch (CannotGetJdbcConnectionException e) {
fatalLog.error("[db-error] " + e.toString(), e);
throw e;
}
}
/**
* 添加普通配置信息,发布数据变更事件
*/
public void addConfigInfo4Beta(ConfigInfo configInfo, String betaIps,
String srcIp, String srcUser, Timestamp time, boolean notify) {
String appNameTmp = StringUtils.isBlank(configInfo.getAppName()) ? StringUtils.EMPTY : configInfo.getAppName();
String tenantTmp = StringUtils.isBlank(configInfo.getTenant()) ? StringUtils.EMPTY : configInfo.getTenant();
try {
String md5 = MD5.getInstance().getMD5String(configInfo.getContent());
jt.update(
"INSERT INTO config_info_beta(data_id,group_id,tenant_id,app_name,content,md5,beta_ips,src_ip,"
+ "src_user,gmt_create,gmt_modified) VALUES(?,?,?,?,?,?,?,?,?,?,?)",
configInfo.getDataId(), configInfo.getGroup(), tenantTmp, appNameTmp, configInfo.getContent(), md5,
betaIps, srcIp, srcUser, time, time);
if (notify) {
EventDispatcher.fireEvent(new ConfigDataChangeEvent(true, configInfo.getDataId(), configInfo.getGroup(),
tenantTmp, time.getTime()));
}
} catch (CannotGetJdbcConnectionException e) {
fatalLog.error("[db-error] " + e.toString(), e);
throw e;
}
}
/**
* 更新配置信息
*/
public void updateConfigInfo4Beta(ConfigInfo configInfo, String srcIp, String srcUser, Timestamp time,
boolean notify) {
String appNameTmp = StringUtils.isBlank(configInfo.getAppName()) ? StringUtils.EMPTY : configInfo.getAppName();
String tenantTmp = StringUtils.isBlank(configInfo.getTenant()) ? StringUtils.EMPTY : configInfo.getTenant();
try {
String md5 = MD5.getInstance().getMD5String(configInfo.getContent());
jt.update(
"UPDATE config_info_beta SET content=?, md5 = ?, src_ip=?,src_user=?,gmt_modified=?,app_name=? WHERE "
+ "data_id=? AND group_id=? AND tenant_id=?",
configInfo.getContent(), md5, srcIp, srcUser, time, appNameTmp, configInfo.getDataId(),
configInfo.getGroup(), tenantTmp);
if (notify) {
EventDispatcher.fireEvent(new ConfigDataChangeEvent(true, configInfo.getDataId(), configInfo.getGroup(),
tenantTmp, time.getTime()));
}
} catch (CannotGetJdbcConnectionException e) {
fatalLog.error("[db-error] " + e.toString(), e);
throw e;
}
}
//......
}
- insertOrUpdate、insertOrUpdateTag、insertOrUpdateBeta三者的执行逻辑都是先执行insert操作,捕获到DataIntegrityViolationException时执行update操作
小结
- publishConfig根据入参构造configAdvanceInfo及configInfo,对于前者会执行ParamUtils.checkParam(configAdvanceInfo)校验
- 对于有betaIps的则执行persistService.insertOrUpdateBeta(configInfo, betaIps, srcIp, srcUser, time, false),然后发布ConfigDataChangeEvent
- 对于没有betaIps的则判断tag是否为空,为空则执行persistService.insertOrUpdate(srcIp, srcUser, configInfo, time, configAdvanceInfo, false),不为空则执行persistService.insertOrUpdateTag(configInfo, tag, srcIp, srcUser, time, false);二者都会发布ConfigDataChangeEvent